Skip to content

Commit

Permalink
feat: archive update for store v3 (#2451)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Mar 12, 2024
1 parent 059cb97 commit 505479b
Show file tree
Hide file tree
Showing 20 changed files with 502 additions and 382 deletions.
5 changes: 3 additions & 2 deletions tests/waku_archive/archive_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ proc computeArchiveCursor*(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: waku_archive.computeDigest(message),
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)

proc put*(
driver: ArchiveDriver, pubsubTopic: PubSubTopic, msgList: seq[WakuMessage]
): ArchiveDriver =
for msg in msgList:
let
msgDigest = waku_archive.computeDigest(msg)
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
_ = waitFor driver.put(pubsubTopic, msg, msgDigest, msgHash, msg.timestamp)
# discard crashes
Expand Down
9 changes: 6 additions & 3 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ proc computeTestCursor(pubsubTopic: PubsubTopic,
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message)
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)

suite "Postgres driver":
Expand Down Expand Up @@ -62,19 +63,21 @@ suite "Postgres driver":
let msg = fakeWakuMessage(contentTopic=contentTopic)

let computedDigest = computeDigest(msg)
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)

let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computedHash, msg.timestamp)
assert putRes.isOk(), putRes.error

let storedMsg = (await driver.getAllMessages()).tryGet()

assert storedMsg.len == 1

let (pubsubTopic, actualMsg, digest, storeTimestamp) = storedMsg[0]
let (pubsubTopic, actualMsg, digest, _, hash) = storedMsg[0]
assert actualMsg.contentTopic == contentTopic
assert pubsubTopic == DefaultPubsubTopic
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)
assert toHex(computedHash) == toHex(hash)

asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
Expand Down
43 changes: 41 additions & 2 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import
import
../../../waku/waku_archive,
../../../waku/waku_archive/driver as driver_module,
../../../waku/waku_archive/driver/builder,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
Expand All @@ -33,7 +32,8 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message)
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message)
)

suite "Postgres driver - queries":
Expand Down Expand Up @@ -652,6 +652,45 @@ suite "Postgres driver - queries":
check:
filteredMessages == expectedMessages[4..5].reversed()

asyncTest "only hashes - descending order":
## Given
let timeOrigin = now()
var expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin)),
fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin)),
fakeWakuMessage(@[byte 2], ts=ts(20, timeOrigin)),
fakeWakuMessage(@[byte 3], ts=ts(30, timeOrigin)),
fakeWakuMessage(@[byte 4], ts=ts(40, timeOrigin)),
fakeWakuMessage(@[byte 5], ts=ts(50, timeOrigin)),
fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin)),
fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin)),
fakeWakuMessage(@[byte 8], ts=ts(80, timeOrigin)),
fakeWakuMessage(@[byte 9], ts=ts(90, timeOrigin)),
]
var messages = expected

shuffle(messages)
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

let hashes = messages.mapIt(computeMessageHash(DefaultPubsubTopic, it))

for (msg, hash) in messages.zip(hashes):
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), hash, msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
hashes=hashes,
ascendingOrder=false
)

## Then
assert res.isOk(), res.error

let expectedMessages = expected.reversed()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages == expectedMessages

asyncTest "start time only":
## Given
const contentTopic = "test-content-topic"
Expand Down
40 changes: 20 additions & 20 deletions tests/waku_archive/test_driver_queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import

# Helper functions

proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
## Use i to generate an IndexedWakuMessage
proc genIndexedWakuMessage(i: int8): (Index, WakuMessage) =
## Use i to generate an Index WakuMessage
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = i.byte

Expand All @@ -27,14 +27,14 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
pubsubTopic: "test-pubsub-topic"
)

IndexedWakuMessage(msg: message, index: cursor)
(cursor, message)

proc getPrepopulatedTestQueue(unsortedSet: auto, capacity: int): QueueDriver =
let driver = QueueDriver.new(capacity)

for i in unsortedSet:
let message = genIndexedWakuMessage(i.int8)
discard driver.add(message)
let (index, message) = genIndexedWakuMessage(i.int8)
discard driver.add(index, message)

driver

Expand All @@ -49,12 +49,12 @@ procSuite "Sorted driver queue":
## When
# Fill up the queue
for i in 1..capacity:
let message = genIndexedWakuMessage(i.int8)
require(driver.add(message).isOk())
let (index, message) = genIndexedWakuMessage(i.int8)
require(driver.add(index, message).isOk())

# Add one more. Capacity should not be exceeded
let message = genIndexedWakuMessage(capacity.int8 + 1)
require(driver.add(message).isOk())
let (index, message) = genIndexedWakuMessage(capacity.int8 + 1)
require(driver.add(index, message).isOk())

## Then
check:
Expand All @@ -68,14 +68,14 @@ procSuite "Sorted driver queue":
## When
# Fill up the queue
for i in 1..capacity:
let message = genIndexedWakuMessage(i.int8)
require(driver.add(message).isOk())
let (index, message) = genIndexedWakuMessage(i.int8)
require(driver.add(index, message).isOk())

# Attempt to add message with older value than oldest in queue should fail
let
oldestTimestamp = driver.first().get().index.senderTime
message = genIndexedWakuMessage(oldestTimestamp.int8 - 1)
addRes = driver.add(message)
oldestTimestamp = driver.first().get().senderTime
(index, message) = genIndexedWakuMessage(oldestTimestamp.int8 - 1)
addRes = driver.add(index, message)

## Then
check:
Expand All @@ -93,14 +93,14 @@ procSuite "Sorted driver queue":
let driver = getPrepopulatedTestQueue(unsortedSet, capacity)

# Walk forward through the set and verify ascending order
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
var (prevSmaller, _) = genIndexedWakuMessage(min(unsortedSet).int8 - 1)
for i in driver.fwdIterator:
let (index, _) = i
check cmp(index, prevSmaller) > 0
prevSmaller = index

# Walk backward through the set and verify descending order
var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index
var (prevLarger, _) = genIndexedWakuMessage(max(unsortedSet).int8 + 1)
for i in driver.bwdIterator:
let (index, _) = i
check cmp(index, prevLarger) < 0
Expand All @@ -122,7 +122,7 @@ procSuite "Sorted driver queue":

let first = firstRes.tryGet()
check:
first.msg.timestamp == Timestamp(1)
first.senderTime == Timestamp(1)

test "get first item from empty queue should fail":
## Given
Expand Down Expand Up @@ -153,7 +153,7 @@ procSuite "Sorted driver queue":

let last = lastRes.tryGet()
check:
last.msg.timestamp == Timestamp(5)
last.senderTime == Timestamp(5)

test "get last item from empty queue should fail":
## Given
Expand All @@ -176,8 +176,8 @@ procSuite "Sorted driver queue":
let driver = getPrepopulatedTestQueue(unsortedSet, capacity)

let
existingIndex = genIndexedWakuMessage(4).index
nonExistingIndex = genIndexedWakuMessage(99).index
(existingIndex, _) = genIndexedWakuMessage(4)
(nonExistingIndex, _) = genIndexedWakuMessage(99)

## Then
check:
Expand Down
23 changes: 12 additions & 11 deletions tests/waku_archive/test_driver_queue_pagination.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ proc getTestQueueDriver(numMessages: int): QueueDriver =
for x in data.mitems: x = 1

for i in 0..<numMessages:
let msg = IndexedWakuMessage(
msg: WakuMessage(payload: @[byte i], timestamp: Timestamp(i)),
index: Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data)
)

let msg = WakuMessage(payload: @[byte i], timestamp: Timestamp(i))

let index = Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data)
)
discard testQueueDriver.add(msg)

discard testQueueDriver.add(index, msg)

return testQueueDriver

Expand All @@ -37,7 +38,7 @@ procSuite "Queue driver - pagination":
let driver = getTestQueueDriver(10)
let
indexList: seq[Index] = toSeq(driver.fwdIterator()).mapIt(it[0])
msgList: seq[WakuMessage] = toSeq(driver.fwdIterator()).mapIt(it[1].msg)
msgList: seq[WakuMessage] = toSeq(driver.fwdIterator()).mapIt(it[1])

test "Forward pagination - normal pagination":
## Given
Expand Down Expand Up @@ -211,7 +212,7 @@ procSuite "Queue driver - pagination":
cursor: Option[Index] = none(Index)
forward = true

proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
proc onlyEvenTimes(index: Index, msg: WakuMessage): bool = msg.timestamp.int64 mod 2 == 0

## When
let page = driver.getPage(pageSize=pageSize, forward=forward, cursor=cursor, predicate=onlyEvenTimes)
Expand Down Expand Up @@ -392,7 +393,7 @@ procSuite "Queue driver - pagination":
cursor: Option[Index] = none(Index)
forward = false

proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
proc onlyOddTimes(index: Index, msg: WakuMessage): bool = msg.timestamp.int64 mod 2 != 0

## When
let page = driver.getPage(pageSize=pageSize, forward=forward, cursor=cursor, predicate=onlyOddTimes)
Expand Down
3 changes: 2 additions & 1 deletion tests/waku_archive/test_driver_queue_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message)
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)


Expand Down
8 changes: 5 additions & 3 deletions tests/waku_archive/test_driver_sqlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ suite "SQLite driver":
let driver = newSqliteArchiveDriver()

let msg = fakeWakuMessage(contentTopic=contentTopic)
let msgHash = computeMessageHash(DefaultPubsubTopic, msg)

## When
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msgHash, msg.timestamp)

## Then
check:
Expand All @@ -53,9 +54,10 @@ suite "SQLite driver":
check:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, _, _, hash) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic
pubsubTopic == DefaultPubsubTopic and
hash == msgHash

## Cleanup
(waitFor driver.close()).expect("driver to close")
2 changes: 1 addition & 1 deletion tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ suite "Waku Archive - Retention policy":
check:
storedMsg.len == capacity
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, _, _, _) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic

Expand Down
14 changes: 7 additions & 7 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -777,14 +777,14 @@ proc mountArchive*(node: WakuNode,
driver: ArchiveDriver,
retentionPolicy = none(RetentionPolicy)):
Result[void, string] =
node.wakuArchive = WakuArchive.new(
driver = driver,
retentionPolicy = retentionPolicy,
).valueOr:
return err("error in mountArchive: " & error)

let wakuArchiveRes = WakuArchive.new(driver,
retentionPolicy)
if wakuArchiveRes.isErr():
return err("error in mountArchive: " & wakuArchiveRes.error)
node.wakuArchive.start()

node.wakuArchive = wakuArchiveRes.get()
asyncSpawn node.wakuArchive.start()
return ok()

## Waku store
Expand Down Expand Up @@ -1194,7 +1194,7 @@ proc stop*(node: WakuNode) {.async.} =
error "exception stopping the node", error=getCurrentExceptionMsg()

if not node.wakuArchive.isNil():
await node.wakuArchive.stop()
await node.wakuArchive.stopWait()

node.started = false

Expand Down

0 comments on commit 505479b

Please sign in to comment.