# Publish-Subscribe

[![Open in Colab](https://img.shields.io/badge/Open%20in%20Colab-F9AB00?style=for-the-badge&logo=googlecolab&color=525252)](https://colab.research.google.com/github/H-IAAC/CST-Python/blob/main/examples/Publisher-Subscriber.ipynb) [![Open in Github](https://img.shields.io/badge/Open%20in%20Github-100000?style=for-the-badge&logo=github&logoColor=white)](https://github.com/H-IAAC/CST-Python/blob/main/examples/Publisher-Subscriber.ipynb)

Sometimes we wish that a codelet is only executed when its input value is changed. For that, we can use the publish-subscribe mechanism.

For exemplify that, we are going to implement a agent that computes the average of its input values:

![](./Publisher-Subscriber/diagram.png)

Lets start by importing the necessary modules:

In [None]:
try:
    import cst_python
except:
    !python3 -m pip install cst_python

In [1]:
import time # Sleep

import cst_python as cst # CST-Python Module

## Naive

The first implementation is going to be a naive aproach: we are going to store the last timestamp the input value was changed, and only compute the average when the timestamp increases:

In [2]:
class NaiveAverageCodelet(cst.Codelet):
    def __init__(self):
        super().__init__()

        self._value_mo : cst.Codelet | None = None

        self._counter_mo : cst.Codelet | None = None
        self._avg_mo : cst.Codelet | None = None

        self.last_timestamp = 0

    def access_memory_objects(self):
        self._value_mo = self.get_input(name="Value")

        self._counter_mo = self.get_output(name="Counter")
        self._avg_mo = self.get_output(name="Average")

    def calculate_activation(self):
        pass

    def proc(self):
        
        # Check if have a new value
        if self._value_mo.get_timestamp() != 0 and self._value_mo.get_timestamp() <= self.last_timestamp:
            return
        self.last_timestamp = self._value_mo.get_timestamp()

        counter : int = self._counter_mo.get_info()
        avg : float = self._avg_mo.get_info()

        # Retrieve the previous sum
        avg *= counter

        # Update the values
        avg += self._value_mo.get_info()
        counter += 1
        avg /= counter
        
        self._avg_mo.set_info(avg)
        self._counter_mo.set_info(counter)

The `prepare_mind` function creates a new mind with all the necessary memories:

In [3]:
def prepare_mind(average_codelet:cst.Codelet):
    mind = cst.Mind()

    avg_mo = mind.create_memory_object("Average", 0.0)
    counter_mo = mind.create_memory_object("Counter", 0)
    value_mo = mind.create_memory_object("Value", 0.0)
    
    average_codelet.add_input(value_mo)

    average_codelet.add_output(avg_mo)
    average_codelet.add_output(counter_mo)

    # Avoid the naive codelet using the first value in the computation
    average_codelet.last_timestamp = value_mo.get_timestamp() 
    
    average_codelet.time_step = 10
    mind.insert_codelet(average_codelet)

    return mind, value_mo, avg_mo

We than create the codelet, prepare and start the mind:

In [4]:
naive_codelet = NaiveAverageCodelet()

mind, value_mo, avg_mo = prepare_mind(naive_codelet)

mind.start()

For testing, we can set the "Value" memory info and check the current average:

In [5]:
value_mo.set_info(10)

time.sleep(0.020)

avg_mo.get_info()

10.0

In [6]:
value_mo.set_info(20)

time.sleep(0.020)

avg_mo.get_info()

15.0

We then stops the executing mind:

In [7]:
mind.shutdown()

## Memory Observer

In this case, the codelet are not going to check if the input value changed, the `proc` method is more clean and really peforms only the codelet operation. Also, the codelet becomes stateless:

In [8]:
class PubSubAverageCodelet(cst.Codelet):
    def __init__(self):
        super().__init__()

        self._value_mo : cst.Codelet | None = None

        self._counter_mo : cst.Codelet | None = None
        self._avg_mo : cst.Codelet | None = None
        

    def access_memory_objects(self):
        self._value_mo = self.get_input(name="Value")

        self._counter_mo = self.get_output(name="Counter")
        self._avg_mo = self.get_output(name="Average")

    def calculate_activation(self):
        pass

    def proc(self):
        counter : int = self._counter_mo.get_info()
        avg : float = self._avg_mo.get_info()

        # Retrieve the previous sum
        avg *= counter

        # Update the values
        avg += self._value_mo.get_info()
        counter += 1
        avg /= counter
        
        self._avg_mo.set_info(avg)
        self._counter_mo.set_info(counter)

We than create the codelet and mind as before:

In [9]:
average_codelet = PubSubAverageCodelet()

mind, value_mo, avg_mo = prepare_mind(average_codelet)

In this case, we configure the codelet as a "memory observer": it is going to execute the `proc` method only when the observed memory is changed. Than we set the codelet as a observer of the "Value" memory and starts the mind:

In [10]:
average_codelet.is_memory_observer = True
value_mo.add_memory_observer(average_codelet)

mind.start()

We test the codelet with the same example as before:

In [11]:
value_mo.set_info(10)

time.sleep(0.020)

avg_mo.get_info()

10.0

In [12]:
value_mo.set_info(20)

time.sleep(0.020)

avg_mo.get_info()

15.0

In [13]:
mind.shutdown()

## Publisher-Subscriber

The previous example shows how to create a codelet that that selectively waits for some input to change.

Sometimes, we wanna the codelet to run when any input (or the only input) changes. In this case, we can use a "publish-subscriber" codelet:

In [14]:
average_codelet = PubSubAverageCodelet()

mind, value_mo, avg_mo = prepare_mind(average_codelet)

average_codelet.set_publish_subscribe(True)

mind.start()

In [15]:
value_mo.set_info(10)

time.sleep(0.020)

avg_mo.get_info()

10.0

In [16]:
value_mo.set_info(20)

time.sleep(0.020)

avg_mo.get_info()

15.0