A simple controller for MQTT and MQTT-SN pipelines using Mosquitto and RSMB
You can install the package through pip
pip install gabby
Before run examples, please initialize the Mosquitto or RSMB server.
from gabby.gabby import Gabby, Topic
from gabby.controller import Controller
class PrinterGabby(Gabby):
def transform(self, message):
print(f'ARRIVED! Data: {message.data}')
return []
if __name__ == "__main__":
controller = Controller()
topic_A = Topic('queue/a', 'i')
topic_B = Topic('queue/b', 'i')
printer_gabby = PrinterGabby([topic_A], [topic_B])
controller.add_gabby(printer_gabby)
controller.run()
from gabby.gabby import Gabby, Topic
from gabby.message import Message
if __name__ == "__main__":
topic_A = Topic('queue/a', 'i')
topic_B = Topic('queue/b', 'i')
g = Gabby([topic_B], [topic_A])
data = (1,)
g.send(Message(data, g.output_topics))
from gabby.gabby import Gabby, Topic
from gabby.controller import Controller
class PrinterGabby(Gabby):
def transform(self, message):
print(f'ARRIVED! Data: {message.data}')
return []
if __name__ == "__main__":
controller = Controller()
topic_A = Topic('queue/a', 'i', 'udp')
topic_B = Topic('queue/b', 'i', 'udp')
printer_gabby = PrinterGabby([topic_A], [topic_B], transmission='udp')
controller.add_gabby(printer_gabby)
controller.run()
You can use both TCP and UDP brokers
import logging
from gabby import Gabby, Topic, Controller
class LoggerGabby(Gabby):
def transform(self, message):
logging.debug(f"ARRIVED! Data: {message.data}")
def setup_logging(level):
"""
Setup logging level
"""
logging.basicConfig(
format='%(levelname)s: %(message)s',
level=getattr(logging, level.upper())
)
if __name__ == "__main__":
setup_logging('debug')
controller = Controller()
topic_A = Topic('qa', 'i', 'udp')
topic_F = Topic('qb', 'i', 'udp')
topic_B = Topic('queue/1', 'i', 'tcp')
topic_C = Topic('queue/a', 'i', 'tcp')
topic_D = Topic('queue/b', 'i', 'tcp')
topic_E = Topic('queue/c', 'i', 'tcp')
logger_gabby = LoggerGabby(
[topic_A, topic_F, topic_B, topic_C, topic_D, topic_E],
transmission=['tcp', 'udp']
)
controller.add_gabby(logger_gabby)
controller.run()