Skip to content

Commit

Permalink
chore: move SubscriptionManager under waku_core (#2025)
Browse files Browse the repository at this point in the history
* chore: cherry-pick from Filter V2 RestApi PR: move FilterPushHandler and SubscriptionManager from Filter V1 to under waku_core
  • Loading branch information
NagyZoltanPeter committed Sep 12, 2023
1 parent ebe715e commit 563b2b2
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 45 deletions.
6 changes: 4 additions & 2 deletions waku/waku_core.nim
Expand Up @@ -2,10 +2,12 @@ import
./waku_core/topics,
./waku_core/time,
./waku_core/message,
./waku_core/peers
./waku_core/peers,
./waku_core/subscription

export
topics,
time,
message,
peers
peers,
subscription
7 changes: 7 additions & 0 deletions waku/waku_core/subscription.nim
@@ -0,0 +1,7 @@
import
./subscription/subscription_manager,
./subscription/push_handler

export
subscription_manager,
push_handler
13 changes: 13 additions & 0 deletions waku/waku_core/subscription/push_handler.nim
@@ -0,0 +1,13 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
chronos

import
../topics,
../message

type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.}
48 changes: 48 additions & 0 deletions waku/waku_core/subscription/subscription_manager.nim
@@ -0,0 +1,48 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
std/tables,
stew/results,
chronicles,
chronos

import
./push_handler,
../topics,
../message

## Subscription manager
type SubscriptionManager* = object
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]

proc init*(T: type SubscriptionManager): T =
SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]())

proc clear*(m: var SubscriptionManager) =
m.subscriptions.clear()

proc registerSubscription*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) =
try:
# TODO: Handle over subscription surprises
m.subscriptions[(pubsubTopic, contentTopic)]= handler
except CatchableError:
error "failed to register filter subscription", error=getCurrentExceptionMsg()

proc removeSubscription*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) =
m.subscriptions.del((pubsubTopic, contentTopic))

proc notifySubscriptionHandler*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) =
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
return

try:
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
asyncSpawn handler(pubsubTopic, message)
except CatchableError:
discard

proc getSubscriptionsCount*(m: SubscriptionManager): int =
m.subscriptions.len()
43 changes: 0 additions & 43 deletions waku/waku_filter/client.nim
Expand Up @@ -27,50 +27,7 @@ logScope:

const Defaultstring = "/waku/2/default-waku/proto"


### Client, filter subscripton manager

type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.}


## Subscription manager

type SubscriptionManager = object
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]

proc init(T: type SubscriptionManager): T =
SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]())

proc clear(m: var SubscriptionManager) =
m.subscriptions.clear()

proc registerSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) =
try:
m.subscriptions[(pubsubTopic, contentTopic)]= handler
except: # TODO: Fix "BareExcept" warning
error "failed to register filter subscription", error=getCurrentExceptionMsg()

proc removeSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) =
m.subscriptions.del((pubsubTopic, contentTopic))

proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) =
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
return

try:
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
asyncSpawn handler(pubsubTopic, message)
except: # TODO: Fix "BareExcept" warning
discard

proc getSubscriptionsCount(m: SubscriptionManager): int =
m.subscriptions.len()


## Client

type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}

type WakuFilterClientLegacy* = ref object of LPProtocol
rng: ref rand.HmacDrbgContext
peerManager: PeerManager
Expand Down

0 comments on commit 563b2b2

Please sign in to comment.