forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.go
90 lines (79 loc) · 1.8 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package pbrpc
import (
"io"
"sync"
"github.com/golang/protobuf/proto"
rpc "github.com/youtube/vitess/go/rpcplus"
)
type pbServerCodec struct {
mu sync.Mutex
rwc io.ReadWriteCloser
}
// NewServerCodec returns a new ServerCodec.
func NewServerCodec(rwc io.ReadWriteCloser) rpc.ServerCodec {
return &pbServerCodec{rwc: rwc}
}
// ReadRequestHeader reads a Request.
func (c *pbServerCodec) ReadRequestHeader(r *rpc.Request) error {
data, err := ReadNetString(c.rwc)
if err != nil {
return err
}
rtmp := new(Request)
err = proto.Unmarshal(data, rtmp)
if err != nil {
return err
}
r.ServiceMethod = *rtmp.ServiceMethod
r.Seq = *rtmp.Seq
return nil
}
// ReadRequestBody reads a body structure from the codec.
func (c *pbServerCodec) ReadRequestBody(body interface{}) error {
data, err := ReadNetString(c.rwc)
if err != nil {
return err
}
if body != nil {
return proto.Unmarshal(data, body.(proto.Message))
}
return nil
}
type flusher interface {
Flush() error
}
// WriteResponse writes a response on the codec.
func (c *pbServerCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) (err error) {
// Use a mutex to guarantee the header/body are written in the correct order.
c.mu.Lock()
defer c.mu.Unlock()
rtmp := &Response{ServiceMethod: &r.ServiceMethod, Seq: &r.Seq, Error: &r.Error}
data, err := proto.Marshal(rtmp)
if err != nil {
return
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return
}
if pb, ok := body.(proto.Message); ok {
data, err = proto.Marshal(pb)
if err != nil {
return
}
} else {
data = nil
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
}
return
}
// Close the underlying connection.
func (c *pbServerCodec) Close() error {
return c.rwc.Close()
}