From ac716e151457fec04cf63f323767cee086600f00 Mon Sep 17 00:00:00 2001 From: Qweder93 Date: Thu, 2 Jul 2020 16:54:32 +0300 Subject: [PATCH] storagenode/heldamount: payment receipt added to monthly paystub, heldamount.service separated for service and endpoint Change-Id: Id759586c6362edbef34c230d4f0d2585c11c9b47 --- storagenode/console/consoleapi/heldamount.go | 8 +- .../console/consoleapi/storagenode_test.go | 28 +-- storagenode/console/service.go | 14 +- storagenode/heldamount/db_test.go | 58 +---- storagenode/heldamount/endpoint.go | 236 ++++++++++++++++++ storagenode/heldamount/heldamount.go | 19 +- storagenode/heldamount/service.go | 158 +----------- storagenode/nodestats/cache.go | 55 ++-- storagenode/peer.go | 8 +- storagenode/storagenodedb/heldamount.go | 99 ++------ 10 files changed, 350 insertions(+), 333 deletions(-) create mode 100644 storagenode/heldamount/endpoint.go diff --git a/storagenode/console/consoleapi/heldamount.go b/storagenode/console/consoleapi/heldamount.go index 5eeb3662b909..44f62dded303 100644 --- a/storagenode/console/consoleapi/heldamount.go +++ b/storagenode/console/consoleapi/heldamount.go @@ -52,7 +52,7 @@ func (heldAmount *HeldAmount) PayStubMonthly(w http.ResponseWriter, r *http.Requ id := queryParams.Get("id") if id == "" { - payStubs, err := heldAmount.service.AllPayStubsMonthlyCached(ctx, period) + payStubs, err := heldAmount.service.AllPayStubsMonthly(ctx, period) if err != nil { heldAmount.serveJSONError(w, http.StatusInternalServerError, ErrHeldAmountAPI.Wrap(err)) return @@ -69,7 +69,7 @@ func (heldAmount *HeldAmount) PayStubMonthly(w http.ResponseWriter, r *http.Requ return } - payStub, err := heldAmount.service.SatellitePayStubMonthlyCached(ctx, satelliteID, period) + payStub, err := heldAmount.service.SatellitePayStubMonthly(ctx, satelliteID, period) if err != nil { if heldamount.ErrNoPayStubForPeriod.Has(err) { heldAmount.serveJSONError(w, http.StatusNotFound, ErrHeldAmountAPI.Wrap(err)) @@ -112,7 +112,7 @@ func (heldAmount *HeldAmount) PayStubPeriod(w http.ResponseWriter, r *http.Reque id := queryParams.Get("id") if id == "" { - payStubs, err := heldAmount.service.AllPayStubsPeriodCached(ctx, start, end) + payStubs, err := heldAmount.service.AllPayStubsPeriod(ctx, start, end) if err != nil { if heldamount.ErrBadPeriod.Has(err) { heldAmount.serveJSONError(w, http.StatusBadRequest, ErrHeldAmountAPI.Wrap(err)) @@ -134,7 +134,7 @@ func (heldAmount *HeldAmount) PayStubPeriod(w http.ResponseWriter, r *http.Reque return } - payStubs, err := heldAmount.service.SatellitePayStubPeriodCached(ctx, satelliteID, start, end) + payStubs, err := heldAmount.service.SatellitePayStubPeriod(ctx, satelliteID, start, end) if err != nil { if heldamount.ErrBadPeriod.Has(err) { heldAmount.serveJSONError(w, http.StatusBadRequest, ErrHeldAmountAPI.Wrap(err)) diff --git a/storagenode/console/consoleapi/storagenode_test.go b/storagenode/console/consoleapi/storagenode_test.go index b0485a4cd781..d87a067ef27d 100644 --- a/storagenode/console/consoleapi/storagenode_test.go +++ b/storagenode/console/consoleapi/storagenode_test.go @@ -150,13 +150,13 @@ func TestStorageNodeApi(t *testing.T) { CurrentMonthEstimatedAmount: estimated1 + estimated2, CurrentMonthHeld: int64(sum1 + sum2 - sum1AfterHeld - sum2AfterHeld), PreviousMonthPayout: heldamount.PayoutMonthly{ - EgressBandwidth: 0, - EgressPayout: 0, - EgressRepairAudit: 0, - RepairAuditPayout: 0, - DiskSpace: 0, - DiskSpaceAmount: 0, - HeldPercentRate: 0, + EgressBandwidth: 0, + EgressBandwidthPayout: 0, + EgressRepairAudit: 0, + EgressRepairAuditPayout: 0, + DiskSpace: 0, + DiskSpacePayout: 0, + HeldRate: 0, }, }) require.NoError(t, err) @@ -180,13 +180,13 @@ func TestStorageNodeApi(t *testing.T) { CurrentMonthEstimatedAmount: estimated1, CurrentMonthHeld: int64(sum1 - sum1AfterHeld), PreviousMonthPayout: heldamount.PayoutMonthly{ - EgressBandwidth: 0, - EgressPayout: 0, - EgressRepairAudit: 0, - RepairAuditPayout: 0, - DiskSpace: 0, - DiskSpaceAmount: 0, - HeldPercentRate: 75, + EgressBandwidth: 0, + EgressBandwidthPayout: 0, + EgressRepairAudit: 0, + EgressRepairAuditPayout: 0, + DiskSpace: 0, + DiskSpacePayout: 0, + HeldRate: 75, }, }) require.NoError(t, err) diff --git a/storagenode/console/service.go b/storagenode/console/service.go index 161417c1ba67..32e7bc1c0de3 100644 --- a/storagenode/console/service.go +++ b/storagenode/console/service.go @@ -407,11 +407,11 @@ func (s *Service) GetAllSatellitesEstimatedPayout(ctx context.Context) (payout h payout.CurrentMonthEstimatedAmount += current payout.CurrentMonthHeld += held - payout.PreviousMonthPayout.DiskSpaceAmount += previous.DiskSpaceAmount + payout.PreviousMonthPayout.DiskSpacePayout += previous.DiskSpacePayout payout.PreviousMonthPayout.DiskSpace += previous.DiskSpace payout.PreviousMonthPayout.EgressBandwidth += previous.EgressBandwidth - payout.PreviousMonthPayout.EgressPayout += previous.EgressPayout - payout.PreviousMonthPayout.RepairAuditPayout += previous.RepairAuditPayout + payout.PreviousMonthPayout.EgressBandwidthPayout += previous.EgressBandwidthPayout + payout.PreviousMonthPayout.EgressRepairAuditPayout += previous.EgressRepairAuditPayout payout.PreviousMonthPayout.EgressRepairAudit += previous.EgressRepairAudit } @@ -487,7 +487,7 @@ func (s *Service) estimatedPayoutPreviousMonth(ctx context.Context, satelliteID } heldRate := s.getHeldRate(stats.JoinedAt) - payoutData.HeldPercentRate = heldRate + payoutData.HeldRate = heldRate bandwidthDaily, err := s.bandwidthDB.GetDailySatelliteRollups(ctx, satelliteID, from, to) if err != nil { @@ -497,10 +497,10 @@ func (s *Service) estimatedPayoutPreviousMonth(ctx context.Context, satelliteID for i := 0; i < len(bandwidthDaily); i++ { payoutData.EgressBandwidth += bandwidthDaily[i].Egress.Usage usagePayout := float64(bandwidthDaily[i].Egress.Usage*priceModel.EgressBandwidth*heldRate/100) / math.Pow10(12) - payoutData.EgressPayout += int64(usagePayout) + payoutData.EgressBandwidthPayout += int64(usagePayout) payoutData.EgressRepairAudit += bandwidthDaily[i].Egress.Audit + bandwidthDaily[i].Egress.Repair repairAuditPayout := float64((bandwidthDaily[i].Egress.Audit*priceModel.AuditBandwidth+bandwidthDaily[i].Egress.Repair*priceModel.RepairBandwidth)*heldRate/100) / math.Pow10(12) - payoutData.RepairAuditPayout += int64(repairAuditPayout) + payoutData.EgressRepairAuditPayout += int64(repairAuditPayout) } storageDaily, err := s.storageUsageDB.GetDaily(ctx, satelliteID, from, to) @@ -510,7 +510,7 @@ func (s *Service) estimatedPayoutPreviousMonth(ctx context.Context, satelliteID for j := 0; j < len(storageDaily); j++ { payoutData.DiskSpace += storageDaily[j].AtRestTotal - payoutData.DiskSpaceAmount += int64(storageDaily[j].AtRestTotal / 730 / math.Pow10(12) * float64(priceModel.DiskSpace*heldRate/100)) + payoutData.DiskSpacePayout += int64(storageDaily[j].AtRestTotal / 730 / math.Pow10(12) * float64(priceModel.DiskSpace*heldRate/100)) } return payoutData, nil diff --git a/storagenode/heldamount/db_test.go b/storagenode/heldamount/db_test.go index 94d44e00a19d..931e0598b7ea 100644 --- a/storagenode/heldamount/db_test.go +++ b/storagenode/heldamount/db_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "storj.io/common/rpc" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/storj/storagenode" @@ -138,45 +137,6 @@ func TestHeldAmountDB(t *testing.T) { assert.NoError(t, err) }) - t.Run("Test GetPayment", func(t *testing.T) { - paym, err := heldAmount.GetPayment(ctx, satelliteID, period) - assert.NoError(t, err) - assert.Equal(t, paym.Created, payment.Created) - assert.Equal(t, paym.SatelliteID, payment.SatelliteID) - assert.Equal(t, paym.Period, payment.Period) - assert.Equal(t, paym.ID, payment.ID) - assert.Equal(t, paym.Amount, payment.Amount) - assert.Equal(t, paym.Notes, payment.Notes) - assert.Equal(t, paym.Receipt, payment.Receipt) - - paym, err = heldAmount.GetPayment(ctx, satelliteID, "") - assert.Error(t, err) - assert.Equal(t, true, heldamount.ErrNoPayStubForPeriod.Has(err)) - assert.Nil(t, paym) - - paym, err = heldAmount.GetPayment(ctx, storj.NodeID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, period) - assert.Error(t, err) - assert.Equal(t, true, heldamount.ErrNoPayStubForPeriod.Has(err)) - assert.Nil(t, paym) - }) - - t.Run("Test StorePayment", func(t *testing.T) { - payments, err := heldAmount.AllPayments(ctx, period) - assert.NoError(t, err) - assert.Equal(t, 1, len(payments)) - assert.Equal(t, payments[0].Created, payment.Created) - assert.Equal(t, payments[0].SatelliteID, payment.SatelliteID) - assert.Equal(t, payments[0].Period, payment.Period) - assert.Equal(t, payments[0].ID, payment.ID) - assert.Equal(t, payments[0].Amount, payment.Amount) - assert.Equal(t, payments[0].Notes, payment.Notes) - assert.Equal(t, payments[0].Receipt, payment.Receipt) - - payments, err = heldAmount.AllPayments(ctx, "") - assert.NoError(t, err) - assert.Equal(t, len(payments), 0) - }) - t.Run("Test SatellitesHeldbackHistory", func(t *testing.T) { heldback, err := heldAmount.SatellitesHeldbackHistory(ctx, satelliteID) assert.NoError(t, err) @@ -233,7 +193,7 @@ func TestSatellitePayStubPeriodCached(t *testing.T) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { heldAmountDB := db.HeldAmount() reputationDB := db.Reputation() - service := heldamount.NewService(nil, heldAmountDB, reputationDB, rpc.Dialer{}, nil) + service := heldamount.NewService(nil, heldAmountDB, reputationDB, nil) payStub := heldamount.PayStub{ SatelliteID: storj.NodeID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, @@ -264,15 +224,15 @@ func TestSatellitePayStubPeriodCached(t *testing.T) { require.NoError(t, err) } - payStubs, err := service.SatellitePayStubPeriodCached(ctx, payStub.SatelliteID, "2020-01", "2020-03") + payStubs, err := service.SatellitePayStubPeriod(ctx, payStub.SatelliteID, "2020-01", "2020-03") require.NoError(t, err) require.Equal(t, 3, len(payStubs)) - payStubs, err = service.SatellitePayStubPeriodCached(ctx, payStub.SatelliteID, "2019-01", "2021-03") + payStubs, err = service.SatellitePayStubPeriod(ctx, payStub.SatelliteID, "2019-01", "2021-03") require.NoError(t, err) require.Equal(t, 3, len(payStubs)) - payStubs, err = service.SatellitePayStubPeriodCached(ctx, payStub.SatelliteID, "2019-01", "2020-01") + payStubs, err = service.SatellitePayStubPeriod(ctx, payStub.SatelliteID, "2019-01", "2020-01") require.NoError(t, err) require.Equal(t, 1, len(payStubs)) }) @@ -282,7 +242,7 @@ func TestAllPayStubPeriodCached(t *testing.T) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { heldAmountDB := db.HeldAmount() reputationDB := db.Reputation() - service := heldamount.NewService(nil, heldAmountDB, reputationDB, rpc.Dialer{}, nil) + service := heldamount.NewService(nil, heldAmountDB, reputationDB, nil) payStub := heldamount.PayStub{ SatelliteID: storj.NodeID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, @@ -316,19 +276,19 @@ func TestAllPayStubPeriodCached(t *testing.T) { } } - payStubs, err := service.AllPayStubsPeriodCached(ctx, "2020-01", "2020-03") + payStubs, err := service.AllPayStubsPeriod(ctx, "2020-01", "2020-03") require.NoError(t, err) require.Equal(t, 9, len(payStubs)) - payStubs, err = service.AllPayStubsPeriodCached(ctx, "2019-01", "2021-03") + payStubs, err = service.AllPayStubsPeriod(ctx, "2019-01", "2021-03") require.NoError(t, err) require.Equal(t, 9, len(payStubs)) - payStubs, err = service.AllPayStubsPeriodCached(ctx, "2019-01", "2020-01") + payStubs, err = service.AllPayStubsPeriod(ctx, "2019-01", "2020-01") require.NoError(t, err) require.Equal(t, 3, len(payStubs)) - payStubs, err = service.AllPayStubsPeriodCached(ctx, "2019-01", "2019-01") + payStubs, err = service.AllPayStubsPeriod(ctx, "2019-01", "2019-01") require.NoError(t, err) require.Equal(t, 0, len(payStubs)) }) diff --git a/storagenode/heldamount/endpoint.go b/storagenode/heldamount/endpoint.go new file mode 100644 index 000000000000..c7bf121f67e2 --- /dev/null +++ b/storagenode/heldamount/endpoint.go @@ -0,0 +1,236 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package heldamount + +import ( + "context" + + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/pb" + "storj.io/common/rpc" + "storj.io/common/rpc/rpcstatus" + "storj.io/common/storj" + "storj.io/storj/private/date" + "storj.io/storj/storagenode/trust" +) + +// Client encapsulates HeldAmountClient with underlying connection. +// +// architecture: Client +type Client struct { + conn *rpc.Conn + pb.DRPCHeldAmountClient +} + +// Close closes underlying client connection. +func (c *Client) Close() error { + return c.conn.Close() +} + +// Endpoint retrieves info from satellites using an rpc client. +// +// architecture: Endpoint +type Endpoint struct { + log *zap.Logger + + dialer rpc.Dialer + trust *trust.Pool +} + +// NewEndpoint creates new instance of endpoint. +func NewEndpoint(log *zap.Logger, dialer rpc.Dialer, trust *trust.Pool) *Endpoint { + return &Endpoint{ + log: log, + dialer: dialer, + trust: trust, + } +} + +// GetPaystub retrieves held amount for particular satellite from satellite using RPC. +func (endpoint *Endpoint) GetPaystub(ctx context.Context, satelliteID storj.NodeID, period string) (_ *PayStub, err error) { + defer mon.Task()(&ctx)(&err) + + client, err := endpoint.dial(ctx, satelliteID) + if err != nil { + return nil, ErrHeldAmountService.Wrap(err) + } + defer func() { err = errs.Combine(err, client.Close()) }() + + requestedPeriod, err := date.PeriodToTime(period) + if err != nil { + return nil, ErrHeldAmountService.Wrap(err) + } + + resp, err := client.GetPayStub(ctx, &pb.GetHeldAmountRequest{Period: requestedPeriod}) + if err != nil { + if rpcstatus.Code(err) == rpcstatus.OutOfRange { + return nil, ErrNoPayStubForPeriod.Wrap(err) + } + + return nil, ErrHeldAmountService.Wrap(err) + } + + return &PayStub{ + Period: period[0:7], + SatelliteID: satelliteID, + Created: resp.CreatedAt, + Codes: resp.Codes, + UsageAtRest: resp.UsageAtRest, + UsageGet: resp.UsageGet, + UsagePut: resp.UsagePut, + UsageGetRepair: resp.UsageGetRepair, + UsagePutRepair: resp.UsagePutRepair, + UsageGetAudit: resp.UsageGetAudit, + CompAtRest: resp.CompAtRest, + CompGet: resp.CompGet, + CompPut: resp.CompPut, + CompGetRepair: resp.CompGetRepair, + CompPutRepair: resp.CompPutRepair, + CompGetAudit: resp.CompGetAudit, + SurgePercent: resp.SurgePercent, + Held: resp.Held, + Owed: resp.Owed, + Disposed: resp.Disposed, + Paid: resp.Paid, + }, nil +} + +// GetAllPaystubs retrieves all paystubs for particular satellite. +func (endpoint *Endpoint) GetAllPaystubs(ctx context.Context, satelliteID storj.NodeID) (_ []PayStub, err error) { + defer mon.Task()(&ctx)(&err) + + client, err := endpoint.dial(ctx, satelliteID) + if err != nil { + return nil, ErrHeldAmountService.Wrap(err) + } + defer func() { err = errs.Combine(err, client.Close()) }() + + resp, err := client.GetAllPaystubs(ctx, &pb.GetAllPaystubsRequest{}) + if err != nil { + return nil, ErrHeldAmountService.Wrap(err) + } + + var payStubs []PayStub + + for i := 0; i < len(resp.Paystub); i++ { + paystub := PayStub{ + Period: resp.Paystub[i].Period.String()[0:7], + SatelliteID: satelliteID, + Created: resp.Paystub[i].CreatedAt, + Codes: resp.Paystub[i].Codes, + UsageAtRest: resp.Paystub[i].UsageAtRest, + UsageGet: resp.Paystub[i].UsageGet, + UsagePut: resp.Paystub[i].UsagePut, + UsageGetRepair: resp.Paystub[i].UsageGetRepair, + UsagePutRepair: resp.Paystub[i].UsagePutRepair, + UsageGetAudit: resp.Paystub[i].UsageGetAudit, + CompAtRest: resp.Paystub[i].CompAtRest, + CompGet: resp.Paystub[i].CompGet, + CompPut: resp.Paystub[i].CompPut, + CompGetRepair: resp.Paystub[i].CompGetRepair, + CompPutRepair: resp.Paystub[i].CompPutRepair, + CompGetAudit: resp.Paystub[i].CompGetAudit, + SurgePercent: resp.Paystub[i].SurgePercent, + Held: resp.Paystub[i].Held, + Owed: resp.Paystub[i].Owed, + Disposed: resp.Paystub[i].Disposed, + Paid: resp.Paystub[i].Paid, + } + + payStubs = append(payStubs, paystub) + } + + return payStubs, nil +} + +// GetPayment retrieves payment data from particular satellite using grpc. +func (endpoint *Endpoint) GetPayment(ctx context.Context, satelliteID storj.NodeID, period string) (_ *Payment, err error) { + defer mon.Task()(&ctx)(&err) + + client, err := endpoint.dial(ctx, satelliteID) + if err != nil { + return nil, ErrHeldAmountService.Wrap(err) + } + defer func() { err = errs.Combine(err, client.Close()) }() + + requestedPeriod, err := date.PeriodToTime(period) + if err != nil { + return nil, ErrHeldAmountService.Wrap(err) + } + + resp, err := client.GetPayment(ctx, &pb.GetPaymentRequest{Period: requestedPeriod}) + if err != nil { + if rpcstatus.Code(err) == rpcstatus.OutOfRange { + return nil, nil + } + + return nil, ErrHeldAmountService.Wrap(err) + } + + return &Payment{ + ID: resp.Id, + Created: resp.CreatedAt, + SatelliteID: satelliteID, + Period: period[0:7], + Amount: resp.Amount, + Receipt: resp.Receipt, + Notes: resp.Notes, + }, nil +} + +// GetAllPayments retrieves all payments for particular satellite. +func (endpoint *Endpoint) GetAllPayments(ctx context.Context, satelliteID storj.NodeID) (_ []Payment, err error) { + defer mon.Task()(&ctx)(&err) + + client, err := endpoint.dial(ctx, satelliteID) + if err != nil { + return nil, ErrHeldAmountService.Wrap(err) + } + defer func() { err = errs.Combine(err, client.Close()) }() + + resp, err := client.GetAllPayments(ctx, &pb.GetAllPaymentsRequest{}) + if err != nil { + return nil, ErrHeldAmountService.Wrap(err) + } + + var payments []Payment + + for i := 0; i < len(resp.Payment); i++ { + payment := Payment{ + ID: resp.Payment[i].Id, + Created: resp.Payment[i].CreatedAt, + SatelliteID: satelliteID, + Period: resp.Payment[i].Period.String()[0:7], + Amount: resp.Payment[i].Amount, + Receipt: resp.Payment[i].Receipt, + Notes: resp.Payment[i].Notes, + } + + payments = append(payments, payment) + } + + return payments, nil +} + +// dial dials the HeldAmount client for the satellite by id +func (endpoint *Endpoint) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) { + defer mon.Task()(&ctx)(&err) + + nodeurl, err := endpoint.trust.GetNodeURL(ctx, satelliteID) + if err != nil { + return nil, errs.New("unable to find satellite %s: %w", satelliteID, err) + } + + conn, err := endpoint.dialer.DialNodeURL(ctx, nodeurl) + if err != nil { + return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err) + } + + return &Client{ + conn: conn, + DRPCHeldAmountClient: pb.NewDRPCHeldAmountClient(conn), + }, nil +} diff --git a/storagenode/heldamount/heldamount.go b/storagenode/heldamount/heldamount.go index 4cd2743e617a..3e4709211315 100644 --- a/storagenode/heldamount/heldamount.go +++ b/storagenode/heldamount/heldamount.go @@ -32,10 +32,6 @@ type DB interface { AllPeriods(ctx context.Context) ([]string, error) // StorePayment inserts or updates payment into the DB StorePayment(ctx context.Context, payment Payment) error - // GetPayment retrieves payment stats for specific satellite in specific period. - GetPayment(ctx context.Context, satelliteID storj.NodeID, period string) (*Payment, error) - // AllPayments retrieves payment stats from all satellites in specific period from DB. - AllPayments(ctx context.Context, period string) ([]Payment, error) } // ErrNoPayStubForPeriod represents errors from the heldamount database. @@ -64,6 +60,7 @@ type PayStub struct { Owed int64 `json:"owed"` Disposed int64 `json:"disposed"` Paid int64 `json:"paid"` + Receipt string `json:"receipt"` } // AmountPeriod is node's held amount for period. @@ -81,13 +78,13 @@ type EstimatedPayout struct { // PayoutMonthly contains bandwidth and payout amount for month. type PayoutMonthly struct { - EgressBandwidth int64 `json:"egressBandwidth"` - EgressPayout int64 `json:"egressPayout"` - EgressRepairAudit int64 `json:"egressRepairAudit"` - RepairAuditPayout int64 `json:"repairAuditPayout"` - DiskSpace float64 `json:"diskSpace"` - DiskSpaceAmount int64 `json:"diskSpaceAmount"` - HeldPercentRate int64 `json:"heldRate"` + EgressBandwidth int64 `json:"egressBandwidth"` + EgressBandwidthPayout int64 `json:"egressBandwidthPayout"` + EgressRepairAudit int64 `json:"egressRepairAudit"` + EgressRepairAuditPayout int64 `json:"egressRepairAuditPayout"` + DiskSpace float64 `json:"diskSpace"` + DiskSpacePayout int64 `json:"diskSpacePayout"` + HeldRate int64 `json:"heldRate"` } // Payment is node payment data for specific period. diff --git a/storagenode/heldamount/service.go b/storagenode/heldamount/service.go index 7eeb8a845aca..f1f99e94ccba 100644 --- a/storagenode/heldamount/service.go +++ b/storagenode/heldamount/service.go @@ -14,9 +14,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" - "storj.io/common/pb" - "storj.io/common/rpc" - "storj.io/common/rpc/rpcstatus" "storj.io/common/storj" "storj.io/storj/private/date" "storj.io/storj/storagenode/reputation" @@ -33,21 +30,6 @@ var ( mon = monkit.Package() ) -// Client encapsulates HeldAmountClient with underlying connection -// -// architecture: Client -type Client struct { - conn *rpc.Conn - pb.DRPCHeldAmountClient -} - -// Close closes underlying client connection -func (c *Client) Close() error { - return c.conn.Close() -} - -// TODO: separate service on service and endpoint. - // Service retrieves info from satellites using an rpc client // // architecture: Service @@ -56,121 +38,21 @@ type Service struct { db DB reputationDB reputation.DB - - dialer rpc.Dialer - trust *trust.Pool + trust *trust.Pool } // NewService creates new instance of service. -func NewService(log *zap.Logger, db DB, reputationDB reputation.DB, dialer rpc.Dialer, trust *trust.Pool) *Service { +func NewService(log *zap.Logger, db DB, reputationDB reputation.DB, trust *trust.Pool) *Service { return &Service{ log: log, db: db, reputationDB: reputationDB, - dialer: dialer, trust: trust, } } -// GetPaystubStats retrieves held amount for particular satellite from satellite using RPC. -func (service *Service) GetPaystubStats(ctx context.Context, satelliteID storj.NodeID, period string) (_ *PayStub, err error) { - defer mon.Task()(&ctx)(&err) - - client, err := service.dial(ctx, satelliteID) - if err != nil { - return nil, ErrHeldAmountService.Wrap(err) - } - defer func() { err = errs.Combine(err, client.Close()) }() - - requestedPeriod, err := date.PeriodToTime(period) - if err != nil { - return nil, ErrHeldAmountService.Wrap(err) - } - - resp, err := client.GetPayStub(ctx, &pb.GetHeldAmountRequest{Period: requestedPeriod}) - if err != nil { - if rpcstatus.Code(err) == rpcstatus.OutOfRange { - return nil, ErrNoPayStubForPeriod.Wrap(err) - } - - return nil, ErrHeldAmountService.Wrap(err) - } - - return &PayStub{ - Period: period[0:7], - SatelliteID: satelliteID, - Created: resp.CreatedAt, - Codes: resp.Codes, - UsageAtRest: resp.UsageAtRest, - UsageGet: resp.UsageGet, - UsagePut: resp.UsagePut, - UsageGetRepair: resp.UsageGetRepair, - UsagePutRepair: resp.UsagePutRepair, - UsageGetAudit: resp.UsageGetAudit, - CompAtRest: resp.CompAtRest, - CompGet: resp.CompGet, - CompPut: resp.CompPut, - CompGetRepair: resp.CompGetRepair, - CompPutRepair: resp.CompPutRepair, - CompGetAudit: resp.CompGetAudit, - SurgePercent: resp.SurgePercent, - Held: resp.Held, - Owed: resp.Owed, - Disposed: resp.Disposed, - Paid: resp.Paid, - }, nil -} - -// GetAllPaystubs retrieves all paystubs for particular satellite. -func (service *Service) GetAllPaystubs(ctx context.Context, satelliteID storj.NodeID) (_ []PayStub, err error) { - defer mon.Task()(&ctx)(&err) - - client, err := service.dial(ctx, satelliteID) - if err != nil { - return nil, ErrHeldAmountService.Wrap(err) - } - defer func() { err = errs.Combine(err, client.Close()) }() - - resp, err := client.GetAllPaystubs(ctx, &pb.GetAllPaystubsRequest{}) - if err != nil { - return nil, ErrHeldAmountService.Wrap(err) - } - - var payStubs []PayStub - - for i := 0; i < len(resp.Paystub); i++ { - paystub := PayStub{ - Period: resp.Paystub[i].Period.String()[0:7], - SatelliteID: satelliteID, - Created: resp.Paystub[i].CreatedAt, - Codes: resp.Paystub[i].Codes, - UsageAtRest: resp.Paystub[i].UsageAtRest, - UsageGet: resp.Paystub[i].UsageGet, - UsagePut: resp.Paystub[i].UsagePut, - UsageGetRepair: resp.Paystub[i].UsageGetRepair, - UsagePutRepair: resp.Paystub[i].UsagePutRepair, - UsageGetAudit: resp.Paystub[i].UsageGetAudit, - CompAtRest: resp.Paystub[i].CompAtRest, - CompGet: resp.Paystub[i].CompGet, - CompPut: resp.Paystub[i].CompPut, - CompGetRepair: resp.Paystub[i].CompGetRepair, - CompPutRepair: resp.Paystub[i].CompPutRepair, - CompGetAudit: resp.Paystub[i].CompGetAudit, - SurgePercent: resp.Paystub[i].SurgePercent, - Held: resp.Paystub[i].Held, - Owed: resp.Paystub[i].Owed, - Disposed: resp.Paystub[i].Disposed, - Paid: resp.Paystub[i].Paid, - } - - payStubs = append(payStubs, paystub) - } - - return payStubs, nil -} - -// SatellitePayStubMonthlyCached retrieves held amount for particular satellite for selected month from storagenode database. -func (service *Service) SatellitePayStubMonthlyCached(ctx context.Context, satelliteID storj.NodeID, period string) (payStub *PayStub, err error) { +// SatellitePayStubMonthly retrieves held amount for particular satellite for selected month from storagenode database. +func (service *Service) SatellitePayStubMonthly(ctx context.Context, satelliteID storj.NodeID, period string) (payStub *PayStub, err error) { defer mon.Task()(&ctx, &satelliteID, &period)(&err) payStub, err = service.db.GetPayStub(ctx, satelliteID, period) @@ -181,8 +63,8 @@ func (service *Service) SatellitePayStubMonthlyCached(ctx context.Context, satel return payStub, nil } -// AllPayStubsMonthlyCached retrieves held amount for all satellites per selected period from storagenode database. -func (service *Service) AllPayStubsMonthlyCached(ctx context.Context, period string) (payStubs []PayStub, err error) { +// AllPayStubsMonthly retrieves held amount for all satellites per selected period from storagenode database. +func (service *Service) AllPayStubsMonthly(ctx context.Context, period string) (payStubs []PayStub, err error) { defer mon.Task()(&ctx, &period)(&err) payStubs, err = service.db.AllPayStubs(ctx, period) @@ -193,8 +75,8 @@ func (service *Service) AllPayStubsMonthlyCached(ctx context.Context, period str return payStubs, nil } -// SatellitePayStubPeriodCached retrieves held amount for all satellites for selected months from storagenode database. -func (service *Service) SatellitePayStubPeriodCached(ctx context.Context, satelliteID storj.NodeID, periodStart, periodEnd string) (payStubs []PayStub, err error) { +// SatellitePayStubPeriod retrieves held amount for all satellites for selected months from storagenode database. +func (service *Service) SatellitePayStubPeriod(ctx context.Context, satelliteID storj.NodeID, periodStart, periodEnd string) (payStubs []PayStub, err error) { defer mon.Task()(&ctx, &satelliteID, &periodStart, &periodEnd)(&err) periods, err := parsePeriodRange(periodStart, periodEnd) @@ -218,8 +100,8 @@ func (service *Service) SatellitePayStubPeriodCached(ctx context.Context, satell return payStubs, nil } -// AllPayStubsPeriodCached retrieves held amount for all satellites for selected range of months from storagenode database. -func (service *Service) AllPayStubsPeriodCached(ctx context.Context, periodStart, periodEnd string) (payStubs []PayStub, err error) { +// AllPayStubsPeriod retrieves held amount for all satellites for selected range of months from storagenode database. +func (service *Service) AllPayStubsPeriod(ctx context.Context, periodStart, periodEnd string) (payStubs []PayStub, err error) { defer mon.Task()(&ctx, &periodStart, &periodEnd)(&err) periods, err := parsePeriodRange(periodStart, periodEnd) @@ -325,26 +207,6 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []HeldHi return result, nil } -// dial dials the HeldAmount client for the satellite by id -func (service *Service) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) { - defer mon.Task()(&ctx)(&err) - - nodeurl, err := service.trust.GetNodeURL(ctx, satelliteID) - if err != nil { - return nil, errs.New("unable to find satellite %s: %w", satelliteID, err) - } - - conn, err := service.dialer.DialNodeURL(ctx, nodeurl) - if err != nil { - return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err) - } - - return &Client{ - conn: conn, - DRPCHeldAmountClient: pb.NewDRPCHeldAmountClient(conn), - }, nil -} - // TODO: move to separate struct. func parsePeriodRange(periodStart, periodEnd string) (periods []string, err error) { var yearStart, yearEnd, monthStart, monthEnd int diff --git a/storagenode/nodestats/cache.go b/storagenode/nodestats/cache.go index 5c7c89549be1..fb4b3c099a3a 100644 --- a/storagenode/nodestats/cache.go +++ b/storagenode/nodestats/cache.go @@ -46,10 +46,11 @@ type CacheStorage struct { type Cache struct { log *zap.Logger - db CacheStorage - service *Service - heldamountService *heldamount.Service - trust *trust.Pool + db CacheStorage + service *Service + heldamountEndpoint *heldamount.Endpoint + heldamountService *heldamount.Service + trust *trust.Pool maxSleep time.Duration Reputation *sync2.Cycle @@ -57,16 +58,17 @@ type Cache struct { } // NewCache creates new caching service instance -func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, heldamountService *heldamount.Service, trust *trust.Pool) *Cache { +func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, heldamountEndpoint *heldamount.Endpoint, heldamountService *heldamount.Service, trust *trust.Pool) *Cache { return &Cache{ - log: log, - db: db, - service: service, - heldamountService: heldamountService, - trust: trust, - maxSleep: config.MaxSleep, - Reputation: sync2.NewCycle(config.ReputationSync), - Storage: sync2.NewCycle(config.StorageSync), + log: log, + db: db, + service: service, + heldamountEndpoint: heldamountEndpoint, + heldamountService: heldamountService, + trust: trust, + maxSleep: config.MaxSleep, + Reputation: sync2.NewCycle(config.ReputationSync), + Storage: sync2.NewCycle(config.StorageSync), } } @@ -75,7 +77,7 @@ func (cache *Cache) Run(ctx context.Context) error { var group errgroup.Group err := cache.satelliteLoop(ctx, func(satelliteID storj.NodeID) error { - stubHistory, err := cache.heldamountService.GetAllPaystubs(ctx, satelliteID) + stubHistory, err := cache.heldamountEndpoint.GetAllPaystubs(ctx, satelliteID) if err != nil { return err } @@ -87,6 +89,18 @@ func (cache *Cache) Run(ctx context.Context) error { } } + paymentHistory, err := cache.heldamountEndpoint.GetAllPayments(ctx, satelliteID) + if err != nil { + return err + } + + for j := 0; j < len(paymentHistory); j++ { + err := cache.db.HeldAmount.StorePayment(ctx, paymentHistory[j]) + if err != nil { + return err + } + } + pricingModel, err := cache.service.GetPricingModel(ctx, satelliteID) if err != nil { return err @@ -185,7 +199,7 @@ func (cache *Cache) CacheHeldAmount(ctx context.Context) (err error) { } previousMonth := yearAndMonth.AddDate(0, -1, 0).String() - payStub, err := cache.heldamountService.GetPaystubStats(ctx, satellite, previousMonth) + payStub, err := cache.heldamountEndpoint.GetPaystub(ctx, satellite, previousMonth) if err != nil { if heldamount.ErrNoPayStubForPeriod.Has(err) { return nil @@ -200,6 +214,17 @@ func (cache *Cache) CacheHeldAmount(ctx context.Context) (err error) { } } + payment, err := cache.heldamountEndpoint.GetPayment(ctx, satellite, previousMonth) + if err != nil { + return err + } + + if payment != nil { + if err = cache.db.HeldAmount.StorePayment(ctx, *payment); err != nil { + return err + } + } + return nil }) } diff --git a/storagenode/peer.go b/storagenode/peer.go index 644dcbaf4928..1589f7601ce8 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -256,7 +256,8 @@ type Peer struct { } Heldamount struct { - Service *heldamount.Service + Service *heldamount.Service + Endpoint *heldamount.Endpoint } Bandwidth *bandwidth.Service @@ -530,6 +531,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Log.Named("heldamount:service"), peer.DB.HeldAmount(), peer.DB.Reputation(), + peer.Storage2.Trust, + ) + peer.Heldamount.Endpoint = heldamount.NewEndpoint( + peer.Log.Named("heldamount:endpoint"), peer.Dialer, peer.Storage2.Trust, ) @@ -553,6 +558,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten Satellites: peer.DB.Satellites(), }, peer.NodeStats.Service, + peer.Heldamount.Endpoint, peer.Heldamount.Service, peer.Storage2.Trust, ) diff --git a/storagenode/storagenodedb/heldamount.go b/storagenode/storagenodedb/heldamount.go index 0a77ee6749d8..ae928f18deb7 100644 --- a/storagenode/storagenodedb/heldamount.go +++ b/storagenode/storagenodedb/heldamount.go @@ -91,7 +91,7 @@ func (db *heldamountDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID Period: period, } - row := db.QueryRowContext(ctx, + rowStub := db.QueryRowContext(ctx, `SELECT created_at, codes, usage_at_rest, @@ -115,7 +115,7 @@ func (db *heldamountDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID satelliteID, period, ) - err = row.Scan( + err = rowStub.Scan( &result.Created, &result.Codes, &result.UsageAtRest, @@ -143,6 +143,19 @@ func (db *heldamountDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID return nil, ErrHeldAmount.Wrap(err) } + rowPayment := db.QueryRowContext(ctx, + `SELECT receipt FROM payments WHERE satellite_id = ? AND period = ?`, + satelliteID, period, + ) + + err = rowPayment.Scan(&result.Receipt) + if err != nil { + if sql.ErrNoRows == err { + return &result, nil + } + return nil, ErrHeldAmount.Wrap(err) + } + return &result, nil } @@ -340,88 +353,6 @@ func (db *heldamountDB) StorePayment(ctx context.Context, payment heldamount.Pay return ErrHeldAmount.Wrap(err) } -// GetPayment retrieves payment data for a specific satellite. -func (db *heldamountDB) GetPayment(ctx context.Context, satelliteID storj.NodeID, period string) (_ *heldamount.Payment, err error) { - defer mon.Task()(&ctx)(&err) - - result := heldamount.Payment{ - SatelliteID: satelliteID, - Period: period, - } - - row := db.QueryRowContext(ctx, - `SELECT id, - created_at, - amount, - receipt, - notes - FROM payments WHERE satellite_id = ? AND period = ?`, - satelliteID, period, - ) - - err = row.Scan( - &result.ID, - &result.Created, - &result.Amount, - &result.Receipt, - &result.Notes, - ) - if err != nil { - if sql.ErrNoRows == err { - return nil, heldamount.ErrNoPayStubForPeriod.Wrap(err) - } - return nil, ErrHeldAmount.Wrap(err) - } - - return &result, nil -} - -// AllPayments retrieves all payment stats from DB for specific period. -func (db *heldamountDB) AllPayments(ctx context.Context, period string) (_ []heldamount.Payment, err error) { - defer mon.Task()(&ctx)(&err) - - query := `SELECT - satellite_id, - id, - created_at, - amount, - receipt, - notes - FROM payments WHERE period = ?` - - rows, err := db.QueryContext(ctx, query, period) - if err != nil { - return nil, err - } - - defer func() { err = errs.Combine(err, rows.Close()) }() - - var paymentList []heldamount.Payment - for rows.Next() { - var payment heldamount.Payment - payment.Period = period - - err := rows.Scan(&payment.SatelliteID, - &payment.ID, - &payment.Created, - &payment.Amount, - &payment.Receipt, - &payment.Notes, - ) - - if err != nil { - return nil, ErrHeldAmount.Wrap(err) - } - - paymentList = append(paymentList, payment) - } - if err = rows.Err(); err != nil { - return nil, ErrHeldAmount.Wrap(err) - } - - return paymentList, nil -} - // SatellitesDisposedHistory returns all disposed amount for specific satellite from DB. func (db *heldamountDB) SatellitesDisposedHistory(ctx context.Context, satelliteID storj.NodeID) (_ int64, err error) { defer mon.Task()(&ctx)(&err)