Skip to content

Commit

Permalink
Update to match the .NET code here - boskjoett@716c0e5
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Sep 20, 2022
1 parent 53caab3 commit e847411
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
22 changes: 11 additions & 11 deletions consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@

logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

i = 0
messageCounter = 0


def on_message(chan, method_frame, _header_frame, body):
global i
dt = datetime.datetime.now()
msg_dt = datetime.datetime.fromtimestamp(pickle.loads(body))
delta = dt - msg_dt
i = i + 1
global messageCounter
received = datetime.datetime.now()
sent = datetime.datetime.fromtimestamp(pickle.loads(body))
delay = received - sent
messageCounter = messageCounter + 1
LOGGER.info(
"CONSUMER received %s iteration %d at now %s - delta: %s",
msg_dt,
i,
dt,
delta,
"CONSUMER received at %s, sent at %s - iteration %d, delay: %s",
received,
sent,
messageCounter,
delay,
)
chan.basic_ack(delivery_tag=method_frame.delivery_tag)

Expand Down
20 changes: 13 additions & 7 deletions producer/producer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
import datetime
import logging
import pickle
import pika
import random
import time

LOG_FORMAT = (
Expand Down Expand Up @@ -42,15 +44,19 @@ def main():
)

try:
messageCounter = 0
props = pika.BasicProperties(content_type="text/plain")
while True:
t = time.time()
msg = str(t)
tp = pickle.dumps(t)
channel.basic_publish(
exchange="", routing_key=queue_name, body=tp, properties=props
)
LOGGER.info("PRODUCER sent %s", msg)
burstSize = random.randrange(10)
for _ in range(burstSize):
messageCounter = messageCounter + 1
t = time.time()
tp = pickle.dumps(t)
channel.basic_publish(
exchange="", routing_key=queue_name, body=tp, properties=props
)
sendTime = datetime.datetime.fromtimestamp(t)
LOGGER.info("PRODUCER sent message %d at %s", messageCounter, sendTime);
connection.process_data_events(5)
except KeyboardInterrupt:
channel.close()
Expand Down

0 comments on commit e847411

Please sign in to comment.