# Introducción Paho MQTT en Python

## Paho MQTT Python 

* Es una librería que implementa el cliente MQTT v3.1 y 3.1.1.
* Provee una clase cliente que activa aplica para conectar a un *broker* MQTT para publicar mensajes, y suscribir a los tópicos y recibir mensajes publicados. 
* Incluye funciones de ayuda para publicar y recibir mensajes MQTT de manera sencilla.

https://pypi.org/project/paho-mqtt/

## Crearemos un Cliente sencillo basado en Paho que suscribe y publica mensajes sobre un tópico

Importar librería

In [16]:
import paho.mqtt.client as mqtt
import time

El cliente de Python tiene 4 librería principales

```
* connect()
* subscribe()
* unsubscribe()
* publish()

```

Cada una esta asociada a una funcion de respuesta (```callback()```)
```
* connect() - on_connect()
* subscribe() - on_subscribe()
* unsubscribe() - on_unsubscribe()
* publish() - on_publish()
```

Otras mas utilizadas

```
ON_MESSAGE()
ON_LOG()
ON_SOCKET_OPEN()
ON_SOCKET_CLOSE()
```

Para otras consultar https://pypi.org/project/paho-mqtt/#callbacks


Una Instancia del cliente se crean con ```mqtt.Client```:

**Ejemplo: Creando una instancia del Cliente MQTT**

In [17]:
def prueba():
    print("hola")


In [18]:
help(mqtt.Client.on_message)

Help on property:

    If implemented, called when a message has been received on a topic
    that the client subscribes to.
    
    This callback will be called for every message received. Use
    message_callback_add() to define multiple callbacks that will be called
    for specific topic filters.



In [19]:
host = "10.12.40.58"
ncliente = "rod2"

def on_message(client, userdata, message):
    print("Mensaje Recibido " ,str(message.payload.decode("utf-8")))
    print("Mensaje Topico=",message.topic)
    print("Mensaje qos=",message.qos)
    print("Mensaje retain flag = ",message.retain)

#creando el cliente
cliente = mqtt.Client(ncliente)

In [20]:
#conectandose al cliente
cliente.connect(host)

0

In [21]:
print("Suscribiendo a un tópico","mensajex")
cliente.subscribe("mensajex")

('Suscribiendo a un t\xc3\xb3pico', 'mensajex')


(0, 1)

In [22]:
#publicar
cliente.publish("mensajex",payload="prueba",retain=True,qos=0)


<paho.mqtt.client.MQTTMessageInfo at 0x10898d680>

In [23]:
#asociar mensaje
cliente.on_message = on_message

In [25]:
host = "10.12.40.58"
host = "192.168.1.93"
ncliente = "rod2"

def on_message(client, userdata, message):
    print("Mensaje Recibido " ,str(message.payload.decode("utf-8")))
    print("Mensaje Topico=",message.topic)
    print("Mensaje qos=",message.qos)
    print("Mensaje retain flag = ",message.retain)

#creando el cliente
cliente = mqtt.Client(ncliente)

#conectandose al cliente
cliente.connect(host)

print("Suscribiendo a un tópico","mensajex")
cliente.subscribe("mensajex")

#publicar
#cliente.publish("mensajex",payload="prueba",retain=False,qos=2)

#asociar mensaje
#cliente.on_message = on_message


#iniciar loop
cliente.loop_start()


('Suscribiendo a un t\xc3\xb3pico', 'mensajex')
0x20
0x90
0x30
0xd0


In [29]:
#cliente.on_message = on_message
#cliente.subscribe("mensajex")
cliente.publish("mensajex",payload="prueba",retain=False,qos=2)

<paho.mqtt.client.MQTTMessageInfo at 0x10899b158>

0x50
0x70
0x30
0xd0
0xd0


In [None]:
time.sleep(10)
#detener cliente
cliente.loop_stop()

#desconectar
cliente.disconnect()


In [32]:
#desconectar
cliente.disconnect()

4

In [9]:
#help(mqtt.Client)

**Conectandose con un broker**

In [10]:
#help(cliente.connect)

### Publicando Mensajes

help(cliente.publish)

publish(topic, payload=None, qos=0, retain=False)

In [6]:
help(cliente.publish)

Help on method publish in module paho.mqtt.client:

publish(self, topic, payload=None, qos=0, retain=False) method of paho.mqtt.client.Client instance
    Publish a message on a topic.
    
    This causes a message to be sent to the broker and subsequently from
    the broker to any clients subscribing to matching topics.
    
    topic: The topic that the message should be published on.
    payload: The actual message to send. If not given, or set to None a
    zero length message will be used. Passing an int or float will result
    in the payload being converted to a string representing that number. If
    you wish to send a true int/float, use struct.pack() to create the
    payload you require.
    qos: The quality of service level to use.
    retain: If set to true, the message will be set as the "last known
    good"/retained message for the topic.
    
    Returns a MQTTMessageInfo class, which can be used to determine whether
    the message has been delivered (using info.is_

In [7]:
cliente.publish("mensajex",payload="prueba",retain=True)

<paho.mqtt.client.MQTTMessageInfo at 0x10c0e48e8>

### Suscribiendo a tópicos

In [12]:
print("Suscribiendo a un tópico","mensajex")
cliente.subscribe("mensajex",qos=0)

('Suscribiendo a un t\xc3\xb3pico', 'mensajex')


(0, 2)

In [13]:
help(cliente.subscribe)

Help on method subscribe in module paho.mqtt.client:

subscribe(self, topic, qos=0) method of paho.mqtt.client.Client instance
    Subscribe the client to one or more topics.
    
    This function may be called in three different ways:
    
    Simple string and integer
    -------------------------
    e.g. subscribe("my/topic", 2)
    
    topic: A string specifying the subscription topic to subscribe to.
    qos: The desired quality of service level for the subscription.
         Defaults to 0.
    
    String and integer tuple
    ------------------------
    e.g. subscribe(("my/topic", 1))
    
    topic: A tuple of (topic, qos). Both topic and qos must be present in
           the tuple.
    qos: Not used.
    
    List of string and integer tuples
    ------------------------
    e.g. subscribe([("my/topic", 0), ("another/topic", 2)])
    
    This allows multiple topic subscriptions in a single SUBSCRIPTION
    command, which is more efficient than using multiple calls to
    

### Procesando los mensajes recibidos

Para ver los mensajes recibidos es necesario definir una función ```callback```. 

Para procesar los ```callbacks``` se requiere:
1. Implementar una función ```callback```.
2. Ejecutar un ```loop``` para revisar mensajes de respuesta

[Referencia sobre ```callbacks``` Client Docs](https://pypi.org/project/paho-mqtt/#callbacks)


Consultar (```help(mqtt.Client)```), para conocer el prototipo de la función.

Por ejemplo, para connect tenemos el siguiente prototipo descrito en la documentación

```
def on_connect(client, userdata, flags, rc):
 |      print("Connection returned " + str(rc))
 |  
 |  client.on_connect = on_connect
 |  
 |  All of the callbacks as described below have a "client" and an "userdata"
 |  argument. "client" is the Client instance that is calling the callback.
 |  "userdata" is user data of any type and can be set when creating a new client
 |  instance or with user_data_set(userdata).
 |  
 |  The callbacks:
 |  
 |  on_connect(client, userdata, flags, rc): called when the broker responds to our connection
 |    request.
 |    flags is a dict that contains response flags from the broker:
 |      flags['session present'] - this flag is useful for clients that are
 |          using clean session set to 0 only. If a client with clean
 |          session=0, that reconnects to a broker that it has previously
 |          connected to, this flag indicates whether the broker still has the
 |          session information for the client. If 1, the session still exists.
 |    The value of rc determines success or not:
 |      0: Connection successful
 |      1: Connection refused - incorrect protocol version
 |      2: Connection refused - invalid client identifier
 |      3: Connection refused - server unavailable
 |      4: Connection refused - bad username or password
 |      5: Connection refused - not authorised
 |      6-255: Currently unused.
```

In [14]:
def on_message(client, userdata, message):
    print("Mensaje Recibido " ,str(message.payload.decode("utf-8")))
    print("Mensaje Topico=",message.topic)
    print("Mensaje qos=",message.qos)
    print("Mensaje retain flag = ",message.retain)

In [15]:
help(cliente.on_message)

Help on NoneType object:

class NoneType(object)
 |  Methods defined here:
 |  
 |  __hash__(...)
 |      x.__hash__() <==> hash(x)
 |  
 |  __repr__(...)
 |      x.__repr__() <==> repr(x)



In [16]:
cliente.on_message = on_message

In [17]:
cliente.loop_start()

0x20
0x90
0x30
('Mensaje Recibido ', 'prueba')
('Mensaje Topico=', u'mensajex')
('Mensaje qos=', 0)
('Mensaje retain flag = ', 1)


In [20]:
#cliente.subscribe("mensajex")
print("Publishing message to topic","mensajex")
cliente.publish("mensajex","desde")

('Publishing message to topic', 'mensajex')


<paho.mqtt.client.MQTTMessageInfo at 0x103698a48>

In [19]:

#time.sleep(4) # wait
cliente.loop_stop() #stop the loop

### Referencia
    
* https://pypi.org/project/paho-mqtt/#callbacks

In [21]:
import paho.mqtt.client as mqtt
import time

def on_message(client, userdata, message):
    print("Mensaje Recibido: " ,str(message.payload.decode("utf-8")))
    print("Mensaje topic = ",message.topic)
    print("Mensaje qos = ",message.qos)
    print("Mensaje retain flag =",message.retain)
    
def on_log(client, userdata, level, buf):
    print("log: ",buf)
    
def on_disconnect(client, userdata,rc):
    logging.debug("DisConnected result code "+str(rc))
    cliente.loop_stop()

host = "192.168.1.99"
ncliente = "rod3"
cliente = mqtt.Client(ncliente,clean_session=False)
cliente.on_message = on_message
cliente.on_log = on_log
#cliente.on_disconnect = on_disconnect

print("connecting to broker")
cliente.connect(host) 







connecting to broker
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')


0

In [22]:
cliente.loop_start() 
#cliente.disconnect()

0x20
('log: ', 'Received CONNACK (0, 0)')


In [54]:
print("Suscribir a tópico","mensajex")
cliente.subscribe("mensajenb2")

('Suscribir a t\xc3\xb3pico', 'mensajex')
('log: ', "Sending SUBSCRIBE (d0) [('mensajenb2', 0)]")


(0, 1)

In [24]:
#print("Publishing message to topic","mensajex")
cliente.publish("mensajenb2","hola")

('log: ', "Sending PUBLISH (d0, q0, r0, m2), 'mensajenb2', ... (4 bytes)")


<paho.mqtt.client.MQTTMessageInfo at 0x103698e68>

('log: ', 'Sending PINGREQ')
0xd0
('log: ', 'Received PINGRESP')


In [52]:
cliente.publish("mensajenb2","hola")

<paho.mqtt.client.MQTTMessageInfo at 0x10ba76f70>

('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')
0x20
('log: ', 'Received CONNACK (1, 0)')
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')
0x20
('log: ', 'Received CONNACK (1, 0)')
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')
0x20
('log: ', 'Received CONNACK (1, 0)')
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')
0x20
('log: ', 'Received CONNACK (1, 0)')
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')
0x20
('log: ', 'Received CONNACK (1, 0)')
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')
0x20
('log: ', 'Received CONNACK (1, 0)')
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')
0x20
('log: ', 'Received CONNACK (1, 0)')
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=rod3')
0x20
('log: ', 'Received CONNACK (1, 0)')
('log: ', 'Sending CONNECT (u0, p0, wr0, wq0, wf

In [58]:
#time.sleep(30) # wait
cliente.loop_stop() #stop the loop

3

In [59]:
cliente.disconnect()

4

# Extras:  Particularidades de la Conexión  al broker

El método ```connect```, recibe 4 parámetros:

El Unico parámetro obligatorio es el ```host```.

Cuando se establece conexión con el broker, este responde con un ```CONNACK```, el cual activa la función callback **on_connect** (la descripción de los mensajes que activan todos los ```callbacks``` están en  https://pypi.org/project/paho-mqtt/#callbacks)


![image.png](attachment:image.png) 

Los callbacks son prototipos de funciones (sin definir), que son activadas por los mensajes recibidos del broker.

Por ejemplo, el evento de recibir la confirmación de la conexion (CONNACK),  activa la función ```on_connect.``` el cual recibe  recibe 4 parámetros: ```(client, userdata, flags, rc)``` 

```on_connect(client, userdata, flags, rc)```

Una implementación sencilla de la funcion puede ser:

```
def on_connect(client, userdata, flags, rc):
    print("Connection returned result: "+connack_string(rc))

mqttc.on_connect = on_connect

```

o tambien puede ser mas elaborada.

```

def on_connect(client, userdata, flags, rc):
    if rc==0:
        print("connected OK Returned code=",rc)
    else:
        print("Bad connection Returned code=",rc)
```

Por ejemplo, podemos imprimir que pasó con la conexión si conocemos los return codes (rc):

0: Connection successful  
1: Connection refused – incorrect protocol version  
2: Connection refused – invalid client identifier  
3: Connection refused – server unavailable  
4: Connection refused – bad username or password  
5: Connection refused – not authorised  
6-255: Currently unused.  
    


## Procesando ```on_connect callback```

Otra aplicación es el procesamiento continuo de los mensajes que el cliente recibe del broker.

Para la implementación de ```on_connect```

1. Se crea el cliente
2. Se define la función ```callback``` respetando los parámetros de entrada
3. **Se enlaza una implementación función callback** <-- Necesario antes de establecer la conexión
4. Se conecta al broker
5. Se ejecuta el "loop"

La ejecución de la funcion ```callback``` se ejecuta de manera independiene en un hilo generado por ```loop_start()```, por lo tanto es asíncrona.


Ejemplo:


```
import paho.mqtt.client as mqtt
def on_connect(clientE, userdata, flags, rc):
   if rc==0
      print("CONEXION EXITOSA")
client = mqtt.Client(“python1”)   # crear una nueva instancia
client.on_connect=on_connect      # enlazar callback function
client.connect(broker_address)    # conectar con el broker
client.loop_start()               # Iniciar un thread que ejecuta el loop
time.sleep(4)                     # Dormir la ejecución principal 4 segundos

client.loop_stop()                 #Deterner loop 
```

**Fallas en la conexión**

1. Configuración incorrecta del cliente
2. No hay coenxión 
3. Parametros incorrectos para conectarse a la red como puerto incorrecto

**EXCEPCIONES**

```
try:
    cliente.connect(broker,port) # Conectar al broker
except:
    print(“Fallo conexión”)
    exit(1)                      # Detener la ejecución, modificar bandera, intentar reconexión
```

**Manejo de fallas en la conexión**

El codigo de retorno (0) en ```on_connect```, nos ayuda a determinar si la conexión fué exitosa. 

En caso de detactar una falla en la conexión (e.g, password incorrecto lo que se debe hacer es)

1. Detener el loop
2. Detener el script


Para esto las funciones ```callback``` se definen:

In [85]:
def on_connect(client, userdata, flags, rc):
    if rc==0:
        client.connected_flag=True #set flag
        print("connected OK")
    else:
        print("Bad connection Returned code=",rc)
        client.bad_connection_flag=True

```

while not client.connected_flag and not client.bad_connection_flag: #wait in loop
    print("In wait loop")
    time.sleep(1)
    
#detenemos el loop solo si hay fallo en la conexi con este if que funciona como filtro.
if client.bad_connection_flag:
    client.loop_stop()    #Stop loop
    sys.exit()
```    
# de otra manera

**Desconexión**
```
def on_disconnect(client, userdata, rc):
    logging.info("disconnecting reason  "  +str(rc))
    client.connected_flag=False
    client.disconnect_flag=True
    
```

**REFERENCIA** http://www.steves-internet-guide.com/client-connections-python-mqtt/

**Loop**

* Los mensajes llegan al cliente , llegan antes a un buffer llamado *buffer receptor*
* Los mensajes en el buffer receptor esperan ser leídos
* Del lado receptor ve los mensajes y dependiendo del tipo de mensaje acciona una funcion callback.
* una vez que son recibidos los mensajes, estos son procesados


![image.png](attachment:image.png)




El Cliente Paho Python provee 3 métodos para iniciar la ejecución de un loop.

```
1. loop_start(): Comienza un loop en un hilo y es ejecutado en intervalos de tiempo. Puede ser detenido con un un loop_stop() e intenta reconectarser al broker automáticamente.
2. loop_forever(): BLoquea el programa y se detetiene desde una interrupción del sistema operativo
3. loop(), es una sola llamada a la función y se ejecuta de manera manual. 
```

## QOS parte 2
Intervenir client.py y simular:

Que el broker no responde con connack  
Que PUBREC no llega al cliente  
Que PUBCOMP no llega al cliente  
Que PUBACK no llega al cliente.  
  
De acuerdo al diagrama de de bloques dado en clase, que sucede?  



### Práctica 2.1



1. Probar el funcionamiento de la bandera Clean session.

2. Ejecutar unsuscribe() y probar que funciona.

3. Implemntar y probar callback on_publish(client, userdata, mid) e investigar e implementar el callback ```on_message``` mas otro de libre elección y explicar su funcionamiento.

4. Detener el loop automáticamente si la conexión falla. usando ```def on_disconnect(client, userdata,rc=0)```

5. Implementar un mecanismo para detener el loop cuando hay desconexión con las banderas ```client.connected_flag``` - ```client.bad_connection_flag``

6. Simular un sensor con un generador de señales virtual. Esta señal es transmitida por el protocolo MQTT utilizando Paho. 
  1. El cliente que recibe debe ser capaz de almacenar el dato en una base de datos (.csv, .txt, mysql, mongodb, panda database)


client.connected_flag = TRUE cuando rc = 0
client.bad_connection_flag = 1 cuando rc>0




Ejecutar y detener por medio de mensajes un cliente de MQTT*

In [None]:
## Notas: 
    
Returning Values from Callbacks
Due to the way they operate It is not possible to return values from a callback using the standard return command.

### Referencia
    
* https://pypi.org/project/paho-mqtt/#callbacks