Skip to content

Commit

Permalink
feat(postgres): integration of postgres in wakunode2 (#1808)
Browse files Browse the repository at this point in the history
* Making the wakunode2 to support postgres driver

* driver/builder.nim: controling possible errors when creating the messages table

* postgres_driver.nim: adding protection in getInt and fixing typo
  • Loading branch information
Ivansete-status committed Jun 28, 2023
1 parent cf46fb7 commit 88b7481
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
27 changes: 24 additions & 3 deletions waku/v2/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ else:

import
stew/results,
chronicles
chronicles,
chronos
import
../driver,
../../../common/databases/dburl,
../../../common/databases/db_sqlite,
./sqlite_driver,
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
./queue_driver
./queue_driver,
./postgres_driver

export
sqlite_driver,
queue_driver
queue_driver,
postgres_driver

proc new*(T: type ArchiveDriver,
url: string,
Expand Down Expand Up @@ -69,6 +72,24 @@ proc new*(T: type ArchiveDriver,

return ok(res.get())

of "postgres":
const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?)
let res = PostgresDriver.new(url, MaxNumConns)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

let driver = res.get()

try:
# The table should exist beforehand.
let newTableRes = waitFor driver.createMessageTable()
if newTableRes.isErr():
return err("error creating table: " & newTableRes.error)
except CatchableError:
return err("exception creating table: " & getCurrentExceptionMsg())

return ok(driver)

else:
debug "setting up in-memory waku archive driver"
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
Expand Down
11 changes: 6 additions & 5 deletions waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ proc new*(T: type PostgresDriver,

return ok(PostgresDriver(connPool: connPoolRes.get()))

proc createMessageTable(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
proc createMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(createTableQuery())
if execRes.isErr():
Expand Down Expand Up @@ -238,9 +238,10 @@ proc getInt(s: PostgresDriver,
if fields.len != 1:
return err("failed in getRow: Expected one field but got " & $fields.len)

var retInt: int64
var retInt = 0'i64
try:
retInt = parseInt(fields[0])
if fields[0] != "":
retInt = parseInt(fields[0])
except ValueError:
return err("exception in getRow, parseInt: " & getCurrentExceptionMsg())

Expand Down Expand Up @@ -269,7 +270,7 @@ method getNewestMessageTimestamp*(s: PostgresDriver):

let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages")
if intRes.isErr():
return err("error in getOldestMessageTimestamp: " & intRes.error)
return err("error in getNewestMessageTimestamp: " & intRes.error)

return ok(Timestamp(intRes.get()))

Expand Down

0 comments on commit 88b7481

Please sign in to comment.