/
main_loop.go
77 lines (64 loc) · 1.89 KB
/
main_loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package metathings_device_service
import (
"math"
"time"
log "github.com/sirupsen/logrus"
session_helper "github.com/nayotta/metathings/pkg/common/session"
deviced_pb "github.com/nayotta/metathings/proto/deviced"
)
func (self *MetathingsDeviceServiceImpl) main_loop() {
rc_tvl := self.opt.MinReconnectInterval
for {
err := self.internal_main_loop()
if err != nil {
rc_tvl = time.Duration(math.Min(float64(rc_tvl*2), float64(self.opt.MaxReconnectInterval)))
} else {
rc_tvl = self.opt.MinReconnectInterval
}
time.Sleep(rc_tvl)
}
}
func (self *MetathingsDeviceServiceImpl) _refresh_startup_session() {
self.startup_session = session_helper.GenerateStartupSession()
}
func (self *MetathingsDeviceServiceImpl) internal_main_loop() error {
logger := self.get_logger().WithField("method", "internal_main_loop")
var err error
var req *deviced_pb.ConnectRequest
// build connection
cli, cfn, err := self.cli_fty.NewDevicedServiceClient()
if err != nil {
logger.WithError(err).Errorf("failed to connect to deviced service")
return err
}
defer cfn()
// TODO(Peer): DONT refresh startup session
self._refresh_startup_session()
ctx := self.context_with_sesion()
self.conn_stm_rwmtx.Lock()
self.conn_stm, err = cli.Connect(ctx)
self.conn_stm_rwmtx.Unlock()
if err != nil {
logger.WithError(err).Errorf("failed to build connection to deviced")
return err
}
self.conn_stm_wg_once.Do(func() {
time.Sleep(200 * time.Millisecond)
self.conn_stm_wg.Done()
})
// handle message loop
for {
self.conn_stm_rwmtx.RLock()
conn := self.connection_stream()
self.conn_stm_rwmtx.RUnlock()
if req, err = conn.Recv(); err != nil {
logger.WithError(err).Errorf("failed to recv message from connection stream")
return nil
}
logger.WithFields(log.Fields{
"session": req.GetSessionId().GetValue(),
"kind": req.GetKind(),
}).Debugf("recv msg")
go self.handle(req)
}
}