Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions umqtt.robust/umqtt/robust.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,37 @@ class MQTTClient(simple.MQTTClient):
DEBUG = False

def delay(self, i):
"""
Sleeps the thread for 2 seconds.

:param i: Does not seem to be used.
:return: None
"""
utime.sleep(self.DELAY)

def log(self, in_reconnect, e):
def log(self, in_reconnect: bool, e: Exception):
"""
Simple logging function.

:param in_reconnect: Boolean flag for if function is called in reconnect.
:type in_reconnect: bool
:param e: Exception that occured.
:type e: Exception
:return:None
"""
if self.DEBUG:
if in_reconnect:
print("mqtt reconnect: %r" % e)
else:
print("mqtt: %r" % e)

def reconnect(self):
"""
Reconnects to MQTT broker.

:return: Call to super method connect. Connection response.
:rtype: int
"""
i = 0
while 1:
try:
Expand All @@ -26,7 +47,22 @@ def reconnect(self):
i += 1
self.delay(i)

def publish(self, topic, msg, retain=False, qos=0):
def publish(self, topic: str, msg: str, retain=False, qos=0):
"""
Calls super publish method. If this method fails,
the error is logged and then the client attempts to reconnect
to the MQTT broker.

:param topic: Topic you wish to publish to. Takes the form "path/to/topic"
:type topic: str
:param msg: Message to publish to topic.
:type msg: str
:param retain: Have the MQTT broker retain the message.
:type retain: bool
:param qos: Sets quality of service level. Accepts values 0 to 2. PLEASE NOTE qos=2 is not actually supported.
:type qos: int
:return: None
"""
while 1:
try:
return super().publish(topic, msg, retain, qos)
Expand All @@ -35,6 +71,13 @@ def publish(self, topic, msg, retain=False, qos=0):
self.reconnect()

def wait_msg(self):
"""
Calls super wait_msg. In the event of an exception, the
exception is logged and then the client attempts to
reconnect to the MQTT broker.

:return:None
"""
while 1:
try:
return super().wait_msg()
Expand Down
133 changes: 117 additions & 16 deletions umqtt.simple/umqtt/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,34 @@
import ustruct as struct
from ubinascii import hexlify


class MQTTException(Exception):
pass


class MQTTClient:

def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
def __init__(self, client_id: str, server: str, port=0, user=None, password=None, keepalive=0,
ssl=False, ssl_params={}):
"""
Default constructor, initializes MQTTClient object.

:param client_id: Unique MQTT ID attached to client.
:type client_id: str
:param server: MQTT host address.
:type server str
:param port: MQTT Port, typically 1883. If unset, the port number will default to 1883 of 8883 base on ssl.
:type port: int
:param user: Username if your server requires it.
:type user: str
:param password: Password if your server requires it.
:type password: str
:param keepalive:
:param ssl: Require SSL for the connection.
:type ssl: bool
:param ssl_params: Required SSL parameters.
:type ssl_params: dict
"""
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
Expand All @@ -28,10 +49,20 @@ def __init__(self, client_id, server, port=0, user=None, password=None, keepaliv
self.lw_retain = False

def _send_str(self, s):
"""
Private class method.
:param s:
:return: None
"""
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)

def _recv_len(self):
"""
Private class method.
:return:
:rtype int
"""
n = 0
sh = 0
while 1:
Expand All @@ -42,9 +73,30 @@ def _recv_len(self):
sh += 7

def set_callback(self, f):
"""
Sets the callback function.

:param f: A function to execute when a message is received from the MQTT server.
:return: None
"""
self.cb = f

def set_last_will(self, topic, msg, retain=False, qos=0):
def set_last_will(self, topic: str, msg: str, retain=False, qos=0):
"""
Sets the last will and testament of the client. This is used to perform an action by the broker
in the event that the client "dies".
Learn more at https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/

:param topic: Topic of LWT. Takes the from "path/to/topic"
:type topic: str
:param msg: Message to be published to LWT topic.
:type msg: str
:param retain: Have the MQTT broker retain the message.
:type retain: bool
:param qos: Sets quality of service level. Accepts values 0 to 2. PLEASE NOTE qos=2 is not actually supported.
:type qos: int
:return: None
"""
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
Expand All @@ -53,6 +105,14 @@ def set_last_will(self, topic, msg, retain=False, qos=0):
self.lw_retain = retain

def connect(self, clean_session=True):
"""
Establishes connection with the MQTT server.

:param clean_session: Starts new session on true, resumes past session if false.
:type clean_session: bool
:return: Connection response.
:rtype: int
"""
self.sock = socket.socket()
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
Expand Down Expand Up @@ -85,7 +145,7 @@ def connect(self, clean_session=True):

self.sock.write(premsg, i + 2)
self.sock.write(msg)
#print(hex(len(msg)), hexlify(msg, ":"))
# print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
Expand All @@ -100,13 +160,36 @@ def connect(self, clean_session=True):
return resp[2] & 1

def disconnect(self):
"""
Disconnects from the MQTT server.

:return: None
"""
self.sock.write(b"\xe0\0")
self.sock.close()

def ping(self):
"""
Pings the MQTT server.

:return: None
"""
self.sock.write(b"\xc0\0")

def publish(self, topic, msg, retain=False, qos=0):
def publish(self, topic: str, msg: str, retain=False, qos=0):
"""
Publishes a message to a specified topic.

:param topic: Topic you wish to publish to. Takes the form "path/to/topic"
:type topic: str
:param msg: Message to publish to topic.
:type msg: str
:param retain: Have the MQTT broker retain the message.
:type retain: bool
:param qos: Sets quality of service level. Accepts values 0 to 2. PLEASE NOTE qos=2 is not actually supported.
:type qos: int
:return: None
"""
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
Expand All @@ -119,7 +202,7 @@ def publish(self, topic, msg, retain=False, qos=0):
sz >>= 7
i += 1
pkt[i] = sz
#print(hex(len(pkt)), hexlify(pkt, ":"))
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
Expand All @@ -141,30 +224,44 @@ def publish(self, topic, msg, retain=False, qos=0):
elif qos == 2:
assert 0

def subscribe(self, topic, qos=0):
def subscribe(self, topic: str, qos=0):
"""
Subscribes to a given topic.

:param topic: Topic you wish to publish to. Takes the form "path/to/topic"
:type topic: str
:param qos: Sets quality of service level. Accepts values 0 to 2. PLEASE NOTE qos=2 is not actually supported.
:type qos: int
:return: None
"""
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
#print(hex(len(pkt)), hexlify(pkt, ":"))
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
#print(resp)
# print(resp)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return

# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .set_callback() method. Other (internal) MQTT
# messages processed internally.
#
def wait_msg(self):
"""
Wait for a single incoming MQTT message and process it.
Subscribed messages are delivered to a callback previously
set by .set_callback() method. Other (internal) MQTT
messages processed internally.

:return: None
"""
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
Expand Down Expand Up @@ -196,9 +293,13 @@ def wait_msg(self):
elif op & 6 == 4:
assert 0

# Checks whether a pending message from server is available.
# If not, returns immediately with None. Otherwise, does
# the same processing as wait_msg.
def check_msg(self):
"""
Checks whether a pending message from server is available.
If not, returns immediately with None. Otherwise, does
the same processing as wait_msg.

:return: None
"""
self.sock.setblocking(False)
return self.wait_msg()
return self.wait_msg()