FastMQTT is a high-performance, easy-to-use MQTT v5 client library for Python, built on top of asyncio. It provides a simple and intuitive API for working with MQTT, offering features like automatic reconnection, custom encoders/decoders, and full support for MQTT v5 capabilities.
- Full support for MQTT v5
- Asynchronous API based on asyncio
- Automatic reconnection
- Flexible subscription management
- Custom payload encoders and decoders
- Request-response pattern support
- Router-based topic handling
Install FastMQTT using pip:
pip install fastmqtt
Here's a simple example demonstrating how to use FastMQTT:
import asyncio
import logging
from fastmqtt import FastMQTT, Message
logging.basicConfig(level=logging.INFO)
fastmqtt = FastMQTT("test.mosquitto.org")
@fastmqtt.on_message("my/topic/1")
async def message_handler(message: Message):
print(f"Message received: {message.payload.decode()} on topic {message.topic}")
async def main():
async with fastmqtt:
await fastmqtt.publish("my/topic/1", "Hello from FastMQTT!")
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.run(main())
FastMQTT offers multiple ways to manage subscriptions:
- Using the
@on_message
decorator, can be used only before connecting:
@fastmqtt.on_message("my/topic")
async def handler(message: Message):
...
- Using the
register
method before connecting, can be used only before connecting:
fastmqtt.register(handler, "my/topic")
- Using the
subscribe
method after connecting, can be used always:
await fastmqtt.subscribe(handler, "my/topic")
For better organization of your MQTT handlers, use the MQTTRouter
:
from fastmqtt import MQTTRouter
router = MQTTRouter()
@router.on_message("my/topic")
async def handler(message: Message):
...
fastmqtt = FastMQTT("test.mosquitto.org", routers=[router])
FastMQTT supports custom payload encoding and decoding:
from fastmqtt.encoders import JsonEncoder, JsonDecoder
fastmqtt = FastMQTT(
"test.mosquitto.org",
payload_encoder=JsonEncoder(),
payload_decoder=JsonDecoder()
)
# Now you can publish and receive JSON payloads
await fastmqtt.publish("my/topic", {"key": "value"})
FastMQTT provides a convenient way to implement request-response patterns:
async with fastmqtt.response_context("response/topic") as ctx:
response = await ctx.request("request/topic", "Hello")
print(f"Response: {response.payload.decode()}")
FastMQTT fully supports MQTT v5 features. Here are some examples:
- Using MQTT v5 properties:
from fastmqtt.properties import PublishProperties
props = PublishProperties(
content_type="application/json",
user_property=[("key", "value")]
)
await fastmqtt.publish("my/topic", payload, properties=props)
- Handling retained messages:
from fastmqtt.types import RetainHandling
@fastmqtt.on_message("my/topic", retain_handling=RetainHandling.DO_NOT_SEND)
async def handler(message: Message):
...
- Working with shared subscriptions:
@fastmqtt.on_message("$share/group/my/topic")
async def shared_handler(message: Message):
...
Contributions to FastMQTT are welcome! Please follow these steps to contribute:
- Fork the repository
- Create a new branch for your feature or bug fix
- Write your code and tests
- Run the example scripts to ensure everything passes
- Submit a pull request with a clear description of your changes
FastMQTT is released under the MIT License. See the LICENSE file for details.