Skip to content

Commit

Permalink
satellite/{console,payments}: freeze/warn storjscan users
Browse files Browse the repository at this point in the history
This change enables the freezing/warning of users who use storjscan.

Issue: #6164

Change-Id: I7b00ee09d6527b3818b72326e9065c82ef5a2ac8
  • Loading branch information
wilfred-asomanii authored and Storj Robot committed Aug 31, 2023
1 parent ca0ea50 commit dcc4bd0
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 86 deletions.
12 changes: 12 additions & 0 deletions satellite/console/accountfreezes.go
Expand Up @@ -77,6 +77,18 @@ const (
Warning AccountFreezeEventType = 1
)

// String returns a string representation of this event.
func (et AccountFreezeEventType) String() string {
switch et {
case Freeze:
return "Freeze"
case Warning:
return "Warning"
default:
return ""
}
}

// AccountFreezeService encapsulates operations concerning account freezes.
type AccountFreezeService struct {
freezeEventsDB AccountFreezeEvents
Expand Down
48 changes: 48 additions & 0 deletions satellite/console/observerpayinvoicewithtokens.go
@@ -0,0 +1,48 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.

package console

import (
"context"

"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/billing"
)

var _ billing.Observer = (*InvoiceTokenPaymentObserver)(nil)

// InvoiceTokenPaymentObserver used to pay pending payments with STORJ tokens.
type InvoiceTokenPaymentObserver struct {
consoleDB DB
payments payments.Accounts
}

// NewInvoiceTokenPaymentObserver creates new observer instance.
func NewInvoiceTokenPaymentObserver(consoleDB DB, payments payments.Accounts) *InvoiceTokenPaymentObserver {
return &InvoiceTokenPaymentObserver{
consoleDB: consoleDB,
payments: payments,
}
}

// Process attempts to pay user's pending payments with tokens.
func (o *InvoiceTokenPaymentObserver) Process(ctx context.Context, transaction billing.Transaction) (err error) {
defer mon.Task()(&ctx)(&err)

user, err := o.consoleDB.Users().Get(ctx, transaction.UserID)
if err != nil {
return err
}

if !user.PaidTier {
return nil
}

err = o.payments.Invoices().AttemptPayOverdueInvoicesWithTokens(ctx, user.ID)
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions satellite/core.go
Expand Up @@ -521,6 +521,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,

choreObservers := billing.ChoreObservers{
UpgradeUser: console.NewUpgradeUserObserver(peer.DB.Console(), peer.DB.Billing(), config.Console.UsageLimits, config.Console.UserBalanceForUpgrade),
PayInvoices: console.NewInvoiceTokenPaymentObserver(peer.DB.Console(), peer.Payments.Accounts),
}

peer.Payments.BillingChore = billing.NewChore(
Expand Down
24 changes: 20 additions & 4 deletions satellite/payments/accountfreeze/chore.go
Expand Up @@ -33,7 +33,7 @@ type Config struct {
Interval time.Duration `help:"How often to run this chore, which is how often unpaid invoices are checked." default:"24h"`
GracePeriod time.Duration `help:"How long to wait between a warning event and freezing an account." default:"360h"`
PriceThreshold int64 `help:"The failed invoice amount (in cents) beyond which an account will not be frozen" default:"10000"`
ExcludeStorjscan bool `help:"whether to exclude storjscan-paying users from automatic warn/freeze" default:"true"`
ExcludeStorjscan bool `help:"whether to exclude storjscan-paying users from automatic warn/freeze" default:"false"`
}

// Chore is a chore that checks for unpaid invoices and potentially freezes corresponding accounts.
Expand Down Expand Up @@ -116,6 +116,7 @@ func (chore *Chore) attemptFreezeWarn(ctx context.Context) {

debugLog := func(message string) {
chore.log.Debug(message,
zap.String("process", "freeze/warn"),
zap.String("invoiceID", invoice.ID),
zap.String("customerID", invoice.CustomerID),
zap.Any("userID", userID),
Expand All @@ -124,6 +125,7 @@ func (chore *Chore) attemptFreezeWarn(ctx context.Context) {

errorLog := func(message string, err error) {
chore.log.Error(message,
zap.String("process", "freeze/warn"),
zap.String("invoiceID", invoice.ID),
zap.String("customerID", invoice.CustomerID),
zap.Any("userID", userID),
Expand Down Expand Up @@ -284,26 +286,40 @@ func (chore *Chore) attemptUnfreezeUnwarn(ctx context.Context) {
}

for _, event := range events.Events {
errorLog := func(message string, err error) {
chore.log.Error(message,
zap.String("process", "unfreeze/unwarn"),
zap.Any("userID", event.UserID),
zap.String("eventType", event.Type.String()),
zap.Error(Error.Wrap(err)),
)
}

usersCount++
invoices, err := chore.payments.Invoices().ListFailed(ctx, &event.UserID)
if err != nil {
chore.log.Error("Could not get failed invoices for user", zap.Error(Error.Wrap(err)))
errorLog("Could not get failed invoices for user", err)
continue
}
if len(invoices) > 0 {
// try to pay the invoices.
err = chore.payments.Invoices().AttemptPayOverdueInvoices(ctx, event.UserID)
if err != nil {
errorLog("Could not attempt payment", err)
}
continue
}

if event.Type == console.Freeze {
err = chore.freezeService.UnfreezeUser(ctx, event.UserID)
if err != nil {
chore.log.Error("Could not unfreeze user", zap.Error(Error.Wrap(err)))
errorLog("Could not unfreeze user", err)
}
unfrozenCount++
} else {
err = chore.freezeService.UnWarnUser(ctx, event.UserID)
if err != nil {
chore.log.Error("Could not unwarn user", zap.Error(Error.Wrap(err)))
errorLog("Could not unwarn user", err)
}
unwarnedCount++
}
Expand Down
168 changes: 95 additions & 73 deletions satellite/payments/accountfreeze/chore_test.go
Expand Up @@ -323,90 +323,112 @@ func TestAutoFreezeChore(t *testing.T) {
err = service.UnfreezeUser(ctx, user2.ID)
require.NoError(t, err)
})
})
}

t.Run("Storjscan exceptions", func(t *testing.T) {
// AnalyticsMock tests that events are sent once.
service.TestChangeFreezeTracker(newFreezeTrackerMock(t))
// reset chore clock
chore.TestSetNow(time.Now)

storjscanUser, err := sat.AddUser(ctx, console.CreateUser{
FullName: "Test User",
Email: "storjscanuser@mail.test",
}, 1)
require.NoError(t, err)
func TestAutoFreezeChore_StorjscanExclusion(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.AccountFreeze.Enabled = true
config.AccountFreeze.ExcludeStorjscan = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
stripeClient := sat.API.Payments.StripeClient
invoicesDB := sat.Core.Payments.Accounts.Invoices()
customerDB := sat.Core.DB.StripeCoinPayments().Customers()
usersDB := sat.DB.Console().Users()
projectsDB := sat.DB.Console().Projects()
service := console.NewAccountFreezeService(sat.DB.Console().AccountFreezeEvents(), usersDB, projectsDB, newFreezeTrackerMock(t))
chore := sat.Core.Payments.AccountFreeze
chore.TestSetFreezeService(service)

// create a wallet and transaction for the new user in storjscan
address, err := blockchain.BytesToAddress(testrand.Bytes(20))
require.NoError(t, err)
require.NoError(t, sat.DB.Wallets().Add(ctx, storjscanUser.ID, address))
cachedPayments := []storjscan.CachedPayment{
{
From: blockchaintest.NewAddress(),
To: address,
TokenValue: currency.AmountFromBaseUnits(1000, currency.StorjToken),
USDValue: currency.AmountFromBaseUnits(testrand.Int63n(1000), currency.USDollarsMicro),
BlockHash: blockchaintest.NewHash(),
Transaction: blockchaintest.NewHash(),
Status: payments.PaymentStatusConfirmed,
Timestamp: time.Now(),
},
}
require.NoError(t, sat.DB.StorjscanPayments().InsertBatch(ctx, cachedPayments))
amount := int64(100)
curr := string(stripe.CurrencyUSD)

storjscanCus, err := customerDB.GetCustomerID(ctx, storjscanUser.ID)
require.NoError(t, err)
// AnalyticsMock tests that events are sent once.
service.TestChangeFreezeTracker(newFreezeTrackerMock(t))
// reset chore clock
chore.TestSetNow(time.Now)

item, err := stripeClient.InvoiceItems().New(&stripe.InvoiceItemParams{
Params: stripe.Params{Context: ctx},
Amount: &amount,
Currency: &curr,
Customer: &storjscanCus,
})
require.NoError(t, err)
storjscanUser, err := sat.AddUser(ctx, console.CreateUser{
FullName: "Test User",
Email: "storjscanuser@mail.test",
}, 1)
require.NoError(t, err)

items := make([]*stripe.InvoiceUpcomingInvoiceItemParams, 0, 1)
items = append(items, &stripe.InvoiceUpcomingInvoiceItemParams{
InvoiceItem: &item.ID,
Amount: &amount,
Currency: &curr,
})
inv, err := stripeClient.Invoices().New(&stripe.InvoiceParams{
Params: stripe.Params{Context: ctx},
Customer: &storjscanCus,
InvoiceItems: items,
})
require.NoError(t, err)
// create a wallet and transaction for the new user in storjscan
address, err := blockchain.BytesToAddress(testrand.Bytes(20))
require.NoError(t, err)
require.NoError(t, sat.DB.Wallets().Add(ctx, storjscanUser.ID, address))
cachedPayments := []storjscan.CachedPayment{
{
From: blockchaintest.NewAddress(),
To: address,
TokenValue: currency.AmountFromBaseUnits(1000, currency.StorjToken),
USDValue: currency.AmountFromBaseUnits(testrand.Int63n(1000), currency.USDollarsMicro),
BlockHash: blockchaintest.NewHash(),
Transaction: blockchaintest.NewHash(),
Status: payments.PaymentStatusConfirmed,
Timestamp: time.Now(),
},
}
require.NoError(t, sat.DB.StorjscanPayments().InsertBatch(ctx, cachedPayments))

paymentMethod := stripe1.MockInvoicesPayFailure
inv, err = stripeClient.Invoices().Pay(inv.ID, &stripe.InvoicePayParams{
Params: stripe.Params{Context: ctx},
PaymentMethod: &paymentMethod,
})
require.Error(t, err)
require.Equal(t, stripe.InvoiceStatusOpen, inv.Status)
storjscanCus, err := customerDB.GetCustomerID(ctx, storjscanUser.ID)
require.NoError(t, err)

failed, err := invoicesDB.ListFailed(ctx, nil)
require.NoError(t, err)
require.Equal(t, 1, len(failed))
invFound := false
for _, failedInv := range failed {
if failedInv.ID == inv.ID {
invFound = true
break
}
}
require.True(t, invFound)
item, err := stripeClient.InvoiceItems().New(&stripe.InvoiceItemParams{
Params: stripe.Params{Context: ctx},
Amount: &amount,
Currency: &curr,
Customer: &storjscanCus,
})
require.NoError(t, err)

chore.Loop.TriggerWait()
items := make([]*stripe.InvoiceUpcomingInvoiceItemParams, 0, 1)
items = append(items, &stripe.InvoiceUpcomingInvoiceItemParams{
InvoiceItem: &item.ID,
Amount: &amount,
Currency: &curr,
})
inv, err := stripeClient.Invoices().New(&stripe.InvoiceParams{
Params: stripe.Params{Context: ctx},
Customer: &storjscanCus,
InvoiceItems: items,
})
require.NoError(t, err)

// user should not be warned or frozen due to storjscan payments
freeze, warning, err := service.GetAll(ctx, storjscanUser.ID)
require.NoError(t, err)
require.Nil(t, warning)
require.Nil(t, freeze)
paymentMethod := stripe1.MockInvoicesPayFailure
inv, err = stripeClient.Invoices().Pay(inv.ID, &stripe.InvoicePayParams{
Params: stripe.Params{Context: ctx},
PaymentMethod: &paymentMethod,
})
require.Error(t, err)
require.Equal(t, stripe.InvoiceStatusOpen, inv.Status)

failed, err := invoicesDB.ListFailed(ctx, nil)
require.NoError(t, err)
require.Equal(t, 1, len(failed))
invFound := false
for _, failedInv := range failed {
if failedInv.ID == inv.ID {
invFound = true
break
}
}
require.True(t, invFound)

chore.Loop.TriggerWait()

// user should not be warned or frozen due to storjscan payments
freeze, warning, err := service.GetAll(ctx, storjscanUser.ID)
require.NoError(t, err)
require.Nil(t, warning)
require.Nil(t, freeze)
})
}

Expand Down
28 changes: 20 additions & 8 deletions satellite/payments/billing/chore.go
Expand Up @@ -22,6 +22,7 @@ type Observer interface {
// ChoreObservers holds functionality to process confirmed transactions using different types of observers.
type ChoreObservers struct {
UpgradeUser Observer
PayInvoices Observer
}

// ChoreErr is billing chore err class.
Expand Down Expand Up @@ -87,16 +88,21 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
break
}

if chore.observers.UpgradeUser == nil {
continue
if chore.observers.UpgradeUser != nil {
err = chore.observers.UpgradeUser.Process(ctx, transaction)
if err != nil {
// we don't want to halt storing transactions if upgrade user observer fails
// because this chore is designed to store new transactions.
// So auto upgrading user is a side effect which shouldn't interrupt the main process.
chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err)))
}
}

err = chore.observers.UpgradeUser.Process(ctx, transaction)
if err != nil {
// we don't want to halt storing transactions if upgrade user observer fails
// because this chore is designed to store new transactions.
// So auto upgrading user is a side effect which shouldn't interrupt the main process.
chore.log.Error("error upgrading user", zap.Error(ChoreErr.Wrap(err)))
if chore.observers.PayInvoices != nil {
err = chore.observers.PayInvoices.Process(ctx, transaction)
if err != nil {
chore.log.Error("error paying invoices", zap.Error(ChoreErr.Wrap(err)))
}
}
}
}
Expand All @@ -110,3 +116,9 @@ func (chore *Chore) Close() (err error) {
chore.TransactionCycle.Close()
return nil
}

// TestSetPaymentTypes is used in tests to change the payment
// types this chore tracks.
func (chore *Chore) TestSetPaymentTypes(types []PaymentType) {
chore.paymentTypes = types
}

0 comments on commit dcc4bd0

Please sign in to comment.