Skip to content

Commit

Permalink
satellite/{db,analytics,payments}: add chore for auto account freeze
Browse files Browse the repository at this point in the history
This change adds a new chore that will check for failed invoices and
potentially freeze corresponding accounts.
It makes slight modifications to stripemock.go and invoices.go (adding
stripe CustomerID to the Invoice struct).

Issue: storj/storj-private#140

Change-Id: I161f4037881222003bd231559c75f43360509894
  • Loading branch information
wilfred-asomanii authored and Storj Robot committed Mar 1, 2023
1 parent 31ec4fa commit faeea88
Show file tree
Hide file tree
Showing 17 changed files with 734 additions and 3 deletions.
52 changes: 52 additions & 0 deletions satellite/analytics/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ const (
eventProjectDescriptionUpdated = "Project Description Updated"
eventProjectStorageLimitUpdated = "Project Storage Limit Updated"
eventProjectBandwidthLimitUpdated = "Project Bandwidth Limit Updated"
eventAccountFrozen = "Account Frozen"
eventAccountFreezeWarning = "Account Freeze Warning"
eventUnpaidLargeInvoice = "Large Invoice Unpaid"
)

var (
Expand Down Expand Up @@ -303,6 +306,55 @@ func (service *Service) TrackProjectCreated(userID uuid.UUID, email string, proj
})
}

// TrackAccountFrozen sends an account frozen event to Segment.
func (service *Service) TrackAccountFrozen(userID uuid.UUID, email string) {
if !service.config.Enabled {
return
}

props := segment.NewProperties()
props.Set("email", email)

service.enqueueMessage(segment.Track{
UserId: userID.String(),
Event: service.satelliteName + " " + eventAccountFrozen,
Properties: props,
})
}

// TrackAccountFreezeWarning sends an account freeze warning event to Segment.
func (service *Service) TrackAccountFreezeWarning(userID uuid.UUID, email string) {
if !service.config.Enabled {
return
}

props := segment.NewProperties()
props.Set("email", email)

service.enqueueMessage(segment.Track{
UserId: userID.String(),
Event: service.satelliteName + " " + eventAccountFreezeWarning,
Properties: props,
})
}

// TrackLargeUnpaidInvoice sends an event to Segment indicating that a user has not paid a large invoice.
func (service *Service) TrackLargeUnpaidInvoice(invID string, userID uuid.UUID, email string) {
if !service.config.Enabled {
return
}

props := segment.NewProperties()
props.Set("email", email)
props.Set("invoice", invID)

service.enqueueMessage(segment.Track{
UserId: userID.String(),
Event: service.satelliteName + " " + eventUnpaidLargeInvoice,
Properties: props,
})
}

// TrackAccessGrantCreated sends an "Access Grant Created" event to Segment.
func (service *Service) TrackAccessGrantCreated(userID uuid.UUID, email string) {
if !service.config.Enabled {
Expand Down
26 changes: 26 additions & 0 deletions satellite/console/accountfreezes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type AccountFreezeEvents interface {
Upsert(ctx context.Context, event *AccountFreezeEvent) (*AccountFreezeEvent, error)
// Get is a method for querying account freeze event from the database by user ID and event type.
Get(ctx context.Context, userID uuid.UUID, eventType AccountFreezeEventType) (*AccountFreezeEvent, error)
// GetAll is a method for querying all account freeze events from the database by user ID.
GetAll(ctx context.Context, userID uuid.UUID) (*AccountFreezeEvent, *AccountFreezeEvent, error)
// DeleteAllByUserID is a method for deleting all account freeze events from the database by user ID.
DeleteAllByUserID(ctx context.Context, userID uuid.UUID) error
}
Expand Down Expand Up @@ -189,3 +191,27 @@ func (s *AccountFreezeService) UnfreezeUser(ctx context.Context, userID uuid.UUI

return ErrAccountFreeze.Wrap(s.freezeEventsDB.DeleteAllByUserID(ctx, userID))
}

// WarnUser adds a warning event to the freeze events table.
func (s *AccountFreezeService) WarnUser(ctx context.Context, userID uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err)

_, err = s.freezeEventsDB.Upsert(ctx, &AccountFreezeEvent{
UserID: userID,
Type: Warning,
})

return ErrAccountFreeze.Wrap(err)
}

// GetAll returns all events for a user.
func (s *AccountFreezeService) GetAll(ctx context.Context, userID uuid.UUID) (freeze *AccountFreezeEvent, warning *AccountFreezeEvent, err error) {
defer mon.Task()(&ctx)(&err)

freeze, warning, err = s.freezeEventsDB.GetAll(ctx, userID)
if err != nil {
return nil, nil, ErrAccountFreeze.Wrap(err)
}

return freeze, warning, nil
}
24 changes: 24 additions & 0 deletions satellite/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/rolluparchive"
"storj.io/storj/satellite/accounting/tally"
"storj.io/storj/satellite/analytics"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleauth"
"storj.io/storj/satellite/console/emailreminders"
"storj.io/storj/satellite/gracefulexit"
Expand All @@ -46,6 +48,7 @@ import (
"storj.io/storj/satellite/overlay/offlinenodes"
"storj.io/storj/satellite/overlay/straynodes"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/accountfreeze"
"storj.io/storj/satellite/payments/billing"
"storj.io/storj/satellite/payments/storjscan"
"storj.io/storj/satellite/payments/stripecoinpayments"
Expand Down Expand Up @@ -143,6 +146,7 @@ type Core struct {
}

Payments struct {
AccountFreeze *accountfreeze.Chore
Accounts payments.Accounts
BillingChore *billing.Chore
StorjscanClient *storjscan.Client
Expand Down Expand Up @@ -612,6 +616,26 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
})
}

{ // setup account freeze
if config.AccountFreeze.Enabled {
peer.Payments.AccountFreeze = accountfreeze.NewChore(
peer.Log.Named("payments.accountfreeze:chore"),
peer.DB.StripeCoinPayments(),
peer.Payments.Accounts,
peer.DB.Console().Users(),
console.NewAccountFreezeService(db.Console().AccountFreezeEvents(), db.Console().Users(), db.Console().Projects()),
analytics.NewService(peer.Log.Named("analytics:service"), config.Analytics, config.Console.SatelliteName),
config.AccountFreeze,
)

peer.Services.Add(lifecycle.Item{
Name: "accountfreeze:chore",
Run: peer.Payments.AccountFreeze.Run,
Close: peer.Payments.AccountFreeze.Close,
})
}
}

{ // setup graceful exit
log := peer.Log.Named("gracefulexit")
switch {
Expand Down
135 changes: 135 additions & 0 deletions satellite/payments/accountfreeze/chore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.

package accountfreeze

import (
"context"
"time"

"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"

"storj.io/common/sync2"
"storj.io/storj/satellite/analytics"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/stripecoinpayments"
)

var (
// Error is the standard error class for automatic freeze errors.
Error = errs.Class("account-freeze-chore")
mon = monkit.Package()
)

// Config contains configurable values for account freeze chore.
type Config struct {
Enabled bool `help:"whether to run this chore." default:"false"`
Interval time.Duration `help:"How often to run this chore, which is how often unpaid invoices are checked." default:"24h"`
GracePeriod time.Duration `help:"How long to wait between a warning event and freezing an account." default:"720h"`
PriceThreshold int64 `help:"The failed invoice amount beyond which an account will not be frozen" default:"2000"`
}

// Chore is a chore that checks for unpaid invoices and potentially freezes corresponding accounts.
type Chore struct {
log *zap.Logger
freezeService *console.AccountFreezeService
analytics *analytics.Service
usersDB console.Users
payments payments.Accounts
accounts stripecoinpayments.DB
config Config
nowFn func() time.Time
Loop *sync2.Cycle
}

// NewChore is a constructor for Chore.
func NewChore(log *zap.Logger, accounts stripecoinpayments.DB, payments payments.Accounts, usersDB console.Users, freezeService *console.AccountFreezeService, analytics *analytics.Service, config Config) *Chore {
return &Chore{
log: log,
freezeService: freezeService,
analytics: analytics,
usersDB: usersDB,
accounts: accounts,
config: config,
payments: payments,
nowFn: time.Now,
Loop: sync2.NewCycle(config.Interval),
}
}

// Run runs the chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {

invoices, err := chore.payments.Invoices().ListFailed(ctx)
if err != nil {
chore.log.Error("Could not list invoices", zap.Error(Error.Wrap(err)))
return nil
}

for _, invoice := range invoices {
userID, err := chore.accounts.Customers().GetUserID(ctx, invoice.CustomerID)
if err != nil {
chore.log.Error("Could not get userID", zap.String("invoice", invoice.ID), zap.Error(Error.Wrap(err)))
continue
}

user, err := chore.usersDB.Get(ctx, userID)
if err != nil {
chore.log.Error("Could not get user", zap.String("invoice", invoice.ID), zap.Error(Error.Wrap(err)))
continue
}

if invoice.Amount > chore.config.PriceThreshold {
chore.analytics.TrackLargeUnpaidInvoice(invoice.ID, userID, user.Email)
continue
}

freeze, warning, err := chore.freezeService.GetAll(ctx, userID)
if err != nil {
chore.log.Error("Could not check freeze status", zap.String("invoice", invoice.ID), zap.Error(Error.Wrap(err)))
continue
}
if freeze != nil {
// account already frozen
continue
}

if warning == nil {
err = chore.freezeService.WarnUser(ctx, userID)
if err != nil {
chore.log.Error("Could not add warning event", zap.String("invoice", invoice.ID), zap.Error(Error.Wrap(err)))
continue
}
chore.analytics.TrackAccountFreezeWarning(userID, user.Email)
continue
}

if chore.nowFn().Sub(warning.CreatedAt) > chore.config.GracePeriod {
err = chore.freezeService.FreezeUser(ctx, userID)
if err != nil {
chore.log.Error("Could not freeze account", zap.String("invoice", invoice.ID), zap.Error(Error.Wrap(err)))
continue
}
chore.analytics.TrackAccountFrozen(userID, user.Email)
}
}

return nil
})
}

// TestSetNow sets nowFn on chore for testing.
func (chore *Chore) TestSetNow(f func() time.Time) {
chore.nowFn = f
}

// Close closes the chore.
func (chore *Chore) Close() error {
chore.Loop.Close()
return nil
}
Loading

0 comments on commit faeea88

Please sign in to comment.