-
Notifications
You must be signed in to change notification settings - Fork 21
/
batch_market_instructions_processor.go
127 lines (113 loc) · 3.35 KB
/
batch_market_instructions_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package processor
import (
"context"
"errors"
"fmt"
"code.vegaprotocol.io/vega/commands"
"code.vegaprotocol.io/vega/core/idgeneration"
"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/logging"
commandspb "code.vegaprotocol.io/vega/protos/vega/commands/v1"
)
type BMIProcessor struct {
log *logging.Logger
exec ExecutionEngine
}
func NewBMIProcessor(
log *logging.Logger,
exec ExecutionEngine,
) *BMIProcessor {
return &BMIProcessor{
log: log,
exec: exec,
}
}
// ProcessBatch will process a batch of market transaction. Transaction are
// always executed in the following order: cancellation, amendment then submissions.
// All errors are returned as a single error.
func (p *BMIProcessor) ProcessBatch(
ctx context.Context,
batch *commandspb.BatchMarketInstructions,
party, determinitisticID string,
stats Stats,
) error {
errs := commands.NewErrors()
// keep track of the index of the current instruction
// in the whole batch e.g:
// a batch with 10 instruction in each array
// idx 11 will be the second instruction of the
// amendment array
idx := 0
// first we generate the IDs for all new orders,
// these need to be determinitistic
idgen := idgeneration.New(determinitisticID)
submissionsIDs := make([]string, 0, len(batch.Submissions))
for i := 0; i < len(batch.Submissions); i++ {
submissionsIDs = append(submissionsIDs, idgen.NextID())
}
// process cancellations
for i, cancel := range batch.Cancellations {
err := commands.CheckOrderCancellation(cancel)
if err == nil {
stats.IncTotalCancelOrder()
_, err = p.exec.CancelOrder(
ctx, types.OrderCancellationFromProto(cancel), party, idgen)
}
if err != nil {
errs.AddForProperty(fmt.Sprintf("%d", i), err)
}
idx++
}
// keep track of all amends already done, it's not legal to amend twice the
// same order
amended := map[string]struct{}{}
// then amendments
for _, protoAmend := range batch.Amendments {
var err error
if _, ok := amended[protoAmend.OrderId]; ok {
// order already amended, just set an error, and do nothing
err = errors.New("order already amended in batch")
} else {
err = commands.CheckOrderAmendment(protoAmend)
if err == nil {
stats.IncTotalAmendOrder()
var amend *types.OrderAmendment
amend, err = types.NewOrderAmendmentFromProto(protoAmend)
if err == nil {
_, err = p.exec.AmendOrder(ctx, amend, party, idgen)
}
}
}
if err != nil {
errs.AddForProperty(fmt.Sprintf("%d", idx), err)
} else {
// add to the amended list, a successful amend should prevent
// any following amend of the same order
amended[protoAmend.OrderId] = struct{}{}
}
idx++
}
// then submissions
for i, protoSubmit := range batch.Submissions {
err := commands.CheckOrderSubmission(protoSubmit)
if err == nil {
var submit *types.OrderSubmission
stats.IncTotalCreateOrder()
if submit, err = types.NewOrderSubmissionFromProto(protoSubmit); err == nil {
var conf *types.OrderConfirmation
conf, err = p.exec.SubmitOrder(ctx, submit, party, idgen, submissionsIDs[i])
if conf != nil {
stats.AddCurrentTradesInBatch(uint64(len(conf.Trades)))
stats.AddTotalTrades(uint64(len(conf.Trades)))
stats.IncCurrentOrdersInBatch()
}
stats.IncTotalOrders()
}
}
if err != nil {
errs.AddForProperty(fmt.Sprintf("%d", idx), err)
}
idx++
}
return errs.ErrorOrNil()
}