/
submit_pending.go
64 lines (54 loc) · 1.71 KB
/
submit_pending.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package orders
import (
"context"
"encoding/json"
"strconv"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/kiran94/dca-manager/pkg"
"github.com/sirupsen/logrus"
)
// PendingOrderQueue is an abstraction to submit pending orders to a queue.
type PendingOrderQueue interface {
SubmitPendingOrder(ctx context.Context, sc pkg.SQSAccess, po *PendingOrders, exchange string, real bool, sqsQueue string) error
}
// PendingOrderSubmitter submits pending order to a queue
type PendingOrderSubmitter struct{}
// SubmitPendingOrder submits a pending order to queue.
func (p PendingOrderSubmitter) SubmitPendingOrder(ctx context.Context, sc pkg.SQSAccess, po *PendingOrders, exchange string, real bool, sqsQueue string) error {
sqsMessageBodyBytes, err := json.Marshal(po)
if err != nil {
return err
}
sqsMessage := string(sqsMessageBodyBytes)
sqsMessageInput := &sqs.SendMessageInput{
QueueUrl: &sqsQueue,
MessageBody: aws.String(sqsMessage),
MessageAttributes: map[string]types.MessageAttributeValue{
"Exchange": {
DataType: aws.String("String"),
StringValue: aws.String(exchange),
},
"TransactionId": {
DataType: aws.String("String"),
StringValue: aws.String(po.TransactionID),
},
"Real": {
DataType: aws.String("String"),
StringValue: aws.String(strconv.FormatBool(real)),
},
},
}
logrus.WithFields(logrus.Fields{
"transactionId": po.TransactionID,
"queue": sqsQueue,
"real": real,
"exchange": exchange,
}).Info("Submitting Transaction to Queue")
_, sqsErr := sc.SendMessage(ctx, sqsMessageInput)
if sqsErr != nil {
return sqsErr
}
return nil
}