-
Notifications
You must be signed in to change notification settings - Fork 22
/
position.go
124 lines (107 loc) · 3.87 KB
/
position.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright (c) 2022 Gobalsky Labs Limited
//
// Use of this software is governed by the Business Source License included
// in the LICENSE.DATANODE file and at https://www.mariadb.com/bsl11.
//
// Change Date: 18 months from the later of the date of the first publicly
// available Distribution of this version of the repository, and 25 June 2022.
//
// On the date above, in accordance with the Business Source License, use
// of this software will be governed by version 3 or later of the GNU General
// Public License.
package service
import (
"context"
"fmt"
"code.vegaprotocol.io/vega/datanode/entities"
"code.vegaprotocol.io/vega/datanode/utils"
"code.vegaprotocol.io/vega/logging"
lru "github.com/hashicorp/golang-lru"
)
type PositionStore interface {
Flush(ctx context.Context) ([]entities.Position, error)
Add(ctx context.Context, p entities.Position) error
GetByMarketAndParty(ctx context.Context, marketID string, partyID string) (entities.Position, error)
GetByMarket(ctx context.Context, marketID string) ([]entities.Position, error)
GetByParty(ctx context.Context, partyID string) ([]entities.Position, error)
GetByPartyConnection(ctx context.Context, partyID string, marketID string, pagination entities.CursorPagination) ([]entities.Position, entities.PageInfo, error)
GetAll(ctx context.Context) ([]entities.Position, error)
}
type positionCacheKey struct {
MarketID entities.MarketID
PartyID entities.PartyID
}
type Position struct {
log *logging.Logger
store PositionStore
observer utils.Observer[entities.Position]
cache *lru.Cache
}
func NewPosition(store PositionStore, log *logging.Logger) *Position {
cache, err := lru.New(10000)
if err != nil {
panic(err)
}
return &Position{
store: store,
log: log,
observer: utils.NewObserver[entities.Position]("positions", log, 0, 0),
cache: cache,
}
}
func (p *Position) Flush(ctx context.Context) error {
flushed, err := p.store.Flush(ctx)
if err != nil {
return err
}
p.observer.Notify(flushed)
return nil
}
func (p *Position) Add(ctx context.Context, pos entities.Position) error {
key := positionCacheKey{pos.MarketID, pos.PartyID}
p.cache.Add(key, pos)
return p.store.Add(ctx, pos)
}
func (p *Position) GetByMarketAndParty(ctx context.Context, marketID string, partyID string) (entities.Position, error) {
key := positionCacheKey{entities.MarketID(marketID), entities.PartyID(partyID)}
value, ok := p.cache.Get(key)
if !ok {
pos, err := p.store.GetByMarketAndParty(
ctx, marketID, partyID)
if err == nil {
p.cache.Add(key, pos)
} else { // If store errors in the cache too
p.cache.Add(key, err)
}
return pos, err
}
switch v := value.(type) {
case entities.Position:
return v, nil
case error:
return entities.Position{}, v
default:
return entities.Position{}, fmt.Errorf("unknown type in cache")
}
}
func (p *Position) GetByMarket(ctx context.Context, marketID string) ([]entities.Position, error) {
return p.store.GetByMarket(ctx, marketID)
}
func (p *Position) GetByParty(ctx context.Context, partyID entities.PartyID) ([]entities.Position, error) {
return p.store.GetByParty(ctx, partyID.String())
}
func (p *Position) GetByPartyConnection(ctx context.Context, partyID entities.PartyID, marketID entities.MarketID, pagination entities.CursorPagination) ([]entities.Position, entities.PageInfo, error) {
return p.store.GetByPartyConnection(ctx, partyID.String(), marketID.String(), pagination)
}
func (p *Position) GetAll(ctx context.Context) ([]entities.Position, error) {
return p.store.GetAll(ctx)
}
func (p *Position) Observe(ctx context.Context, retries int, partyID, marketID string) (<-chan []entities.Position, uint64) {
ch, ref := p.observer.Observe(ctx,
retries,
func(pos entities.Position) bool {
return (len(marketID) == 0 || marketID == pos.MarketID.String()) &&
(len(partyID) == 0 || partyID == pos.PartyID.String())
})
return ch, ref
}