Skip to content

Commit

Permalink
compensation: always generate invoices for every node
Browse files Browse the repository at this point in the history
instead of only generating invoices for nodes that had some
activity, we generate it for every node so that we can find
and pay terminal nodes that did not meet thresholds before
we recognized them as terminal.

Change-Id: Ibb3433e1b35f1ddcfbe292c034238c9fa1b66c44
  • Loading branch information
zeebo committed Mar 29, 2021
1 parent 035c393 commit a65aecf
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 47 deletions.
38 changes: 27 additions & 11 deletions cmd/satellite/compensation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"

"storj.io/common/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb"
)

Expand Down Expand Up @@ -48,36 +51,49 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io
return err
}

invoices := make([]compensation.Invoice, 0, len(periodUsage))
periodUsageByNode := make(map[storj.NodeID]accounting.StorageNodePeriodUsage, len(periodUsage))
for _, usage := range periodUsage {
totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, usage.NodeID)
periodUsageByNode[usage.NodeID] = usage
}

var allNodes []*overlay.NodeDossier
err = db.OverlayCache().IterateAllNodeDossiers(ctx,
func(ctx context.Context, node *overlay.NodeDossier) error {
allNodes = append(allNodes, node)
return nil
})
if err != nil {
return err
}

invoices := make([]compensation.Invoice, 0, len(allNodes))
for _, node := range allNodes {
totalAmounts, err := db.Compensation().QueryTotalAmounts(ctx, node.Id)
if err != nil {
return err
}

node, err := db.OverlayCache().Get(ctx, usage.NodeID)
if err != nil {
zap.L().Warn("failed to get node, skipping", zap.String("nodeID", usage.NodeID.String()), zap.Error(err))
continue
}
var gracefulExit *time.Time
if node.ExitStatus.ExitSuccess {
gracefulExit = node.ExitStatus.ExitFinishedAt
}
nodeAddress, _, err := net.SplitHostPort(node.Address.Address)
if err != nil {
return errs.New("unable to split node %q address %q", usage.NodeID, node.Address.Address)
return errs.New("unable to split node %q address %q", node.Id, node.Address.Address)
}
var nodeLastIP string
if node.LastIPPort != "" {
nodeLastIP, _, err = net.SplitHostPort(node.LastIPPort)
if err != nil {
return errs.New("unable to split node %q last ip:port %q", usage.NodeID, node.LastIPPort)
return errs.New("unable to split node %q last ip:port %q", node.Id, node.LastIPPort)
}
}

// the zero value of period usage is acceptable for if the node does not have
// any usage for the period.
usage := periodUsageByNode[node.Id]
nodeInfo := compensation.NodeInfo{
ID: usage.NodeID,
ID: node.Id,
CreatedAt: node.CreatedAt,
LastContactSuccess: node.Reputation.LastContactSuccess,
Disqualified: node.Disqualified,
Expand All @@ -96,7 +112,7 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io

invoice := compensation.Invoice{
Period: period,
NodeID: compensation.NodeID(usage.NodeID),
NodeID: compensation.NodeID(node.Id),
NodeWallet: node.Operator.Wallet,
NodeWalletFeatures: node.Operator.WalletFeatures,
NodeAddress: nodeAddress,
Expand Down
2 changes: 2 additions & 0 deletions satellite/overlay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type DB interface {

// IterateAllNodes will call cb on all known nodes (used in restore trash contexts).
IterateAllNodes(context.Context, func(context.Context, *SelectedNode) error) error
// IterateAllNodes will call cb on all known nodes (used for invoice generation).
IterateAllNodeDossiers(context.Context, func(context.Context, *NodeDossier) error) error
}

// NodeCheckInInfo contains all the info that will be updated when a node checkins.
Expand Down
10 changes: 7 additions & 3 deletions satellite/satellitedb/dbx/satellitedb.dbx
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ model node (
field vetted_at timestamp ( updatable, nullable )
field uptime_success_count int64 ( updatable, default 0 )
field total_uptime_count int64 ( updatable, default 0 )

field created_at timestamp ( autoinsert, default current_timestamp )
field updated_at timestamp ( autoinsert, autoupdate, default current_timestamp )
field last_contact_success timestamp ( updatable, default "epoch" )
Expand Down Expand Up @@ -192,9 +192,13 @@ read all (
select node.id
)

read paged (
select node
)

read all (
select node.id node.piece_count
where node.piece_count != 0
select node.id node.piece_count
where node.piece_count != 0
)

//--- audit history ---//
Expand Down
137 changes: 137 additions & 0 deletions satellite/satellitedb/dbx/satellitedb.dbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9146,6 +9146,11 @@ type Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation st
_set bool
}

type Paged_Node_Continuation struct {
_value_id []byte
_set bool
}

type Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation struct {
_value_storagenode_id []byte
_value_interval_start time.Time
Expand Down Expand Up @@ -10293,6 +10298,65 @@ func (obj *pgxImpl) All_Node_Id(ctx context.Context) (

}

func (obj *pgxImpl) Paged_Node(ctx context.Context,
limit int, start *Paged_Node_Continuation) (
rows []*Node, next *Paged_Node_Continuation, err error) {
defer mon.Task()(&ctx)(&err)

var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes WHERE (nodes.id) > ? ORDER BY nodes.id LIMIT ?")

var __embed_first_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes ORDER BY nodes.id LIMIT ?")

var __values []interface{}

var __stmt string
if start != nil && start._set {
__values = append(__values, start._value_id, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
} else {
__values = append(__values, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt)
}
obj.logStmt(__stmt, __values...)

for {
rows, next, err = func() (rows []*Node, next *Paged_Node_Continuation, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, nil, err
}
defer __rows.Close()

var __continuation Paged_Node_Continuation
__continuation._set = true

for __rows.Next() {
node := &Node{}
err = __rows.Scan(&node.Id, &node.Address, &node.LastNet, &node.LastIpPort, &node.Protocol, &node.Type, &node.Email, &node.Wallet, &node.WalletFeatures, &node.FreeDisk, &node.PieceCount, &node.Major, &node.Minor, &node.Patch, &node.Hash, &node.Timestamp, &node.Release, &node.Latency90, &node.AuditSuccessCount, &node.TotalAuditCount, &node.VettedAt, &node.UptimeSuccessCount, &node.TotalUptimeCount, &node.CreatedAt, &node.UpdatedAt, &node.LastContactSuccess, &node.LastContactFailure, &node.Contained, &node.Disqualified, &node.Suspended, &node.UnknownAuditSuspended, &node.OfflineSuspended, &node.UnderReview, &node.OnlineScore, &node.AuditReputationAlpha, &node.AuditReputationBeta, &node.UnknownAuditReputationAlpha, &node.UnknownAuditReputationBeta, &node.ExitInitiatedAt, &node.ExitLoopCompletedAt, &node.ExitFinishedAt, &node.ExitSuccess, &__continuation._value_id)
if err != nil {
return nil, nil, err
}
rows = append(rows, node)
next = &__continuation
}

if err := __rows.Err(); err != nil {
return nil, nil, err
}

return rows, next, nil
}()
if err != nil {
if obj.shouldRetry(err) {
continue
}
return nil, nil, obj.makeErr(err)
}
return rows, next, nil
}

}

func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -15727,6 +15791,65 @@ func (obj *pgxcockroachImpl) All_Node_Id(ctx context.Context) (

}

func (obj *pgxcockroachImpl) Paged_Node(ctx context.Context,
limit int, start *Paged_Node_Continuation) (
rows []*Node, next *Paged_Node_Continuation, err error) {
defer mon.Task()(&ctx)(&err)

var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes WHERE (nodes.id) > ? ORDER BY nodes.id LIMIT ?")

var __embed_first_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.address, nodes.last_net, nodes.last_ip_port, nodes.protocol, nodes.type, nodes.email, nodes.wallet, nodes.wallet_features, nodes.free_disk, nodes.piece_count, nodes.major, nodes.minor, nodes.patch, nodes.hash, nodes.timestamp, nodes.release, nodes.latency_90, nodes.audit_success_count, nodes.total_audit_count, nodes.vetted_at, nodes.uptime_success_count, nodes.total_uptime_count, nodes.created_at, nodes.updated_at, nodes.last_contact_success, nodes.last_contact_failure, nodes.contained, nodes.disqualified, nodes.suspended, nodes.unknown_audit_suspended, nodes.offline_suspended, nodes.under_review, nodes.online_score, nodes.audit_reputation_alpha, nodes.audit_reputation_beta, nodes.unknown_audit_reputation_alpha, nodes.unknown_audit_reputation_beta, nodes.exit_initiated_at, nodes.exit_loop_completed_at, nodes.exit_finished_at, nodes.exit_success, nodes.id FROM nodes ORDER BY nodes.id LIMIT ?")

var __values []interface{}

var __stmt string
if start != nil && start._set {
__values = append(__values, start._value_id, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
} else {
__values = append(__values, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_first_stmt)
}
obj.logStmt(__stmt, __values...)

for {
rows, next, err = func() (rows []*Node, next *Paged_Node_Continuation, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, nil, err
}
defer __rows.Close()

var __continuation Paged_Node_Continuation
__continuation._set = true

for __rows.Next() {
node := &Node{}
err = __rows.Scan(&node.Id, &node.Address, &node.LastNet, &node.LastIpPort, &node.Protocol, &node.Type, &node.Email, &node.Wallet, &node.WalletFeatures, &node.FreeDisk, &node.PieceCount, &node.Major, &node.Minor, &node.Patch, &node.Hash, &node.Timestamp, &node.Release, &node.Latency90, &node.AuditSuccessCount, &node.TotalAuditCount, &node.VettedAt, &node.UptimeSuccessCount, &node.TotalUptimeCount, &node.CreatedAt, &node.UpdatedAt, &node.LastContactSuccess, &node.LastContactFailure, &node.Contained, &node.Disqualified, &node.Suspended, &node.UnknownAuditSuspended, &node.OfflineSuspended, &node.UnderReview, &node.OnlineScore, &node.AuditReputationAlpha, &node.AuditReputationBeta, &node.UnknownAuditReputationAlpha, &node.UnknownAuditReputationBeta, &node.ExitInitiatedAt, &node.ExitLoopCompletedAt, &node.ExitFinishedAt, &node.ExitSuccess, &__continuation._value_id)
if err != nil {
return nil, nil, err
}
rows = append(rows, node)
next = &__continuation
}

if err := __rows.Err(); err != nil {
return nil, nil, err
}

return rows, next, nil
}()
if err != nil {
if obj.shouldRetry(err) {
continue
}
return nil, nil, obj.makeErr(err)
}
return rows, next, nil
}

}

func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -21263,6 +21386,16 @@ func (rx *Rx) Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx co
return tx.Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, bucket_bandwidth_rollup_interval_start_greater_or_equal, limit, start)
}

func (rx *Rx) Paged_Node(ctx context.Context,
limit int, start *Paged_Node_Continuation) (
rows []*Node, next *Paged_Node_Continuation, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Paged_Node(ctx, limit, start)
}

func (rx *Rx) Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_archive_interval_start_greater_or_equal StoragenodeBandwidthRollupArchive_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation) (
Expand Down Expand Up @@ -22074,6 +22207,10 @@ type Methods interface {
limit int, start *Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation) (
rows []*BucketBandwidthRollup, next *Paged_BucketBandwidthRollup_By_IntervalStart_GreaterOrEqual_Continuation, err error)

Paged_Node(ctx context.Context,
limit int, start *Paged_Node_Continuation) (
rows []*Node, next *Paged_Node_Continuation, err error)

Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual(ctx context.Context,
storagenode_bandwidth_rollup_archive_interval_start_greater_or_equal StoragenodeBandwidthRollupArchive_IntervalStart_Field,
limit int, start *Paged_StoragenodeBandwidthRollupArchive_By_IntervalStart_GreaterOrEqual_Continuation) (
Expand Down
4 changes: 2 additions & 2 deletions satellite/satellitedb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,10 @@ func (db *satelliteDB) PostgresMigration() *migrate.Migration {
exit_initiated_at timestamp with time zone,
exit_finished_at timestamp with time zone,
exit_success boolean NOT NULL DEFAULT FALSE,
last_ip_port text,
suspended timestamp with time zone,
unknown_audit_reputation_alpha double precision NOT NULL DEFAULT 1,
unknown_audit_reputation_beta double precision NOT NULL DEFAULT 0,
suspended timestamp with time zone,
last_ip_port text,
vetted_at timestamp with time zone,
PRIMARY KEY ( id )
);`,
Expand Down
30 changes: 30 additions & 0 deletions satellite/satellitedb/overlaycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1802,3 +1802,33 @@ func (cache *overlaycache) IterateAllNodes(ctx context.Context, cb func(context.

return rows.Err()
}

// IterateAllNodeDossiers will call cb on all known nodes (used for invoice generation).
func (cache *overlaycache) IterateAllNodeDossiers(ctx context.Context, cb func(context.Context, *overlay.NodeDossier) error) (err error) {
defer mon.Task()(&ctx)(&err)

const nodesPerPage = 1000
var cont *dbx.Paged_Node_Continuation
var dbxNodes []*dbx.Node

for {
dbxNodes, cont, err = cache.db.Paged_Node(ctx, nodesPerPage, cont)
if err != nil {
return err
}

for _, node := range dbxNodes {
dossier, err := convertDBNode(ctx, node)
if err != nil {
return err
}
if err := cb(ctx, dossier); err != nil {
return err
}
}

if cont == nil {
return nil
}
}
}

0 comments on commit a65aecf

Please sign in to comment.