In [1]:
# 导入库
import paho.mqtt.client as ph_mqtt_clt
import json
import threading
import base64
import requests
import time
from enum import Enum


In [2]:
class Mqtt_Clt():
    """
    用于表征Mqtt客户端的类
    """
    def __init__(self, ip_broker, port_broker, topic_sub, topic_pub, time_out_secs):
        """
        parameters:ip_broker:broker的IP地址
                   port_broker:连接服务的端口
                   topic_sub:订阅话题/订阅名称
                   topic_pub:发布话题/发布名称
                   time_out_secs:连接超时的时间
        """
        self.mqtt_clt = ph_mqtt_clt.Client()    # 构造Mqtt客户端
        self.mqtt_clt.on_message = self.on_message  # 设置Mqtt客户端回调on_message为本类成员方法on_message
        self.topic_sub = topic_sub  # 设置订阅话题
        self.topic_pub = topic_pub  # 设置发布话题
        self.msg = {}   # 初始化接收消息的字典类成员变量msg为空({})
        self.mqtt_clt.connect(ip_broker, port_broker, time_out_secs)    # 连接Mqtt Broker
        self.mqtt_clt.subscribe(self.topic_sub,qos=0)  # 订阅相关话题
        self.mqtt_clt.loop_start()  # 开启接收循环 开启独立的线程来循环是否有消息通讯

    def __del__(self):
        self.mqtt_clt.loop_forever()    # 结束接收循环
        self.mqtt_clt.disconnect()  # 断开连接


    def on_message(self, client, userdata, message):
        """
        parameters:client:回调返回的客户端实例
                   userdata:Client()或user_data_set()中设置的私有用户数据
                   msg:MQTTMessage实例,包含topic,payload,qos,retain
        functions:接收消息的回调,提取消息中的相关内容
        """
        self.msg = json.loads(message.payload.decode())
        # print("%%%%%%%%%%%%")
        # print("msg_json:", self.msg)

    def send_json_msg(self, msg):
        """
        parameters:msg:json消息
        returns:无
        functions:在指定的话题上发布消息
        """
        # retain参数:一个 topic 只能存在一条 Retained 消息，发布新的 Retained 消息将会覆盖旧消息
        # 只有新的订阅者才能够收到 Retained 消息
        info = self.mqtt_clt.publish(self.topic_pub, payload="{}".format(msg))
        # print("published:True",end=" ") if info.is_published else print("publish:False",end=" ")
        # print(msg)
        
    def control_device(self, str_key, str_value):
        """
        parameters:str_key:键的字符串
                   str_value:值的字符串
        functions:控制设备
        """
        self.send_json_msg(json.dumps({str_key: str_value}))

In [3]:
# 最后没用上，全部删除了
class PosObj(Enum):
    """
    物体的位置
    """
    before_1st = 0  # 在1号对管之前
    at_1st = 1  # 在1号对管处
    between_1st_and_2nd = 2 # 在1,2号对管之间
    at_2nd = 3  # 在2号对管处
    between_2nd_and_3rd = 4 # 在2,3号对管之间
    at_3rd = 5  # 在3号对管处
    beyond_3rd = 6  # 在3号对管之后

In [4]:
class StateRod(Enum):
    """
    推杆状态
    """
    stop = 0    # 停止
    push = 1    # 推出
    pull = 2    # 拉回

In [5]:
class FS_IIOTA():
    """
    表征智能分拣系统行为的类
    """
    def __init__(self, ip_mqtt_broker, website_back_end):
        """
        parameters:ip_mqtt_broker:Mqtt broker的IP地址
                   website_back_end:用于识别的服务端(后端)网址
        """
        self.mqtt_clt = Mqtt_Clt(ip_mqtt_broker, 1883, "bb", "aa",
                                200)    # 构造Mqtt客户端
        self.website_back_end = website_back_end    # 获取用于识别的后端网址
        self.stat_stack_rod = StateRod.stop # 认为当前上货杆(1号推杆)停止
        self.stat_classify_rod = [StateRod.pull for i in range(3)] # 为每个分类杆单独保存状态
        self.mqtt_clt.control_device("conveyor", "run") # 控制传送带运行
        time.sleep(3)
        self.timer_stack_rod = threading.Timer(1, self.handle_timer_statck_rod)  
        self.mqtt_clt.control_device("rod_control", "first_push")   # 控制上货杆推
        self.stat_stack_rod = StateRod.push # 将上货杆的状态设为推出
        self.timer_stack_rod.start()    # 启动上货杆再次动作的定时器


    def __del__(self):
        self.mqtt_clt.control_device("rod_control", "all_pull") # 控制所有推杆拉回
        time.sleep(3)
        self.mqtt_clt.control_device("conveyor", "stop")    # 控制传送带停止
        time.sleep(1)
        
    def handle_mqtt_json_msg(self, total_tolerance):
        """
        functions:以循环的方式对Mqtt客户端的有效消息进行处理,在发生异常的时候记录异常总次数;并触发相应的状态机
        """
        cnt_except = 0  # 定义发生异常的计数量(初始化为0)
        while cnt_except < total_tolerance: # 在发生异常的计数量小于容忍值时一直循环
            if self.mqtt_clt.msg != {}: # 在mqtt接收到的消息非空时 
                # 这里并不是每0.3s就有图片发过来的 应该是有图片过的时候才发送消息 
                if "image" in self.mqtt_clt.msg:    # 如果接收到的是图像的话
                    img_data = base64.b64decode(self.mqtt_clt.msg["image"]) # 解码获得图像数据
                    try:
                        result = json.loads(requests.post(self.website_back_end, data=img_data).text)   # 将图像数据img_data给POST出去并获得相应的结果result
                        try:
                            assert len(result["results"]) == 1  # 确保识别结果result中的键"results"中的值只有1个
                            label = result["results"][0]["label"]   # 获得识别结果中的label
                            # print("lable=",label)
                            # 这是在图像采集区域，位置都是1号对管之前
                            if label == "ripe": 
                                threading.Timer(1.5,self.handle_timer_classify_rod,(2,)).start()
                            elif label == "half-ripe":  
                                threading.Timer(3,self.handle_timer_classify_rod,(3,)).start()
                            elif label == "raw":   
                                threading.Timer(4.2,self.handle_timer_classify_rod,(4,)).start()
                            else:
                                print("三分类以外的分支")
                        except: # 获得结果时发生异常,则cnt_except自加1
                            print("未获得有效结果!")
                            cnt_except += 1
                            print("cnt_except += 1")
                    except: # post时发生异常,则cnt_except自加1
                        print("无法post!")
                        cnt_except += 1
                        print("cnt_except += 1")
                self.mqtt_clt.msg = {}  # 清空mqtt接收到的消息

    def handle_timer_statck_rod(self):
        """
        functions:对上货杆的状态进行分情况讨论:
        如果当前上货杆状态为推出,则需要令上货杆拉回;
        否则,如果上货杆状态为拉回,则需要令上货杆推出
        这里是自主循环上货了 
        """
        if self.stat_stack_rod == StateRod.push:
            self.mqtt_clt.control_device("rod_control", "first_pull")
            self.stat_stack_rod = StateRod.pull
            self.timer_stack_rod = threading.Timer(0.5, self.handle_timer_statck_rod) 
            self.timer_stack_rod.start()
        elif self.stat_stack_rod == StateRod.pull:
            self.stat_stack_rod = StateRod.stop
            self.mqtt_clt.control_device("rod_control", "first_push")
            self.stat_stack_rod = StateRod.push
            self.timer_stack_rod = threading.Timer(0.5, self.handle_timer_statck_rod)
            self.timer_stack_rod.start()
    

    # 试一试将分类杆按照上货杆自主完成推出和拉回 把statemachine删掉
    def handle_timer_classify_rod(self,act_num):
        """
        functions:对分类杆的状态进行分情况讨论:
        如果当前分类杆的状态为推出,则需要让相应的分类杆拉回
        否则,如果当前分类杆的状态为拉回,则需要让相应的分类杆推出
        这里也是自主循环收、拉分类杆了,而且因为是从摄像头分类开始就定时推出相应的分类杆,所以statemachine函数也删除了
        """
        if act_num == 2:
            # 如果第二个杆处于pull状态，则可以push并设置为push状态
            if self.stat_classify_rod[0] != StateRod.push:
                self.mqtt_clt.control_device("rod_control", "second_push")
                threading.Timer(0.5,self.handle_timer_classify_rod,(act_num,)).start()
                self.stat_classify_rod[0] = StateRod.push
            else:
                # 处于push状态，将其拉回并设置为pull状态
                self.mqtt_clt.control_device("rod_control", "second_pull")
                self.stat_classify_rod[0] = StateRod.pull
        elif act_num == 3:
            if self.stat_classify_rod[1] != StateRod.push:
                self.mqtt_clt.control_device("rod_control", "third_push")
                threading.Timer(0.5,self.handle_timer_classify_rod,(act_num,)).start()
                self.stat_classify_rod[1] = StateRod.push
            else:
                self.mqtt_clt.control_device("rod_control", "third_pull")
                self.stat_classify_rod[1] = StateRod.pull
        elif act_num == 4:
            if self.stat_classify_rod[2] != StateRod.push:
                self.mqtt_clt.control_device("rod_control", "fourth_push")
                threading.Timer(0.5,self.handle_timer_classify_rod,(act_num,)).start()
                self.stat_classify_rod[2] = StateRod.push
            else:
                self.mqtt_clt.control_device("rod_control", "fourth_pull")
                self.stat_classify_rod[2] = StateRod.pull
           

In [6]:
if __name__ == "__main__":
    fs_iiota = FS_IIOTA("127.0.0.1", "http://127.0.0.1:8088/aisim_tf_pre")    # 构造类FS_IIOTA的对象
    fs_iiota.handle_mqtt_json_msg(5)    # 调用成员函数处理通过Mqtt发来的json消息

KeyboardInterrupt: 