Skip to content

Commit

Permalink
feat: Postgres partition implementation (#2506)
Browse files Browse the repository at this point in the history
* postgres: first step to implement partition management
* postgres_driver: use of times.now().toTime().toUnix() instead of Moment.now()
* postgres migrations: set new version to 2
* test_driver_postgres: use of assert instead of require and avoid using times.now()
* postgres_driver: better implementation of the reset method with partitions
* Remove createMessageTable, init, and deleteMessageTable procs
* postgres: ensure we use the version 15.4 in tests
* postgres_driver.nim: enhance debug logs partition addition
* ci.yml: ensure logs are printed without colors
* postgres_driver: starting the loop factory in an asynchronous task
* postgres_driver: log partition name and size when removing a partition
  • Loading branch information
Ivansete-status committed Mar 6, 2024
1 parent beba14d commit 161a10e
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 148 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ concurrency:
env:
NPROC: 2
MAKEFLAGS: "-j${NPROC}"
NIMFLAGS: "--parallelBuild:${NPROC}"
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none"

jobs:
changes: # changes detection
Expand Down Expand Up @@ -115,7 +115,7 @@ jobs:
fi
if [ ${{ runner.os }} == "Linux" ]; then
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18
fi
make V=1 LOG_LEVEL=DEBUG QUICK_AND_DIRTY_COMPILER=1 POSTGRES=1 test testwakunode2
Expand Down
2 changes: 1 addition & 1 deletion tests/postgres-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.8"

services:
db:
image: postgres:9.6-alpine
image: postgres:15.4-alpine3.18
restart: always
environment:
POSTGRES_PASSWORD: test123
Expand Down
52 changes: 24 additions & 28 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[sequtils,times,options],
std/[sequtils,options],
testutils/unittests,
chronos
import
Expand All @@ -13,8 +13,6 @@ import
../testlib/testasync,
../testlib/postgres

proc now():int64 = getTime().toUnix()

proc computeTestCursor(pubsubTopic: PubsubTopic,
message: WakuMessage):
ArchiveCursor =
Expand Down Expand Up @@ -56,7 +54,7 @@ suite "Postgres driver":
# Actually, the diff randomly goes between 1 and 2 seconds.
# although in theory it should spend 1s because we establish 100
# connections and we spawn 100 tasks that spend ~1s each.
require diff < 20
assert diff < 20_000_000_000

asyncTest "Insert a message":
const contentTopic = "test-content-topic"
Expand All @@ -69,14 +67,14 @@ suite "Postgres driver":
assert putRes.isOk(), putRes.error

let storedMsg = (await driver.getAllMessages()).tryGet()
require:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, actualMsg, digest, storeTimestamp) = item
actualMsg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic and
toHex(computedDigest.data) == toHex(digest) and
toHex(actualMsg.payload) == toHex(msg.payload)

assert storedMsg.len == 1

let (pubsubTopic, actualMsg, digest, storeTimestamp) = storedMsg[0]
assert actualMsg.contentTopic == contentTopic
assert pubsubTopic == DefaultPubsubTopic
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)

asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
Expand All @@ -96,31 +94,30 @@ suite "Postgres driver":

let countMessagesRes = await driver.getMessagesCount()

require countMessagesRes.isOk() and countMessagesRes.get() == 2
assert countMessagesRes.isOk(), $countMessagesRes.error
assert countMessagesRes.get() == 2

var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1])

require messagesRes.isOk()
require messagesRes.get().len == 1
assert messagesRes.isOk(), $messagesRes.error
assert messagesRes.get().len == 1

# Get both content topics, check ordering
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2])
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 2 and
messagesRes.get()[0][1].contentTopic == contentTopic1
assert messagesRes.get().len == 2
assert messagesRes.get()[0][1].contentTopic == contentTopic1

# Descending order
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
ascendingOrder = false)
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 2 and
messagesRes.get()[0][1].contentTopic == contentTopic2
assert messagesRes.get().len == 2
assert messagesRes.get()[0][1].contentTopic == contentTopic2

# cursor
# Get both content topics
Expand All @@ -130,25 +127,24 @@ suite "Postgres driver":
cursor = some(
computeTestCursor(pubsubTopic1,
messagesRes.get()[1][1])))
require messagesRes.isOk()
require messagesRes.get().len == 1
assert messagesRes.isOk()
assert messagesRes.get().len == 1

# Get both content topics but one pubsub topic
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
pubsubTopic = some(pubsubTopic1))
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 1 and
messagesRes.get()[0][1].contentTopic == contentTopic1
assert messagesRes.get().len == 1
assert messagesRes.get()[0][1].contentTopic == contentTopic1

# Limit
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
maxPageSize = 1)
assert messagesRes.isOk(), messagesRes.error
require messagesRes.get().len == 1
assert messagesRes.get().len == 1

asyncTest "Insert true duplicated messages":
# Validates that two completely equal messages can not be stored.
Expand All @@ -164,5 +160,5 @@ suite "Postgres driver":

putRes = await driver.put(DefaultPubsubTopic,
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
require not putRes.isOk()
assert not putRes.isOk()

3 changes: 2 additions & 1 deletion waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver,
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method decreaseDatabaseSize*(driver: ArchiveDriver,
targetSizeInBytes: int64):
targetSizeInBytes: int64,
forceRemoval: bool = false):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method close*(driver: ArchiveDriver):
Expand Down
13 changes: 13 additions & 0 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ proc new*(T: type ArchiveDriver,
if migrateRes.isErr():
return err("ArchiveDriver build failed in migration: " & $migrateRes.error)

## This should be started once we make sure the 'messages' table exists
## Hence, this should be run after the migration is completed.
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)

info "waiting for a partition to be created"
for i in 0..<100:
if driver.containsAnyPartition():
break
await sleepAsync(chronos.milliseconds(100))

if not driver.containsAnyPartition():
onFatalErrorAction("a partition could not be created")

return ok(driver)

else:
Expand Down
11 changes: 9 additions & 2 deletions waku/waku_archive/driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import ./postgres_driver/postgres_driver
import
./postgres_driver/postgres_driver,
./postgres_driver/partitions_manager,
./postgres_driver/postgres_healthcheck

export
postgres_driver,
partitions_manager,
postgres_healthcheck

export postgres_driver
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import
logScope:
topics = "waku archive migration"

const SchemaVersion* = 1 # increase this when there is an update in the database schema
const SchemaVersion* = 2 # increase this when there is an update in the database schema

proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
Expand Down
105 changes: 105 additions & 0 deletions waku/waku_archive/driver/postgres_driver/partitions_manager.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@

## This module is aimed to handle the creation and truncation of partition tables
## in order to limit the space occupied in disk by the database.
##
## The created partitions are referenced by the 'storedAt' field.
##

import
std/deques
import
chronos,
chronicles

logScope:
topics = "waku archive partitions_manager"

## The time range has seconds resolution
type TimeRange* = tuple[beginning: int64, `end`: int64]

type
Partition = object
name: string
timeRange: TimeRange

PartitionManager* = ref object
partitions: Deque[Partition] # FIFO of partition table names. The first is the oldest partition

proc new*(T: type PartitionManager): T =
return PartitionManager()

proc getPartitionFromDateTime*(self: PartitionManager,
targetMoment: int64):
Result[Partition, string] =
## Returns the partition name that might store a message containing the passed timestamp.
## In order words, it simply returns the partition name which contains the given timestamp.
## targetMoment - represents the time of interest, measured in seconds since epoch.

if self.partitions.len == 0:
return err("There are no partitions")

for partition in self.partitions:
let timeRange = partition.timeRange

let beginning = timeRange.beginning
let `end` = timeRange.`end`

if beginning <= targetMoment and targetMoment < `end`:
return ok(partition)

return err("Couldn't find a partition table for given time: " & $targetMoment)

proc getNewestPartition*(self: PartitionManager): Result[Partition, string] =
if self.partitions.len == 0:
return err("there are no partitions allocated")

let newestPartition = self.partitions.peekLast
return ok(newestPartition)

proc getOldestPartition*(self: PartitionManager): Result[Partition, string] =
if self.partitions.len == 0:
return err("there are no partitions allocated")

let oldestPartition = self.partitions.peekFirst
return ok(oldestPartition)

proc addPartitionInfo*(self: PartitionManager,
partitionName: string,
beginning: int64,
`end`: int64) =
## The given partition range has seconds resolution.
## We just store information of the new added partition merely to keep track of it.
let partitionInfo = Partition(name: partitionName, timeRange: (beginning, `end`))
trace "Adding partition info"
self.partitions.addLast(partitionInfo)

proc removeOldestPartitionName*(self: PartitionManager) =
## Simply removed the partition from the tracked/known partitions queue.
## Just remove it and ignore it.
discard self.partitions.popFirst()

proc isEmpty*(self: PartitionManager): bool =
return self.partitions.len == 0

proc getLastMoment*(partition: Partition): int64 =
## Considering the time range covered by the partition, this
## returns the `end` time (number of seconds since epoch) of such range.
let lastTimeInSec = partition.timeRange.`end`
return lastTimeInSec

proc containsMoment*(partition: Partition, time: int64): bool =
## Returns true if the given moment is contained within the partition window,
## 'false' otherwise.
## time - number of seconds since epoch
if partition.timeRange.beginning <= time and
time < partition.timeRange.`end`:
return true

return false

proc getName*(partition: Partition): string =
return partition.name

func `==`*(a, b: Partition): bool {.inline.} =
return a.name == b.name

0 comments on commit 161a10e

Please sign in to comment.