Skip to content

Commit

Permalink
chore: Better postgres duplicate insert (#2535)
Browse files Browse the repository at this point in the history
* postgres_driver: add ON CONFLICT DO NOTHING in the insert statement
* test_driver_postgres: adapt test to ON CONFLICT DO NOTHING
  The insert does not fail when duplicate, it returns a positive response
  when doing 'put' of a duplicated row. The test is adapted so that
  we just check that the number of messages doesn't grow after
  trying to add a duplicated row.
  • Loading branch information
Ivansete-status committed Mar 18, 2024
1 parent 3ba4378 commit 693a177
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
18 changes: 17 additions & 1 deletion tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ suite "Postgres driver":
let msg1 = fakeWakuMessage(ts = now)
let msg2 = fakeWakuMessage(ts = now)

let initialNumMsgs = (await driver.getMessagesCount()).valueOr:
raiseAssert "could not get num mgs correctly: " & $error

var putRes = await driver.put(
DefaultPubsubTopic,
msg1,
Expand All @@ -173,11 +176,24 @@ suite "Postgres driver":
)
assert putRes.isOk(), putRes.error

var newNumMsgs = (await driver.getMessagesCount()).valueOr:
raiseAssert "could not get num mgs correctly: " & $error

assert newNumMsgs == (initialNumMsgs + 1.int64),
"wrong number of messages: " & $newNumMsgs

putRes = await driver.put(
DefaultPubsubTopic,
msg2,
computeDigest(msg2),
computeMessageHash(DefaultPubsubTopic, msg2),
msg2.timestamp,
)
assert not putRes.isOk()

assert putRes.isOk()

newNumMsgs = (await driver.getMessagesCount()).valueOr:
raiseAssert "could not get num mgs correctly: " & $error

assert newNumMsgs == (initialNumMsgs + 1.int64),
"wrong number of messages: " & $newNumMsgs
9 changes: 1 addition & 8 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,7 @@ proc handleMessage*(

(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
# Prevent spamming the logs when multiple nodes are connected to the same database.
# In that case, the message cannot be inserted but is an expected "insert error"
# and therefore we reduce its visibility by having the log in trace level.
if "duplicate key value violates unique constraint" in error:
trace "failed to insert message", err = error
else:
debug "failed to insert message", err = error

debug "failed to insert message", err = error
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)

Expand Down
3 changes: 2 additions & 1 deletion waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type PostgresDriver* = ref object of ArchiveDriver
const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition = # TODO: get the sql queries from a file
"""INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic,
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8);"""
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING;"""

const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
const SelectNoCursorAscStmtDef =
Expand Down Expand Up @@ -679,6 +679,7 @@ proc loopPartitionFactory(
debug "creating a new partition for the future"
## The current used partition is the last one that was created.
## Thus, let's create another partition for the future.

(
await self.addPartition(
newestPartition.getLastMoment(), PartitionsRangeInterval
Expand Down

0 comments on commit 693a177

Please sign in to comment.