In [50]:
#处理redis初始化从数据库中加载车辆信息以及定时将redis最新数据同步到数据库
import redis
import sched
import time
import psycopg2 as pg
import requests as req
import logging
import configparser
import pika

In [87]:
logging.basicConfig(filename="redis-mq.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%M-%d %H:%M:%S", level=logging.INFO)
parser = configparser.ConfigParser()
parser.read('application.ini')
logging.info('加载application.ini配置')

#获取redis配置信息
redis_host = parser.get('redis', 'host')
bus_pos_key = parser.get('redis', 'bus_pos_key')
syn_time = float(parser.get('redis', 'syn_time'))

#获取postgresql配置信息
pg_host = parser.get('postgresql', 'host')
pg_user = parser.get('postgresql', 'user')
pg_password = parser.get('postgresql', 'password')
pg_database = parser.get('postgresql', 'database')

#获取rabbitmq配置信息
rq_host = parser.get('rabbitmq', 'host')
rq_queue_name = parser.get('rabbitmq', 'queue_name')

In [52]:
#初始化redis数据连接
def init_redis(host):
    try:
        pool = redis.ConnectionPool(host=redis_host)
        rd = redis.Redis(connection_pool=pool)
        logging.info('redis已连接')
        return rd
    except Exception as ex:
        logging.exception('redis初始化错误')

In [53]:
#初始化db数据连接
def init_db(host, database, user_name, password):
    try:
        pgconn = pg.connect(database = database, user = user_name, password = password, host = host)
        curs = pgconn.cursor()
        logging.info('postgresql已连接')
        return pgconn, curs
    except Exception as ex:
        logging.exception('postgresql初始化错误')   

In [None]:
#初始化rabbitmq连接
def init_rabbitmq(host):
    try:
        parameters = pika.ConnectionParameters(rq_host, credentials=pika.credentials.PlainCredentials('admin','admin'), heartbeat=0)
        connection = pika.BlockingConnection(parameters)
        logging.info('rabbitmq已连接')
        return connection
        return pgconn, curs
    except Exception as ex:
        logging.exception('rabbitmq初始化错误') 

In [55]:
def close(rd, pgconn, curs, rqconn):
    if not rd is None:
        logging.info('redis已关闭')
        rd.close()
    if not curs is None:
        curs.close()
        pgconn.close()
        logging.info('postgresql已关闭')
    if not rqconn is None:
        rqconn.close()
        logging.info('rq已关闭')

In [75]:
rd = init_redis(redis_host)
conn, curs = init_db(pg_host, pg_database, pg_user, pg_password)
rqconn = init_rabbitmq(rq_host)

In [None]:
#定义消息接受处理业务
def on_message(channel, method_frame, header_frame, body):
    #res.set(str(method_frame.delivery_tag), str(method_frame.delivery_tag))
    try:
        #TODO 处理消息 此处需要获取车辆机构信息(从缓存中直接获取，如果缓存中没有从数据库中获取)
        #上线 下线既时更新数据库
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    except Exception as ex:
        logging.exception('消息处理失败,忽略')  

In [None]:
channel = rqconn.channel()
channel.basic_consume(rq_queue_name, on_message)
try:
    logging.info('开始监听mq消息队列...')  
    channel.start_consuming()
except Exception as ex:
    channel.stop_consuming()
    close(rd, conn, curs, rqconn)
    logging.exception('mq任务监听失败')   