Skip to content

Commit

Permalink
feat: update various protocols to autoshard (#1857)
Browse files Browse the repository at this point in the history
* feat: update FILTER & LIGHTPUSH to autoshard
  • Loading branch information
SionoiS committed Aug 17, 2023
1 parent 8c568ca commit cf30139
Show file tree
Hide file tree
Showing 18 changed files with 233 additions and 198 deletions.
8 changes: 4 additions & 4 deletions apps/chat2/chat2.nim
Expand Up @@ -212,7 +212,7 @@ proc publish(c: Chat, line: string) =

if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message)
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
else:
asyncSpawn c.node.publish(DefaultPubsubTopic, message)

Expand Down Expand Up @@ -267,7 +267,7 @@ proc writeAndPrint(c: Chat) {.async.} =
if not c.node.wakuFilter.isNil():
echo "unsubscribing from content filters..."

await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic)
await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic)

echo "quitting..."

Expand Down Expand Up @@ -473,7 +473,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)

await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler)
await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler)

else:
error "Filter not mounted. Couldn't parse conf.filternode",
Expand All @@ -488,7 +488,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
chat.printReceivedMessage(msg)

let topic = DefaultPubsubTopic
node.subscribe(topic, handler)
await node.subscribe(some(topic), @[ContentTopic("")], handler)

when defined(rln):
if conf.rlnRelay:
Expand Down
4 changes: 2 additions & 2 deletions apps/wakunode2/app.nim
Expand Up @@ -135,7 +135,7 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
error "failed to parse content topic", error=res.error
quit(QuitFailure)

let shardsRes = contentTopicsRes.mapIt(singleHighestWeigthShard(it.get()))
let shardsRes = contentTopicsRes.mapIt(getShard(it.get()))

for res in shardsRes:
if res.isErr():
Expand Down Expand Up @@ -363,7 +363,7 @@ proc setupProtocols(node: WakuNode,
# TODO autoshard content topics only once.
# Already checked for errors in app.init
let contentTopics = conf.contentTopics.mapIt(NsContentTopic.parse(it).expect("Parsing"))
let shards = contentTopics.mapIt($(singleHighestWeigthShard(it).expect("Sharding")))
let shards = contentTopics.mapIt($(getShard(it).expect("Sharding")))

let pubsubTopics = conf.topics & conf.pubsubTopics & shards
try:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_waku_lightpush.nim
Expand Up @@ -115,4 +115,4 @@ suite "Waku Lightpush":
requestError == error

## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
await allFutures(clientSwitch.stop(), serverSwitch.stop())
3 changes: 2 additions & 1 deletion tests/test_wakunode_filter.nim
@@ -1,6 +1,7 @@
{.used.}

import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
Expand Down Expand Up @@ -43,7 +44,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg))

## When
await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo)
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)

# Wait for subscription to take effect
waitFor sleepAsync(100.millis)
Expand Down
3 changes: 2 additions & 1 deletion tests/test_wakunode_lightpush.nim
@@ -1,6 +1,7 @@
{.used.}

import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
Expand Down Expand Up @@ -54,7 +55,7 @@ suite "WakuNode - Lightpush":
await sleepAsync(100.millis)

## When
await lightNode.lightpushPublish(DefaultPubsubTopic, message)
await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)

## Then
check await completionFutRelay.withTimeout(5.seconds)
Expand Down
24 changes: 3 additions & 21 deletions tests/waku_core/test_namespaced_topics.nim
Expand Up @@ -13,7 +13,6 @@ suite "Waku Message - Content topics namespacing":
## Given
var ns = NsContentTopic()
ns.generation = none(int)
ns.bias = Unbiased
ns.application = "toychat"
ns.version = "2"
ns.name = "huilong"
Expand All @@ -39,15 +38,14 @@ suite "Waku Message - Content topics namespacing":
let ns = nsRes.get()
check:
ns.generation == none(int)
ns.bias == Unbiased
ns.application == "toychat"
ns.version == "2"
ns.name == "huilong"
ns.encoding == "proto"

test "Parse content topic string - Valid string with sharding":
## Given
let topic = "/0/lower20/toychat/2/huilong/proto"
let topic = "/0/toychat/2/huilong/proto"

## When
let nsRes = NsContentTopic.parse(topic)
Expand All @@ -58,7 +56,6 @@ suite "Waku Message - Content topics namespacing":
let ns = nsRes.get()
check:
ns.generation == some(0)
ns.bias == Lower20
ns.application == "toychat"
ns.version == "2"
ns.name == "huilong"
Expand Down Expand Up @@ -122,11 +119,11 @@ suite "Waku Message - Content topics namespacing":
let err = ns.tryError()
check:
err.kind == ParsingErrorKind.InvalidFormat
err.cause == "invalid topic structure"
err.cause == "generation should be a numeric value"

test "Parse content topic string - Invalid string: non numeric generation":
## Given
let topic = "/first/unbiased/toychat/2/huilong/proto"
let topic = "/first/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic)
Expand All @@ -139,21 +136,6 @@ suite "Waku Message - Content topics namespacing":
err.kind == ParsingErrorKind.InvalidFormat
err.cause == "generation should be a numeric value"

test "Parse content topic string - Invalid string: invalid bias":
## Given
let topic = "/0/no/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic)

## Then
assert ns.isErr(), $ns.get()

let err = ns.tryError()
check:
err.kind == ParsingErrorKind.InvalidFormat
err.cause == "bias should be one of; unbiased, lower20 or higher80"

suite "Waku Message - Pub-sub topics namespacing":

test "Stringify named sharding pub-sub topic":
Expand Down
75 changes: 21 additions & 54 deletions tests/waku_core/test_sharding.nim
Expand Up @@ -4,7 +4,6 @@ import
std/options,
std/strutils,
std/sugar,
std/algorithm,
std/random,
stew/results,
testutils/unittests
Expand Down Expand Up @@ -34,88 +33,60 @@ suite "Waku Sharding":

let enc = "cbor"

NsContentTopic.init(none(int), Unbiased, app, version, name, enc)
NsContentTopic.init(none(int), app, version, name, enc)

test "Implicit content topic generation":
## Given
let topic = "/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic).expect("Parsing")

let paramRes = shardCount(ns)
let parseRes = NsContentTopic.parse(topic)

## Then
assert paramRes.isOk(), paramRes.error
assert parseRes.isOk(), $parseRes.error

let count = paramRes.get()
let nsTopic = parseRes.get()
check:
count == GenerationZeroShardsCount
ns.bias == Unbiased
nsTopic.generation == none(int)

test "Valid content topic":
## Given
let topic = "/0/lower20/toychat/2/huilong/proto"
let topic = "/0/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic).expect("Parsing")

let paramRes = shardCount(ns)
let parseRes = NsContentTopic.parse(topic)

## Then
assert paramRes.isOk(), paramRes.error
assert parseRes.isOk(), $parseRes.error

let count = paramRes.get()
let nsTopic = parseRes.get()
check:
count == GenerationZeroShardsCount
ns.bias == Lower20
nsTopic.generation.get() == 0

test "Invalid content topic generation":
## Given
let topic = "/1/unbiased/toychat/2/huilong/proto"
let topic = "/1/toychat/2/huilong/proto"

## When
let ns = NsContentTopic.parse(topic).expect("Parsing")

let paramRes = shardCount(ns)
let shardRes = getShard(ns)

## Then
assert paramRes.isErr(), $paramRes.get()
assert shardRes.isErr(), $shardRes.get()

let err = paramRes.error
let err = shardRes.error
check:
err == "Generation > 0 are not supported yet"

test "Weigths bias":
## Given
let count = 5

## When
let anonWeigths = biasedWeights(count, ShardingBias.Lower20)
let speedWeigths = biasedWeights(count, ShardingBias.Higher80)

## Then
check:
anonWeigths[0] == 2.0
anonWeigths[1] == 1.0
anonWeigths[2] == 1.0
anonWeigths[3] == 1.0
anonWeigths[4] == 1.0

speedWeigths[0] == 1.0
speedWeigths[1] == 2.0
speedWeigths[2] == 2.0
speedWeigths[3] == 2.0
speedWeigths[4] == 2.0

test "Sorted shard list":
#[ test "Sorted shard list":
## Given
let topic = "/0/unbiased/toychat/2/huilong/proto"
let topic = "/0/toychat/2/huilong/proto"
## When
let contentTopic = NsContentTopic.parse(topic).expect("Parsing")
let count = shardCount(contentTopic).expect("Valid parameters")
let weights = biasedWeights(count, contentTopic.bias)
let weights = repeat(1.0, count)
let shardsRes = weightedShardList(contentTopic, count, weights)
Expand All @@ -125,7 +96,7 @@ suite "Waku Sharding":
let shards = shardsRes.get()
check:
shards.len == count
isSorted(shards, hashOrder)
isSorted(shards, hashOrder) ]#

test "Shard Choice Reproducibility":
## Given
Expand All @@ -134,15 +105,11 @@ suite "Waku Sharding":
## When
let contentTopic = NsContentTopic.parse(topic).expect("Parsing")

let res = singleHighestWeigthShard(contentTopic)
let pubsub = getGenZeroShard(contentTopic, GenerationZeroShardsCount)

## Then
assert res.isOk(), res.error

let pubsubTopic = res.get()

check:
pubsubTopic == NsPubsubTopic.staticSharding(ClusterIndex, 3)
pubsub == NsPubsubTopic.staticSharding(ClusterIndex, 3)

test "Shard Choice Simulation":
## Given
Expand All @@ -154,7 +121,7 @@ suite "Waku Sharding":

## When
for topic in topics:
let pubsub = singleHighestWeigthShard(topic).expect("Valid Topic")
let pubsub = getShard(topic).expect("Valid Topic")
counts[pubsub.shard] += 1

## Then
Expand Down
1 change: 0 additions & 1 deletion tests/waku_filter_v2/test_waku_filter.nim
Expand Up @@ -10,7 +10,6 @@ import
../../../waku/node/peer_manager,
../../../waku/waku_filter_v2,
../../../waku/waku_filter_v2/client,
../../../waku/waku_filter_v2/rpc,
../../../waku/waku_core,
../testlib/common,
../testlib/wakucore
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_store/test_waku_store.nim
Expand Up @@ -115,4 +115,4 @@ suite "Waku Store - query handler":
error.kind == HistoryErrorKind.BAD_REQUEST

## Cleanup
await allFutures(serverSwitch.stop(), clientSwitch.stop())
await allFutures(serverSwitch.stop(), clientSwitch.stop())
2 changes: 1 addition & 1 deletion tests/waku_store/test_wakunode_store.nim
Expand Up @@ -232,7 +232,7 @@ procSuite "WakuNode - Store":
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
filterFut.complete((pubsubTopic, msg))

waitFor server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer)
waitFor server.filterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer)

waitFor sleepAsync(100.millis)

Expand Down
6 changes: 3 additions & 3 deletions tests/wakunode_jsonrpc/test_jsonrpc_filter.nim
Expand Up @@ -61,9 +61,9 @@ procSuite "Waku v2 JSON-RPC API - Filter":

let contentFilters = @[
ContentFilter(contentTopic: DefaultContentTopic),
ContentFilter(contentTopic: ContentTopic("2")),
ContentFilter(contentTopic: ContentTopic("3")),
ContentFilter(contentTopic: ContentTopic("4")),
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content2/proto")),
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content3/proto")),
ContentFilter(contentTopic: ContentTopic("/waku/2/default-content4/proto")),
]
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
Expand Down
6 changes: 3 additions & 3 deletions tests/wakunode_rest/test_rest_filter.nim
Expand Up @@ -90,7 +90,7 @@ suite "Waku v2 Rest API - Filter":
]

let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)

# Then
Expand All @@ -106,7 +106,7 @@ suite "Waku v2 Rest API - Filter":
restFilterTest.messageCache.isSubscribed("4")

# When - error case
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "")
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: none(string))
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)

check:
Expand Down Expand Up @@ -137,7 +137,7 @@ suite "Waku v2 Rest API - Filter":

# When
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)

# Then
Expand Down

0 comments on commit cf30139

Please sign in to comment.