Skip to content

Commit

Permalink
feat: Rest API interface for legacy (v1) filter service. (#1851)
Browse files Browse the repository at this point in the history
* Added Rest API interface for legacy (v1) filter service with tests.
  • Loading branch information
NagyZoltanPeter committed Aug 4, 2023
1 parent 831a093 commit 08ff667
Show file tree
Hide file tree
Showing 7 changed files with 581 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -38,6 +38,9 @@ node_modules/
# Ignore Jetbrains IDE files
.idea/

# ignore vscode files
.vscode/

# RLN / keystore
rlnKeystore.json
*.tar.gz
Expand Down
6 changes: 6 additions & 0 deletions apps/wakunode2/app.nim
Expand Up @@ -45,6 +45,7 @@ import
../../waku/v2/node/rest/debug/handlers as rest_debug_api,
../../waku/v2/node/rest/relay/handlers as rest_relay_api,
../../waku/v2/node/rest/relay/topic_cache,
../../waku/v2/node/rest/filter/handlers as rest_filter_api,
../../waku/v2/node/rest/store/handlers as rest_store_api,
../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api,
Expand Down Expand Up @@ -566,6 +567,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, app.node, relayCache)

## Filter REST API
if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity)
installFilterApiHandlers(server.router, app.node, filterCache)

## Store REST API
installStoreApiHandlers(server.router, app.node)

Expand Down
191 changes: 191 additions & 0 deletions tests/v2/wakunode_rest/test_rest_filter.nim
@@ -0,0 +1,191 @@
{.used.}

import
std/sequtils,
stew/byteutils,
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/crypto/crypto
import
../../waku/v2/node/message_cache,
../../waku/common/base64,
../../waku/v2/waku_core,
../../waku/v2/waku_node,
../../waku/v2/node/peer_manager,
../../waku/v2/waku_filter,
../../waku/v2/node/rest/server,
../../waku/v2/node/rest/client,
../../waku/v2/node/rest/responses,
../../waku/v2/node/rest/filter/types,
../../waku/v2/node/rest/filter/handlers as filter_api,
../../waku/v2/node/rest/filter/client as filter_api_client,
../../waku/v2/waku_relay,
../testlib/wakucore,
../testlib/wakunode


proc testWakuNode(): WakuNode =
let
privkey = generateSecp256k1Key()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(0)

return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))


type RestFilterTest = object
node1: WakuNode
node2: WakuNode
restServer: RestServerRef
messageCache: filter_api.MessageCache
client: RestClientRef


proc setupRestFilter(): Future[RestFilterTest] {.async.} =
result.node1 = testWakuNode()
result.node2 = testWakuNode()

await allFutures(result.node1.start(), result.node2.start())

await result.node1.mountFilter()
await result.node2.mountFilterClient()

result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)

let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
result.restServer = RestServerRef.init(restAddress, restPort).tryGet()

result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity)

installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache)

result.restServer.start()

result.client = newRestHttpClient(initTAddress(restAddress, restPort))

return result


proc shutdown(self: RestFilterTest) {.async.} =
await self.restServer.stop()
await self.restServer.closeWait()
await allFutures(self.node1.stop(), self.node2.stop())


suite "Waku v2 Rest API - Filter":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
# Given
let restFilterTest: RestFilterTest = await setupRestFilter()

# When
let contentFilters = @[DefaultContentTopic
,ContentTopic("2")
,ContentTopic("3")
,ContentTopic("4")
]

let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"

check:
restFilterTest.messageCache.isSubscribed(DefaultContentTopic)
restFilterTest.messageCache.isSubscribed("2")
restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")

# When - error case
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "")
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)

check:
badResponse.status == 400
$badResponse.contentType == $MIMETYPE_TEXT
badResponse.data == "Invalid content body, could not decode. Unable to deserialize data"


await restFilterTest.shutdown()


asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
# Given
let
restFilterTest: RestFilterTest = await setupRestFilter()

# When
restFilterTest.messageCache.subscribe("1")
restFilterTest.messageCache.subscribe("2")
restFilterTest.messageCache.subscribe("3")
restFilterTest.messageCache.subscribe("4")

let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
# ,ContentTopic("4") # Keep this subscription for check
]

# When
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"

check:
not restFilterTest.messageCache.isSubscribed("1")
not restFilterTest.messageCache.isSubscribed("2")
not restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")

await restFilterTest.shutdown()


asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given

let
restFilterTest = await setupRestFilter()

let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )

let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]

restFilterTest.messageCache.subscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(contentTopic, msg)

# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: FilterWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)

await restFilterTest.shutdown()
68 changes: 68 additions & 0 deletions waku/v2/node/rest/filter/client.nim
@@ -0,0 +1,68 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
std/sets,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../../waku_core,
../serdes,
../responses,
./types

export types

logScope:
topics = "waku node rest client"

proc encodeBytes*(value: FilterSubscriptionsRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")

let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)

proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")

var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)

# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.}

# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.}

proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported response contentType value", contentType = contentType
return err("Unsupported response contentType")

let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
return ok(decoded)

# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterGetMessagesV1*(contentTopic: string):
RestResponse[FilterGetMessagesResponse]
{.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.}

0 comments on commit 08ff667

Please sign in to comment.