# pylint: disable=global-statement,unused-argument import asyncio import logging from gmqtt import Client as MQTTClient from gmqtt.mqtt.constants import MQTTv311 _DISC = None async def _main(): logging.basicConfig(level="DEBUG") global _DISC _DISC = asyncio.Event() client = MQTTClient(None) client.on_disconnect = _on_disconnect client.set_config({"reconnect_retries": 0}) connect_task = asyncio.create_task(client.connect("127.0.0.1", port=2000, version=MQTTv311)) wait_for_disconnect_task = asyncio.create_task(_DISC.wait()) done, pending = await asyncio.wait([connect_task, wait_for_disconnect_task], timeout=10, return_when=asyncio.FIRST_COMPLETED) logging.debug("wait for connect() returned, done: %d, pending: %d", len(done), len(pending)) logging.debug("Client.connect() completed: %d", connect_task in done) if connect_task in done: try: logging.debug("Client.connect() result: %s", connect_task.result()) except ConnectionRefusedError: logging.debug("Connection was refused.") logging.debug("_DISC.wait() completed: %d", wait_for_disconnect_task in done) logging.debug("_DISC.is_set(): %d", _DISC.is_set()) logging.debug("Cancelling pending tasks") for task in pending: task.cancel() try: await task except asyncio.CancelledError: pass logging.debug("Cancelled pending tasks") logging.debug("Wait for disconnect()") await client.disconnect() logging.debug("disconnect() returned") logging.debug("waiting for 20 seconds") await asyncio.sleep(20) logging.debug("waited for 20 seconds; exiting _main()") def _on_disconnect(client, packet, exc=None): logging.debug("_on_disconnect(exc='%s')", exc) _DISC.set() if __name__ == '__main__': asyncio.run(_main())