Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add postgres basic support #1590

Closed
wants to merge 2 commits into from
Closed

Add postgres basic support #1590

wants to merge 2 commits into from

Conversation

cammellos
Copy link
Contributor

@cammellos cammellos commented Mar 6, 2023

I have added support for storing messages on postgres:

A few things left unresolved:

  1. Payload is stored as HEX in a VARCHAR field, we should use bytea, but not sure exactly what's the way to convert it yet.
    I can store it ok, but when I pull it out, it's converted as a string by getAllRows https://nim-lang.org/docs/db_postgres.html#getAllRows%2CDbConn%2CSqlQuery%2Cvarargs%5Bstring%2C%5D , and I want to convert the \x format back, I will look into it, but if you know of the top of your head that would save me some time ;)

  2. I have added a docker-compose file to start the container and run tests against it, probably we want to decide on when/how to run tests, and how to pass the container info to the tests (env variables I guess? but this is something that @jakubgs will also have an opinion)

@cammellos cammellos self-assigned this Mar 6, 2023
@@ -40,6 +40,7 @@ test: | test1 test2

v1: | wakunode1 example1 sim1
v2: | wakunode2 example2 wakubridge chat2 chat2bridge
v2light: | wakunode2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used for testing, will remove


test2: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim test2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims


mytests: | build deps librln
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used for testing, will remove

@@ -13,7 +13,7 @@ import
../../waku/v2/node/rest/debug/handlers as debug_api,
../../waku/v2/node/rest/relay/handlers as relay_api,
../../waku/v2/node/rest/relay/topic_cache,
./config
../../waku/v2/config
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved config to its own namespace so it can be reused

@@ -0,0 +1,11 @@
version: "3.8"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dockerfile to run tests against it, not sure how/when we want to run this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are asking about CI, we can do it like we do it with status-go:
https://github.com/status-im/status-go/blob/070ffd9f5c455e2e8de4ef5ac17b39bb6064a9ef/_assets/ci/Jenkinsfile.tests#L64-L76
But we don't need a Docker Compose for that.

@@ -0,0 +1,5 @@
## Waku v2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used for tests, will remove

"CREATE TABLE IF NOT EXISTS messages (" &
" pubsubTopic VARCHAR NOT NULL," &
" contentTopic VARCHAR NOT NULL," &
" payload VARCHAR," &
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a bytea, I will investigate a bit more on how to do that avoiding using varchar



method getOldestMessageTimestamp*(s: PostgresDriver): ArchiveDriverResult[Timestamp] =
return err("not implemented")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look whether it needs implementing, otherwise I can do in a separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are called by the message retention policy (e.g., time retention policy, max capacity retention policy, etc.)

The question is: how do we want to manage the message retention policy in the "shared backend" scenario?

I am not sure how to perform this without involving a 3rd party (e.g., Kubernetes cron job).

cc @jakubgs @jm-clius

method getOldestMessageTimestamp*(s: PostgresDriver): ArchiveDriverResult[Timestamp] =
return err("not implemented")

method getNewestMessageTimestamp*(s: PostgresDriver): ArchiveDriverResult[Timestamp] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look whether it needs implementing, otherwise I can do in a separate PR

return err("not implemented")


method deleteMessagesOlderThanTimestamp*(s: PostgresDriver, ts: Timestamp): ArchiveDriverResult[void] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look whether it needs implementing, otherwise I can do in a separate PR

method deleteMessagesOlderThanTimestamp*(s: PostgresDriver, ts: Timestamp): ArchiveDriverResult[void] =
return err("not implemented")

method deleteOldestMessagesNotWithinLimit*(s: PostgresDriver, limit: int): ArchiveDriverResult[void] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look whether it needs implementing, otherwise I can do in a separate PR

@cammellos cammellos force-pushed the feature/postgres branch 3 times, most recently from 14240b8 to a8c053e Compare March 6, 2023 13:35
@arnetheduck
Copy link
Contributor

arnetheduck commented Mar 6, 2023

interesting, this is for performance reasons?

@cammellos
Copy link
Contributor Author

interesting, this is for performance reasons?

I believe so, and I think the idea is also to be able to share a single instance between multiple nodes, but on the latter I am not so sure

@LNSD
Copy link
Contributor

LNSD commented Mar 8, 2023

interesting, this is for performance reasons?

Partially, yes.


Context and rationale

At this moment, in the message history durability engine (Waku Archive), there is a performance bottleneck:

We are using a single SQLite connection to perform all queries and insert messages to the DB. As SQLite is synchronous, this blocks the scheduler until the DB query returns the results. This limits the number of Waku Store requests we can handle per second.

Moreover, IIRC, we can only serve one concurrent request as the Waku Store query handling is blocking the async handler. This "blocking query" also steals CPU cycles of other protocols (e.g., Waku Relay, Waku Filter, etc.) since it blocks the nim-chronos' scheduler thread.

To avoid blocking the scheduler thread, we could build an SQLite connection pool, move all the processing into a thread pool and build input/output query queues with backpressure. But if we go with a standalone DB (e.g., PostgreSQL), we will get exactly that.

Additionally, there are operational requirements for the fleet nodes that @jakubgs asked for:

As the Status fleet nodes' retention policy is set to several days (IIRC, 15 days), we are holding in a single node millions of messages (around 2 million messages at this moment). This translates into storage requirements of many GBs per node.

The idea is to share a PostgreSQL backend amongst the nodes in the same cluster (e.g., one DB per AWS region). One can argue that we have a single central DB, but this would be impractical because the latency between datacenters would add to the query processing time, significantly impacting the performance perceived by the Waku Store client.

This approach has other benefits (and challenges):

Currently, there is no history synchronization mechanism between the different nodes. This leads to fleet nodes' message histories being inconsistent. Some nodes have more messages recorded than others.

Having a shared backend will simplify this situation. All nodes would read from and write to the same database. This brings some other challenges as message deduplication. The proposed solution is to use a Message Unique ID (see the MUID ADR here).

To sum up, all this development aims to improve the history query performance metrics (e.g., concurrent request handling, requests/sec, etc.) and reduce the operational requirements and costs for the "professional" waku node operators (like Status during the MVP).

{.push raises: [].}

import
std/db_postgres,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that here we use the standard library PostgreSQL driver std/db_postgres. This is an idiomatic wrapper library for the PostgreSQL C library libpq.

Regarding the request concurrency issue I described above, I think using this library "as is" (without notifying the nim-chronos scheduler) may present the same limitation as the SQLite driver: it will block the scheduler thread not allowing new Waku Store requests to be processed.

Maybe @arnetheduck or @dryajov have an idea of how we could achieve this integration with the nim-chronos async runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

achieve this integration with the nim-chronos async runtime.

porting https://github.com/treeform/pg to chronos is probably the way to go to achieve this - it actually looks pretty trivial to do, chances are it would work with minor modifications (to handle cancellation above all)

Comment on lines +32 to +42
proc createTableQuery(): string =
"CREATE TABLE IF NOT EXISTS messages (" &
" pubsubTopic VARCHAR NOT NULL," &
" contentTopic VARCHAR NOT NULL," &
" payload VARCHAR," &
" version INTEGER NOT NULL," &
" timestamp BIGINT NOT NULL," &
" id VARCHAR NOT NULL," &
" storedAt BIGINT NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
");"
Copy link
Contributor

@LNSD LNSD Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aim to deploy this in a cloud environment and the DB will be shared amongst nodes. To avoid inconsistencies between nodes and race conditions, the table and index creation (together with the DB migrations) should happen before the node bootup/upgrade. And it should be performed by the node operator (cc @jakubgs).

For testing purposes having it here may make sense, but it cannot be shipped as it is right now :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. It would at least require a flag to make this step optional.

std/sequtils,
std/parseutils,
std/strformat,
std/nre,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using nim-regex, a pure nim regular expression library, instead of the standard library to avoid compile time-linking issues.

@@ -156,36 +153,6 @@ proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): SetupResult[Optio
else:
return err("unknown retention policy")

proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): SetupResult[ArchiveDriver] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I understand the reasons behind these changes (e.g., sharing the initialization code), we cannot accept them as part of this PR. I see these changes as out of scope.

We aim to simplify the WakuNode type as much as possible. Moving this logic inside the waku_node module increases the code coupling affecting the code testability and maintainability.

If it is strictly necessary, I suggest you duplicate the code you need. But, please, do not remove (or move) already existing code :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what you mean, you prefer a dependency injection pattern, I can move things back as they were.

The reason I have moved the code, is because this namespace/file will know about postgres, which to me indicates that logic is not particularly well encapsulated, as this namespace is doing a lot of things already, and some of them are low-level concerns, like setting up sqlite/in-memory (and eventually postgres), driver, so I would suggest moving those decisions somewhere else.

Nothing in the apps/wakunode2 seems to be tested by the way (though I maybe wrong, apologies if that's the case, but as far as I can see it's not imported in tests/, and breaking any of the function does not seem make any of the tests fail), so I'd also recommend having at least a few tests making sure it works, as there's definitely some non trivial logic.

As a side note "please, do not remove (or move) already existing code", it's not something I can agree with, and that's not an acceptable statement for working on a codebase, happy to agree with you on what changes needs making and the overall strategy of course (ergo the draft PR), it's always a discussion, but it's just not how software development is done.

Copy link
Contributor

@LNSD LNSD Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note "please, do not remove (or move) already existing code", it's not something I can agree with, and that's not an acceptable statement for working on a codebase, happy to agree with you on what changes needs making and the overall strategy of course (ergo the draft PR), it's always a discussion, but it's just not how software development is done.

Sorry if it sounded bad. Very far from my intention 😅

I meant that these big and "unrelated changes" are touching common parts of the app and the codebase that are very git-conflict prone. This part has not been refactored and cleaned up yet. And this will affect our near-future work and productivity in the app.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool, I thought that might be the case :)
I'll update the PR according to this discussion, thanks for the feedback

@Ivansete-status
Copy link
Collaborator

I close this PR because its changes were adapted in the next one:
#1764

Thanks so much indeed @cammellos for this great contibution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants