Skip to content

Commit

Permalink
satellite/satellitedb: use utilities for conversions
Browse files Browse the repository at this point in the history
This avoids some potential typos.

Change-Id: Icc5262e1f96fe220dd07212c00acacf6960ee909
  • Loading branch information
egonelbre committed Jun 8, 2023
1 parent 974b4f9 commit df53914
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 139 deletions.
31 changes: 9 additions & 22 deletions satellite/satellitedb/billingdb.go
Expand Up @@ -224,34 +224,21 @@ func (db billingDB) List(ctx context.Context, userID uuid.UUID) (txs []billing.T
return nil, Error.Wrap(err)
}

for _, dbxTX := range dbxTXs {
tx, err := fromDBXBillingTransaction(dbxTX)
if err != nil {
return nil, Error.Wrap(err)
}
txs = append(txs, *tx)
}

return txs, nil
txs, err = convertSlice(dbxTXs, fromDBXBillingTransaction)
return txs, Error.Wrap(err)
}

func (db billingDB) ListSource(ctx context.Context, userID uuid.UUID, txSource string) (txs []billing.Transaction, err error) {
defer mon.Task()(&ctx)(&err)
dbxTXs, err := db.db.All_BillingTransaction_By_UserId_And_Source_OrderBy_Desc_Timestamp(ctx,
dbx.BillingTransaction_UserId(userID[:]), dbx.BillingTransaction_Source(txSource))
dbx.BillingTransaction_UserId(userID[:]),
dbx.BillingTransaction_Source(txSource))
if err != nil {
return nil, Error.Wrap(err)
}

for _, dbxTX := range dbxTXs {
tx, err := fromDBXBillingTransaction(dbxTX)
if err != nil {
return nil, Error.Wrap(err)
}
txs = append(txs, *tx)
}

return txs, nil
txs, err = convertSlice(dbxTXs, fromDBXBillingTransaction)
return txs, Error.Wrap(err)
}

func (db billingDB) GetBalance(ctx context.Context, userID uuid.UUID) (_ currency.Amount, err error) {
Expand All @@ -269,12 +256,12 @@ func (db billingDB) GetBalance(ctx context.Context, userID uuid.UUID) (_ currenc
}

// fromDBXBillingTransaction converts *dbx.BillingTransaction to *billing.Transaction.
func fromDBXBillingTransaction(dbxTX *dbx.BillingTransaction) (*billing.Transaction, error) {
func fromDBXBillingTransaction(dbxTX *dbx.BillingTransaction) (billing.Transaction, error) {
userID, err := uuid.FromBytes(dbxTX.UserId)
if err != nil {
return nil, errs.Wrap(err)
return billing.Transaction{}, errs.Wrap(err)
}
return &billing.Transaction{
return billing.Transaction{
ID: dbxTX.Id,
UserID: userID,
Amount: currency.AmountFromBaseUnits(dbxTX.Amount, currency.USDollarsMicro),
Expand Down
21 changes: 6 additions & 15 deletions satellite/satellitedb/coinpaymentstxs.go
Expand Up @@ -53,17 +53,8 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid
return nil, err
}

var txs []stripe.Transaction
for _, dbxTX := range dbxTXs {
tx, err := fromDBXCoinpaymentsTransaction(dbxTX)
if err != nil {
return nil, errs.Wrap(err)
}

txs = append(txs, *tx)
}

return txs, nil
txs, err := convertSlice(dbxTXs, fromDBXCoinpaymentsTransaction)
return txs, Error.Wrap(err)
}

// TestInsert inserts new coinpayments transaction into DB.
Expand Down Expand Up @@ -126,17 +117,17 @@ func (db *coinPaymentsTransactions) TestLockRate(ctx context.Context, id coinpay
return Error.Wrap(err)
}

// fromDBXCoinpaymentsTransaction converts *dbx.CoinpaymentsTransaction to *stripecoinpayments.Transaction.
func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (*stripe.Transaction, error) {
// fromDBXCoinpaymentsTransaction converts *dbx.CoinpaymentsTransaction to stripecoinpayments.Transaction.
func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (stripe.Transaction, error) {
userID, err := uuid.FromBytes(dbxCPTX.UserId)
if err != nil {
return nil, errs.Wrap(err)
return stripe.Transaction{}, errs.Wrap(err)
}

// TODO: the currency here should be passed in to this function or stored
// in the database.

return &stripe.Transaction{
return stripe.Transaction{
ID: coinpayments.TransactionID(dbxCPTX.Id),
AccountID: userID,
Address: dbxCPTX.Address,
Expand Down
13 changes: 5 additions & 8 deletions satellite/satellitedb/projectinvitations.go
Expand Up @@ -204,12 +204,9 @@ func projectInvitationFromDBX(dbxInvite *dbx.ProjectInvitation) (_ *console.Proj
// projectInvitationSliceFromDBX converts a project member invitation slice from the database to a
// slice of console.ProjectInvitation.
func projectInvitationSliceFromDBX(dbxInvites []*dbx.ProjectInvitation) (invites []console.ProjectInvitation, err error) {
for _, dbxInvite := range dbxInvites {
invite, err := projectInvitationFromDBX(dbxInvite)
if err != nil {
return nil, err
}
invites = append(invites, *invite)
}
return invites, nil
return convertSlice(dbxInvites,
func(i *dbx.ProjectInvitation) (console.ProjectInvitation, error) {
r, err := projectInvitationFromDBX(i)
return *r, err
})
}
24 changes: 9 additions & 15 deletions satellite/satellitedb/projectmembers.go
Expand Up @@ -213,19 +213,13 @@ func sanitizeOrderDirectionName(pmo console.OrderDirection) string {
// projectMembersFromDbxSlice is used for creating []ProjectMember entities from autogenerated []*dbx.ProjectMember struct.
func projectMembersFromDbxSlice(ctx context.Context, projectMembersDbx []*dbx.ProjectMember) (_ []console.ProjectMember, err error) {
defer mon.Task()(&ctx)(&err)
var projectMembers []console.ProjectMember
var errors []error

// Generating []dbo from []dbx and collecting all errors
for _, projectMemberDbx := range projectMembersDbx {
projectMember, err := projectMemberFromDBX(ctx, projectMemberDbx)
if err != nil {
errors = append(errors, err)
continue
}

projectMembers = append(projectMembers, *projectMember)
}

return projectMembers, errs.Combine(errors...)
rs, errors := convertSliceWithErrors(projectMembersDbx,
func(v *dbx.ProjectMember) (r console.ProjectMember, _ error) {
p, err := projectMemberFromDBX(ctx, v)
if err != nil {
return r, err
}
return *p, err
})
return rs, errs.Combine(errors...)
}
22 changes: 8 additions & 14 deletions satellite/satellitedb/projects.go
Expand Up @@ -427,20 +427,14 @@ func projectFromDBX(ctx context.Context, project *dbx.Project) (_ *console.Proje
func projectsFromDbxSlice(ctx context.Context, projectsDbx []*dbx.Project) (_ []console.Project, err error) {
defer mon.Task()(&ctx)(&err)

var projects []console.Project
var errors []error

// Generating []dbo from []dbx and collecting all errors
for _, projectDbx := range projectsDbx {
project, err := projectFromDBX(ctx, projectDbx)
if err != nil {
errors = append(errors, err)
continue
}

projects = append(projects, *project)
}

projects, errors := convertSliceWithErrors(projectsDbx,
func(v *dbx.Project) (r console.Project, _ error) {
p, err := projectFromDBX(ctx, v)
if err != nil {
return r, err
}
return *p, nil
})
return projects, errs.Combine(errors...)
}

Expand Down
133 changes: 74 additions & 59 deletions satellite/satellitedb/storagenodeaccounting.go
Expand Up @@ -53,37 +53,21 @@ func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally ti
func (db *StoragenodeAccounting) GetTallies(ctx context.Context) (_ []*accounting.StoragenodeStorageTally, err error) {
defer mon.Task()(&ctx)(&err)
raws, err := db.db.All_StoragenodeStorageTally(ctx)
out := make([]*accounting.StoragenodeStorageTally, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeStorageTally{
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
}
if err != nil {
return nil, Error.Wrap(err)
}
out, err := convertSlice(raws, fromDBXStoragenodeStorageTally)
return out, Error.Wrap(err)
}

// GetTalliesSince retrieves all raw tallies since latestRollup.
func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) (_ []*accounting.StoragenodeStorageTally, err error) {
defer mon.Task()(&ctx)(&err)
raws, err := db.db.All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.StoragenodeStorageTally_IntervalEndTime(latestRollup))
out := make([]*accounting.StoragenodeStorageTally, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeStorageTally{
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
}
if err != nil {
return nil, Error.Wrap(err)
}
out, err := convertSlice(raws, fromDBXStoragenodeStorageTally)
return out, Error.Wrap(err)
}

Expand Down Expand Up @@ -132,16 +116,11 @@ func (db *StoragenodeAccounting) getBandwidthByNodeSince(ctx context.Context, la
}
cursor = next
for _, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
v, err := fromDBXStoragenodeBandwidthRollup(r)
if err != nil {
return Error.Wrap(err)
return err
}
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
})
err = cb(ctx, &v)
if err != nil {
return err
}
Expand Down Expand Up @@ -171,16 +150,11 @@ func (db *StoragenodeAccounting) getBandwidthPhase2ByNodeSince(ctx context.Conte
}
cursor = next
for _, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
v, err := fromDBXStoragenodeBandwidthRollupPhase2(r)
if err != nil {
return Error.Wrap(err)
}
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
})
err = cb(ctx, &v)
if err != nil {
return err
}
Expand Down Expand Up @@ -642,18 +616,13 @@ func (db *StoragenodeAccounting) GetRollupsSince(ctx context.Context, since time
return nil, Error.Wrap(err)
}
cursor = next
for _, dbxRollup := range dbxRollups {
id, err := storj.NodeIDFromBytes(dbxRollup.StoragenodeId)
if err != nil {
return nil, Error.Wrap(err)
}
bwRollups = append(bwRollups, accounting.StoragenodeBandwidthRollup{
NodeID: id,
IntervalStart: dbxRollup.IntervalStart,
Action: dbxRollup.Action,
Settled: dbxRollup.Settled,
})

rollups, err := convertSlice(dbxRollups, fromDBXStoragenodeBandwidthRollup)
if err != nil {
return nil, Error.Wrap(err)
}
bwRollups = append(bwRollups, rollups...)

if cursor == nil {
return bwRollups, nil
}
Expand All @@ -678,20 +647,66 @@ func (db *StoragenodeAccounting) GetArchivedRollupsSince(ctx context.Context, si
return nil, Error.Wrap(err)
}
cursor = next
for _, dbxRollup := range dbxRollups {
id, err := storj.NodeIDFromBytes(dbxRollup.StoragenodeId)
if err != nil {
return nil, Error.Wrap(err)
}
bwRollups = append(bwRollups, accounting.StoragenodeBandwidthRollup{
NodeID: id,
IntervalStart: dbxRollup.IntervalStart,
Action: dbxRollup.Action,
Settled: dbxRollup.Settled,
})

rollups, err := convertSlice(dbxRollups, fromDBXStoragenodeBandwidthRollupArchive)
if err != nil {
return nil, Error.Wrap(err)
}
bwRollups = append(bwRollups, rollups...)

if cursor == nil {
return bwRollups, nil
}
}
}

func fromDBXStoragenodeStorageTally(r *dbx.StoragenodeStorageTally) (*accounting.StoragenodeStorageTally, error) {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
return &accounting.StoragenodeStorageTally{
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
}, nil
}

func fromDBXStoragenodeBandwidthRollup(v *dbx.StoragenodeBandwidthRollup) (r accounting.StoragenodeBandwidthRollup, _ error) {
id, err := storj.NodeIDFromBytes(v.StoragenodeId)
if err != nil {
return r, Error.Wrap(err)
}
return accounting.StoragenodeBandwidthRollup{
NodeID: id,
IntervalStart: v.IntervalStart,
Action: v.Action,
Settled: v.Settled,
}, nil
}

func fromDBXStoragenodeBandwidthRollupPhase2(v *dbx.StoragenodeBandwidthRollupPhase2) (r accounting.StoragenodeBandwidthRollup, _ error) {
id, err := storj.NodeIDFromBytes(v.StoragenodeId)
if err != nil {
return r, Error.Wrap(err)
}
return accounting.StoragenodeBandwidthRollup{
NodeID: id,
IntervalStart: v.IntervalStart,
Action: v.Action,
Settled: v.Settled,
}, nil
}

func fromDBXStoragenodeBandwidthRollupArchive(v *dbx.StoragenodeBandwidthRollupArchive) (r accounting.StoragenodeBandwidthRollup, _ error) {
id, err := storj.NodeIDFromBytes(v.StoragenodeId)
if err != nil {
return r, Error.Wrap(err)
}
return accounting.StoragenodeBandwidthRollup{
NodeID: id,
IntervalStart: v.IntervalStart,
Action: v.Action,
Settled: v.Settled,
}, nil
}
8 changes: 2 additions & 6 deletions satellite/satellitedb/storjscanpayments.go
Expand Up @@ -126,12 +126,7 @@ func (storjscanPayments *storjscanPayments) ListWallet(ctx context.Context, wall
return nil, Error.Wrap(err)
}

var payments []storjscan.CachedPayment
for _, dbxPmnt := range dbxPmnts {
payments = append(payments, fromDBXPayment(dbxPmnt))
}

return payments, nil
return convertSliceNoError(dbxPmnts, fromDBXPayment), nil
}

// LastBlock returns the highest block known to DB.
Expand Down Expand Up @@ -160,6 +155,7 @@ func (storjscanPayments storjscanPayments) DeletePending(ctx context.Context) er
func (storjscanPayments storjscanPayments) ListConfirmed(ctx context.Context, blockNumber int64, logIndex int) (_ []storjscan.CachedPayment, err error) {
defer mon.Task()(&ctx)(&err)

// TODO: use DBX here
query := `SELECT block_hash, block_number, transaction, log_index, from_address, to_address, token_value, usd_value, status, timestamp
FROM storjscan_payments WHERE (storjscan_payments.block_number, storjscan_payments.log_index) > (?, ?) AND storjscan_payments.status = ?
ORDER BY storjscan_payments.block_number, storjscan_payments.log_index`
Expand Down

0 comments on commit df53914

Please sign in to comment.