-
Notifications
You must be signed in to change notification settings - Fork 16
/
disbursement_instructions.go
230 lines (205 loc) · 9.37 KB
/
disbursement_instructions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package data
import (
"context"
"errors"
"fmt"
"github.com/stellar/stellar-disbursement-platform-backend/db"
)
type DisbursementInstruction struct {
Phone string `csv:"phone"`
ID string `csv:"id"`
Amount string `csv:"amount"`
VerificationValue string `csv:"verification"`
ExternalPaymentId *string `csv:"paymentID"`
}
type DisbursementInstructionModel struct {
dbConnectionPool db.DBConnectionPool
receiverVerificationModel *ReceiverVerificationModel
receiverWalletModel *ReceiverWalletModel
receiverModel *ReceiverModel
paymentModel *PaymentModel
disbursementModel *DisbursementModel
}
type InstructionLine struct {
line int
disbursementInstruction *DisbursementInstruction
}
const MaxInstructionsPerDisbursement = 10000
// NewDisbursementInstructionModel creates a new DisbursementInstructionModel.
func NewDisbursementInstructionModel(dbConnectionPool db.DBConnectionPool) *DisbursementInstructionModel {
return &DisbursementInstructionModel{
dbConnectionPool: dbConnectionPool,
receiverVerificationModel: &ReceiverVerificationModel{},
receiverWalletModel: &ReceiverWalletModel{dbConnectionPool: dbConnectionPool},
receiverModel: &ReceiverModel{},
paymentModel: &PaymentModel{dbConnectionPool: dbConnectionPool},
disbursementModel: &DisbursementModel{dbConnectionPool: dbConnectionPool},
}
}
var (
ErrMaxInstructionsExceeded = errors.New("maximum number of instructions exceeded")
ErrReceiverVerificationMismatch = errors.New("receiver verification mismatch")
)
// ProcessAll Processes all disbursement instructions and persists the data to the database.
//
// |--- For each phone number in the instructions:
// | |--- Check if a receiver exists.
// | | |--- If a receiver does not exist, create one.
// | |--- For each receiver:
// | | |--- Check if the receiver verification exists.
// | | | |--- If the receiver verification does not exist, create one.
// | | | |--- If the receiver verification exists:
// | | | | |--- Check if the verification value matches.
// | | | | | |--- If the verification value does not match and the verification is confirmed, return an error.
// | | | | | |--- If the verification value does not match and the verification is not confirmed, update the verification value.
// | | | | | |--- If the verification value matches, continue.
// | | |--- Check if the receiver wallet exists.
// | | | |--- If the receiver wallet does not exist, create one.
// | | | |--- If the receiver wallet exists and it's not REGISTERED, retry the invitation SMS.
// | | |--- Delete all payments tied to this disbursement.
// | | |--- Create all payments passed in the instructions.
func (di DisbursementInstructionModel) ProcessAll(ctx context.Context, userID string, instructions []*DisbursementInstruction, disbursement *Disbursement, update *DisbursementUpdate, maxNumberOfInstructions int) error {
if len(instructions) > maxNumberOfInstructions {
return ErrMaxInstructionsExceeded
}
// We need all the following logic to be executed in one transaction.
return db.RunInTransaction(ctx, di.dbConnectionPool, nil, func(dbTx db.DBTransaction) error {
// Step 1: Fetch all receivers by phone number and create missing ones
phoneNumbers := make([]string, 0, len(instructions))
for _, instruction := range instructions {
phoneNumbers = append(phoneNumbers, instruction.Phone)
}
existingReceivers, err := di.receiverModel.GetByPhoneNumbers(ctx, dbTx, phoneNumbers)
if err != nil {
return fmt.Errorf("error fetching receivers by phone number: %w", err)
}
receiverMap := make(map[string]*Receiver)
for _, receiver := range existingReceivers {
receiverMap[receiver.PhoneNumber] = receiver
}
instructionMap := make(map[string]InstructionLine)
for line, instruction := range instructions {
instructionMap[instruction.Phone] = InstructionLine{
line: line + 1,
disbursementInstruction: instruction,
}
}
for _, instruction := range instructions {
_, exists := receiverMap[instruction.Phone]
if !exists {
receiverInsert := ReceiverInsert{
PhoneNumber: instruction.Phone,
ExternalId: &instruction.ID,
}
receiver, insertErr := di.receiverModel.Insert(ctx, dbTx, receiverInsert)
if insertErr != nil {
return fmt.Errorf("error inserting receiver: %w", insertErr)
}
receiverMap[instruction.Phone] = receiver
}
}
// Step 2: Fetch all receiver verifications and create missing ones.
receiverIDs := make([]string, 0, len(receiverMap))
for _, receiver := range receiverMap {
receiverIDs = append(receiverIDs, receiver.ID)
}
verifications, err := di.receiverVerificationModel.GetByReceiverIDsAndVerificationField(ctx, dbTx, receiverIDs, disbursement.VerificationField)
if err != nil {
return fmt.Errorf("error fetching receiver verifications: %w", err)
}
verificationMap := make(map[string]*ReceiverVerification)
for _, verification := range verifications {
verificationMap[verification.ReceiverID] = verification
}
for _, receiver := range receiverMap {
verification, verificationExists := verificationMap[receiver.ID]
instruction := instructionMap[receiver.PhoneNumber]
if !verificationExists {
verificationInsert := ReceiverVerificationInsert{
ReceiverID: receiver.ID,
VerificationValue: instruction.disbursementInstruction.VerificationValue,
VerificationField: disbursement.VerificationField,
}
hashedVerification, insertError := di.receiverVerificationModel.Insert(ctx, dbTx, verificationInsert)
if insertError != nil {
return fmt.Errorf("error inserting receiver verification: %w", insertError)
}
verificationMap[receiver.ID] = &ReceiverVerification{
ReceiverID: verificationInsert.ReceiverID,
VerificationField: verificationInsert.VerificationField,
HashedValue: hashedVerification,
}
} else {
if verified := CompareVerificationValue(verification.HashedValue, instruction.disbursementInstruction.VerificationValue); !verified {
if verification.ConfirmedAt != nil {
return fmt.Errorf("%w: receiver verification for %s doesn't match. Check line %d on CSV file - Internal ID %s", ErrReceiverVerificationMismatch, receiver.PhoneNumber, instruction.line, instruction.disbursementInstruction.ID)
}
err = di.receiverVerificationModel.UpdateVerificationValue(ctx, dbTx, verification.ReceiverID, verification.VerificationField, instruction.disbursementInstruction.VerificationValue)
if err != nil {
return fmt.Errorf("error updating receiver verification for disbursement id %s: %w", disbursement.ID, err)
}
}
}
}
// Step 3: Fetch all receiver wallets and create missing ones
receiverWallets, err := di.receiverWalletModel.GetByReceiverIDsAndWalletID(ctx, dbTx, receiverIDs, disbursement.Wallet.ID)
if err != nil {
return fmt.Errorf("error fetching receiver wallets: %w", err)
}
receiverIDToReceiverWalletIDMap := make(map[string]string)
for _, receiverWallet := range receiverWallets {
receiverIDToReceiverWalletIDMap[receiverWallet.Receiver.ID] = receiverWallet.ID
}
for _, receiverId := range receiverIDs {
receiverWalletId, exists := receiverIDToReceiverWalletIDMap[receiverId]
if !exists {
receiverWalletInsert := ReceiverWalletInsert{
ReceiverID: receiverId,
WalletID: disbursement.Wallet.ID,
}
rwID, insertErr := di.receiverWalletModel.Insert(ctx, dbTx, receiverWalletInsert)
if insertErr != nil {
return fmt.Errorf("error inserting receiver wallet for receiver id %s: %w", receiverId, insertErr)
}
receiverIDToReceiverWalletIDMap[receiverId] = rwID
} else {
_, retryErr := di.receiverWalletModel.RetryInvitationSMS(ctx, dbTx, receiverWalletId)
if retryErr != nil {
if !errors.Is(retryErr, ErrRecordNotFound) {
return fmt.Errorf("error retrying invitation: %w", retryErr)
}
}
}
}
// Step 4: Delete all payments tied to this disbursement for each receiver in one call
if err = di.paymentModel.DeleteAllForDisbursement(ctx, dbTx, disbursement.ID); err != nil {
return fmt.Errorf("error deleting payments: %w", err)
}
// Step 5: Create payments for all receivers
payments := make([]PaymentInsert, 0, len(instructions))
for _, instruction := range instructions {
receiver := receiverMap[instruction.Phone]
payment := PaymentInsert{
ReceiverID: receiver.ID,
DisbursementID: disbursement.ID,
Amount: instruction.Amount,
AssetID: disbursement.Asset.ID,
ReceiverWalletID: receiverIDToReceiverWalletIDMap[receiver.ID],
ExternalPaymentID: instruction.ExternalPaymentId,
}
payments = append(payments, payment)
}
if err = di.paymentModel.InsertAll(ctx, dbTx, payments); err != nil {
return fmt.Errorf("error inserting payments: %w", err)
}
// Step 6: Persist Payment file to Disbursement
if err = di.disbursementModel.Update(ctx, update); err != nil {
return fmt.Errorf("error persisting payment file: %w", err)
}
// Step 7: Update Disbursement Status
if err = di.disbursementModel.UpdateStatus(ctx, dbTx, userID, disbursement.ID, ReadyDisbursementStatus); err != nil {
return fmt.Errorf("error updating status: %w", err)
}
return nil
})
}