/
logic.go
119 lines (92 loc) · 2.97 KB
/
logic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package bfldb
import (
"context"
"fmt"
"time"
)
// SubscribePositions subscribes to user's potition details in a new goroutine.
//
// Returns two read-only channels, one with user's positions, other with any errors occured during the subsription.
func (u *User) SubscribePositions(ctx context.Context) (<-chan Position, <-chan error) {
cp := make(chan Position)
ce := make(chan error)
go func() {
defer close(cp)
defer close(ce)
for {
select {
case <-ctx.Done():
return
default:
// u.log.Printf("[%s] Checking for new positions\n", u.id)
res, err := u.GetOtherPosition(ctx)
if err != nil {
ce <- fmt.Errorf("failed to fetch positions: %w", err)
time.Sleep(u.Delay())
continue
}
if !res.Success {
ce <- fmt.Errorf("failed to fetch positions, bad response message: %v", res.Message)
time.Sleep(u.Delay())
continue
}
// u.log.Printf("[%s] Updating %d positions\n", u.id, len(res.Data.OtherPositionRetList))
u.handlePositions(res.Data.OtherPositionRetList, cp, ce)
time.Sleep(u.Delay())
}
}
}()
return cp, ce
}
// handlePositions parses raw positions, determines their type and sends the new ones through a channel.
func (u *User) handlePositions(rps []rawPosition, cp chan<- Position, ce chan<- error) {
// used will be used for checking whether or not a position was already handled
// (thus if it's a new position or if it hasn't been present in the latest fetch and thus been closed)
used := make(map[string]struct{}, len(rps))
for _, rp := range rps {
p := newPosition(rp)
// mark as used
used[p.Ticker] = struct{}{}
// retrieve old position
pp := u.positions[p.Ticker]
// amount is the same, so we dont want to send the update
if pp.Amount == p.Amount {
// update the values that change on every refresh
pp.MarkPrice = p.MarkPrice
pp.Pnl = p.Pnl
pp.Roe = p.Roe
u.positions[p.Ticker] = pp
continue
}
// record the previous amount on the new position
p.PrevAmount = pp.Amount
// determine the current position type and assign
p.Type = DeterminePositionType(p.Amount, pp.Amount)
u.log.Printf("[%s] {send: %t} Position change: %d %s %f -> %f %s @ %f\n", u.UID, !u.firstFetch, p.Type, p.Direction, p.PrevAmount, p.Amount, p.Ticker, p.EntryPrice)
// dont send the new position on first run (bc it's not really "new")
if !u.firstFetch {
cp <- p
}
// add/update the old position to the current one
u.positions[p.Ticker] = p
}
// check which positions were not present in the latest fetch
for h, p := range u.positions {
// position still in the leaderboard
if _, ok := used[h]; ok {
continue
}
// position hasn't been updated (is not present in the leaderboard anymore)
// thus it has been closed
p.Type = Closed
p.PrevAmount = p.Amount
p.Amount = 0
cp <- p
// remove the position from user's positions
delete(u.positions, h)
}
// set first run to false because we just completed it
if u.firstFetch {
u.firstFetch = false
}
}