/
grpc_server_workers.go
255 lines (236 loc) · 6.41 KB
/
grpc_server_workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
package server
import (
"context"
"math/rand"
"time"
"github.com/palantir/stacktrace"
"github.com/tnyim/jungletv/buildconfig"
"github.com/tnyim/jungletv/proto"
"github.com/tnyim/jungletv/segcha"
"github.com/tnyim/jungletv/server/components/chatmanager"
"github.com/tnyim/jungletv/server/components/notificationmanager/notifications"
"github.com/tnyim/jungletv/utils/event"
"github.com/tnyim/jungletv/utils/transaction"
)
func (s *grpcServer) Worker(ctx context.Context, errorCb func(error)) {
errChan := make(chan error)
go func() {
err := s.appRunner.LaunchAutorunApplications()
if err != nil {
errChan <- stacktrace.Propagate(err, "failed to launch autorun applications")
}
}()
go func(ctx context.Context) {
for {
s.log.Println("Payments processor starting/restarting")
err := s.paymentAccountPool.Worker(ctx, s.ticketCheckPeriod)
if err == nil {
return
}
errChan <- stacktrace.Propagate(err, "payments processor error")
select {
case <-ctx.Done():
s.log.Println("Payments processor done")
return
default:
}
}
}(ctx)
go func(ctx context.Context) {
for {
s.log.Println("Rewards handler starting/restarting")
err := s.rewardsHandler.Worker(ctx)
if err == nil {
return
}
errChan <- stacktrace.Propagate(err, "rewards handler error")
select {
case <-ctx.Done():
s.log.Println("Rewards handler done")
return
default:
}
}
}(ctx)
if buildconfig.AllowWithdrawalsAndRefunds {
go func(ctx context.Context) {
for {
s.log.Println("Withdrawal handler starting/restarting")
err := s.withdrawalHandler.Worker(ctx)
if err == nil {
return
}
errChan <- stacktrace.Propagate(err, "withdrawal handler error")
select {
case <-ctx.Done():
s.log.Println("Withdrawal handler done")
return
default:
}
}
}(ctx)
}
go func(ctx context.Context) {
for {
s.log.Println("Skip manager starting/restarting")
err := s.skipManager.Worker(ctx)
if err == nil {
return
}
errChan <- stacktrace.Propagate(err, "skip manager error")
select {
case <-ctx.Done():
s.log.Println("Skip manager done")
return
default:
}
}
}(ctx)
go func(ctx context.Context) {
for {
s.log.Println("Skip manager balances checker starting/restarting")
err := s.skipManager.BalancesWorker(ctx, s.ticketCheckPeriod)
if err == nil {
return
}
errChan <- stacktrace.Propagate(err, "skip manager balances worker error")
select {
case <-ctx.Done():
s.log.Println("Skip manager balances worker done")
return
default:
}
}
}(ctx)
go func(ctx context.Context) {
for {
select {
case f := <-s.collectorAccountQueue:
f(s.collectorAccount, &s.wallet.RPC, &s.wallet.RPCWork)
case <-ctx.Done():
s.log.Println("Collector account worker done")
return
}
}
}(ctx)
// challenge creation is unfortunately slower than it should, so we attempt to use a remote worker
// to cache challenges in a queue so they can be used later
go func(ctx context.Context) {
makeChallenge := func() *segcha.Challenge {
for {
if s.segchaClient != nil {
ctxT, cancelFn := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
challenge, err := segcha.NewChallengeUsingClient(ctxT, segchaChallengeSteps, s.segchaClient)
cancelFn()
if err != nil {
s.log.Printf("remote segcha challenge creation failed: %v", err)
// fall through to local generation
} else {
return challenge
}
}
challenge, err := segcha.NewChallenge(segchaChallengeSteps, s.captchaImageDB, s.captchaFontPath)
if err != nil {
errChan <- stacktrace.Propagate(err, "failed to locally create segcha challenge")
} else {
return challenge
}
}
}
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
inCache := len(s.captchaChallengesQueue)
if inCache < segchaPremadeQueueSize {
func() {
s.captchaGenerationMutex.Lock()
defer s.captchaGenerationMutex.Unlock()
c := makeChallenge()
s.captchaChallengesQueue <- c
latestGeneratedChallenge = c
inCache++
s.log.Printf("generated cached segcha challenge (%d in cache)", inCache)
}()
}
go s.statsClient.Gauge("segcha_cached", inCache)
case <-ctx.Done():
s.log.Println("segcha challenge creator worker done")
return
}
}
}(ctx)
go s.mediaQueue.ProcessQueueWorker(ctx)
go s.staffActivityManager.StatsWorker(ctx)
go s.ipReputationChecker.Worker(ctx)
go func() {
for {
s.log.Println("Chat system messages worker starting/restarting")
err := s.ChatSystemMessagesWorker(ctx)
if err == nil {
return
}
errChan <- stacktrace.Propagate(err, "chat system message worker error")
select {
case <-ctx.Done():
s.log.Println("Chat system message worker done")
return
default:
}
}
}()
go func() {
mediaChangedC, mediaChangedU := s.mediaQueue.MediaChanged().Subscribe(event.BufferFirst)
defer mediaChangedU()
wait := time.Duration(90+rand.Intn(180)) * time.Second
t := time.NewTimer(wait)
defer t.Stop()
for {
select {
case v := <-mediaChangedC:
if v == nil {
wait = time.Duration(90+rand.Intn(180)) * time.Second
t.Reset(wait)
}
case <-t.C:
allowed := s.getAllowMediaEnqueuing() == proto.AllowedMediaEnqueuingType_ENABLED
if s.mediaQueue.Length() == 0 && s.autoEnqueueVideos &&
allowed {
for attempt := 0; attempt < 3; attempt++ {
err := func() error {
tx, err := transaction.Begin(ctx)
if err != nil {
return stacktrace.Propagate(err, "")
}
defer tx.Commit() // read-only tx
return s.autoEnqueueNewVideo(tx)
}()
if err != nil {
errChan <- stacktrace.Propagate(err, "")
} else {
wait = time.Duration(90+rand.Intn(180)) * time.Second
t.Reset(wait)
break
}
}
}
}
}
}()
defer s.chat.OnMessageCreated().SubscribeUsingCallback(event.BufferAll, func(evt chatmanager.MessageCreatedEventArgs) {
m := evt.Message
if m.Reference != nil && m.Reference.Author != nil && !m.Reference.Author.IsUnknown() &&
(m.Author == nil || m.Reference.Author.Address() != m.Author.Address()) {
s.notificationManager.Notify(notifications.NewChatMentionNotification(m.Reference.Author, m.ID, !s.chat.IsUserConnected(m.Reference.Author)))
}
})()
for {
select {
case err := <-errChan:
errorCb(err)
case <-ctx.Done():
return
}
}
}