-
Notifications
You must be signed in to change notification settings - Fork 387
/
invoiceprojectrecords.go
224 lines (183 loc) · 7.66 KB
/
invoiceprojectrecords.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"errors"
"time"
"github.com/zeebo/errs"
"storj.io/common/tagsql"
"storj.io/common/uuid"
"storj.io/storj/satellite/payments/stripe"
"storj.io/storj/satellite/satellitedb/dbx"
)
// ensure that invoiceProjectRecords implements stripecoinpayments.ProjectRecordsDB.
var _ stripe.ProjectRecordsDB = (*invoiceProjectRecords)(nil)
// invoiceProjectRecordState defines states of the invoice project record.
type invoiceProjectRecordState int
const (
// invoice project record is not yet applied to customer invoice.
invoiceProjectRecordStateUnapplied invoiceProjectRecordState = 0
// invoice project record has been used during creating customer invoice.
invoiceProjectRecordStateConsumed invoiceProjectRecordState = 1
// invoice project record is not yet applied to customer invoice and has to be aggregated with other items.
invoiceProjectRecordStateToBeAggregated invoiceProjectRecordState = 2
)
// Int returns intent state as int.
func (intent invoiceProjectRecordState) Int() int {
return int(intent)
}
// invoiceProjectRecords is stripecoinpayments project records DB.
//
// architecture: Database
type invoiceProjectRecords struct {
db *satelliteDB
}
// Create creates new invoice project record in the DB.
func (db *invoiceProjectRecords) Create(ctx context.Context, records []stripe.CreateProjectRecord, start, end time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return db.createWithState(ctx, records, invoiceProjectRecordStateUnapplied, start, end)
}
// CreateToBeAggregated creates new to be aggregated invoice project record in the DB.
func (db *invoiceProjectRecords) CreateToBeAggregated(ctx context.Context, records []stripe.CreateProjectRecord, start, end time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return db.createWithState(ctx, records, invoiceProjectRecordStateToBeAggregated, start, end)
}
func (db *invoiceProjectRecords) createWithState(ctx context.Context, records []stripe.CreateProjectRecord, state invoiceProjectRecordState, start, end time.Time) error {
return Error.Wrap(db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, record := range records {
id, err := uuid.New()
if err != nil {
return Error.Wrap(err)
}
_, err = tx.Create_StripecoinpaymentsInvoiceProjectRecord(ctx,
dbx.StripecoinpaymentsInvoiceProjectRecord_Id(id[:]),
dbx.StripecoinpaymentsInvoiceProjectRecord_ProjectId(record.ProjectID[:]),
dbx.StripecoinpaymentsInvoiceProjectRecord_Storage(record.Storage),
dbx.StripecoinpaymentsInvoiceProjectRecord_Egress(record.Egress),
dbx.StripecoinpaymentsInvoiceProjectRecord_PeriodStart(start),
dbx.StripecoinpaymentsInvoiceProjectRecord_PeriodEnd(end),
dbx.StripecoinpaymentsInvoiceProjectRecord_State(state.Int()),
dbx.StripecoinpaymentsInvoiceProjectRecord_Create_Fields{
Segments: dbx.StripecoinpaymentsInvoiceProjectRecord_Segments(int64(record.Segments)),
},
)
if err != nil {
return Error.Wrap(err)
}
}
return nil
}))
}
// Check checks if invoice project record for specified project and billing period exists.
func (db *invoiceProjectRecords) Check(ctx context.Context, projectID uuid.UUID, start, end time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.Get_StripecoinpaymentsInvoiceProjectRecord_By_ProjectId_And_PeriodStart_And_PeriodEnd(ctx,
dbx.StripecoinpaymentsInvoiceProjectRecord_ProjectId(projectID[:]),
dbx.StripecoinpaymentsInvoiceProjectRecord_PeriodStart(start),
dbx.StripecoinpaymentsInvoiceProjectRecord_PeriodEnd(end),
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return err
}
return stripe.ErrProjectRecordExists
}
// Get returns record for specified project and billing period.
func (db *invoiceProjectRecords) Get(ctx context.Context, projectID uuid.UUID, start, end time.Time) (record *stripe.ProjectRecord, err error) {
defer mon.Task()(&ctx)(&err)
dbxRecord, err := db.db.Get_StripecoinpaymentsInvoiceProjectRecord_By_ProjectId_And_PeriodStart_And_PeriodEnd(ctx,
dbx.StripecoinpaymentsInvoiceProjectRecord_ProjectId(projectID[:]),
dbx.StripecoinpaymentsInvoiceProjectRecord_PeriodStart(start),
dbx.StripecoinpaymentsInvoiceProjectRecord_PeriodEnd(end),
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
return fromDBXInvoiceProjectRecord(dbxRecord)
}
// Consume consumes invoice project record.
func (db *invoiceProjectRecords) Consume(ctx context.Context, id uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.Update_StripecoinpaymentsInvoiceProjectRecord_By_Id(ctx,
dbx.StripecoinpaymentsInvoiceProjectRecord_Id(id[:]),
dbx.StripecoinpaymentsInvoiceProjectRecord_Update_Fields{
State: dbx.StripecoinpaymentsInvoiceProjectRecord_State(invoiceProjectRecordStateConsumed.Int()),
},
)
return err
}
// ListToBeAggregated returns to be aggregated project records page with unapplied project records.
// Cursor is not included into listing results.
func (db *invoiceProjectRecords) ListToBeAggregated(ctx context.Context, cursor uuid.UUID, limit int, start, end time.Time) (page stripe.ProjectRecordsPage, err error) {
defer mon.Task()(&ctx)(&err)
return db.list(ctx, cursor, limit, invoiceProjectRecordStateToBeAggregated.Int(), start, end)
}
// ListUnapplied returns project records page with unapplied project records.
// Cursor is not included into listing results.
func (db *invoiceProjectRecords) ListUnapplied(ctx context.Context, cursor uuid.UUID, limit int, start, end time.Time) (page stripe.ProjectRecordsPage, err error) {
defer mon.Task()(&ctx)(&err)
return db.list(ctx, cursor, limit, invoiceProjectRecordStateUnapplied.Int(), start, end)
}
func (db *invoiceProjectRecords) list(ctx context.Context, cursor uuid.UUID, limit, state int, start, end time.Time) (page stripe.ProjectRecordsPage, err error) {
err = withRows(db.db.QueryContext(ctx, db.db.Rebind(`
SELECT
id, project_id, storage, egress, segments, period_start, period_end, state
FROM
stripecoinpayments_invoice_project_records
WHERE
id > ? AND period_start = ? AND period_end = ? AND state = ?
ORDER BY id
LIMIT ?
`), cursor, start, end, state, limit+1))(func(rows tagsql.Rows) error {
for rows.Next() {
var record stripe.ProjectRecord
err := rows.Scan(&record.ID, &record.ProjectID, &record.Storage, &record.Egress, &record.Segments, &record.PeriodStart, &record.PeriodEnd, &record.State)
if err != nil {
return Error.New("failed to scan stripe invoice project records: %w", err)
}
page.Records = append(page.Records, record)
}
return nil
})
if err != nil {
return stripe.ProjectRecordsPage{}, err
}
if len(page.Records) == limit+1 {
page.Next = true
page.Records = page.Records[:len(page.Records)-1]
page.Cursor = page.Records[len(page.Records)-1].ID
}
return page, nil
}
// fromDBXInvoiceProjectRecord converts *dbx.StripecoinpaymentsInvoiceProjectRecord to *stripecoinpayments.ProjectRecord.
func fromDBXInvoiceProjectRecord(dbxRecord *dbx.StripecoinpaymentsInvoiceProjectRecord) (*stripe.ProjectRecord, error) {
id, err := uuid.FromBytes(dbxRecord.Id)
if err != nil {
return nil, errs.Wrap(err)
}
projectID, err := uuid.FromBytes(dbxRecord.ProjectId)
if err != nil {
return nil, errs.Wrap(err)
}
var segments float64
if dbxRecord.Segments != nil {
segments = float64(*dbxRecord.Segments)
}
return &stripe.ProjectRecord{
ID: id,
ProjectID: projectID,
Storage: dbxRecord.Storage,
Egress: dbxRecord.Egress,
Segments: segments,
PeriodStart: dbxRecord.PeriodStart,
PeriodEnd: dbxRecord.PeriodEnd,
State: dbxRecord.State,
}, nil
}