Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgmq-py bench script #281

Merged
merged 24 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions pgmq/coredb-pgmq-python/benches/bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import random
import time

import numpy as np

from coredb_pgmq_python import Message, PGMQueue


def bench(bench_name: str, port: str, username: str) -> dict:
rnd = random.randint(0, 100)
test_queue = f"bench_queue_{rnd}"
test_message = {"hello": "world"}

queue = PGMQueue(host="localhost", port=port, username=username, password="postgres", database="postgres")
try:
queue.create_queue(test_queue)
except Exception as e:
print("table exists?")

num_iters = 1000

bench_0_start = time.time()
vt = 30

print(f"""
Starting benchmark
Total messages: {num_iters}
""")

writes = []
total_write_start = time.time()
# publish messages
print("Writing Messages")
for x in range(num_iters):
test_message["hello"] = x
start = time.time()
msg_id = queue.send(test_queue, test_message)
writes.append(time.time() - start)
total_write_duration = time.time() - total_write_start
print(f"total write time: {total_write_duration}")
write_results = summarize("writes", writes)

reads = []
total_read_start = time.time()
# read them all once, each
print("Reading Messages")
for x in range(num_iters):
start = time.time()
message: Message = queue.read(test_queue, vt = vt)
reads.append(time.time() - start)
if x % 100 == 0:
print(f"read {x} messages")

total_read_time = time.time() - total_read_start
print(f"total read time: {total_read_time}")
read_results = summarize("reads", reads)

# wait for all VT to expire
while time.time() - bench_0_start < vt:
print("waiting for all VTs to expire")
time.sleep(1)

# deletes
deletes = []
delete_start = time.time()
print("Deleting Messages")
for x in range(num_iters):
start = time.time()
queue.delete(test_queue, x)
deletes.append(time.time() - start)
total_delete_time = time.time() - delete_start
print(f"total delete time: {total_delete_time}")
delete_results = summarize("deletes", deletes)

# archives
print("Benchmarking: Archiving Messages")
writes = []
total_write_start = time.time()
# publish messages
print("Writing Messages")
for x in range(num_iters):
test_message["hello"] = x
start = time.time()
msg_id = queue.send(test_queue, test_message)
writes.append(time.time() - start)
total_write_duration = time.time() - total_write_start
print(f"total write time: {total_write_duration}")
summarize("writes", writes)

archives = []
archive_start = time.time()
print("Archiving Messages")
for x in range(num_iters):
start = time.time()
queue.archive(test_queue, x)
archives.append(time.time() - start)
total_archive_time = time.time() - archive_start
print(f"total archive time: {total_delete_time}")
archive_results = summarize("archives", reads)

results = {"bench_name": bench_name}
results.update(write_results)
results.update(read_results)
results.update(delete_results)
results.update(archive_results)

return results


def summarize(cat: str, timings: list[float]) -> None:
total = len(timings)
mean = round(np.mean(timings), 4)
stdev = round(np.std(timings), 4)
_min = round(np.min(timings), 4)
_max = round(np.max(timings), 4)
print(f"Summary: {cat}")
print(f"Count: {total}, mean: {mean}, stdev: {stdev}, min: {_min}, max: {_max}")
return {
# f"{cat}_count": total,
f"{cat}_mean": mean,
f"{cat}_stdev": stdev,
f"{cat}_min": _min,
f"{cat}_max": _max
}
if __name__ == "__main__":
trials = [
("docker", 5432, "postgres"), # docker
("native", 28815, "username") # pgrx
]
all_results = []
for t in trials:
all_results.append(
bench(t[0], t[1], t[2])
)
print(all_results)
2 changes: 2 additions & 0 deletions pgmq/coredb-pgmq-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ orjson = "^3.8.10"

[tool.poetry.group.dev.dependencies]
mypy = "1.1.1"
pandas = "^2.0.1"
pytest = "^7.3.0"
debugpy = "^1.6.7"
black = "^23.3.0"
isort = "^5.12.0"
flake8 = "^6.0.0"
numpy = "^1.24.3"

[tool.black]
line-length = 120
Expand Down
4 changes: 1 addition & 3 deletions pgmq/coredb-pgmq-python/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# these tests require an externally running Postgres instance
# with the PGMQ Extension installed

from datetime import datetime

from coredb_pgmq_python import Message, PGMQueue


Expand All @@ -20,4 +18,4 @@ def test_lifecycle() -> None:
message: Message = queue.read(test_queue, vt = 20)

assert message.message == test_message
assert message.msg_id == msg_id
assert message.msg_id == msg_id