-
Notifications
You must be signed in to change notification settings - Fork 19
/
orders.go
339 lines (291 loc) · 11.7 KB
/
orders.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package sqlstore
import (
"context"
"errors"
"fmt"
"strings"
"code.vegaprotocol.io/vega/datanode/entities"
"code.vegaprotocol.io/vega/datanode/metrics"
"code.vegaprotocol.io/vega/libs/ptr"
"code.vegaprotocol.io/vega/logging"
v2 "code.vegaprotocol.io/vega/protos/data-node/api/v2"
"github.com/georgysavva/scany/pgxscan"
)
const (
sqlOrderColumns = `id, market_id, party_id, side, price,
size, remaining, time_in_force, type, status,
reference, reason, version, batch_id, pegged_offset,
pegged_reference, lp_id, created_at, updated_at, expires_at,
tx_hash, vega_time, seq_num, post_only, reduce_only, reserved_remaining,
peak_size, minimum_visible_size`
ordersFilterDateColumn = "vega_time"
OrdersTableName = "orders"
)
var ErrLastPaginationNotSupported = errors.New("'last' pagination is not supported")
type Orders struct {
*ConnectionSource
batcher MapBatcher[entities.OrderKey, entities.Order]
}
var ordersOrdering = TableOrdering{
ColumnOrdering{Name: "created_at", Sorting: ASC},
ColumnOrdering{Name: "id", Sorting: DESC},
ColumnOrdering{Name: "vega_time", Sorting: ASC},
}
func NewOrders(connectionSource *ConnectionSource) *Orders {
a := &Orders{
ConnectionSource: connectionSource,
batcher: NewMapBatcher[entities.OrderKey, entities.Order](
OrdersTableName,
entities.OrderColumns),
}
return a
}
func (os *Orders) Flush(ctx context.Context) ([]entities.Order, error) {
defer metrics.StartSQLQuery("Orders", "Flush")()
return os.batcher.Flush(ctx, os.Connection)
}
// Add inserts an order update row into the database if a row for this (block time, order id, version)
// does not already exist; otherwise update the existing row with information supplied.
// Currently we only store the last update to an order per block, so the order history is not
// complete if multiple updates happen in one block.
func (os *Orders) Add(o entities.Order) error {
os.batcher.Add(o)
return nil
}
// GetAll returns all updates to all orders (including changes to orders that don't increment the version number).
func (os *Orders) GetAll(ctx context.Context) ([]entities.Order, error) {
defer metrics.StartSQLQuery("Orders", "GetAll")()
orders := []entities.Order{}
query := fmt.Sprintf("SELECT %s FROM orders", sqlOrderColumns)
err := pgxscan.Select(ctx, os.Connection, &orders, query)
return orders, err
}
// GetOrder returns the last update of the order with the given ID.
func (os *Orders) GetOrder(ctx context.Context, orderIDStr string, version *int32) (entities.Order, error) {
var err error
order := entities.Order{}
orderID := entities.OrderID(orderIDStr)
defer metrics.StartSQLQuery("Orders", "GetByOrderID")()
if version != nil && *version > 0 {
query := fmt.Sprintf("SELECT %s FROM orders_current_versions WHERE id=$1 and version=$2", sqlOrderColumns)
err = pgxscan.Get(ctx, os.Connection, &order, query, orderID, version)
} else {
query := fmt.Sprintf("SELECT %s FROM orders_current_desc WHERE id=$1", sqlOrderColumns)
err = pgxscan.Get(ctx, os.Connection, &order, query, orderID)
}
return order, os.wrapE(err)
}
// GetByMarketAndID returns all orders with given IDs for a market.
func (os *Orders) GetByMarketAndID(ctx context.Context, marketIDstr string, orderIDs []string) ([]entities.Order, error) {
if len(orderIDs) == 0 {
os.log.Warn("GetByMarketAndID called with an empty order slice",
logging.String("market ID", marketIDstr),
)
return nil, nil
}
defer metrics.StartSQLQuery("Orders", "GetByMarketAndID")()
marketID := entities.MarketID(marketIDstr)
// IDs := make([]entities.OrderID, 0, len(orderIDs))
IDs := make([]interface{}, 0, len(orderIDs))
in := make([]string, 0, len(orderIDs))
bindNum := 2
for _, o := range orderIDs {
IDs = append(IDs, entities.OrderID(o))
in = append(in, fmt.Sprintf("$%d", bindNum))
bindNum++
}
bind := make([]interface{}, 0, len(in)+1)
// set all bind vars
bind = append(bind, marketID)
bind = append(bind, IDs...)
// select directly from orders_live table, the current view searches in orders
// this is used to expire orders, which have to be, by definition, live. This table uses ID as its PK
// so this is a more optimal way of querying the data.
query := fmt.Sprintf(`SELECT %s from orders_live WHERE market_id=$1 AND id IN (%s) order by id`, sqlOrderColumns, strings.Join(in, ", "))
orders := make([]entities.Order, 0, len(orderIDs))
err := pgxscan.Select(ctx, os.Connection, &orders, query, bind...)
return orders, os.wrapE(err)
}
func (os *Orders) GetByTxHash(ctx context.Context, txHash entities.TxHash) ([]entities.Order, error) {
defer metrics.StartSQLQuery("Orders", "GetByTxHash")()
orders := []entities.Order{}
query := fmt.Sprintf(`SELECT %s FROM orders WHERE tx_hash=$1`, sqlOrderColumns)
err := pgxscan.Select(ctx, os.Connection, &orders, query, txHash)
if err != nil {
return nil, fmt.Errorf("querying orders: %w", err)
}
return orders, nil
}
// GetByReference returns the last update of orders with the specified user-suppled reference.
func (os *Orders) GetByReferencePaged(ctx context.Context, reference string, p entities.CursorPagination) ([]entities.Order, entities.PageInfo, error) {
return os.ListOrders(ctx, p, entities.OrderFilter{
Reference: &reference,
})
}
// GetLiveOrders fetches all currently live orders so the market depth data can be rebuilt
// from the orders data in the database.
func (os *Orders) GetLiveOrders(ctx context.Context) ([]entities.Order, error) {
defer metrics.StartSQLQuery("Orders", "GetLiveOrders")()
query := fmt.Sprintf(`select %s from orders_live order by vega_time, seq_num`, sqlOrderColumns)
return os.queryOrders(ctx, query, nil)
}
// -------------------------------------------- Utility Methods
func (os *Orders) queryOrders(ctx context.Context, query string, args []interface{}) ([]entities.Order, error) {
orders := []entities.Order{}
err := pgxscan.Select(ctx, os.Connection, &orders, query, args...)
if err != nil {
return nil, fmt.Errorf("querying orders: %w", err)
}
return orders, nil
}
func (os *Orders) queryOrdersWithCursorPagination(ctx context.Context, query string, args []interface{},
pagination entities.CursorPagination, alreadyOrdered bool,
) ([]entities.Order, entities.PageInfo, error) {
var (
err error
orders []entities.Order
pageInfo entities.PageInfo
)
// This is a bit subtle - if we're selecting from a view that's doing DISTINCT ON ... ORDER BY
// it is imperative that we don't apply an ORDER BY clause to the outer query or else postgres
// will try and materialize the entire view; so rely on the view to sort correctly for us.
ordering := ordersOrdering
paginateQuery := PaginateQuery[entities.OrderCursor]
if alreadyOrdered {
paginateQuery = PaginateQueryWithoutOrderBy[entities.OrderCursor]
}
// We don't have views and indexes for iterating backwards for now so we can't use 'last'
// as it requires us to order in reverse
if pagination.HasBackward() {
return nil, entities.PageInfo{}, ErrLastPaginationNotSupported
}
query, args, err = paginateQuery(query, args, ordering, pagination)
if err != nil {
return orders, pageInfo, err
}
err = pgxscan.Select(ctx, os.Connection, &orders, query, args...)
if err != nil {
return nil, pageInfo, fmt.Errorf("querying orders: %w", err)
}
orders, pageInfo = entities.PageEntities[*v2.OrderEdge](orders, pagination)
return orders, pageInfo, nil
}
func currentView(f entities.OrderFilter, p entities.CursorPagination) (string, bool, error) {
if !p.NewestFirst {
return "", false, fmt.Errorf("oldest first order query is not currently supported")
}
if f.LiveOnly {
return "orders_live", false, nil
}
if f.Reference != nil {
return "orders_current_desc_by_reference", true, nil
}
if len(f.PartyIDs) > 0 {
return "orders_current_desc_by_party", true, nil
}
if len(f.MarketIDs) > 0 {
return "orders_current_desc_by_market", true, nil
}
return "orders_current_desc", true, nil
}
func (os *Orders) ListOrders(
ctx context.Context,
p entities.CursorPagination,
orderFilter entities.OrderFilter,
) ([]entities.Order, entities.PageInfo, error) {
table, alreadyOrdered, err := currentView(orderFilter, p)
if err != nil {
return nil, entities.PageInfo{}, err
}
bind := make([]interface{}, 0, len(orderFilter.PartyIDs)+len(orderFilter.MarketIDs)+1)
where := strings.Builder{}
where.WriteString("WHERE 1=1 ")
whereStr, args := applyOrderFilter(where.String(), bind, orderFilter)
query := fmt.Sprintf(`SELECT %s from %s %s`, sqlOrderColumns, table, whereStr)
query, args = filterDateRange(query, ordersFilterDateColumn, ptr.UnBox(orderFilter.DateRange), false, args...)
defer metrics.StartSQLQuery("Orders", "GetByMarketPaged")()
return os.queryOrdersWithCursorPagination(ctx, query, args, p, alreadyOrdered)
}
func (os *Orders) ListOrderVersions(ctx context.Context, orderIDStr string, p entities.CursorPagination) ([]entities.Order, entities.PageInfo, error) {
if orderIDStr == "" {
return nil, entities.PageInfo{}, errors.New("orderID is required")
}
orderID := entities.OrderID(orderIDStr)
query := fmt.Sprintf(`SELECT %s from orders_current_versions WHERE id=$1`, sqlOrderColumns)
defer metrics.StartSQLQuery("Orders", "GetByOrderIDPaged")()
return os.queryOrdersWithCursorPagination(ctx, query, []interface{}{orderID}, p, true)
}
func applyOrderFilter(whereClause string, args []any, filter entities.OrderFilter) (string, []any) {
if filter.ExcludeLiquidity {
whereClause += " AND COALESCE(lp_id, '') = ''"
}
if len(filter.PartyIDs) > 0 {
parties := strings.Builder{}
for i, party := range filter.PartyIDs {
if i > 0 {
parties.WriteString(",")
}
parties.WriteString(nextBindVar(&args, entities.PartyID(party)))
}
whereClause += fmt.Sprintf(" AND party_id IN (%s)", parties.String())
}
if len(filter.MarketIDs) > 0 {
markets := strings.Builder{}
for i, market := range filter.MarketIDs {
if i > 0 {
markets.WriteString(",")
}
markets.WriteString(nextBindVar(&args, entities.MarketID(market)))
}
whereClause += fmt.Sprintf(" AND market_id IN (%s)", markets.String())
}
if filter.Reference != nil {
args = append(args, filter.Reference)
whereClause += fmt.Sprintf(" AND reference = $%d", len(args))
}
if len(filter.Statuses) > 0 {
states := strings.Builder{}
for i, status := range filter.Statuses {
if i > 0 {
states.WriteString(",")
}
states.WriteString(nextBindVar(&args, status))
}
whereClause += fmt.Sprintf(" AND status IN (%s)", states.String())
}
if len(filter.Types) > 0 {
types := strings.Builder{}
for i, orderType := range filter.Types {
if i > 0 {
types.WriteString(",")
}
types.WriteString(nextBindVar(&args, orderType))
}
whereClause += fmt.Sprintf(" AND type IN (%s)", types.String())
}
if len(filter.TimeInForces) > 0 {
timeInForces := strings.Builder{}
for i, timeInForce := range filter.TimeInForces {
if i > 0 {
timeInForces.WriteString(",")
}
timeInForces.WriteString(nextBindVar(&args, timeInForce))
}
whereClause += fmt.Sprintf(" AND time_in_force IN (%s)", timeInForces.String())
}
return whereClause, args
}