This repository has been archived by the owner on Mar 23, 2023. It is now read-only.
/
live.go
253 lines (227 loc) · 6.4 KB
/
live.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package main
import (
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api"
"github.com/zshorz/fkbro/data"
"github.com/zshorz/fkbro/util"
"math"
"strings"
"sync"
"time"
)
var liveMap = make(map[string]int64) // username : ChatID
var liveMapLock sync.Mutex
var liveExitChan = make(chan int, 1)
var liveStateMap = make(map[string]*liveState)
type liveState struct {
LastTime time.Time
LastCursor string
}
// 每日报告
type report struct {
Name string // 币种
Time string // 时间间隔
Data map[string]*reportData
IN int64 // 流入交易所
OUT int64 // 流出交易所
IN_usd int64 // 流入交易所
OUT_usd int64 // 流出交易所
}
func (re *report) GetINOUT() int64 {
ret := re.IN - re.OUT
return ret
}
func (re *report) AbsUsd() int64 {
return int64 (math.Abs(float64(re.IN_usd - re.OUT_usd)))
}
func (re *report) Abs() int64 {
return int64 (math.Abs(float64(re.IN - re.OUT)))
}
type reportData struct {
TotalIn int64
TotalOut int64
UnknownIN int64
UnknownOut int64
TotalIn_usd int64
TotalOut_usd int64
UnknownIN_usd int64
UnknownOut_usd int64
}
func live() {
liveStateMap["btc"] = &liveState{LastTime:time.Now(), LastCursor:"0-0-0"}
liveStateMap["usdt"] = &liveState{LastTime:time.Now(), LastCursor:"0-0-0"}
if util.Config.WhaleApikey == "" {
return
}
data.Setup(util.Config.DbAddr, util.Config.DbName, util.Config.DbUser, util.Config.DbPasswd)
go daily()
for {
if len(liveExitChan) > 0 {
<-liveExitChan
break
}
look("btc")
look("usdt")
<-time.After(60*time.Second)
}
}
func look(currency string) { // btc usdc
w := WhaleAPI.WhaleQuery(util.Config.WhaleApikey, currency, liveStateMap[currency].LastCursor, liveStateMap[currency].LastTime.Unix(), util.Config.LookMinV)
if w == nil || w.Result != "success" || w.Cursor == "0-0-0" { // 没查到
Log.Debug(w)
if time.Now().Unix() - liveStateMap[currency].LastTime.Unix() > 60*60 && liveStateMap[currency].LastCursor == "0-0-0"{ // 60 分钟没查好
liveStateMap[currency].LastTime = time.Now()
}
} else {
liveStateMap[currency].LastCursor = w.Cursor
Log.Debug(w)
for _,trans := range w.Transactions {
if trans.TransactionType != "transfer" {
continue
}
if trans.From.Owner_type == "unknown" {
trans.From.Owner = "unknown"
}
if trans.To.Owner_type == "unknown" {
trans.To.Owner = "unknown"
}
al := data.Alert{
TimeStamp: trans.Timestamp,
Symbol: trans.Symbol,
Hash: trans.Hash,
Amount: int64(trans.Amount+0.5),
AmountUsd: int64(trans.Amount_usd+0.5),
FromAddr: trans.From.Address,
FromOwner: trans.From.Owner,
TomAddr: trans.To.Address,
TomOwner: trans.To.Owner,
}
switch trans.Blockchain {
case "bitcoin":
al.SetURL("https://blockchair.com/zh/bitcoin/transaction/" + al.Hash)
case "ethereum":
al.SetURL("https://blockchair.com/zh/ethereum/transaction/0x" + al.Hash)
case "tron":
al.SetURL("https://tronscan.org/#/transaction/" + al.Hash)
}
Log.Debug(trans)
if trans.From.Owner != trans.To.Owner{ // 双方账户不一样才处理
err := al.Insert() // 存入数据库
if err != nil && strings.HasPrefix(err.Error(), "Error 1062") { // 有重复,就不通知了
continue
}
if int64(trans.Amount_usd) >= util.Config.AlertMinV {
alert(&al) // 机器人推送
}
}
}
}
}
func alert(al *data.Alert) {
liveMapLock.Lock()
for _, id :=range liveMap {
text := ParseToString("alert", al)
msg := tgbotapi.NewMessage(id,text)
msg.ChatID = id
msg.ParseMode="Markdown"
send(&msg, 1)
}
liveMapLock.Unlock()
}
func daily() {
Log.Info("daily 任务开启")
for {
// 计算下一个0点
now := time.Now()
futher := now.AddDate(0,0,1)
t0 := time.Date(futher.Year(), futher.Month(), futher.Day(), 0,0,0,0, futher.Location())
dur := time.Second * time.Duration(t0.Unix()-now.Unix())
Log.Info("等待执行", int64(dur/time.Second/60/60), "小时后")
<- time.After(dur)
last0 := time.Date(now.Year(), now.Month(), now.Day(), 0,0,0,0, now.Location())
// 开始执行 0 点任务
btcrep := calc("btc", last0.Unix(), 5000000) // 500w$ 才会打印
usdtrep := calc("usdt", last0.Unix(), 5000000)
btcrep.Time = "#日报"
usdtrep.Time = "#日报"
doReport(btcrep)
doReport(usdtrep)
t:= time.Now()
if t.Weekday() == 0 { // 一周任务
nt := t.AddDate(0,0,-7)
btcrep := calc("btc", nt.Unix(), 25000000)
usdtrep := calc("usdt", nt.Unix(), 25000000)
btcrep.Time = "#周报"
usdtrep.Time = "#周报"
doReport(btcrep)
doReport(usdtrep)
data.DeleteAlert(nt.Unix())
Log.Info("周任务 本次完成")
}
Log.Info("daily 本次完成")
}
}
func doReport(rep *report) {
liveMapLock.Lock()
for _, id :=range liveMap {
text := ParseToString("report", rep)
msg := tgbotapi.NewMessage(id,text)
msg.ChatID = id
msg.ParseMode="Markdown"
send(&msg, 1)
}
liveMapLock.Unlock()
}
// 统计币种情况, 不会返回 nil
func calc(currency string, timestamp int64, minVal int64) *report{
re := &report{
Name: currency,
Data: make(map[string]*reportData),
}
alerts := data.GetAlertsByTimeStamp(currency, timestamp)
if alerts == nil {
return re
}
// 初始化
for _, alert := range alerts {
if alert.FromOwner != "unknown" {
re.Data[alert.FromOwner] = &reportData{}
}
if alert.TomOwner != "unknown" {
re.Data[alert.TomOwner] = &reportData{}
}
}
// 计算数据
for _, alert := range alerts {
// 计算支出
if alert.FromOwner != "unknown" {
data := re.Data[alert.FromOwner]
data.TotalOut += alert.Amount
data.TotalOut_usd += alert.AmountUsd
if alert.TomOwner == "unknown" && alert.FromOwner != "tether treasury" { // 泰达币金库不应视为交易所
data.UnknownOut += alert.Amount
data.UnknownOut_usd += alert.AmountUsd
re.OUT += alert.Amount // 流出交易所
re.OUT_usd += alert.AmountUsd // 流出交易所
}
}
// 计算收入
if alert.TomOwner != "unknown" {
data := re.Data[alert.TomOwner]
data.TotalIn += alert.Amount
data.TotalIn_usd += alert.AmountUsd
if alert.FromOwner == "unknown" && alert.TomOwner != "tether treasury" {
data.UnknownIN += alert.Amount
data.UnknownIN_usd += alert.AmountUsd
re.IN += alert.Amount // 流入交易所
re.IN_usd += alert.AmountUsd // 流入交易所
}
}
}
// 交易额小于 minVal 美元的不显示
for k,v := range re.Data {
if v.TotalIn_usd + v.TotalOut_usd < minVal {
delete(re.Data, k)
}
}
return re
}