-
Notifications
You must be signed in to change notification settings - Fork 167
/
reconnecting-client.go
96 lines (85 loc) · 2.47 KB
/
reconnecting-client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package tasks
import (
"context"
"time"
"github.com/micro/go-micro/client"
"go.uber.org/zap"
"github.com/pydio/cells/common"
"github.com/pydio/cells/common/log"
"github.com/pydio/cells/common/proto/jobs"
"github.com/pydio/cells/common/registry"
)
type ReconnectingClient struct {
parentCtx context.Context
stopChan chan bool
closed bool
}
func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient {
r := &ReconnectingClient{
parentCtx: parentCtx,
stopChan: make(chan bool),
}
return r
}
func (s *ReconnectingClient) StartListening(tasksChan chan interface{}) {
s.chanToStream(tasksChan)
}
func (s *ReconnectingClient) Stop() {
s.stopChan <- true
}
func (s *ReconnectingClient) chanToStream(ch chan interface{}, requeue ...*jobs.Task) {
go func() {
taskClient := jobs.NewJobServiceClient(registry.GetClient(common.ServiceJobs))
ctx, cancel := context.WithTimeout(s.parentCtx, 5*time.Minute)
defer cancel()
streamer, e := taskClient.PutTaskStream(ctx, client.WithRequestTimeout(5*time.Minute))
if e != nil {
log.Logger(s.parentCtx).Error("Streamer PutTaskStream", zap.Error(e))
<-time.After(10 * time.Second)
s.chanToStream(ch)
return
}
defer streamer.Close()
if len(requeue) > 0 {
streamer.Send(&jobs.PutTaskRequest{Task: requeue[0]})
streamer.Recv()
}
for {
select {
case val := <-ch:
if t, ok := val.(*jobs.Task); ok {
task := t.WithoutLogs()
e := streamer.Send(&jobs.PutTaskRequest{Task: task})
if e != nil {
log.Logger(s.parentCtx).Debug("Cannot post task - break and reconnect streamer", zap.Error(e))
if _, rE := taskClient.PutTask(s.parentCtx, &jobs.PutTaskRequest{Task: task}); rE == nil {
log.Logger(s.parentCtx).Debug("Posted with a direct request")
}
if !s.closed {
<-time.After(1 * time.Second)
s.chanToStream(ch)
}
return
}
_, e = streamer.Recv()
if e != nil {
log.Logger(s.parentCtx).Debug("Error while posting task - reconnect streamer", zap.Error(e))
if _, rE := taskClient.PutTask(s.parentCtx, &jobs.PutTaskRequest{Task: task}); rE == nil {
log.Logger(s.parentCtx).Debug("Posted with a direct request")
}
if !s.closed {
<-time.After(1 * time.Second)
s.chanToStream(ch)
}
return
}
} else {
log.Logger(s.parentCtx).Error("Could not cast value to jobs.Task", zap.Any("val", val))
}
case <-s.stopChan:
s.closed = true
return
}
}
}()
}