/
rpc.go
149 lines (125 loc) · 3.78 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package p2p
import (
"bufio"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
protobufCodec "github.com/multiformats/go-multicodec/protobuf"
"github.com/orbit-drive/orbit-drive/pb"
"github.com/orbit-drive/orbit-drive/utils"
log "github.com/sirupsen/logrus"
)
const (
// ProtocolRequestID - protocol header id for request traffic
ProtocolRequestID string = "/od/syncreq/1.0.0"
// ProtocolResponseID - protocol header id for response traffic
ProtocolResponseID string = "/od/syncresp/1.0.0"
)
type ReqResp struct {
requestPb *pb.Request
respChan chan *pb.Response
}
func newReqResp(reqPb *pb.Request) *ReqResp {
return &ReqResp{
requestPb: reqPb,
respChan: make(chan *pb.Response),
}
}
// RPC handles the data stream from/to libp2p host and stores all
// incoming/outgoing request until fully resolve or timeout is reached.
type RPC struct {
lnode *LNode
// TODO: Might need to create a self contained context queue to timeout request pending for x amount of seconds.
// ReqOut represents the queue outgoing requests from the current node.
ReqOut map[string]*ReqResp
// ReqIn represents a chan of incoming request to process.
ReqIn chan *pb.Request
}
func NewRpc(lnode *LNode) *RPC {
return &RPC{
lnode: lnode,
ReqOut: make(map[string]*ReqResp),
ReqIn: make(chan *pb.Request),
}
}
// RequestID returns the procotol resquest id.
func (rpc *RPC) RequestID() protocol.ID {
return protocol.ID(ProtocolRequestID)
}
// ResponseID returns the protol response id.
func (rpc *RPC) ResponseID() protocol.ID {
return protocol.ID(ProtocolResponseID)
}
func (rpc *RPC) initHandlers() {
rpc.lnode.SetStreamHandler(rpc.RequestID(), rpc.reqHandler)
rpc.lnode.SetStreamHandler(rpc.ResponseID(), rpc.respHandler)
}
func (rpc *RPC) createReq(method string) *pb.Request {
return &pb.Request{
PeerId: string(rpc.lnode.GetPeerID()),
RequestId: utils.RandUUID(),
Method: method,
}
}
func (rpc *RPC) registerReqOut(reqPb *pb.Request) *ReqResp {
reqResp := newReqResp(reqPb)
rpc.ReqOut[reqPb.GetRequestId()] = reqResp
return reqResp
}
func (rpc *RPC) findReqOut(reqID string) *ReqResp {
reqResp, ok := rpc.ReqOut[reqID]
if ok {
return reqResp
}
return nil
}
// RequestToPeer opens a stream to a single peer and sends a proto request.
func (rpc *RPC) RequestToPeer(peerID peer.ID, method string) (*pb.Response, error) {
stream, err := rpc.lnode.NewStream(rpc.lnode.GetContext(), peerID, rpc.RequestID())
if err != nil {
return nil, err
}
writer := bufio.NewWriter(stream)
requestPayload := rpc.createReq(method)
enc := protobufCodec.Multicodec(nil).Encoder(writer)
if err = enc.Encode(requestPayload); err != nil {
return nil, err
}
writer.Flush()
reqResp := rpc.registerReqOut(requestPayload)
respPb := <-reqResp.respChan
return respPb, nil
}
// reqHandler: remote peer request handler (received request from peer)
func (rpc *RPC) reqHandler(s inet.Stream) {
req := &pb.Request{}
reader := bufio.NewReader(s)
decoder := protobufCodec.Multicodec(nil).Decoder(reader)
if err := decoder.Decode(req); err != nil {
log.Warn(err)
return
}
log.WithFields(log.Fields{
"peer-id": req.GetPeerId(),
"req-id": req.GetRequestId(),
"method": req.GetMethod(),
}).Info("Received request from peer")
}
func (rpc *RPC) respHandler(s inet.Stream) {
resp := &pb.Response{}
reader := bufio.NewReader(s)
decoder := protobufCodec.Multicodec(nil).Decoder(reader)
if err := decoder.Decode(resp); err != nil {
log.Warn(err)
return
}
reqResp := rpc.findReqOut(resp.GetRequestId())
if reqResp != nil {
reqResp.respChan <- resp
return
}
log.WithFields(log.Fields{
"peer-id": resp.GetPeerId(),
"request-uuid": resp.GetRequestId(),
}).Warn("Received response from peer with no corresponding request")
}