-
Notifications
You must be signed in to change notification settings - Fork 48
/
archive.nim
235 lines (181 loc) · 6.56 KB
/
archive.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[times, options, sequtils, strutils, algorithm],
stew/[results, byteutils],
chronicles,
chronos,
metrics
import
../common/paging,
./driver,
./retention_policy,
../waku_core,
../waku_core/message/digest,
./common,
./archive_metrics
logScope:
topics = "waku archive"
const
DefaultPageSize*: uint = 20
MaxPageSize*: uint = 100
# Retention policy
WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30)
# Metrics reporting
WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1)
# Message validation
# 20 seconds maximum allowable sender timestamp "drift"
MaxMessageTimestampVariance* = getNanoSecondTime(20)
type MessageValidator* =
proc(msg: WakuMessage): Result[void, string] {.closure, gcsafe, raises: [].}
## Archive
type WakuArchive* = ref object
driver: ArchiveDriver
validator: MessageValidator
retentionPolicy: Option[RetentionPolicy]
retentionPolicyHandle: Future[void]
metricsHandle: Future[void]
proc validate*(msg: WakuMessage): Result[void, string] =
if msg.ephemeral:
# Ephemeral message, do not store
return
if msg.timestamp == 0:
return ok()
let
now = getNanosecondTime(getTime().toUnixFloat())
lowerBound = now - MaxMessageTimestampVariance
upperBound = now + MaxMessageTimestampVariance
if msg.timestamp < lowerBound:
return err(invalidMessageOld)
if upperBound < msg.timestamp:
return err(invalidMessageFuture)
return ok()
proc new*(
T: type WakuArchive,
driver: ArchiveDriver,
validator: MessageValidator = validate,
retentionPolicy = none(RetentionPolicy),
): Result[T, string] =
if driver.isNil():
return err("archive driver is Nil")
let archive =
WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy)
return ok(archive)
proc handleMessage*(
self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage
) {.async.} =
self.validator(msg).isOkOr:
waku_archive_errors.inc(labelValues = [error])
return
let
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
msgTimestamp =
if msg.timestamp > 0:
msg.timestamp
else:
getNanosecondTime(getTime().toUnixFloat())
trace "handling message",
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
msgTimestamp = msg.timestamp,
usedTimestamp = msgTimestamp,
digest = toHex(msgDigest.data),
messageHash = toHex(msgHash)
let insertStartTime = getTime().toUnixFloat()
(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
debug "failed to insert message", err = error
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)
proc findMessages*(
self: WakuArchive, query: ArchiveQuery
): Future[ArchiveResult] {.async, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria
let maxPageSize =
if query.pageSize <= 0:
DefaultPageSize
else:
min(query.pageSize, MaxPageSize)
let isAscendingOrder = query.direction.into()
if query.contentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))
let queryStartTime = getTime().toUnixFloat()
let rows = (
await self.driver.getMessages(
contentTopic = query.contentTopics,
pubsubTopic = query.pubsubTopic,
cursor = query.cursor,
startTime = query.startTime,
endTime = query.endTime,
hashes = query.hashes,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder,
)
).valueOr:
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error))
let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_archive_query_duration_seconds.observe(queryDuration)
var hashes = newSeq[WakuMessageHash]()
var messages = newSeq[WakuMessage]()
var cursor = none(ArchiveCursor)
if rows.len == 0:
return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))
## Messages
let pageSize = min(rows.len, int(maxPageSize))
#TODO once store v2 is removed, unzip instead of 2x map
messages = rows[0 ..< pageSize].mapIt(it[1])
hashes = rows[0 ..< pageSize].mapIt(it[4])
## Cursor
if rows.len > int(maxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
#TODO Once Store v2 is removed keep only message and hash
let (pubsubTopic, message, digest, storeTimestamp, hash) = rows[^2]
#TODO Once Store v2 is removed, the cursor becomes the hash of the last message
cursor = some(
ArchiveCursor(
digest: MessageDigest.fromBytes(digest),
storeTime: storeTimestamp,
sendertime: message.timestamp,
pubsubTopic: pubsubTopic,
hash: hash,
)
)
# All messages MUST be returned in chronological order
if not isAscendingOrder:
reverse(messages)
reverse(hashes)
return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))
proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
debug "executing message retention policy"
let policy = self.retentionPolicy.get()
while true:
(await policy.execute(self.driver)).isOkOr:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error = error
await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval)
proc periodicMetricReport(self: WakuArchive) {.async.} =
while true:
let countRes = (await self.driver.getMessagesCount())
if countRes.isErr():
error "loopReportStoredMessagesMetric failed to get messages count",
error = countRes.error
else:
let count = countRes.get()
waku_archive_messages.set(count, labelValues = ["stored"])
await sleepAsync(WakuArchiveDefaultMetricsReportInterval)
proc start*(self: WakuArchive) =
if self.retentionPolicy.isSome():
self.retentionPolicyHandle = self.periodicRetentionPolicy()
self.metricsHandle = self.periodicMetricReport()
proc stopWait*(self: WakuArchive) {.async.} =
var futures: seq[Future[void]]
if self.retentionPolicy.isSome() and not self.retentionPolicyHandle.isNil():
futures.add(self.retentionPolicyHandle.cancelAndWait())
if not self.metricsHandle.isNil:
futures.add(self.metricsHandle.cancelAndWait())
await noCancel(allFutures(futures))