-
Notifications
You must be signed in to change notification settings - Fork 49
/
protocol.nim
137 lines (113 loc) · 4.21 KB
/
protocol.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
## Waku Store protocol for historical messaging support.
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options,
stew/results,
chronicles,
chronos,
bearssl/rand,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
metrics
import
../waku_core,
../node/peer_manager,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../common/ratelimit,
../common/waku_service_metrics
logScope:
topics = "waku store"
const MaxMessageTimestampVariance* = getNanoSecondTime(20)
# 20 seconds maximum allowable sender timestamp "drift"
type HistoryQueryHandler* =
proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.}
type WakuStore* = ref object of LPProtocol
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
queryHandler*: HistoryQueryHandler
requestRateLimiter*: Option[TokenBucket]
## Protocol
proc initProtocolHandler(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int)
let decodeRes = HistoryRPC.decode(buf)
if decodeRes.isErr():
error "failed to decode rpc", peerId = $conn.peerId
waku_store_errors.inc(labelValues = [decodeRpcFailure])
# TODO: Return (BAD_REQUEST, cause: "decode rpc failed")
return
let reqRpc = decodeRes.value
if reqRpc.query.isNone():
error "empty query rpc", peerId = $conn.peerId, requestId = reqRpc.requestId
waku_store_errors.inc(labelValues = [emptyRpcQueryFailure])
# TODO: Return (BAD_REQUEST, cause: "empty query")
return
if ws.requestRateLimiter.isSome() and not ws.requestRateLimiter.get().tryConsume(1):
trace "store query request rejected due rate limit exceeded",
peerId = $conn.peerId, requestId = reqRpc.requestId
let error = HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC()
let response = HistoryResponseRPC(error: error)
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
waku_service_requests_rejected.inc(labelValues = ["Store"])
return
waku_service_requests.inc(labelValues = ["Store"])
let
requestId = reqRpc.requestId
request = reqRpc.query.get().toAPI()
info "received history query",
peerId = conn.peerId, requestId = requestId, query = request
waku_store_queries.inc()
var responseRes: HistoryResult
try:
responseRes = await ws.queryHandler(request)
except Exception:
error "history query failed",
peerId = $conn.peerId, requestId = requestId, error = getCurrentExceptionMsg()
let error = HistoryError(kind: HistoryErrorKind.UNKNOWN).toRPC()
let response = HistoryResponseRPC(error: error)
let rpc = HistoryRPC(requestId: requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
return
if responseRes.isErr():
error "history query failed",
peerId = $conn.peerId, requestId = requestId, error = responseRes.error
let response = responseRes.toRPC()
let rpc = HistoryRPC(requestId: requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
return
let response = responseRes.toRPC()
info "sending history response",
peerId = conn.peerId, requestId = requestId, messages = response.messages.len
let rpc = HistoryRPC(requestId: requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
ws.handler = handler
ws.codec = WakuStoreCodec
proc new*(
T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
queryHandler: HistoryQueryHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
# Raise a defect if history query handler is nil
if queryHandler.isNil():
raise newException(NilAccessDefect, "history query handler is nil")
let ws = WakuStore(
rng: rng,
peerManager: peerManager,
queryHandler: queryHandler,
requestRateLimiter: newTokenBucket(rateLimitSetting),
)
ws.initProtocolHandler()
ws