/
streams.go
68 lines (53 loc) · 1.3 KB
/
streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package node
import (
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/tcfw/didem/internal/comm"
"github.com/tcfw/didem/internal/did"
"github.com/tcfw/didem/internal/utils/logging"
)
type handlerSetup func(n *Node) (network.StreamHandler, interface{}, error)
var (
streamHandlers = map[protocol.ID]handlerSetup{
did.ProtocolID: newDidStreamHandler,
comm.ProtocolID: newCommStreamHandler,
}
)
func (n *Node) setupStreamHandlers() error {
p2p := n.p2p.host
for id, handler := range streamHandlers {
handler, inst, err := handler(n)
if err != nil {
return err
}
if inst == nil {
continue
}
p2p.SetStreamHandler(id, handler)
n.handlers[id] = inst
}
return nil
}
func newDidStreamHandler(n *Node) (network.StreamHandler, interface{}, error) {
did := did.NewHandler(n)
if did == nil {
logging.Entry().Warn("skipping did consensus handler")
return nil, nil, nil
}
go func() {
for {
if err := did.Start(); err != nil {
logging.WithError(err).Error("running did handler")
time.Sleep(10 * time.Second)
continue
}
return
}
}()
return did.Handle, did, nil
}
func newCommStreamHandler(n *Node) (network.StreamHandler, interface{}, error) {
comm := comm.NewHandler(n)
return comm.Handle, comm, nil
}