In [1]:
import sys
import time
import json
from mns.account import Account
from mns.queue import *
from mns.topic import *
from mns.subscription import *
import configparser as ConfigParser


cfgFN = "sample.cfg"
required_ops = [("Base", "AccessKeyId"),
                ("Base", "AccessKeySecret"), ("Base", "Endpoint")]
optional_ops = [("Optional", "SecurityToken")]

parser = ConfigParser.ConfigParser()
parser.read(cfgFN)


['sample.cfg']

In [9]:
# 定义错误类型，分为消息获取错误、消息发送错误、删除错误、修改错误
class MNSMessageRetrieveException(Exception):
    pass

class MNSMessageSendException(Exception):
    pass

class MNSMessageDeleteException(Exception):
    pass

class MNSMessageModifyException(Exception):
    pass



In [2]:
accessKeyId = parser.get("Base", "AccessKeyId")
accessKeySecret = parser.get("Base", "AccessKeySecret")
endpoint = parser.get("Base", "Endpoint")
securityToken = ""
if parser.has_option("Optional", "SecurityToken") and parser.get("Optional", "SecurityToken") != "$SecurityToken":
    securityToken = parser.get("Optional", "SecurityToken")

# 初始化my_account
my_account = Account(endpoint, accessKeyId, accessKeySecret, securityToken)


In [3]:
import warnings

from transformers import pipeline

warnings.filterwarnings("ignore")

# Load the model
classifier = pipeline("zero-shot-classification", model="./model", device=0)

def classify_text(classifier, text, labels):
    return classifier(
        text,
        labels,
        multi_label=True
    )



In [4]:
receive_queue = my_account.get_queue(queue_name='python-label-task')


In [6]:
msg = receive_queue.receive_message(wait_seconds=3)

In [8]:
eval(msg.message_body)

{'active': 'local',
 'clueId': 123456,
 'tagGroup': [{'groupId': 123,
   'tagList': [{'tagId': 1, 'tagName': '1节课'},
    {'tagId': 2, 'tagName': '2节课'},
    {'tagId': 3, 'tagName': '3节课'},
    {'tagId': 4, 'tagName': '4节课'},
    {'tagId': 5, 'tagName': '5节课'}],
   'hitWord': [{'role': '销售', 'word': '今天有个5节,哦不是，是9节，的编程特惠课给到孩子'},
    {'role': '销售', 'word': '明天有个4节的编程特惠课给到孩子'}]},
  {'groupId': 123,
   'tagList': [{'tagId': 1, 'tagName': '1节课'},
    {'tagId': 2, 'tagName': '2节课'},
    {'tagId': 3, 'tagName': '3节课'},
    {'tagId': 4, 'tagName': '4节课'},
    {'tagId': 5, 'tagName': '5节课'}],
   'hitWord': [{'role': '销售', 'word': '今天有个5节,哦不是，是9节，的编程特惠课给到孩子'},
    {'role': '销售', 'word': '明天有个4节的编程特惠课给到孩子'}]}]}

In [None]:
def receive_raw_messages(receive_queue):
    wait_seconds = 3
    try:
        recv_msg = receive_queue.receive_message(wait_seconds)
        return recv_msg
    except MNSExceptionBase as e:
        if e.type == "QueueNotExist":
            print("Queue not exist, please create queue before receive message.")
        elif e.type == "MessageNotExist":
            print("Queue is empty!")
        
        # 返回消息接收错误
        raise MNSMessageRetrieveException("Message retrieve error")



In [64]:
def process_raw_messages(message):
    try:
        # Extract the JSON data from the message body
        message_body = json.loads(message.message_body)

        activeEnv = message_body['active']
        clueId = message_body['clueId']

        # extract tagNames from data
        tags = [[tag["tagName"] for tag in i["tagList"]]
                for i in message_body['tagGroup']]
        # extract hitWord from data
        text = [[hit["word"] for hit in i["hitWord"]]
                for i in message_body['tagGroup']]

        output = [classify_text(classifier, text[i], tags[i])
                  for i in range(len(tags))]
        tagId = []
        # return the ID corresponding to the label of the maximum probability of each sequence
        for tagIter, items in enumerate(output):
            for item in items:
                matched_tags = item["labels"][0]
                # query the ID corresponding to the label in the tagList
                for tag in message_body['tagGroup'][tagIter]["tagList"]:
                    if tag["tagName"] == matched_tags:
                        tagId.append(tag["tagId"])
                        break
        return [activeEnv, clueId, tagId]
    except Exception as e:
        print(e)
        # 返回消息处理错误
        raise MNSMessageModifyException("Message modify error")


In [None]:
# 发送返回队列消息
def send_back_processed_messages(queue_name, clueId, tagId):
    try:
        # 初始化my_queue
        my_queue = my_account.get_queue(queue_name=queue_name)

        # 构造返回数据
        body = {
            "clueId": clueId,
            "tagId": tagId
        }

        # 发送返回数据
        msg_body = json.dumps(body)
        msg_tag = "test"
        msg = Message(msg_body)

        # 发送消息到my_queue
        msg = Message()
        msg.message_body = json.dumps(tagId)

        re_msg = my_queue.send_message(msg)
        print("Send Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" %
              (re_msg.receipt_handle, msg.message_body, re_msg.message_id))
    except MNSExceptionBase:
        # 返回消息发送错误
        raise MNSMessageSendException("Message send error")



In [None]:
def delete_processed_messages(receive_queue, message):
    try:
        receive_queue.delete_message(message.receipt_handle)
        print("Delete Message Succeed!  ReceiptHandle:%s" %
              message.receipt_handle)
    except MNSExceptionBase as e:
        if e.type == "QueueNotExist":
            print("Queue not exist, please create queue before delete message.")
        elif e.type == "MessageNotExist":
            print("Message not exist, maybe deleted already!")
        raise MNSMessageDeleteException("Message delete error")


In [None]:
def send_test_message(receive_queue):
    with open('test.json', 'r') as f:
        test_data = json.load(f)

    # 测试发送消息
    msg_count = 10

    for i in range(msg_count):
        msg_body = json.dumps(test_data)
        msg_tag = "test"
        msg = Message(msg_body)
        try:
            re_msg = receive_queue.send_message(msg)
            print("Publish Message Succeed.\nMessageBody:%s\nMessageTag:%s\nMessageId:%s\nMessageBodyMd5:%s\n\n" % (msg_body, msg_tag, re_msg.message_id, re_msg.message_body_md5))
        except MNSExceptionBase as e:
            print("Send Message Fail! Exception:%s\n" % e)


In [None]:
wait_seconds = 3

receive_queue = my_account.get_queue(queue_name='python-label-task')
send_test_message(receive_queue)

send_back_queue = my_account.get_queue(queue_name='python-label-result')

while True:
    try:
        # 接收消息
        recv_msg = receive_raw_messages(receive_queue)
    except MNSMessageRetrieveException as e:
        # 暂停wait_seconds秒
        print(e)
        time.sleep(wait_seconds)
        continue

    try:
        # 处理消息
        [clueId, tagId] = process_raw_messages(recv_msg)
    except MNSMessageModifyException as e:
        # 定义下次重新接收消息的时间间隔
        time_interval = 2**(recv_msg.dequeue_count - 1)
        # 将该消息修改为三十秒内不可见
        if recv_msg.dequeue_count <= 8:
            receive_queue.change_message_visibility(recv_msg.receipt_handle, 30)
        else:
            # 删除消息
            delete_processed_messages(receive_queue, recv_msg)
        continue

    try:
        # 发回消息
        send_back_processed_messages(send_back_queue, clueId, tagId)
    except MNSMessageSendException as e:
        print(e)
        time.sleep(wait_seconds)

    try:
        # 删除消息
        delete_processed_messages(receive_queue, recv_msg)
        
    except MNSMessageDeleteException as e:
        print(e)
        time.sleep(wait_seconds)
        
    except Exception as e:
        print(e)
        time.sleep(wait_seconds)
