Skip to content

Commit

Permalink
chore(rest): refactor message cache (#2221)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Nov 28, 2023
1 parent 9f4e6f4 commit bebaa59
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 314 deletions.
4 changes: 2 additions & 2 deletions apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ when isMainModule:

# Install enabled API handlers:
if conf.relay:
let cache = MessageCache[string].init(capacity=30)
let cache = MessageCache.init(capacity=30)
installRelayApiHandlers(node, rpcServer, cache)

if conf.filter:
let messageCache = filter_api.MessageCache.init(capacity=30)
let messageCache = MessageCache.init(capacity=30)
installFilterApiHandlers(node, rpcServer, messageCache)

if conf.store:
Expand Down
24 changes: 11 additions & 13 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -687,18 +687,17 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo

## Relay REST API
if conf.relay:
let cache = MessageCache[string].init(capacity=conf.restRelayCacheCapacity)
let cache = MessageCache.init(int(conf.restRelayCacheCapacity))

let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)

for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
cache.pubsubSubscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
cache.contentSubscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))

installRelayApiHandlers(server.router, app.node, cache)
else:
Expand All @@ -709,10 +708,10 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
app.node.wakuFilterClient != nil and
app.node.wakuFilterClientLegacy != nil:

let legacyFilterCache = rest_legacy_filter_api.MessageCache.init()
let legacyFilterCache = MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)

let filterCache = rest_filter_api.MessageCache.init()
let filterCache = MessageCache.init()

let filterDiscoHandler =
if app.wakuDiscv5.isSome():
Expand Down Expand Up @@ -765,23 +764,22 @@ proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNod
installDebugApiHandlers(app.node, server)

if conf.relay:
let cache = MessageCache[string].init(capacity=30)
let cache = MessageCache.init(capacity=50)

let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)

for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
cache.pubsubSubscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))
cache.contentSubscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))

installRelayApiHandlers(app.node, server, cache)

if conf.filternode != "":
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)
let filterMessageCache = MessageCache.init(capacity=50)
installFilterApiHandlers(app.node, server, filterMessageCache)

installStoreApiHandlers(app.node, server)
Expand Down
190 changes: 127 additions & 63 deletions tests/test_message_cache.nim
Original file line number Diff line number Diff line change
@@ -1,153 +1,217 @@
{.used.}

import
std/sets,
stew/[results, byteutils],
testutils/unittests,
chronicles
testutils/unittests
import
../../waku/waku_core,
../../waku/waku_api/message_cache,
./testlib/common,
./testlib/wakucore


type TestMessageCache = MessageCache[(PubsubTopic, ContentTopic)]

suite "MessageCache":
test "subscribe to topic":
setup:
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()
let capacity = 3
let testPubsubTopic = DefaultPubsubTopic
let testContentTopic = DefaultContentTopic
let cache = MessageCache.init(capacity)

test "subscribe to topic":
## When
cache.subscribe(testTopic)
cache.pubsubSubscribe(testPubsubTopic)
cache.pubsubSubscribe(testPubsubTopic)

# idempotence of subscribe is also tested
cache.contentSubscribe(testContentTopic)
cache.contentSubscribe(testContentTopic)

## Then
check:
cache.isSubscribed(testTopic)

cache.isPubsubSubscribed(testPubsubTopic)
cache.isContentSubscribed(testContentTopic)
cache.pubsubTopicCount() == 1
cache.contentTopicCount() == 1

test "unsubscribe from topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
cache.pubsubSubscribe(testPubsubTopic)
cache.contentSubscribe(testContentTopic)

cache.pubsubSubscribe("AnotherPubsubTopic")
cache.contentSubscribe("AnotherContentTopic")

## When
cache.unsubscribe(testTopic)
cache.pubsubUnsubscribe(testPubsubTopic)
cache.contentUnsubscribe(testContentTopic)

## Then
check:
not cache.isSubscribed(testTopic)

not cache.isPubsubSubscribed(testPubsubTopic)
not cache.isContentSubscribed(testContentTopic)
cache.pubsubTopicCount() == 1
cache.contentTopicCount() == 1

test "get messages of a subscribed topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
cache.addMessage(testTopic, testMessage)
cache.pubsubSubscribe(testPubsubTopic)
cache.addMessage(testPubsubTopic, testMessage)

## When
let res = cache.getMessages(testTopic)
let res = cache.getMessages(testPubsubTopic)

## Then
check:
res.isOk()
res.get() == @[testMessage]


test "get messages with clean flag shoud clear the messages cache":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
cache.addMessage(testTopic, testMessage)
cache.pubsubSubscribe(testPubsubTopic)
cache.addMessage(testPubsubTopic, testMessage)

## When
var res = cache.getMessages(testTopic, clear=true)
var res = cache.getMessages(testPubsubTopic, clear=true)
require(res.isOk())

res = cache.getMessages(testTopic)
res = cache.getMessages(testPubsubTopic)

## Then
check:
res.isOk()
res.get().len == 0


test "get messages of a non-subscribed topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

## When
let res = cache.getMessages(testTopic)
cache.pubsubSubscribe(PubsubTopic("dummyPubsub"))
let res = cache.getMessages(testPubsubTopic)

## Then
check:
res.isErr()
res.error() == "Not subscribed to topic"

res.error() == "not subscribed to this pubsub topic"

test "add messages to subscribed topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()

cache.subscribe(testTopic)
cache.pubsubSubscribe(testPubsubTopic)

## When
cache.addMessage(testTopic, testMessage)
cache.addMessage(testPubsubTopic, testMessage)

## Then
let messages = cache.getMessages(testTopic).tryGet()
let messages = cache.getMessages(testPubsubTopic).tryGet()
check:
messages == @[testMessage]


test "add messages to non-subscribed topic":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()

## When
cache.addMessage(testTopic, testMessage)
cache.addMessage(testPubsubTopic, testMessage)

## Then
let res = cache.getMessages(testTopic)
let res = cache.getMessages(testPubsubTopic)
check:
res.isErr()
res.error() == "Not subscribed to topic"

res.error() == "not subscribed to any pubsub topics"

test "add messages beyond the capacity":
## Given
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessages = @[
fakeWakuMessage(toBytes("MSG-1")),
fakeWakuMessage(toBytes("MSG-2")),
fakeWakuMessage(toBytes("MSG-3"))
]
var testMessages = @[fakeWakuMessage(toBytes("MSG-1"))]

# Prevent duplicate messages timestamp
for i in 0..<5:
var msg = fakeWakuMessage(toBytes("MSG-1"))

let cache = TestMessageCache.init(capacity = 2)
cache.subscribe(testTopic)
while msg.timestamp <= testMessages[i].timestamp:
msg = fakeWakuMessage(toBytes("MSG-1"))

testMessages.add(msg)

cache.pubsubSubscribe(testPubsubTopic)

## When
for msg in testMessages:
cache.addMessage(testTopic, msg)
cache.addMessage(testPubsubTopic, msg)

## Then
let messages = cache.getMessages(testTopic).tryGet()
let messages = cache.getMessages(testPubsubTopic).tryGet()
let messageSet = toHashSet(messages)

let testSet = toHashSet(testMessages)

check:
messageSet.len == capacity
messageSet < testSet
testMessages[0] notin messages

test "get messages on pubsub via content topics":
cache.pubsubSubscribe(testPubsubTopic)

let fakeMessage = fakeWakuMessage()

cache.addMessage(testPubsubTopic, fakeMessage)

let getRes = cache.getAutoMessages(DefaultContentTopic)

check:
getRes.isOk
getRes.get() == @[fakeMessage]

test "add same message twice":
cache.pubsubSubscribe(testPubsubTopic)

let fakeMessage = fakeWakuMessage()

cache.addMessage(testPubsubTopic, fakeMessage)
cache.addMessage(testPubsubTopic, fakeMessage)

check:
cache.messagesCount() == 1

test "unsubscribing remove messages":
let topic0 = "PubsubTopic0"
let topic1 = "PubsubTopic1"
let topic2 = "PubsubTopic2"

let fakeMessage0 = fakeWakuMessage(toBytes("MSG-0"))
let fakeMessage1 = fakeWakuMessage(toBytes("MSG-1"))
let fakeMessage2 = fakeWakuMessage(toBytes("MSG-2"))

cache.pubsubSubscribe(topic0)
cache.pubsubSubscribe(topic1)
cache.pubsubSubscribe(topic2)
cache.contentSubscribe("ContentTopic0")

cache.addMessage(topic0, fakeMessage0)
cache.addMessage(topic1, fakeMessage1)
cache.addMessage(topic2, fakeMessage2)

cache.pubsubUnsubscribe(topic0)

# at this point, fakeMessage0 is only ref by DefaultContentTopic

let res = cache.getAutoMessages(DefaultContentTopic)

check:
res.isOk()
res.get().len == 3
cache.isPubsubSubscribed(topic0) == false
cache.isPubsubSubscribed(topic1) == true
cache.isPubsubSubscribed(topic2) == true

cache.contentUnsubscribe(DefaultContentTopic)

# msg0 was delete because no refs

check:
messages == testMessages[1..2]
cache.messagesCount() == 2
10 changes: 2 additions & 8 deletions tests/wakunode_jsonrpc/test_jsonrpc_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
../../../waku/waku_core,
Expand All @@ -20,11 +18,6 @@ import
../testlib/wakucore,
../testlib/wakunode


proc newTestMessageCache(): filter_api.MessageCache =
filter_api.MessageCache.init(capacity=30)


procSuite "Waku v2 JSON-RPC API - Filter":
let
bindIp = ValidIpAddress.init("0.0.0.0")
Expand All @@ -49,7 +42,8 @@ procSuite "Waku v2 JSON-RPC API - Filter":
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])

installFilterApiHandlers(node2, server, newTestMessageCache())
let cache = MessageCache.init(capacity=30)
installFilterApiHandlers(node2, server, cache)
server.start()

let client = newRpcHttpClient()
Expand Down

0 comments on commit bebaa59

Please sign in to comment.