/
rpc_server.go
92 lines (83 loc) · 2.22 KB
/
rpc_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main
import (
"fmt"
"net"
"time"
"github.com/alipay/sofa-mosn/pkg/buffer"
"github.com/alipay/sofa-mosn/pkg/protocol/sofarpc"
"github.com/alipay/sofa-mosn/pkg/protocol/sofarpc/codec"
)
type SofaRPCServer struct {
Listener net.Listener
}
func (s *SofaRPCServer) Run() {
for {
conn, err := s.Listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
fmt.Printf("[RPC Server] Accept temporary error: %v\n", ne)
continue
}
return //not temporary error, exit
}
fmt.Println("[RPC Server] get connection :", conn.RemoteAddr().String())
go s.Serve(conn)
}
}
func (s *SofaRPCServer) Serve(conn net.Conn) {
iobuf := buffer.NewIoBuffer(102400)
for {
now := time.Now()
conn.SetReadDeadline(now.Add(30 * time.Second))
buf := make([]byte, 10*1024)
bytesRead, err := conn.Read(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
fmt.Printf("[RPC Server] Connect read error: %v\n", err)
continue
}
}
if bytesRead > 0 {
iobuf.Write(buf[:bytesRead])
for iobuf.Len() > 1 {
cmd, _ := codec.BoltV1.GetDecoder().Decode(nil, iobuf)
if cmd == nil {
break
}
if req, ok := cmd.(*sofarpc.BoltRequestCommand); ok {
resp := buildBoltV1Response(req)
iobufresp, err := codec.BoltV1.GetEncoder().EncodeHeaders(nil, resp)
if err != nil {
fmt.Printf("[RPC Server] build response error: %v\n", err)
} else {
fmt.Printf("[RPC Server] reponse connection: %s, requestId: %d\n", conn.RemoteAddr().String(), resp.GetReqID())
respdata := iobufresp.Bytes()
conn.Write(respdata)
}
}
}
}
}
}
func buildBoltV1Response(req *sofarpc.BoltRequestCommand) *sofarpc.BoltResponseCommand {
return &sofarpc.BoltResponseCommand{
Protocol: req.Protocol,
CmdType: sofarpc.RESPONSE,
CmdCode: sofarpc.RPC_RESPONSE,
Version: req.Version,
ReqID: req.ReqID,
CodecPro: req.CodecPro,
ResponseStatus: sofarpc.RESPONSE_STATUS_SUCCESS,
HeaderLen: req.HeaderLen,
HeaderMap: req.HeaderMap,
}
}
func main() {
ln, err := net.Listen("tcp", "127.0.0.1:8080")
if err != nil {
fmt.Println(err)
return
}
server := &SofaRPCServer{ln}
server.Run()
}