/
hdl_grpc.go
117 lines (97 loc) · 2.22 KB
/
hdl_grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/******************************************************************************
*
* Description :
*
* Handler of gRPC connections. See also hdl_websock.go for websockets and
* hdl_longpoll.go for long polling.
*
*****************************************************************************/
package main
import (
"io"
"log"
"net"
"github.com/tinode/chat/pbx"
"google.golang.org/grpc"
)
type grpcNodeServer struct {
}
func (sess *Session) closeGrpc() {
if sess.proto == GRPC {
sess.grpcnode = nil
}
}
// Equivalent of starting a new session and a read loop in one
func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error {
sess, _ := globals.sessionStore.Create(stream, "")
defer func() {
sess.closeGrpc()
sess.cleanUp()
log.Println("grpc exited", sess.sid)
}()
go sess.writeGrpcLoop()
for sess.grpcnode != nil {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Println("grpc in:", truncateStringIfTooLong(in.String()))
sess.dispatch(pbCliDeserialize(in))
}
return nil
}
func (sess *Session) writeGrpcLoop() {
defer func() {
sess.closeGrpc() // exit MessageLoop
}()
for {
select {
case msg, ok := <-sess.send:
if !ok {
// channel closed
return
}
if err := grpcWrite(sess, msg); err != nil {
log.Println("g.writeGrpcLoop", err)
return
}
case msg := <-sess.stop:
// Shutdown requested, don't care if the message is delivered
if msg != nil {
grpcWrite(sess, msg)
}
return
case topic := <-sess.detach:
sess.delSub(topic)
}
}
}
func grpcWrite(sess *Session, msg interface{}) error {
out := sess.grpcnode
if out != nil {
// Will panic if msg is not of *pbx.ServerMsg type. This is an intentional panic.
return out.Send(msg.(*pbx.ServerMsg))
}
return nil
}
func serveGrpc(addr string) (*grpc.Server, error) {
if addr == "" {
return nil, nil
}
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
srv := grpc.NewServer()
pbx.RegisterNodeServer(srv, &grpcNodeServer{})
log.Printf("gRPC server is registered at [%s]", addr)
go func() {
if err := srv.Serve(lis); err != nil {
log.Println("gRPC server failed:", err)
}
}()
return srv, nil
}