-
Notifications
You must be signed in to change notification settings - Fork 43
/
waku_rpc.go
121 lines (97 loc) · 2.76 KB
/
waku_rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package rpc
import (
"context"
"fmt"
"net/http"
"net/http/pprof"
"time"
"github.com/gorilla/mux"
"github.com/gorilla/rpc/v2"
"github.com/waku-org/go-waku/waku/v2/node"
"go.uber.org/zap"
)
type WakuRPC struct {
node *node.WakuNode
server *http.Server
log *zap.Logger
relayService *RelayService
filterService *FilterService
adminService *AdminService
}
func NewWakuRPC(node *node.WakuNode, address string, port int, enableAdmin bool, enablePProf bool, cacheCapacity int, log *zap.Logger) *WakuRPC {
wrpc := new(WakuRPC)
wrpc.log = log.Named("rpc")
s := rpc.NewServer()
s.RegisterCodec(NewSnakeCaseCodec(), "application/json")
s.RegisterCodec(NewSnakeCaseCodec(), "application/json;charset=UTF-8")
mux := mux.NewRouter()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
t := time.Now()
s.ServeHTTP(w, r)
wrpc.log.Info("served request", zap.String("path", r.URL.Path), zap.Duration("duration", time.Since(t)))
})
if enablePProf {
mux.PathPrefix("/debug/").Handler(http.DefaultServeMux)
mux.HandleFunc("/debug/pprof/", pprof.Index)
}
debugService := NewDebugService(node)
err := s.RegisterService(debugService, "Debug")
if err != nil {
wrpc.log.Error("registering debug service", zap.Error(err))
}
var relayService *RelayService
if node.Relay() != nil {
relayService = NewRelayService(node, cacheCapacity, log)
err = s.RegisterService(relayService, "Relay")
if err != nil {
wrpc.log.Error("registering relay service", zap.Error(err))
}
}
err = s.RegisterService(&StoreService{node, log}, "Store")
if err != nil {
wrpc.log.Error("registering store service", zap.Error(err))
}
if enableAdmin {
adminService := &AdminService{node, log.Named("admin")}
err = s.RegisterService(adminService, "Admin")
if err != nil {
wrpc.log.Error("registering admin service", zap.Error(err))
}
wrpc.adminService = adminService
}
filterService := NewFilterService(node, cacheCapacity, log)
err = s.RegisterService(filterService, "Filter")
if err != nil {
wrpc.log.Error("registering filter service", zap.Error(err))
}
listenAddr := fmt.Sprintf("%s:%d", address, port)
server := &http.Server{
Addr: listenAddr,
Handler: mux,
}
server.RegisterOnShutdown(func() {
filterService.Stop()
if relayService != nil {
relayService.Stop()
}
})
wrpc.node = node
wrpc.server = server
wrpc.relayService = relayService
wrpc.filterService = filterService
return wrpc
}
func (r *WakuRPC) Start() {
if r.relayService != nil {
go r.relayService.Start()
}
go r.filterService.Start()
go func() {
_ = r.server.ListenAndServe()
}()
r.log.Info("server started", zap.String("addr", r.server.Addr))
}
func (r *WakuRPC) Stop(ctx context.Context) error {
r.log.Info("shutting down server")
return r.server.Shutdown(ctx)
}