This repository has been archived by the owner on Nov 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn.go
108 lines (87 loc) · 2.34 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package plugin
import (
"context"
"fmt"
"github.com/iyear/go-plugin-grpc/internal/pb"
"github.com/iyear/go-plugin-grpc/shared"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
func (p *Plugin) Mount(target string, port int) error {
//defer func() {
// _ = p.Shutdown(UnbindExit, nil) // TODO 错误处理
//}()
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
conn, err := grpc.DialContext(ctx, fmt.Sprintf("%s:%d", target, port), p.opts.DialOpts...)
if err != nil {
return err
}
//fmt.Println("conn success")
p.conn = conn
p.clients.conn = pb.NewConnClient(conn)
// TODO log是否开另一条连接专用还在考虑,涉及到token验证,不利于复用
logClient, err := p.clients.conn.Log(ctx, p.opts.CallOpts...)
if err != nil {
return err
}
p.clients.log = logClient
//fmt.Println("log success")
commClient, err := p.clients.conn.Communicate(ctx, p.opts.CallOpts...)
if err != nil {
return err
}
p.clients.comm = commClient
//fmt.Println("comm success")
// bind
b, err := proto.Marshal(&pb.BindRequest{
Token: p.token,
Name: p.name,
Version: p.version,
Functions: p.Funcs(),
})
if err != nil {
return err
}
if err = p.clients.comm.Send(&pb.CommunicateMsg{Type: pb.CommunicateType_Bind, Data: b}); err != nil {
return err
}
// 开始心跳
_, err = p.cron.AddFunc(fmt.Sprintf("@every %ds", int(p.opts.Heartbeat.Seconds())), p.heartbeat())
if err != nil {
return err
}
p.cron.Start()
// 置连接状态
p.status = pb.PluginStatus_Connected
return p.recv(ctx)
}
//unbind 填写参数msg将在Core打印解绑原因,如不需要传入nil
func (p *Plugin) unbind(reason shared.UnbindReason, msg *string) error {
b, err := proto.Marshal(&pb.UnbindRequest{
Reason: pb.UnbindReason(reason),
Token: p.token,
Msg: msg,
})
if err != nil {
return err
}
return p.clients.comm.Send(&pb.CommunicateMsg{Type: pb.CommunicateType_Unbind, Data: b})
}
func (p *Plugin) Shutdown(reason shared.UnbindReason, msg *string) []error {
p.cron.Stop()
// 关闭连接
errs := make([]error, 0)
if err := p.unbind(reason, msg); err != nil {
errs = append(errs, err)
}
if p.conn != nil {
if err := p.conn.Close(); err != nil {
errs = append(errs, err)
}
}
// set status to disconnected
p.status = pb.PluginStatus_Disconnected
p.cancel()
return errs
}