<a href="https://colab.research.google.com/github/sanchezcarlosjr/eva-broker/blob/main/mqtt_producer_consumer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
! pip install paho-mqtt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting paho-mqtt
  Downloading paho-mqtt-1.6.1.tar.gz (99 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.4/99.4 KB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: paho-mqtt
  Building wheel for paho-mqtt (setup.py) ... [?25l[?25hdone
  Created wheel for paho-mqtt: filename=paho_mqtt-1.6.1-py3-none-any.whl size=62135 sha256=2449f3cc79f902d58644453917308c5652c81ccbe2b8151e65e4dc8b5e594d52
  Stored in directory: /root/.cache/pip/wheels/0f/90/29/db29bb8ddc98ec5f2363b959130c9ddbcf5cfdb4a00b6184dd
Successfully built paho-mqtt
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-1.6.1


In [4]:
! pip install asyncio-mqtt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting asyncio-mqtt
  Downloading asyncio_mqtt-0.16.1-py3-none-any.whl (17 kB)
Installing collected packages: asyncio-mqtt
Successfully installed asyncio-mqtt-0.16.1


# Coroutines version

In [None]:
import asyncio
import asyncio_mqtt as aiomqtt

queue = asyncio.Queue()

async def listen():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        async with client.messages() as messages:
            for key in ['hermes/asr/stopListening', 'hermes/hotword/toggleOn']:
                await client.subscribe(key)
            async for message in messages:
                await queue.put((message.topic.value, message.payload.decode()))

async def animate(payload=[None]):
   print("ANIMATE", payload)
   await asyncio.sleep(0.5)

async def off(payload=[None]):
   print("OFF", payload)
   await asyncio.sleep(0.5)

async def start_service():
     translator = {
         'hermes/asr/stopListening': 'off',
         'hermes/hotword/toggleOn': 'animate'
     }
     current_state = 'off'
     payload = [None]
     while True:
        await asyncio.sleep(0.5)
        await globals()[current_state](payload)
        topic, payload = await queue.get()
        current_state = \
          'animate' if current_state == 'off' and translator[topic] == 'animate' else \
          'off' if current_state == 'animate' and translator[topic] == 'off' else current_state
async def main():
    loop = asyncio.get_event_loop()
    task = loop.create_task(listen())
    service  = loop.create_task(start_service())

    await task
    await service


await main() # asyncio.run(main())

OFF {"text":0}
OFF {"text":1}
OFF {"text":2}
OFF {"text":3}
ANIMATE {"text":0}
ANIMATE {"text":1}
ANIMATE {"text":2}
ANIMATE {"text":3}
ANIMATE {"text":0}
ANIMATE {"text":1}
ANIMATE {"text":2}
ANIMATE {"text":3}
ANIMATE {"text":4}
ANIMATE {"text":5}
ANIMATE {"text":6}
ANIMATE {"text":7}
ANIMATE {"text":8}
ANIMATE {"text":9}
OFF {"text":0}
OFF {"text":1}
OFF {"text":2}
OFF {"text":3}
OFF {"text":4}
OFF {"text":5}
OFF {"text":6}
OFF {"text":7}
OFF {"text":8}
OFF {"text":9}


CancelledError: ignored

# Coroutine version 2

In [1]:
! pip install jsonschema

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [4]:
# https://matrix-io.github.io/matrix-documentation/matrix-lite/py-reference/everloop/
from time import sleep
from math import pi, sin

everloop = ['black'] * 5

ledAdjust = 0.0
if len(everloop) == 35:
    ledAdjust = 0.51 # MATRIX Creator
else:
    ledAdjust = 1.01 # MATRIX Voice

frequency = 0.375
counter = 0.0
tick = len(everloop) - 1
animation  = {
    'onInit': {
        'frames': ['black']
    },
    'onExec': {
        'unlimited': True,
        'frames': []
    },
    'onDestroy': {
        'frames': ['black']
    }
}

for i in range(0, 6): # led.length
    # Create rainbow
    for i in range(len(everloop)):
        r = round(max(0, (sin(frequency*counter+(pi/180*240))*155+100)/10))
        g = round(max(0, (sin(frequency*counter+(pi/180*120))*155+100)/10))
        b = round(max(0, (sin(frequency*counter)*155+100)/10))

        counter += ledAdjust

        everloop[i] = {'r':r, 'g':g, 'b':b}

    # Slowly show rainbow
    if tick != 0:
        for i in reversed(range(tick)):
            everloop[i] = {}
        tick -= 1
    animation['onExec']['frames'].append(everloop)

animation

{'onInit': {'frames': ['black']},
 'onExec': {'unlimited': True,
  'frames': [[{'r': 24, 'g': 0, 'b': 9},
    {'r': 25, 'g': 1, 'b': 4},
    {'r': 25, 'g': 6, 'b': 0},
    {'r': 22, 'g': 12, 'b': 0},
    {'r': 18, 'g': 18, 'b': 0}],
   [{'r': 24, 'g': 0, 'b': 9},
    {'r': 25, 'g': 1, 'b': 4},
    {'r': 25, 'g': 6, 'b': 0},
    {'r': 22, 'g': 12, 'b': 0},
    {'r': 18, 'g': 18, 'b': 0}],
   [{'r': 24, 'g': 0, 'b': 9},
    {'r': 25, 'g': 1, 'b': 4},
    {'r': 25, 'g': 6, 'b': 0},
    {'r': 22, 'g': 12, 'b': 0},
    {'r': 18, 'g': 18, 'b': 0}],
   [{'r': 24, 'g': 0, 'b': 9},
    {'r': 25, 'g': 1, 'b': 4},
    {'r': 25, 'g': 6, 'b': 0},
    {'r': 22, 'g': 12, 'b': 0},
    {'r': 18, 'g': 18, 'b': 0}],
   [{'r': 24, 'g': 0, 'b': 9},
    {'r': 25, 'g': 1, 'b': 4},
    {'r': 25, 'g': 6, 'b': 0},
    {'r': 22, 'g': 12, 'b': 0},
    {'r': 18, 'g': 18, 'b': 0}],
   [{'r': 24, 'g': 0, 'b': 9},
    {'r': 25, 'g': 1, 'b': 4},
    {'r': 25, 'g': 6, 'b': 0},
    {'r': 22, 'g': 12, 'b': 0},
    {'r': 

In [5]:
import asyncio
from jsonschema import validate
import asyncio_mqtt as aiomqtt
import json

queue = asyncio.Queue()

async def listen():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        async with client.messages() as messages:
            for key in ['eva/matrixvoice/leds']:
                await client.subscribe(key)
            async for message in messages:
                await queue.put((message.topic.value, json.loads(str(message.payload.decode("utf-8")))))

schema = {
  "type": "object",
  "properties": {
    "onInit": {
      "type": "object",
      "properties": {
        "frames": {
          "type": "array",
          "items": {}
        }
      },
      "required": [
        "frames"
      ]
    },
    "onExec": {
      "type": "object",
      "properties": {
        "sleep": {
          "type": "number"
        },
        "unlimited": {
          "type": "boolean"
        },
        "frames": {
          "type": "array",
          "items": {}
        }
      },
      "required": [
        "sleep",
        "unlimited",
        "frames"
      ]
    },
    "onDestroy": {
      "type": "object",
      "properties": {
        "frames": {
          "type": "array",
          "items": {}
        }
      },
      "required": [
        "frames"
      ]
    }
  },
  "required": [
    "onInit",
    "onExec",
    "onDestroy"
  ]
}

def onInit(payload):
  print("onInit", payload)
  # led.set(payload['frames'][0])

def onExec(payload, frame=0):
   unlimited = 'unlimited' in payload and payload['unlimited']
   while unlimited or frame < len(payload['frames']):
     print(payload['frames'][frame%len(payload['frames'])], end=" ")
     # led.set(payload['frames'][frame%len(payload['frames'])])
     yield True
   yield False

def onDestroy(payload):
  print("\nonDestroy", payload)
  # led.set(payload['frames'][0])

async def start_service():
     while True:
        topic, payload = await queue.get()
        try:
            validate(instance=payload, schema=schema)
            onInit(payload['onInit'])
            frame = 0
            while queue.empty() and next(onExec(payload['onExec'], frame)):
               frame += 1
               await asyncio.sleep(payload['onExec']['sleep'])
            onDestroy(payload['onDestroy'])
        except Exception as e:
            print(payload,e)

async def main():
    loop = asyncio.get_event_loop()
    task = loop.create_task(listen())
    service  = loop.create_task(start_service())
    await service
    await task


await main() # asyncio.run(main())

CancelledError: ignored

# Thread version

In [None]:
import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any

class Subject:
  def __iter__(self):
    self.queue = queue.Queue()
    return self
  def __next__(self):
    value = None if self.queue.empty() else self.queue.get()
    return value
  def publish(self, value):
    self.queue.put(value)

@dataclass(order=True)
class PrioritizedItem:    
    data: Any=field(compare=False)
    state: Any=field(compare=False)
    priority: int=field(default=3)
    def __post_init__(self):
      self.calculate_priority()
    def __repr__(self):
      return f"{self.state}"
    def calculate_priority(self):
         try:
             self.priority = self.data['priority'] if 'priority' in self.data and \
                                           (type(self.data['priority']) == int or type(self.data['priority']) == float) \
                                           else self.priority
         except:
             return self
         return self

# Pushdown automata - Priority Queue

class State:
  def __init__(self, resource):
    self.resource = resource
  def keep_in_state(self, next_state):
     print("State Queue ", self.resource.buffer.queue.queue)
     state = next(self.resource.buffer)
     return state == None or state.__class__.__name__ != next_state.__class__.__name__ 
  def __repr__(self):
     return f'State={self.__class__.__name__}'
  def exec(self, payload):
    pass

class Idle(State):
  def __init__(self, resource):
    super().__init__(resource)
  def exec(self, payload):
        print(self)

class Animation(State):
  def __init__(self, resource, next_state):
    super().__init__(resource)
    self.next_state = next_state
  def exec(self, payload):
      while self.keep_in_state(self.next_state):
           print(self)
           time.sleep(0.5)

class ResourceManagementThread(threading.Thread):
    def __init__(self):
        super().__init__(daemon=True)
        self.resource_queue = queue.PriorityQueue()
        self.lock = threading.Lock()
        self.subject = Subject()
        self.buffer = iter(self.subject)
    def front(self):
        self.lock.acquire()
        temp = None if self.resource_queue.empty() else self.resource_queue.queue[0].state
        self.lock.release()
        return temp
    def run(self):
        while True:
             print("Resource Queue ", self.resource_queue.queue)
             prioritizedItem = self.resource_queue.get()
             print(f'Working on ', prioritizedItem)
             prioritizedItem.state.exec(prioritizedItem)
             print(f'Finished ', prioritizedItem)
             self.resource_queue.task_done()
    def put(self, input):
        self.resource_queue.put(input)
        self.subject.publish(input.state)

In [None]:
import paho.mqtt.client as mqtt
import json

resource = ResourceManagementThread()
idle = Idle(resource)
animation = Animation(resource, idle)

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("hermes/hotword/toggleOn")
    client.subscribe("hermes/asr/stopListening")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))


def toggle_on(client, userdata, msg):
    print("Turn on "+msg.topic)
    resource.put(PrioritizedItem(json.loads(str(msg.payload.decode("utf-8"))), animation))

def stop_listening(client, userdata, msg):
    print("Turn off "+msg.topic)
    resource.put(PrioritizedItem(json.loads(str(msg.payload.decode("utf-8"))), idle))

client = mqtt.Client()
client.on_connect = on_connect
client.message_callback_add('hermes/hotword/toggleOn', toggle_on)
client.message_callback_add('hermes/asr/stopListening', stop_listening)
client.on_message = on_message

client.connect("mqtt.eclipseprojects.io", 1883, 60)


resource.start()
client.loop_forever()

Resource Queue  []
Connected with result code 0
Turn on hermes/hotword/toggleOn
Working on  State=Animation
State Queue  deque([State=Animation])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque([])
State=Animation
State Queue  deque(

KeyboardInterrupt: ignored