In [1]:
import time
from api.interface import *
from datetime import datetime

Let's start with Kafka

To run server & consumer, open three terminals in order (in kafka directory)
- bin/zookeeper-server-start.sh config/zookeeper.properties 
- bin/kafka-server-start.sh config/server.properties
- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testing

In [2]:
# Generate a config object
config = Config(
    provider = 'kafka', server = 'localhost:9092'
)

# Generate a producer object
producer = Producer(config)

# Generate or add an existing a topic
# producer.create_topic("testing_topic-1234")
producer.add_topic("testing_topic-1234")

# Send a message to the topic
producer.send_message(f"Hello again! {datetime.now().time()}", "testing_topic-1234")

Sent message to testing_topic-1234


In [3]:
# Topics stored under config, so other producer objects with same config can access them
# Built in warnings when reproducing a topic
producer2 = Producer(config)
producer.create_topic("testing_topic-1234")

# Topics exist outside of API so even this will cause error
config2 = Config(
    provider = 'kafka', server = 'localhost:9092'
)
producer3 = Producer(config)
producer3.create_topic("testing_topic")

Already a topic named testing_topic-1234!
Caught: [Error 36] TopicAlreadyExistsError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='testing_topic-1234', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='testing_topic-1234', error_code=36, error_message="Topic 'testing_topic-1234' already exists.")])'.
Topic added anyway
Caught: [Error 36] TopicAlreadyExistsError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='testing_topic', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='testing_topic', error_code=36, error_message="Topic 'testing_topic' already exists.")])'.
Topic added anyway


In [4]:
# However, another config object can add the topic to itself
# NB: in this example, while creating the topic failed in the previous cell, it added it anyway
# so here we get told, "Already a topic named testing_topic!"
producer3.add_topic("testing_topic")

Already a topic named testing_topic!


In [5]:
# Multiple topics with each config object
producer.create_topic("lunch")
producer.send_message(f"Grilled turkey + cheese. Yum! {datetime.now().time()}", "lunch")

Caught: [Error 36] TopicAlreadyExistsError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='lunch', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='lunch', error_code=36, error_message="Topic 'lunch' already exists.")])'.
Topic added anyway
Sent message to lunch


In [8]:
# 
consumer = Consumer(config)

message = consumer.get_next_message("lunch")

### Message sent here ###

print(message)

None


In [15]:
# Start lisening to a topic
consumer.start_listening('lunch')

for i in range(5):
    producer.send_message(f"Grilled turkey + cheese. Yum! {datetime.now().time()}", "lunch")
    time.sleep(2)
    print(consumer.message_bank['lunch'][-1].value)
    time.sleep(2)

Already a topic named lunch!
Started collecting in self.message_bank[lunch]
Sent message to lunch
b'"Grilled turkey + cheese. Yum! 16:03:16.273632"'
Sent message to lunch
b'"Grilled turkey + cheese. Yum! 16:03:22.287039"'
Sent message to lunch


KeyboardInterrupt: 

In [37]:
print(len(consumer.message_bank['lunch']))

# Will stop listening AFTER the next message is received
consumer.stop_listening('lunch')

5
Stopped collecting in self.message_bank[lunch]


--

Now, let's try Google Pub Sub

In [1]:
import time
from api.interface import *
from datetime import datetime

In [2]:
gconfig = Config(
    provider = 'google_pubsub', project_id='vector2021-kafka', credentials_path="api/pub_sub_account_api.json~"
)
# Again, regenerating a topic causes issue - I've created this topic before so we see the handling message below
gproducer = Producer(gconfig)
gproducer.create_topic('dinner')
gproducer.send_message(f"I think it's going to be curry {datetime.now().time()}", 'dinner')

Caught: 409 Resource already exists in the project (resource=dinner).
Topic added anyway
3126843497211796Sent message to dinner



In [11]:
gconfig = Config(
    provider = 'google_pubsub', project_id='vector2021-kafka', credentials_path="api/pub_sub_account_api.json~"
)
gconsumer = Consumer(gconfig)

message = gconsumer.get_next_message("dinner")

### Message sent here ###

print(message.message.data)

b"I think it's going to be curry 17:37:15.982667"


In [13]:
gconfig = Config(
    provider = 'google_pubsub', project_id='vector2021-kafka', credentials_path="api/pub_sub_account_api.json~"
)
gconsumer = Consumer(gconfig)
gconsumer.start_listening('dinner')

# Google can be a bit slow, so for the sake of this demonstration I'll make it wait until the message 
# is received
def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs):
  mustend = time.time() + timeout
  while time.time() < mustend:
    if somepredicate(*args, **kwargs): return True
    time.sleep(period)
  return False

for i in range(5):
    gproducer.send_message(f"Grilled turkey + cheese. Yum! {datetime.now().time()}", "dinner")
    wait_until(lambda x: len(x)>=i+1, 10, x=gconsumer.message_bank['dinner'])
    print(gconsumer.message_bank['dinner'][-1].data)
    time.sleep(2)

gconsumer.stop_listening('dinner')

projects/vector2021-kafka/subscriptions/dinner-subscription-42-25-17
Created subscription dinner-subscription-42-25-17 for topic dinner
Started collecting in self.message_bank[dinner]
3126672323791342Sent message to dinner

b'Grilled turkey + cheese. Yum! 17:25:52.152187'
3126483677532092Sent message to dinner

b'Grilled turkey + cheese. Yum! 17:26:01.225722'
3126671431665340Sent message to dinner

b'Grilled turkey + cheese. Yum! 17:26:03.558225'
3126673231019989Sent message to dinner

b'Grilled turkey + cheese. Yum! 17:26:05.877113'
3126582245108625Sent message to dinner

b'Grilled turkey + cheese. Yum! 17:26:08.205376'
Stopped collecting in self.message_bank[dinner]
