Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish Example, protocol Version as required field and logging #63

Closed
tropxy opened this issue Jun 7, 2021 · 8 comments
Closed

Publish Example, protocol Version as required field and logging #63

tropxy opened this issue Jun 7, 2021 · 8 comments

Comments

@tropxy
Copy link
Contributor

tropxy commented Jun 7, 2021

Hi,

I just started using your lib and after struggling a bit I managed to do what I wanted. I have a topic that I subscribed to and I published some json data and printed it back in the subscriber:

Subscriber code:

async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
    async with client.filtered_messages("dummy/contactor") as messages:
        await client.subscribe("dummy/#")
        async for message in messages:
            print(message.topic)
            print(json.loads(message.payload))

Publisher code:

async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
    message = {"state": 3}
    await client.publish("dummy/contactor", payload=json.dumps(message), qos=2, retain=False)

This works and I get the following printed out in the terminal:

dummy/contactor
{'state': 3}

So, looks good ;)

But I ran into some issues before I could make it work:

  1. There is no example or test for publish, so I had to dive into the details of your lib and also paho-mqtt and figure out what to do. Also for the Client connection just by inspecting the code one can check that Client class also accepts username, password and other properties necessary for the connection. So, I guess if an example directory with different possibilities and even some more detailed doc page would suffice to solve this.
  2. First I tried to connect without specifying the Protocol used and it actually connects to the broker without an issue. The problem later is that I send a message to the broker, the process finishes and no error whatsoever is raised and I received no message in the subscriber side. So, I was quite puzzled about what was wrong and it was not until I defined the version that started working. Thus, I think the protocol version shouldnt be an optional field
  3. This brings me to the point where I tried to import ProtocolVersion Enum class directly from the lib package, but it wouldnt work, because ProtocolVersion is not part of the all list in the init.py. So, I had to import it specifying the client script:
from asyncio_mqtt.client import ProtocolVersion

I guess it would be better to add the ProtocolVersion to the init.py

  1. Last issue was receiving the JSON payload. I tried to decode the message directly when I received it:
async for message in messages:
            print(json.loads(message))

But this wouldnt work, because message is of MQTT data type. This was not clear for me and I had to search around to understand that the message received can be stripped into two properties: "topic" and "payload" and by decoding the payload it worked. Maybe is due to my inexperience that I didnt know that, but it would be cool if somewhere this is explained or showed in an example.

  1. I dont understand the purpose of the filtered_messages, because I created another topic called "test" and I sent a message to that topic, but I ended up receiving the message in the Subscriber whose code I provided above and that has a filter for the "contactor" topic. I guess this was not suppose to happen or was it? I even tried to filter and to subscribe to the "contactor" topic, only, like this:
async with client.filtered_messages("dummy/contactor") as messages:
        await client.subscribe("dummy/contactor")
        async for message in messages:
            print(message.topic)
            print(json.loads(message.payload))

And I sent a message, using the following code:

async with Client("www.maqiatto.com", username="username", password="password", protocol=ProtocolVersion.V31) as client:
    message = {"state": 3}
    await client.publish("dummy/test", payload=json.dumps(message), qos=2, retain=False)

But, I still got the message in the subscriber:

dummy/test
{'state': 3}

So what am I doing wrong here?

Thank you for your work and let me know what you think about this.
Best regards,
André

@frederikaalund
Copy link
Collaborator

frederikaalund commented Jun 8, 2021

Hi André, thanks for raising this issue. Let me have a look. :)

I think you make some good points. 👍 asyncio-mqtt is still a young library and it definitely lacks on the documentation side of things. In other words, there is a great opportunity for an open source contribution here (hint hint). :) We're a small team and I welcome all contributions. You seem to have a lot to contribute, which is great! If you're up for the task, we'll upgrade you to "maintainer" in no time.

Anyways, let me go through your list.

  1. I agree. I'd gladly welcome a PR that adds proper documentation. Good find!
  2. Sounds like a bug. Personally, I always specify MQTTv5, so I didn't run into it myself. In any case, asyncio-mqtt should work out-of-the-box without any protocol specified.
  3. Good idea. This would make an easy first pull request. :)
  4. Actually, that's by design (see Provide information about message instead of only the payload in unfiltered_messages #2). Maybe you can think of a way that we can support both use cases at the same time.
  5. Sounds like a bug. Very strange that nobody else found this. Under the hood, I simply forward the filters to paho-mqtt that does the filtering for us. Maybe I mixed up the logic there. :/

Again thanks for raising this issue, Andŕe. You have some well-thought-out ideas/findings and I'd like to have them as part of asyncio-mqtt. I have no immediate plans to implement them myself due to personal time constraints. I welcome all pull requests and I'll gladly review and provide feedback. 👍

@tropxy
Copy link
Contributor Author

tropxy commented Jun 8, 2021

Hi @frederikaalund thanks for your fast reply! Yes, I would be happy to help around where I am able to. I think we all have the same problem: Time xD I am also a maintainer of another library (https://github.com/mobilityhouse/ocpp/tree/master/ocpp) and with work and other side projects gets tricky. Nevertheless, I think I can easily do some changes that will flatten the learning curve.

Specifically, regarding question number 5.
I wonder if this may be some related to the broker I am using?
I will try again with another broker and I will get back to you on this...

P.S.: Regardless of the young state of this lib, I think is a cool addition to the community, so thanks for your work!

@tropxy
Copy link
Contributor Author

tropxy commented Jun 18, 2021

Hi, I am writing some code to contribute to the lib and as such I am also diving into the details of it and I have a question.
Why do you declare the get here [1] and create a task to get the MQTT message using message.get(), instead of just using the await asyncio.wait(...) with the message.get() given that message is an asyncio.Queue?:

try: 
    done, _ = await asyncio.wait(
            (message.get(), self._disconnected), return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError:
    ....

[1] - https://github.com/sbtinstruments/asyncio-mqtt/blob/f4736adf0d3c5b87a39ea27afd025ed58c7bb54c/asyncio_mqtt/client.py#L310

@tropxy
Copy link
Contributor Author

tropxy commented Jun 18, 2021

I tested issue number 2 with another broker and it works without specifying the Protocol, so I guess it is a faulty implementation of the broker I used before...

Also I have tested issue number 5, I mentioned before, in my comments with another broker and it works, great! My question is why do we need to specify two times the topic we subscribe to? cant we somehow just specify it once ?

@frederikaalund
Copy link
Collaborator

Hi, I am writing some code to contribute to the lib and as such I am also diving into the details of it and I have a question.

Great to hear that. 👍

Why do you declare the get here [1] and create a task to get the MQTT message using message.get(), instead of just using the await asyncio.wait(...) with the message.get() given that message is an asyncio.Queue?:

try: 
    done, _ = await asyncio.wait(
            (message.get(), self._disconnected), return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError:
    ....

Good question. :) The very first version of asyncio-mqtt that I wrote simply used await message.get(). This approach caused the following issue: When the user disconnects, the message loops hang forever. This is unwanted. Therefore, we now wait until either:

  1. We get a message (message.get())
  2. The client disconnects (self._disconnected)

This way, the message loop ends when the client disconnects. No wanted hangs. :) Does it make sense?


I tested issue number 2 with another broker and it works without specifying the Protocol, so I guess it is a faulty implementation of the broker I used before...

Also I have tested issue number 5, I mentioned before, in my comments with another broker and it works, great!

Great to hear that it was a broker issue and not an issue with asyncio-mqtt. :) Thanks for the investigation. 👍

My question is why do we need to specify two times the topic we subscribe to? cant we somehow just specify it once ?

You can specify that you are interested in all messages like this:

async with client.unfiltered_messages() as all_messages:
        await client.subscribe("dummy/contactor")
        async for message in all_messages:
            print(message.topic)
            print(json.loads(message.payload))

I honestly don't know how useful this filtered/unfiltered messages mechanic is in practice for our users. I just copied it over from paho-mqtt (we basically get it for free).

@tropxy
Copy link
Contributor Author

tropxy commented Jun 18, 2021

Good question. :) The very first version of asyncio-mqtt that I wrote simply used await message.get(). This approach caused the following issue: When the user disconnects, the message loops hang forever. This is unwanted. Therefore, we now wait until either:
We get a message (message.get())
The client disconnects (self._disconnected)
This way, the message loop ends when the client disconnects. No wanted hangs. :) Does it make sense?

OK, yeah, I see what you doing now, makes sense. Usually, I use something like this to create a list of tasks that we await for:

async def cancel_task(task):
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass


async def wait_untill_done(awaitables: List[Awaitable[Any]],
                             finished_when=asyncio.FIRST_COMPLETED):
    tasks = []

    for awaitable in awaitables:
        if not isinstance(awaitable, asyncio.Task):
            awaitable = asyncio.create_task(awaitable)
        tasks.append(awaitable)

    done, pending = \
        await asyncio.wait(tasks, return_when=finished_when)

    for task in pending:
        await cancel_task(task)

    errors = []
    for task in done:
        try:
            task.result()
        except Exception as e:
            errors.append(e)

    if len(errors) == 1:
        raise errors[0]

    if errors:
        raise Error(errors)
  
class Error(Exception):
    def __init__(self, errors: List[Exception]):
        self.errors = errors

You can specify that you are interested in all messages like this:
async with client.unfiltered_messages() as all_messages:
await client.subscribe("dummy/contactor")
async for message in all_messages:
print(message.topic)
print(json.loads(message.payload))
I honestly don't know how useful this filtered/unfiltered messages mechanic is in practice for our users. I just copied it over from paho-mqtt (we basically get it for free).

I see...What bugs me is that we can influence the filtering in two ways: either messing with the filtered_messages() or when we subscribe. Like according to the lib example we have this:

async with Client("test.mosquitto.org") as client:
    async with client.filtered_messages("floors/+/humidity") as messages:
        await client.subscribe("floors/#")
        async for message in messages:
            print(message.payload.decode())

The fact that we subscribe to the floors/# entire directory of topics, makes no difference because we filtered for the humidity ones. If we had subscribed outside the async context manager, then it would make more sense for me, because then I could call another async context manager to deal with the unfiltered_messages or call another filtered_messages with a different topic, but given the fact the client is gone when the context manager is finished, this seems weird to me. Am I making any sense?

Maybe we could move the subscribe to the aenter ? taking into account the topic in the filtered_messages? Not sure if it is possible tho

@frederikaalund
Copy link
Collaborator

In general, I find that it's very awkward to handle task wait/cancellation in asyncio. This is why I'm tinkering a bit with an anyio-based implementation. It reads more naturally to me.


Maybe we could move the subscribe to the aenter ? taking into account the topic in the filtered_messages? Not sure if it is possible tho

Indeed, it's the common use case to filter for the very same topic that you subscribe to. There are, however, use cases where you, e.g., subscribe to a general topic ("floors/#") and then have different message loops each with their own filter ("floors/+/humidity", "floors/+/temperature", etc.). I want our API to support that use case.

All that being said, I'm all ears for a wrapper around all this that simplifies the common use case (same topic for filter and subscription). Here is an example of a simple wrapper around asyncio-mqtt:

@asynccontextmanager
async def subscribe(host: str, topic: str, *args, **kwargs):
    async with Client(host, *args, **kwargs) as client:
        async with client.unfiltered_messages() as messages:
            await client.subscribe(topic)
            yield from messages

You use it as follows:

async with subscribe("test.mosquitto.org", "floors/#") as messages:
    async for message in messages:
        print(message.payload.decode())

That's just one example. Do you have any suggestions? :)

@empicano
Copy link
Owner

Given that this issue was addressed with a pull request and we now touch on these points in the documentation, I hope that I can close this. If there's anything left unsolved or unclear, please reopen! 😊

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants