forked from stellar/go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
actions_trade.go
172 lines (152 loc) · 3.99 KB
/
actions_trade.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package ticker
import (
"context"
"errors"
"fmt"
"math/big"
"strconv"
"time"
horizonclient "github.com/pownieh/stellar_go/clients/horizonclient"
hProtocol "github.com/pownieh/stellar_go/protocols/horizon"
"github.com/pownieh/stellar_go/services/ticker/internal/scraper"
"github.com/pownieh/stellar_go/services/ticker/internal/tickerdb"
hlog "github.com/pownieh/stellar_go/support/log"
)
// StreamTrades constantly streams and ingests new trades directly from horizon.
func StreamTrades(
ctx context.Context,
s *tickerdb.TickerSession,
c *horizonclient.Client,
l *hlog.Entry,
) error {
sc := scraper.ScraperConfig{
Client: c,
Logger: l,
Ctx: &ctx,
}
handler := func(trade hProtocol.Trade) {
l.Infof("New trade arrived. ID: %v; Close Time: %v\n", trade.ID, trade.LedgerCloseTime)
scraper.NormalizeTradeAssets(&trade)
bID, cID, err := findBaseAndCounter(ctx, s, trade)
if err != nil {
l.Error(err)
return
}
dbTrade, err := hProtocolTradeToDBTrade(trade, bID, cID)
if err != nil {
l.Error(err)
return
}
err = s.BulkInsertTrades(ctx, []tickerdb.Trade{dbTrade})
if err != nil {
l.Error("Could not insert trade in database: ", trade.ID)
}
}
// Ensure we start streaming from the last stored trade
lastTrade, err := s.GetLastTrade(ctx)
if err != nil {
return err
}
cursor := lastTrade.HorizonID
return sc.StreamNewTrades(cursor, handler)
}
// BackfillTrades ingest the most recent trades (limited to numDays) directly from Horizon
// into the database.
func BackfillTrades(
ctx context.Context,
s *tickerdb.TickerSession,
c *horizonclient.Client,
l *hlog.Entry,
numHours int,
limit int,
) error {
sc := scraper.ScraperConfig{
Client: c,
Logger: l,
}
now := time.Now()
since := now.Add(time.Hour * -time.Duration(numHours))
trades, err := sc.FetchAllTrades(since, limit)
if err != nil {
return err
}
var dbTrades []tickerdb.Trade
for _, trade := range trades {
var bID, cID int32
bID, cID, err = findBaseAndCounter(ctx, s, trade)
if err != nil {
continue
}
var dbTrade tickerdb.Trade
dbTrade, err = hProtocolTradeToDBTrade(trade, bID, cID)
if err != nil {
l.Error("Could not convert entry to DB Trade: ", err)
continue
}
dbTrades = append(dbTrades, dbTrade)
}
l.Infof("Inserting %d entries in the database.\n", len(dbTrades))
err = s.BulkInsertTrades(ctx, dbTrades)
if err != nil {
fmt.Println(err)
}
return nil
}
// findBaseAndCounter tries to find the Base and Counter assets IDs in the database,
// and returns an error if it doesn't find any.
func findBaseAndCounter(ctx context.Context, s *tickerdb.TickerSession, trade hProtocol.Trade) (bID int32, cID int32, err error) {
bFound, bID, err := s.GetAssetByCodeAndIssuerAccount(
ctx,
trade.BaseAssetCode,
trade.BaseAssetIssuer,
)
if err != nil {
return
}
cFound, cID, err := s.GetAssetByCodeAndIssuerAccount(
ctx,
trade.CounterAssetCode,
trade.CounterAssetIssuer,
)
if err != nil {
return
}
if !bFound || !cFound {
err = errors.New("base or counter asset no found")
return
}
return
}
// hProtocolTradeToDBTrade converts from a hProtocol.Trade to a tickerdb.Trade
func hProtocolTradeToDBTrade(
hpt hProtocol.Trade,
baseAssetID int32,
counterAssetID int32,
) (trade tickerdb.Trade, err error) {
fBaseAmount, err := strconv.ParseFloat(hpt.BaseAmount, 64)
if err != nil {
return
}
fCounterAmount, err := strconv.ParseFloat(hpt.CounterAmount, 64)
if err != nil {
return
}
rPrice := big.NewRat(int64(hpt.Price.D), int64(hpt.Price.N))
fPrice, _ := rPrice.Float64()
trade = tickerdb.Trade{
HorizonID: hpt.ID,
LedgerCloseTime: hpt.LedgerCloseTime,
OfferID: hpt.OfferID,
BaseOfferID: hpt.BaseOfferID,
BaseAccount: hpt.BaseAccount,
BaseAmount: fBaseAmount,
BaseAssetID: baseAssetID,
CounterOfferID: hpt.CounterOfferID,
CounterAccount: hpt.CounterAccount,
CounterAmount: fCounterAmount,
CounterAssetID: counterAssetID,
BaseIsSeller: hpt.BaseIsSeller,
Price: fPrice,
}
return
}