Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use message callback on subscriptions rather than globally #60

Open
DurandA opened this issue Sep 30, 2019 · 9 comments
Open

Use message callback on subscriptions rather than globally #60

DurandA opened this issue Sep 30, 2019 · 9 comments

Comments

@DurandA
Copy link

DurandA commented Sep 30, 2019

Right now, message handlers are attached globally on the mqtt client: mqtt_client.on_message = on_message

Would it be possible to attach a message handler to a Subscription? I am thinking of something along the lines of:

hello_sub = mqtt_client.subscribe('hello', qos=0)
hello_sub.on_message = on_message
@Lenka42
Copy link
Collaborator

Lenka42 commented Oct 1, 2019

@DurandA I like this approach, we will start working on this 👍

@edocod1
Copy link

edocod1 commented May 24, 2020

Hello, is somebody working on this? Thinking about implementing it myself.

@skewty
Copy link

skewty commented Jun 3, 2020

@edocod1 go for it! perhaps @Lenka42 would be willing to assist. I can assist if required as well.

@edocod1
Copy link

edocod1 commented Jun 3, 2020

Hello. I've extended the mqtt client.

Features:

  • Can use (multiple!) function callbacks on subscriptions
  • Can decode/encode yaml messages
  • Can define a "namespace" that will be prepended to every topic (i use it to make a unique server act as multiple ones)
from gmqtt import Client
from gmqtt.mqtt.constants import MQTTv50
import os, yaml, re, asyncio, logging

log = logging.getLogger('mqtt_client')
class MQTTClient(Client):

	def _handle_msg(self, client, topic, payload, qos, properties):

		if not 'subscription_identifier' in properties: return
		sub_id = properties['subscription_identifier'][0]

		if topic.startswith(self.namespace): topic = topic[len(self.namespace):]
		if properties['content_type'][0] == 'text/yaml':
			payload = yaml.safe_load(payload)
		else:
			payload = payload.decode()

		for fun in self.topics[sub_id][1]:
			log.debug(f'Executing {fun.__name__} because of {self.topics[sub_id][0]} ({sub_id}) match')
			if asyncio.iscoroutinefunction(fun):
				result = asyncio.create_task(fun(client, topic, payload))
			else:
				result = fun(client, topic, payload)
		
	def _handle_conn(self, *a):
		while 1:
			try:
				topic = self.pending_subscriptions.pop()
			except IndexError:
				break
			self.subscribe(*topic)

	def add_func_handler(self, topic, func):
		sub_id = max(self.topics.keys())+1 if self.topics else 1 # sub_id range is 1 to 2^28-1
		if topic in self.topics:
			self.topics[sub_id][1].append(func)
		else:
			self.topics[sub_id] = (topic, [func,])
			self.subscribe(topic, sub_id)

	def subscribe_func(self, topic):
		return lambda x: self.add_func_handler(topic, x)

	def __init__(self, client_id, namespace=None):
		self.topics = {}
		self.pending_subscriptions = []
		self.namespace = namespace+'/' if namespace and namespace[-1] != '/' else namespace

		Client.__init__(self, client_id)
		self.on_message = self._handle_msg
		self.on_connect = self._handle_conn

	def publish(self, *args, **kwargs):
		user_property = kwargs['user_property'] if 'user_property' in kwargs else tuple()
				
		payload = args[1]
		if isinstance(payload, str):
			content_type = 'text'
		else:
			payload = yaml.safe_dump(payload)
			content_type = 'text/yaml'
		return Client.publish(self, os.path.join(self.namespace, args[0]), payload, *(args[2:]), **kwargs, content_type=content_type)

	# Subscribe or queue the subscription
	def subscribe(self, topic, sub_id):
		if not self.is_connected:
			return self.pending_subscriptions.append((topic, sub_id))
		Client.subscribe(self, os.path.join(self.namespace, topic), no_local=True, subscription_identifier=sub_id)

License of this snippet is whatever is the most permissive license compatible with this library.

Also, you can also use this decorator:

@client.subscribe_func('sensors/temp/+')
def need(client, topic, payload):
	print('Temperature:', topic, payload)

@Mixser
Copy link
Contributor

Mixser commented Jun 4, 2020

Hi @edocod1,

Thanks for your snippet, but we can't do in that way - because properties is a feature of MQTT5, but we are supporting MQTT3.11 too;

Fill free to create a PR for that feature, we will review it and help you with implementation

@edocod1
Copy link

edocod1 commented Jun 4, 2020

Aww, i actually had an implementation compatible with 3.11, but I've dropped it since i prefer doing it in a cleaner way with subscription_identifier.

Maybe I'll reimplement it later.

@Mixser
Copy link
Contributor

Mixser commented Jun 5, 2020

You can create a new example (in examples folder) with your code of extended MQTTClient instance and make a note that it will work only in 5 version of protocol;

@skewty
Copy link

skewty commented Jun 5, 2020

@edocod1 would you be open to collaboration on your fork? If yes, please give me access to your fork and we can work away at the issues until the PR is accepted.

@edocod1
Copy link

edocod1 commented Jun 5, 2020

Hi @skewty! That snippet there is all i have. I just expanded the Client class to add my own convenience method! The whole repo is actually a p2p web-crawler using mqtt for metrics and communication between peers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants