Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt client connection issues #111

Open
pikowai opened this issue Jan 2, 2022 · 0 comments
Open

Mqtt client connection issues #111

pikowai opened this issue Jan 2, 2022 · 0 comments

Comments

@pikowai
Copy link

pikowai commented Jan 2, 2022

I am using flask with uwsgi and nginx to accept webhooks from a private instance of The Things Stack and then trying to post mqtt commands to control the IO devices in the field. I can connect using paho to create a client and post data. When I try to use flask_mqtt the client connects and then gives an error message:

16 Error: Received CONNACK (0, 0)
16 Error: Sending PINGREQ
16 Error: Received PINGRESP
16 Error: Sending PINGREQ
etc

On the server side, the logs show:

stack_1 | INFO Connected application_uid=pump-control namespace=applicationserver/io/mqtt remote_addr=10.10.10.204:53803
stack_1 | WARN Failed to setup connection error=read tcp 172.18.0.4:8883->10.10.10.204:58855: i/o timeout namespace=applicationserver/io/mqtt remote_addr=10.10.10.204:58855
stack_1 | WARN Error receiving packet client_id=10.10.10.204:53803-62209089971114 error=read tcp 172.18.0.4:8883->10.10.10.204:53803: i/o timeout namespace=applicationserver/io/mqtt server_name=barn.blammo.co.nz username=pump-control
stack_1 | WARN Error when reading packet application_uid=pump-control error=read tcp 172.18.0.4:8883->10.10.10.204:53803: i/o timeout namespace=applicationserver/io/mqtt remote_addr=10.10.10.204:53803

It works fine if I just use the paho mqtt client, it runs happily as a service.

working source with some connection details removed.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from flask import Flask, request
import paho.mqtt.client as mqttClient
import json
import re
import time
import base64
import ssl

app = Flask(__name__)


BROKER_ENDPOINT = "test"
TLS_PORT = 8883  # Secure port
MQTT_USERNAME = "test"
MQTT_PASSWORD = ""  # Leave this in blank
TOPIC = "v3/x/devices/"
DEVICE_LABEL = "dam-pump/"
QUEUE_REPLACE = "down/replace"
QUEUE_PUSH = "down/push"
TLS_CERT_PATH = "isrgrootx1.pem"  # Put here the path of your TLS cert
OFF_CMD = bytes([8,2,0,0,1,0,0])
ON_CMD = bytes([8,2,0,0,1,0,3])
ON1_CMD = bytes([8,2,0,0,1,0,1])
ON2_CMD = bytes([8,2,0,0,1,0,2])

PUMP_STATE = 0
LOW_BATT = 0
NIGHT = 0
connected = 0



def connect(mqtt_client, mqtt_username, mqtt_password, broker_endpoint, port):
    global connected

    if not mqtt_client.is_connected():
        mqtt_client.username_pw_set(mqtt_username, password=mqtt_password)
        mqtt_client.on_connect = on_connect
        mqtt_client.on_publish = on_publish
        mqtt_client.tls_set(ca_certs=TLS_CERT_PATH, certfile=None,
                            keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
                            tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
        mqtt_client.tls_insecure_set(False)
        mqtt_client.connect(broker_endpoint, port=port)
        mqtt_client.loop_start()

        attempts = 0

        while not connected and attempts < 5:  # Wait for connection
            print(connected)
            print("Attempting to connect...")
            time.sleep(1)
            attempts += 1

    if not connected:
        print("[ERROR] Could not connect to broker")
        return False

    return True


def publish(mqtt_client, topic, payload):

    try:
        mqtt_client.publish(topic, payload)

    except Exception as e:
        print("[ERROR] Could not publish data, error: {}".format(e))


def send_command(mqtt,cmd):
    payload_dict = {"downlinks": [{"f_port": 1,"frm_payload": "","priority": "NORMAL"}]}
    raw_cmd = base64.b64encode(cmd)
    payload_dict["downlinks"][0]["frm_payload"] = raw_cmd.decode("utf8")
    payload = json.dumps(payload_dict)
    print(payload)
    topic = "{}{}{}".format(TOPIC, DEVICE_LABEL, QUEUE_REPLACE)
    print (topic)
    ret = mqtt.publish (topic, payload)
    print('return from mqtt is ',ret)

    
def processP1(payload):
    print('p1 array voltage ',payload['uplink_message']['decoded_payload']['pvArray1V'])
    array_voltage = payload['uplink_message']['decoded_payload']['pvArray1V']
    if array_voltage < 24.5:
        NIGHT = 1
        cmd = ''
        
    else:
        NIGHT = 0
        cmd = ''
    
    return cmd

def processP2(payload):
    batt_voltage = payload['uplink_message']['decoded_payload']['batt1V']
    print('p2 payload',payload['uplink_message']['decoded_payload']['batt1V'])
    print('bat voltage is ', batt_voltage)
    if batt_voltage < 24:
        LOW_BATT = 1
        cmd = OFF_CMD
    else:
        LOW_BATT = 0
        cmd = ''
    return cmd

def processIO(payload):
    print('IO payload',payload["uplink_message"]["decoded_payload"])
    global PUMP_STATE, PUMP_START_TIME
    
    if (payload["uplink_message"]["decoded_payload"]['DI2'] == 1 and payload["uplink_message"]["decoded_payload"]['DI1']==1):
                cmd = ON1_CMD
    elif payload["uplink_message"]["decoded_payload"]['DI1']==1:
        if PUMP_STATE == 0:
            PUMP_STATE = 1
            PUMP_START_TIME = time.time()
            cmd = ''
            print('pump started at ',PUMP_START_TIME)
        elif PUMP_STATE == 1:
            print ("pump running time is ",time.time() - PUMP_START_TIME)
            if time.time() - PUMP_START_TIME > 1500:
                cmd = ON1_CMD 
            elif payload["uplink_message"]["decoded_payload"]['DI2'] == 1:
                cmd = ON1_CMD
            else:
                cmd = ''
        else:
            cmd = OFF_CMD
            print('pump is over time')
    
#    elif payload["uplink_message"]["decoded_payload"]['DO1']==0:
#        cmd = ON1_CMD
    elif payload["uplink_message"]["decoded_payload"]['DO1']==1:
        cmd = OFF_CMD
    else:
        PUMP_STATE = 0
        cmd = ''
        print('pump stopped')
    return cmd

def ackManage(payload):
    print('ack recieved')
    cmd = ''
    return cmd 

def payload_switch(payload):
    switch = {'DataFrameP1':processP1,
              'DataFrameP2':processP2,
              'IOFrame':processIO,
              'modbus_write_ack':ackManage
              }
    print('here', payload["uplink_message"]["decoded_payload"]['msgType'])
    func = switch.get(payload["uplink_message"]["decoded_payload"]['msgType'],'')
    cmd = func(payload)        
    return cmd

def parse_request(req):
    """
    Parses application/json request body data into a Python dictionary
    """
    payload = req.get_data()
  #  payload = unquote_plus(payload)
  #  payload = re.sub('payload=', '', payload)
    payload = json.loads(payload)
    cmd = payload_switch(payload)
    print(cmd)

    return cmd

def on_connect(client, userdata, flags, rc):
    global connected  # Use global variable
    if rc == 0:

        print("[INFO] Connected to broker")
        connected = True  # Signal connection
    else:
        print("[INFO] Error, connection failed")


def on_publish(client, userdata, result):
    print("Published!", result)


@app.route('/', methods=['GET'])
def index():
    """
    Go to localhost:5000 to see a message
    """
    return ('This is a website.', 200, None)


@app.route('/api/print', methods=['POST'])
def print_test():
    """
    Send a POST request to localhost:5000/api/print with a JSON body with a "p" key
    to print that message in the server console.
    """
    payload = parse_request(request)
    print (payload['p'])
    return ("", 200, None)



@app.route('/api/pv-data', methods=['POST'])
def pv_webhook():
    print ("Processing request...")
    cmd = parse_request(request)
    print ("cmd is ", cmd)
    if cmd != '':
        print('sending cmd', cmd)
        mqtt = mqttClient.Client()
        if not connect(mqtt, MQTT_USERNAME,
                   MQTT_PASSWORD, BROKER_ENDPOINT, TLS_PORT):
            return False
        send_command(mqtt, cmd)
    return ("OK",200, None)



if __name__ == '__main__':
    app.run(debug=True, use_reloader=False, host='0.0.0.0', port = 9000)

While this connects and then immediately disconnects

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from flask import Flask, request
from flask_mqtt import Mqtt
import json
import re
import time
import base64
import ssl

app = Flask(__name__)

app.config['MQTT_BROKER_URL'] = 'test'  # use the free broker from HIVEMQ
app.config['MQTT_BROKER_PORT'] = 8883  # default port for non-tls connection
app.config['MQTT_USERNAME'] = 'test'  # set the username here if you need authentication for the broker
app.config['MQTT_PASSWORD'] = ''  # set the password here if the broker demands authentication
app.config['MQTT_KEEPALIVE'] = 5  # set the time interval for sending a ping to the broker to 5 seconds
app.config['MQTT_TLS_ENABLED'] = True
app.config['MQTT_TLS_INSECURE'] = True
app.config['MQTT_TLS_VERSION'] = ssl.PROTOCOL_TLSv1_2
app.config['MQTT_TLS_CA_CERTS'] = 'isrgrootx1.pem'# set TLS to disabled for testing purposes
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_CLIENT_ID'] = '111222555'
app.config['MQTT_CLEAN_SESSION'] = True
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 0
app.config['MQTT_PROTOCOL'] = 'MQTTv31'

TOPIC = "v3/pump-control/devices/"
QUEUE_REPLACE = "down/replace"
QUEUE_PUSH = "down/push"
DEVICE_LABEL = 'dam-pump/'
OFF_CMD = bytes([8,2,0,0,1,0,0])
ON_CMD = bytes([8,2,0,0,1,0,3])
ON1_CMD = bytes([8,2,0,0,1,0,1])
ON2_CMD = bytes([8,2,0,0,1,0,2])
PUMP_STATE = 0
LOW_BATT = 0
NIGHT = 0


mqtt = Mqtt(app)


def send_command(mqtt,cmd):
    payload_dict = {"downlinks": [{"f_port": 1,"frm_payload": "","priority": "NORMAL"}]}
    raw_cmd = base64.b64encode(cmd)
    payload_dict["downlinks"][0]["frm_payload"] = raw_cmd.decode("utf8")
    payload = json.dumps(payload_dict)
    print(payload)
    topic = "{}{}{}".format(TOPIC, DEVICE_LABEL, QUEUE_REPLACE)
    print (topic)
    ret = mqtt.publish (topic, payload)
    print('return from mqtt is ',ret)

    
def processP1(payload):
    print('p1 array voltage ',payload['uplink_message']['decoded_payload']['pvArray1V'])
    array_voltage = payload['uplink_message']['decoded_payload']['pvArray1V']
    if array_voltage < 24.5:
        NIGHT = 1
        cmd = ''
        
    else:
        NIGHT = 0
        cmd = ''
    
    return cmd

def processP2(payload):
    batt_voltage = payload['uplink_message']['decoded_payload']['batt1V']
    print('p2 payload',payload['uplink_message']['decoded_payload']['batt1V'])
    print('bat voltage is ', batt_voltage)
    if batt_voltage < 24:
        LOW_BATT = 1
        cmd = OFF_CMD
    else:
        LOW_BATT = 0
        cmd = ''
    return cmd

def processIO(payload):
    print('IO payload',payload["uplink_message"]["decoded_payload"])
    global PUMP_STATE, PUMP_START_TIME
    
    if (payload["uplink_message"]["decoded_payload"]['DI2'] == 1 and payload["uplink_message"]["decoded_payload"]['DI1']==1):
                cmd = ON1_CMD
    elif payload["uplink_message"]["decoded_payload"]['DI1']==1:
        if PUMP_STATE == 0:
            PUMP_STATE = 1
            PUMP_START_TIME = time.time()
            cmd = ''
            print('pump started at ',PUMP_START_TIME)
        elif PUMP_STATE == 1:
            print ("pump running time is ",time.time() - PUMP_START_TIME)
            if time.time() - PUMP_START_TIME > 300:
                cmd = ON1_CMD 
            elif payload["uplink_message"]["decoded_payload"]['DI2'] == 1:
                cmd = ON1_CMD
            else:
                cmd = ''
        else:
            cmd = OFF_CMD
            print('pump is over time')
    
#    elif payload["uplink_message"]["decoded_payload"]['DO1']==0:
#        cmd = ON1_CMD
    elif payload["uplink_message"]["decoded_payload"]['DO1']==1:
        cmd = OFF_CMD
    else:
        PUMP_STATE = 0
        cmd = ''
        print('pump stopped')
    return cmd

def ackManage(payload):
    print('ack recieved')
    cmd = ''
    return cmd 

def payload_switch(payload):
    switch = {'DataFrameP1':processP1,
              'DataFrameP2':processP2,
              'IOFrame':processIO,
              'modbus_write_ack':ackManage
              }
    print('here', payload["uplink_message"]["decoded_payload"]['msgType'])
    func = switch.get(payload["uplink_message"]["decoded_payload"]['msgType'],'')
    cmd = func(payload)        
    return cmd

def parse_request(req):
    """
    Parses application/json request body data into a Python dictionary
    """
    payload = req.get_data()
  #  payload = unquote_plus(payload)
  #  payload = re.sub('payload=', '', payload)
    payload = json.loads(payload)
    cmd = payload_switch(payload)
    print(cmd)

    return cmd

@mqtt.on_log()
def handle_logging(client, userdata, level, buf):
    print(level,'Error: {}'.format(buf))


@app.route('/', methods=['GET'])
def index():
    """
    Go to localhost:5000 to see a message
    """
    return ('This is a website.', 200, None)


@app.route('/api/print', methods=['POST'])
def print_test():
    """
    Send a POST request to localhost:5000/api/print with a JSON body with a "p" key
    to print that message in the server console.
    """
    payload = parse_request(request)
    print (payload['p'])
    return ("", 200, None)


@app.route('/api/sum', methods=['POST'])
def sum():
    """
    Send a POST request to localhost:5000/api/sum with a JSON body with an "a" and "b" key
    to have the app add those numbers together and return a response string with their sum.
    """
    print ("Processing request...")
    payload = parse_request(request)
    print ("Receieved following paylod:")
    print (payload)

    print ("Adding sum...")
    summation = payload['a'] + payload['b']
    print ("Found sum: %s" % summation)

    print ("Creating response string...")
    resp = '%s + %s = %s' % (payload['a'], payload['b'], summation)
    print ("Sending the following response:")
    print (resp)

    return (resp, 200, None)


@app.route('/api/pv-data', methods=['POST'])
def pv_webhook():
    print ("Processing request...")
    cmd = parse_request(request)
    print ("cmd is ", cmd)
    if cmd != '':
        print('sending cmd', cmd)
        send_command(mqtt, cmd)
    return ("OK",200, None)



if __name__ == '__main__':
    app.run(debug=True, use_reloader=False, host='0.0.0.0', port = 9000)

the following is the uwsgi.ini I use,

[uwsgi]
module = pumphook:app

processes = 1
single-interpreter = true

socket = flask.sock
chmod-socket = 664
vacuum = true

die-on-term = true

Any advice on where I'm going wrong would be much appreciated,
cheers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant