-
Notifications
You must be signed in to change notification settings - Fork 1
/
rpc.go
141 lines (115 loc) · 3.59 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package rpc
import (
"bytes"
"encoding/gob"
"reflect"
"github.com/pkg/errors"
)
// RPCdata represents the serializing format of structured data
type RPCdata struct {
Name string // name of the function
Args []interface{} // request's or response's body expect error.
Err string // Error any executing remote server
}
// Execute the given function if present
func Execute(req RPCdata, fFunc interface{}) ([]byte, error) {
f := reflect.ValueOf(fFunc)
// log.Printf("func %s is called\n", req.Name)
// unpackage request arguments
inArgs := make([]reflect.Value, len(req.Args))
for i := range req.Args {
inArgs[i] = reflect.ValueOf(req.Args[i])
}
// invoke requested method
out := f.Call(inArgs)
// now since we have followed the function signature style where last argument will be an error
// so we will pack the response arguments expect error.
resArgs := make([]interface{}, len(out)-1)
for i := 0; i < len(out)-1; i++ {
// Interface returns the constant value stored in v as an interface{}.
resArgs[i] = out[i].Interface()
}
// pack error argument
var er string
if e, ok := out[len(out)-1].Interface().(error); ok {
// convert the error into error string value
er = e.Error()
}
respRPCData := RPCdata{Name: req.Name, Args: resArgs, Err: er}
return Encode(respRPCData)
}
// Call makes rpc call
func Call(rpcName string, fPtr interface{}, rpcSend func(reqBytes []byte) ([]byte, error)) {
container := reflect.ValueOf(fPtr).Elem()
f := func(req []reflect.Value) []reflect.Value {
errorHandler := func(err error) []reflect.Value {
outArgs := make([]reflect.Value, container.Type().NumOut())
for i := 0; i < len(outArgs)-1; i++ {
outArgs[i] = reflect.Zero(container.Type().Out(i))
}
outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()
return outArgs
}
// Process input parameters
inArgs := make([]interface{}, 0, len(req))
for _, arg := range req {
inArgs = append(inArgs, arg.Interface())
}
// ReqRPC
reqRPC := RPCdata{Name: rpcName, Args: inArgs}
reqBytes, err := Encode(reqRPC)
if err != nil {
return errorHandler(err)
}
respBytes, err := rpcSend(reqBytes)
if err != nil {
return errorHandler(err)
}
rspRPC, err := Decode(respBytes)
if err != nil {
return errorHandler(err)
}
if rspRPC.Err != "" { // remote server error
return errorHandler(errors.New(rspRPC.Err))
}
if len(rspRPC.Args) == 0 {
rspRPC.Args = make([]interface{}, container.Type().NumOut())
}
// unpackage response arguments
numOut := container.Type().NumOut()
outArgs := make([]reflect.Value, numOut)
for i := 0; i < numOut; i++ {
if i != numOut-1 { // unpackage arguments (except error)
if rspRPC.Args[i] == nil { // if argument is nil (gob will ignore "Zero" in transmission), set "Zero" value
outArgs[i] = reflect.Zero(container.Type().Out(i))
} else {
outArgs[i] = reflect.ValueOf(rspRPC.Args[i])
}
} else { // unpackage error argument
outArgs[i] = reflect.Zero(container.Type().Out(i))
}
}
return outArgs
}
container.Set(reflect.MakeFunc(container.Type(), f))
}
// Encode The RPCdata in binary format which can
// be sent over the network.
func Encode(data RPCdata) ([]byte, error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(data); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Decode the binary data into the Go RPC struct
func Decode(b []byte) (RPCdata, error) {
buf := bytes.NewBuffer(b)
decoder := gob.NewDecoder(buf)
var data RPCdata
if err := decoder.Decode(&data); err != nil {
return RPCdata{}, err
}
return data, nil
}