In [1]:
import requests
import datetime
import time
import pulsar
from pulsar import PartitionsRoutingMode
from pulsar import MessageId
from pulsar import Function
import _pulsar

In [7]:
class PulsarConnection():

    def __init__(self):
        self.client = pulsar.Client('pulsar://localhost:6650')
        self.tenant = 'public'
        self.namespace = 'default'
        self.static_namespace = 'static'
        self.initializing = False
        self.initialized = False
        self._set_init_status() # updates 'initialized' and 'initializing'
        # Initialize the system if it hasn't
        if not self.initialized:
            self._initialize_pulsar()
        self.day_consumer = None
    
    def close(self):
        """ Remeber to close when finished working """
        try: self.client.close()
        except Exception as e: print(f"\n*** Exception: {e} ***\n")
        
    def callback(self, res, msg_id):
        """ Needed for 'send_async', for when the broker receives the message """
        return

    def _set_init_status(self):
        """ Identifying the status based on messages of 'initialized' topic.
        If there are no messages its because it hasn't been initialized.
        A first 'Initializing' message is because its starting, and a second
        'Initialized' message is because it has already started
        """
        try:
            has_messages = False
            curr_time = str(int(time.time()))
            topic_name = 'initialized'
            reader = self.client.create_reader(
                topic=f'persistent://{self.tenant}/{self.static_namespace}/{topic_name}',
                reader_name=topic_name+'_read_'+curr_time,
                start_message_id=MessageId.earliest)
            has_messages = reader.has_message_available()
        except Exception as e: print(f"\n*** Exception: {e} ***\n")
        # If there are no messages, it hasn't been initialized
        if not has_messages:
            print("\nIt seems the system needs initializing\n")
            reader.close()
            return
    
        # Check now if it is initializing
        msg = reader.read_next()
        message = str(msg.value().decode())
        if message == "Initializing":
            print("Found 'Initializing' message")
            self.initializing = True
        else:
            print("Didn't found 'Initializing' message")
        
        # Check if it has already initialized
        try:
            msg = reader.read_next(timeout_millis=3000)
            message = str(msg.value().decode())
        except Exception as e:
            print(f"\n*** Didn't found 'Initilized' message : {e} ***\n")
            
        if message == "Initialized":
            print("Found 'Initialized' message")
            self.initialized = True
            self.initializing = False
        reader.close()
        
        return True
    
    def _initialize_pulsar(self):
        """ Initialize relevant topics: 'initialized', 'days_processed' """
        
        # If its initilizing elsewhere, wait until it has finished
        while self.initializing:
            print("\nThe system is initializing elsewhere, checking again in 5 seconds..\n")
            time.sleep(5)
            self._set_init_status()
            
        
        # Send an Initializing message, so other workers don't do the same
        print("\n*** Initializing the system *** \n")
        try:
            curr_time = str(int(time.time()))
            topic_name = 'initialized'
            init_producer = self.client.create_producer(
                topic=f'persistent://{self.tenant}/{self.static_namespace}/{topic_name}',
                producer_name=f'{topic_name}_prod_{curr_time}',
                message_routing_mode=PartitionsRoutingMode.UseSinglePartition)
        except Exception as e:
            print(f"\n*** Exception: {e} ***\n")
            init_producer.close()
            return
        
        try:
            init_producer.send(("Initializing").encode('utf-8'))      
        except Exception as e:
            print(f"\n*** Exception sending Initializing message: {e} ***\n")
            init_producer.close()
            return
        
        self.initializing = True
            
        # Create the 'day_to_process' topic with 365 'YYYY-MM-DD' values
        print("\n*** Populating 'day_to_process' topic ***\n")
        try:
            curr_time = str(int(time.time()))
            topic_name = 'day_to_process'
            day_producer = self.client.create_producer(
                topic=f'persistent://{self.tenant}/{self.namespace}/{topic_name}',
                producer_name=f'{topic_name}_prod',
                message_routing_mode=PartitionsRoutingMode.UseSinglePartition)
        except Exception as e:
            print(f"\n*** Exception creating 'day_to_process' topic: {e} ***\n")
            day_producer.close()
            return
        init_date = datetime.datetime(2021, 1, 1)
        dates = [(init_date + datetime.timedelta(days=idx)).strftime('%Y-%m-%d') for idx in range(365)]
        
        for date in dates:
            try:
                day_producer.send((f"{date}").encode('utf-8'))      
            except Exception as e:
                print(f"\n*** Exception sending date message: {e} ***\n")
                day_producer.close()
                return
        day_producer.close()
        
        try:
            init_producer.send(("Initialized").encode('utf-8'))      
        except Exception as e:
            print(f"\n*** Exception sending Initialized message: {e} ***\n")
            init_producer.close()
            return
        init_producer.close()
        
        self.initializing = False
        self.initialized = True
        print("\n*** The system has been initialized. Now get working! ***\n")
        
        return True
    
    def get_day_to_process(self):
        """ Pops a ‘YYYY-MM-DD’ string value from the topic 'day_to_process' """        
        # Create a consumer to receive days to process (if it doesn't exist already)
        if (self.day_consumer == None):
            topic_name = 'day_to_process'
            # Create a consumer on persistent topic, with a unique subscription name,
            # so not to overlap with consumers from other workers
            try:
                curr_time = str(int(time.time()))
                self.day_consumer = self.client.subscribe(
                    topic=f"persistent://{self.tenant}/{self.namespace}/{topic_name}",
                    subscription_name=f'{topic_name}_sub_{curr_time}',
                    initial_position=_pulsar.InitialPosition.Earliest)
            except Exception as e:
                print(f"\n*** Exception creating day_consumer: {e} ***\n")
                self.day_consumer.close()
                return
        try:
            msg = self.day_consumer.receive(timeout_millis=3000)
            # Save the string message (decode from byte value)
            message = str(msg.value().decode())
            # Acknowledge that the message was received          
            self.day_consumer.acknowledge(msg)
        except Exception as e:
            print(f"\n*** Exception receiving value from 'day_consumer': {e} ***\n")
            self.day_consumer.negative_acknowledge(msg)
            self.day_consumer.close()
            return
        return message
            
    def get_initializing(self):
        return self.initializing
    
    def get_initialized(self):
        return self.initialized
    
    def put_basic_repo_info(self):
        """ Todo """
        return

    def get_basic_repo_info(self):
        """ Todo """
        return

In [3]:
my_pulsar = PulsarConnection()

2022-05-26 06:33:10.071 INFO  [139960910792512] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2022-05-26 06:33:10.072 INFO  [139960910792512] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2022-05-26 06:33:10.077 INFO  [139960380487424] ClientConnection:375 | [127.0.0.1:37640 -> 127.0.0.1:6650] Connected to broker
2022-05-26 06:33:10.092 INFO  [139960380487424] HandlerBase:64 | [persistent://public/static/initialized, reader-452d8ce14b, 0] Getting connection from pool
2022-05-26 06:33:10.097 INFO  [139960380487424] ConsumerImpl:224 | [persistent://public/static/initialized, reader-452d8ce14b, 0] Created consumer on broker [127.0.0.1:37640 -> 127.0.0.1:6650] 
Found 'Initializing' message
Found 'Initialized' message
2022-05-26 06:33:10.115 INFO  [139960910792512] ConsumerImpl:999 | [persistent://public/static/initialized, reader-452d8ce14b, 0] Closing consumer for topic persistent://public/static/initialized
2022-05-

In [4]:
my_pulsar.get_initializing()

False

In [5]:
my_pulsar.get_initialized()

True

In [8]:
day = my_pulsar.get_day_to_process()


*** Exception creating day_consumer: subscribe() missing 1 required positional argument: 'subscription_name' ***



AttributeError: 'NoneType' object has no attribute 'close'

In [9]:
my_pulsar.close()

2022-05-26 06:36:24.276 INFO  [139960910792512] ClientImpl:496 | Closing Pulsar client with 0 producers and 1 consumers
2022-05-26 06:36:24.281 INFO  [139960910792512] ClientConnection:1559 | [127.0.0.1:37640 -> 127.0.0.1:6650] Connection closed
2022-05-26 06:36:24.281 INFO  [139960910792512] ClientConnection:263 | [127.0.0.1:37640 -> 127.0.0.1:6650] Destroyed connection


# Test code below

In [None]:
client = pulsar.Client('pulsar://localhost:6650')

try:
    curr_time = str(int(time.time()))
    topic_name = 'days_processed'
    reader = client.create_reader(
        topic=f'persistent://public/default/{topic_name}',
        start_message_id=MessageId.earliest,
        reader_name=topic_name+'_read_'+curr_time)
    print("\n\nAny messages around?: ", reader.has_message_available())
except Exception as e: print(f"\n*** Exception: {e} ***\n")

client.close()

In [None]:
# Testing to see if we can check for a topic's existance.
# However, it might be better to do this using putstate
if False:
    test_client = pulsar.Client('pulsar://localhost:6650')
    persistent_path = 'persistent://public/default'

    try:
        curr_time = str(int(time.time()))
        topic_name = 'days_processed'
        consumer = test_client.subscribe(
            topic=f'{persistent_path}/{topic_name}',
            subscription_name=topic_name+'_sub_'+curr_time,
            initial_position=_pulsar.InitialPosition.Earliest)
    except Exception as e:
        print('Subscription exception:')
        print(e)

    msg = consumer.receive()

    try:
        # Save the string message (decode from byte value)
        message = str(msg.value().decode())
        # Acknowledge that the message was received
        consumer.acknowledge(msg)

        print(message)

    except Exception as e:
        print("Message reception exception")
        consumer.negative_acknowledge(msg)


    test_client.close()