/
socket_babble_proxy_server.go
121 lines (95 loc) · 2.82 KB
/
socket_babble_proxy_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package babble
import (
"net"
"net/rpc"
"net/rpc/jsonrpc"
"time"
"github.com/mosaicnetworks/babble/src/hashgraph"
"github.com/mosaicnetworks/babble/src/node/state"
"github.com/mosaicnetworks/babble/src/proxy"
"github.com/sirupsen/logrus"
)
// SocketBabbleProxyServer is the server component of the BabbleProxy which
// responds to RPC requests from the client component of the AppProxy
type SocketBabbleProxyServer struct {
netListener *net.Listener
rpcServer *rpc.Server
handler proxy.ProxyHandler
timeout time.Duration
logger *logrus.Entry
}
// NewSocketBabbleProxyServer creates a new SocketBabbleProxyServer
func NewSocketBabbleProxyServer(
bindAddress string,
handler proxy.ProxyHandler,
timeout time.Duration,
logger *logrus.Entry,
) (*SocketBabbleProxyServer, error) {
server := &SocketBabbleProxyServer{
handler: handler,
timeout: timeout,
logger: logger,
}
if err := server.register(bindAddress); err != nil {
return nil, err
}
return server, nil
}
func (p *SocketBabbleProxyServer) register(bindAddress string) error {
rpcServer := rpc.NewServer()
rpcServer.RegisterName("State", p)
p.rpcServer = rpcServer
l, err := net.Listen("tcp", bindAddress)
if err != nil {
return err
}
p.netListener = &l
return nil
}
func (p *SocketBabbleProxyServer) listen() error {
for {
conn, err := (*p.netListener).Accept()
if err != nil {
return err
}
go (*p.rpcServer).ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
// CommitBlock implements the AppProxy interface
func (p *SocketBabbleProxyServer) CommitBlock(block hashgraph.Block, response *proxy.CommitResponse) (err error) {
*response, err = p.handler.CommitHandler(block)
p.logger.WithFields(logrus.Fields{
"block": block.Index(),
"response": response,
"err": err,
}).Debug("BabbleProxyServer.CommitBlock")
return
}
// GetSnapshot implements the AppProxy interface
func (p *SocketBabbleProxyServer) GetSnapshot(blockIndex int, snapshot *[]byte) (err error) {
*snapshot, err = p.handler.SnapshotHandler(blockIndex)
p.logger.WithFields(logrus.Fields{
"block": blockIndex,
"snapshot": snapshot,
"err": err,
}).Debug("BabbleProxyServer.GetSnapshot")
return
}
// Restore implements the AppProxy interface
func (p *SocketBabbleProxyServer) Restore(snapshot []byte, stateHash *[]byte) (err error) {
*stateHash, err = p.handler.RestoreHandler(snapshot)
p.logger.WithFields(logrus.Fields{
"state_hash": stateHash,
"err": err,
}).Debug("BabbleProxyServer.Restore")
return
}
// OnStateChanged implements the AppProxy interface
func (p *SocketBabbleProxyServer) OnStateChanged(state state.State, obj *struct{}) (err error) {
err = p.handler.StateChangeHandler(state)
p.logger.WithFields(logrus.Fields{
"state": state.String(),
"err": err,
}).Debug("BabbleProxyServer.OnStateChanged")
return
}