-
Notifications
You must be signed in to change notification settings - Fork 19
/
liquidity_provision.go
403 lines (341 loc) · 15 KB
/
liquidity_provision.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package sqlstore
import (
"context"
"errors"
"fmt"
"code.vegaprotocol.io/vega/datanode/entities"
"code.vegaprotocol.io/vega/datanode/metrics"
"code.vegaprotocol.io/vega/datanode/utils"
"code.vegaprotocol.io/vega/logging"
v2 "code.vegaprotocol.io/vega/protos/data-node/api/v2"
"code.vegaprotocol.io/vega/protos/vega"
"github.com/georgysavva/scany/pgxscan"
"github.com/jackc/pgx/v4"
)
var lpOrdering = TableOrdering{
ColumnOrdering{Name: "vega_time", Sorting: ASC},
ColumnOrdering{Name: "id", Sorting: ASC},
}
var providerOrdering = TableOrdering{
ColumnOrdering{Name: "market_id", Sorting: ASC},
ColumnOrdering{Name: "party_id", Sorting: ASC},
ColumnOrdering{Name: "ordinality", Sorting: ASC},
}
type LiquidityProvision struct {
*ConnectionSource
batcher MapBatcher[entities.LiquidityProvisionKey, entities.LiquidityProvision]
observer utils.Observer[entities.LiquidityProvision]
}
type LiquidityProviderFeeShare struct {
Ordinality int64
MarketID entities.MarketID
PartyID string
AverageLiquidityScore string `db:"average_score"`
EquityLikeShare string
AverageEntryValuation string
VirtualStake string
}
type LiquidityProviderSLA struct {
Ordinality int64
MarketID entities.MarketID
PartyID string
CurrentEpochFractionOfTimeOnBook string
LastEpochFractionOfTimeOnBook string
LastEpochFeePenalty string
LastEpochBondPenalty string
HysteresisPeriodFeePenalties []string
RequiredLiquidity string
NotionalVolumeBuys string
NotionalVolumeSells string
}
const (
sqlOracleLiquidityProvisionColumns = `id, party_id, created_at, updated_at, market_id,
commitment_amount, fee, sells, buys, version, status, reference, tx_hash, vega_time`
)
func NewLiquidityProvision(connectionSource *ConnectionSource, log *logging.Logger) *LiquidityProvision {
return &LiquidityProvision{
ConnectionSource: connectionSource,
batcher: NewMapBatcher[entities.LiquidityProvisionKey, entities.LiquidityProvision](
"liquidity_provisions", entities.LiquidityProvisionColumns),
observer: utils.NewObserver[entities.LiquidityProvision]("liquidity_provisions", log, 10, 10),
}
}
func (lp *LiquidityProvision) Flush(ctx context.Context) error {
defer metrics.StartSQLQuery("LiquidityProvision", "Flush")()
flushed, err := lp.batcher.Flush(ctx, lp.Connection)
if err != nil {
return err
}
lp.observer.Notify(flushed)
return nil
}
func (lp *LiquidityProvision) ObserveLiquidityProvisions(ctx context.Context, retries int,
market *string, party *string,
) (<-chan []entities.LiquidityProvision, uint64) {
ch, ref := lp.observer.Observe(
ctx,
retries,
func(lp entities.LiquidityProvision) bool {
marketOk := market == nil || lp.MarketID.String() == *market
partyOk := party == nil || lp.PartyID.String() == *party
return marketOk && partyOk
})
return ch, ref
}
func (lp *LiquidityProvision) Upsert(ctx context.Context, liquidityProvision entities.LiquidityProvision) error {
lp.batcher.Add(liquidityProvision)
return nil
}
func (lp *LiquidityProvision) Get(ctx context.Context, partyID entities.PartyID, marketID entities.MarketID,
reference string,
live bool,
pagination entities.Pagination,
) ([]entities.CurrentAndPreviousLiquidityProvisions, entities.PageInfo, error) {
if len(partyID) == 0 && len(marketID) == 0 {
return nil, entities.PageInfo{}, errors.New("market or party filters are required")
}
switch p := pagination.(type) {
case entities.CursorPagination:
return lp.getWithCursorPagination(ctx, partyID, marketID, reference, live, p)
default:
panic("unsupported pagination")
}
}
func (lp *LiquidityProvision) GetByTxHash(ctx context.Context, txHash entities.TxHash) ([]entities.LiquidityProvision, error) {
defer metrics.StartSQLQuery("LiquidityProvision", "GetByTxHash")()
var liquidityProvisions []entities.LiquidityProvision
query := fmt.Sprintf(`SELECT %s FROM liquidity_provisions WHERE tx_hash = $1`, sqlOracleLiquidityProvisionColumns)
err := pgxscan.Select(ctx, lp.Connection, &liquidityProvisions, query, txHash)
if err != nil {
return nil, err
}
return liquidityProvisions, nil
}
func (lp *LiquidityProvision) ListProviders(ctx context.Context, partyID *entities.PartyID,
marketID *entities.MarketID, pagination entities.CursorPagination) (
[]entities.LiquidityProvider, entities.PageInfo, error,
) {
var pageInfo entities.PageInfo
var feeShares []LiquidityProviderFeeShare
var slas []LiquidityProviderSLA
var err error
if partyID == nil && marketID == nil {
return nil, pageInfo, errors.New("market, party or both filters are required")
}
// query providers fee shares
feeQuery, feeArgs := buildLiquidityProviderFeeShareQuery(partyID, marketID)
feeQuery, feeArgs, err = PaginateQuery[entities.LiquidityProviderCursor](feeQuery, feeArgs, providerOrdering, pagination)
if err != nil {
return nil, pageInfo, err
}
// query providers sla
slaQuery, slaArgs := buildLiquidityProviderSLA(partyID, marketID)
slaQuery, slaArgs, err = PaginateQuery[entities.LiquidityProviderCursor](slaQuery, slaArgs, providerOrdering, pagination)
if err != nil {
return nil, pageInfo, err
}
batch := &pgx.Batch{}
batch.Queue(feeQuery, feeArgs...)
batch.Queue(slaQuery, slaArgs...)
results := lp.Connection.SendBatch(ctx, batch)
defer results.Close()
feeRows, err := results.Query()
if err != nil {
return nil, pageInfo, err
}
if err := pgxscan.ScanAll(&feeShares, feeRows); err != nil {
return nil, pageInfo, fmt.Errorf("querying fee shares: %w", err)
}
slaRows, err := results.Query()
if err != nil {
return nil, pageInfo, err
}
if err := pgxscan.ScanAll(&slas, slaRows); err != nil {
return nil, pageInfo, fmt.Errorf("querying SLAs: %w", err)
}
slaPerParty := map[string]LiquidityProviderSLA{}
for _, sla := range slas {
slaPerParty[sla.PartyID] = sla
}
providers := []entities.LiquidityProvider{}
for _, feeShare := range feeShares {
provider := entities.LiquidityProvider{
Ordinality: feeShare.Ordinality,
PartyID: entities.PartyID(feeShare.PartyID),
MarketID: feeShare.MarketID,
FeeShare: &vega.LiquidityProviderFeeShare{
Party: feeShare.PartyID,
EquityLikeShare: feeShare.EquityLikeShare,
AverageEntryValuation: feeShare.AverageEntryValuation,
AverageScore: feeShare.AverageLiquidityScore,
VirtualStake: feeShare.VirtualStake,
},
}
if sla, ok := slaPerParty[feeShare.PartyID]; ok {
provider.SLA = &vega.LiquidityProviderSLA{
Party: sla.PartyID,
CurrentEpochFractionOfTimeOnBook: sla.CurrentEpochFractionOfTimeOnBook,
LastEpochFractionOfTimeOnBook: sla.LastEpochFractionOfTimeOnBook,
LastEpochFeePenalty: sla.LastEpochFeePenalty,
LastEpochBondPenalty: sla.LastEpochBondPenalty,
HysteresisPeriodFeePenalties: sla.HysteresisPeriodFeePenalties,
RequiredLiquidity: sla.RequiredLiquidity,
NotionalVolumeBuys: sla.NotionalVolumeBuys,
NotionalVolumeSells: sla.NotionalVolumeSells,
}
}
providers = append(providers, provider)
}
providers, pageInfo = entities.PageEntities[*v2.LiquidityProviderEdge](providers, pagination)
return providers, pageInfo, nil
}
func buildLiquidityProviderFeeShareQuery(partyID *entities.PartyID, marketID *entities.MarketID) (string, []interface{}) {
args := []interface{}{}
// The lp data is available in the current market data table
subQuery := `
select
ordinality,
cmd.market,
coalesce(lpfs.fee_share ->> 'party', '') as party,
coalesce(lpfs.fee_share ->> 'average_score', '') as average_score,
coalesce(lpfs.fee_share ->> 'equity_like_share', '') as equity_like_share,
coalesce(lpfs.fee_share ->> 'average_entry_valuation', '') as average_entry_valuation,
coalesce(lpfs.fee_share ->> 'virtual_stake', '') as virtual_stake
from current_market_data cmd,
jsonb_array_elements(liquidity_provider_fee_shares) with ordinality lpfs(fee_share, ordinality)
where liquidity_provider_fee_shares != 'null' and liquidity_provider_fee_shares is not null
`
if partyID != nil {
subQuery = fmt.Sprintf("%s and decode(lpfs.fee_share ->>'party', 'hex') = %s", subQuery, nextBindVar(&args, partyID))
}
// if a specific market is requested, then filter by that market too
if marketID != nil {
subQuery = fmt.Sprintf("%s and cmd.market = %s", subQuery, nextBindVar(&args, *marketID))
}
// we join with the live liquidity providers table to make sure we are only returning data
// for liquidity providers that are currently active
query := fmt.Sprintf(`WITH liquidity_provider_fee_share(ordinality, market_id, party_id, average_score, equity_like_share, average_entry_valuation, virtual_stake) as (%s)
SELECT fs.ordinality, fs.market_id, fs.party_id, fs.average_score, fs.equity_like_share, fs.average_entry_valuation, fs.virtual_stake
FROM liquidity_provider_fee_share fs
JOIN live_liquidity_provisions lps ON encode(lps.party_id, 'hex') = fs.party_id
AND lps.market_id = fs.market_id`, subQuery)
return query, args
}
func buildLiquidityProviderSLA(partyID *entities.PartyID, marketID *entities.MarketID) (string, []interface{}) {
args := []interface{}{}
// The lp data is available in the current market data table
subQuery := `
select
ordinality,
cmd.market,
lpsla.sla ->> 'party' as party,
coalesce(lpsla.sla ->> 'current_epoch_fraction_of_time_on_book', '') as current_epoch_fraction_of_time_on_book,
coalesce(lpsla.sla ->> 'last_epoch_fraction_of_time_on_book', '') as last_epoch_fraction_of_time_on_book,
coalesce(lpsla.sla ->> 'last_epoch_fee_penalty', '') as last_epoch_fee_penalty,
coalesce(lpsla.sla ->> 'last_epoch_bond_penalty', '') as last_epoch_bond_penalty,
coalesce(lpsla.sla ->> 'required_liquidity', '') as required_liquidity,
coalesce(lpsla.sla ->> 'notional_volume_buys', '') as notional_volume_buys,
coalesce(lpsla.sla ->> 'notional_volume_sells', '') as notional_volume_sells,
lpsla.sla -> 'hysteresis_period_fee_penalties' as hysteresis_period_fee_penalties
from current_market_data cmd,
jsonb_array_elements(liquidity_provider_sla) with ordinality lpsla(sla, ordinality)
where liquidity_provider_sla != 'null' and liquidity_provider_sla is not null
`
if partyID != nil {
subQuery = fmt.Sprintf("%s and decode(lpsla.sla ->>'party', 'hex') = %s", subQuery, nextBindVar(&args, partyID))
}
// if a specific market is requested, then filter by that market too
if marketID != nil {
subQuery = fmt.Sprintf("%s and cmd.market = %s", subQuery, nextBindVar(&args, *marketID))
}
// we join with the live liquidity providers table to make sure we are only returning data
// for liquidity providers that are currently active
query := fmt.Sprintf(`WITH liquidity_provider_sla(ordinality, market_id, party_id, current_epoch_fraction_of_time_on_book, last_epoch_fraction_of_time_on_book, last_epoch_fee_penalty, last_epoch_bond_penalty, required_liquidity, notional_volume_buys, notional_volume_sells, hysteresis_period_fee_penalties) as (%s)
SELECT fs.ordinality, fs.market_id, fs.party_id, fs.current_epoch_fraction_of_time_on_book, fs.last_epoch_fraction_of_time_on_book, fs.last_epoch_fee_penalty, fs.last_epoch_bond_penalty, fs.required_liquidity, fs.notional_volume_buys, fs.notional_volume_sells, fs.hysteresis_period_fee_penalties
FROM liquidity_provider_sla fs
JOIN live_liquidity_provisions lps ON encode(lps.party_id, 'hex') = fs.party_id
AND lps.market_id = fs.market_id`, subQuery)
return query, args
}
func (lp *LiquidityProvision) getWithCursorPagination(ctx context.Context, partyID entities.PartyID, marketID entities.MarketID,
reference string, live bool, pagination entities.CursorPagination,
) ([]entities.CurrentAndPreviousLiquidityProvisions, entities.PageInfo, error) {
query, bindVars := lp.buildLiquidityProvisionsSelect(partyID, marketID, reference, live)
var err error
var pageInfo entities.PageInfo
query, bindVars, err = PaginateQuery[entities.LiquidityProvisionCursor](query, bindVars, lpOrdering, pagination)
if err != nil {
return nil, pageInfo, err
}
var liquidityProvisions []entities.CurrentAndPreviousLiquidityProvisions
if err = pgxscan.Select(ctx, lp.Connection, &liquidityProvisions, query, bindVars...); err != nil {
return nil, entities.PageInfo{}, err
}
pagedLiquidityProvisions, pageInfo := entities.PageEntities[*v2.LiquidityProvisionWithPendingEdge](liquidityProvisions, pagination)
return pagedLiquidityProvisions, pageInfo, nil
}
func (lp *LiquidityProvision) buildLiquidityProvisionsSelect(partyID entities.PartyID, marketID entities.MarketID,
reference string, live bool,
) (string, []interface{}) {
var bindVars []interface{}
sourceTable := "liquidity_provisions"
if live {
sourceTable = "live_liquidity_provisions"
}
selectSQL := fmt.Sprintf(`with last_active as (
select distinct on (id, version) *
FROM liquidity_provisions
WHERE status = 'STATUS_ACTIVE'
order by id, version, vega_time desc
), lps as (
select llp.id, llp.party_id, llp.created_at, llp.updated_at, llp.market_id,
llp.commitment_amount, llp.fee, llp.sells, llp.buys, llp.version,
llp.status, llp.reference, llp.tx_hash, llp.vega_time,
lp.id previous_id, lp.party_id previous_party_id, lp.created_at previous_created_at,
lp.updated_at previous_updated_at, lp.market_id previous_market_id,
lp.commitment_amount previous_commitment_amount, lp.fee previous_fee,
lp.sells previous_sells, lp.buys previous_buys, lp.version previous_version,
lp.status previous_status, lp.reference previous_reference, lp.tx_hash previous_tx_hash,
lp.vega_time previous_vega_time
from %s llp
left join last_active lp on llp.id = lp.id and llp.version - 1 = lp.version
)
select *
from lps
`, sourceTable)
where := ""
if partyID != "" {
where = fmt.Sprintf("%s party_id=%s", where, nextBindVar(&bindVars, partyID))
}
if marketID != "" {
if len(where) > 0 {
where = where + " and "
}
where = fmt.Sprintf("%s market_id=%s", where, nextBindVar(&bindVars, marketID))
}
if reference != "" {
if len(where) > 0 {
where = where + " and "
}
where = fmt.Sprintf("%s reference=%s", where, nextBindVar(&bindVars, reference))
}
if len(where) > 0 {
where = fmt.Sprintf("where %s", where)
}
query := fmt.Sprintf(`%s %s`, selectSQL, where)
return query, bindVars
}