In [1]:
from collections import OrderedDict
import os
import threading
import time
import yaml
import torch
import paho.mqtt.client as mqtt
import json
import pickle
import logging
from logging.handlers import TimedRotatingFileHandler
from typing import List
import pandas as pd
from influxdb import InfluxDBClient
import numpy as np
import random
from get_data import new_uid, Get_last_data,Get_all_training_data
# from output_data import updown, oa_desc
from utils.output_data import Make_point_Energy_Saving_Suggestions, Make_point_Predict_Result, Make_point_Algorithm_Accuracy, Make_point_Algorithm_Log, Make_control_list, Make_together_desc, Output_Alldata_to_Db
from model import train
from sklearn.metrics import mean_squared_error
from Model_Manager.model_manager import Model_Manager
# from OptimizationAlgorithm_Manager.oa_manager import OptimizationAlgorithm_Manager
from utils.interfaces import SafetyBoundary,OptimizationInput,OptimizationOutput,ObservingBoundary
from Optimization_Algorithm.custom_oa import Custom_Optimization

### function

In [2]:
def thread_update(event, model_args, database, input_uid_list, output_uid_list, all_uid_list,model_path):
    """
    模型更新
    """
    init = 0
    try:
        while True:
            event.wait()
            logger.info("模型更新中")
            #用uid_list根据不同房间获取数据
            all_data = Get_all_training_data(all_uid_list, average=False, init=init, database=database)
            logger.info("获取最新数据中")
            try: 
                save_model = train(df=all_data, 
                                   manager=model_manager, 
                                   model_name=model_name, 
                                   model_path=model_path,
                                   input_columns=input_uid_list,
                                   output_columns=output_uid_list, 
                                   args=model_args,
                                   init=init) 
                logger.info("模型更新完成")
            except Exception as e:
                logger.error("An error occurred when updating model: %s", str(e))
            
            lock.acquire()
            try:
                model_update_flag = 1
            finally:
                lock.release()
                
            del save_model
            del all_data
            event.clear()
    except Exception as e:
        logger.error("An error occurred: %s", str(e))


def thread_monitor(event):
    """
    1分钟内未传输完成, 传上一时刻的值
    """
    try:
        while True:
            event.wait()
            start_time = time.time()
            while predicted == 0:
                current_time = time.time()
                elapsed_time = current_time - start_time
                if elapsed_time > 60:
                    # client.publish(topic, predict_data)
                    logger.info("传输上一时刻结果成功！")
                    start_time = time.time()
            event.clear()
    except Exception as e:
        logger.error("An error occurred: %s", str(e))


def Init_predict_model(model_args, database, input_uid_list, output_uid_list, all_uid_list, model_path):
    '''
    初始化预测模型
    '''
    init = 1
    if not os.path.exists(model_path):         
        logger.info("准备训练模型中...") 
        logger.info("读取训练数据中...")
        all_data = Get_all_training_data(all_uid_list, average=False, init=init, database=database)
        logger.info("开始训练...")
        model = train(df=all_data, 
                    manager=model_manager, 
                    model_name=model_name, 
                    model_path=model_path,
                    input_columns=input_uid_list,
                    output_columns=output_uid_list, 
                    args=model_args,
                    init=init) 
        logger.info("模型已保存，训练完成！")
        del all_data
    else:
        logger.info("检测到当前路径已有模型，直接载入")
        model = torch.load(model_path)
    return model

def Init_logger():
    '''
    初始化日志
    '''
    logger = logging.getLogger("my_logger")
    logger.setLevel(logging.INFO)
    handler = logging.FileHandler("run.log",encoding="utf-8", mode="a")
    handler = TimedRotatingFileHandler(
        "run.log", when="midnight", interval=1, backupCount=1
    )
    handler.setLevel(logging.INFO)
    handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
    logger.addHandler(handler)
    return logger

def Init_MQTT_client(configs):
    '''
    初始化MQTT客户端
    '''
    
    broker_address = configs["mqtt"]["broker_address"]
    mqtt_port = configs["mqtt"]["port"]
    topic = configs["mqtt"]["node"] + "/data/raw/point"
    client_id = configs["mqtt"]["client_id"]
    client = mqtt.Client(
        client_id=client_id,
        transport=configs["mqtt"]["transport"],
        clean_session=False,
    )
    try:
        client.connect(broker_address, mqtt_port)
        logger.info("连接MQTT成功!")
    except Exception as e:
        logger.error("An error occurred when connecting MQTT: %s", str(e))
        raise e
    return client, topic

def Init_input_database(configs):
    '''
    初始化 输入数据库
    '''
    host = configs["influxdb"]["influxdbin"]["host"]
    port = configs["influxdb"]["influxdbin"]["port"]
    username = configs["influxdb"]["influxdbin"]["username"]
    password= configs["influxdb"]["influxdbin"]["password"]
    database_name = configs["influxdb"]["influxdbin"]["database_name"]
    timeout= 10
    database = InfluxDBClient(host=host, port=port, timeout=timeout, username=username,password=password)
    database.switch_database(database_name)
    return database

def Init_output_database(configs):
    '''
    初始化 输出数据库
    '''
    host = configs["influxdb"]["influxdbout"]["host"]
    port = configs["influxdb"]["influxdbout"]["port"]
    timeout= 10
    # database = InfluxDBClient(host=host, port=port, timeout=timeout, username=username,password=password)
    database = InfluxDBClient(host=host, port=port, timeout=timeout)
    database.switch_database('iot_origin_database')
    return database

def Init_custom_optimization(oa_configs:dict,last_optimization_input_data:OptimizationInput)->Custom_Optimization:
    '''
    初始化 定制化优化算法
    '''
    custom_optimization = Custom_Optimization(
        last_optimization_input=last_optimization_input_data,**oa_configs
    )
    return custom_optimization

def Get_predict_result(input_data:pd.DataFrame, model):
    '''
    获取预测结果
    输入{
        model 预测模型
        input_data 预测模型输入数据
        }
    返回 predict_result 预测结果
    '''
    with torch.no_grad():
        input_data = input_data.values
        output = model.pred(input_data)
        output = torch.Tensor(output)
        scaler = pickle.load(open("model//scaler.pkl", "rb"))
        if len(output.shape) == 1:
            output = output.reshape(-1, 1)
        predict_result = scaler.inverse_transform(output.cpu().detach().numpy())
    return predict_result[0]

def Count_accuracy(real_data, predict_result):
    '''
    获取准确率
    输入{
       测试数据
        }
    返回 mse 准确率的值
    '''
    if predict_result == 0:
        return 0
    mse = mean_squared_error(real_data, predict_result)
    return mse

def Get_optimization_result(input_data:OptimizationInput,custom_optimization:Custom_Optimization):
    '''
    获得优化结果
    '''
    optimization_output = custom_optimization.handle_optimization_process(input_data)
    return optimization_output


def Renew_output_data(predict_result, optimize_result):
    '''
    更新输出数据  
    '''
    output_data = {}
    output_data["predict_result"] = predict_result
    output_data["optimize_result"] = optimize_result
    return output_data


def Make_PredictResult_to_Dict(output_name:list,predict_result:list)->dict:
    '''
    将预测模型输出结果和对应output_uid_list合并为后续需要的dict字典形式
    '''
    result_dict = dict(zip(output_name, predict_result))
    return result_dict


def Filter_Current_temperature(new_data:pd.DataFrame,current_temp_uid_list:list)->pd.DataFrame:
    '''
    从新读取的数据中筛选获得用于作为优化决策的房间当前温度
    '''
    return new_data[current_temp_uid_list]

def Filter_PredictModel_input_data(new_data:pd.DataFrame,in_predict_uid_list:list)->pd.DataFrame:
    '''
    从新读取的数据中筛选获得用于作为预测输入的参数
    '''
    return new_data[in_predict_uid_list]

def Filter_Optimization_input_data(new_data:pd.DataFrame,input_optimization_uid_list:dict[list])->OptimizationInput:
    '''
    从新读取的数据中筛选获得用于作为优化输入的参数
    '''
    # print('new data is:')
    # print(new_data)
    room_temperature = new_data[input_optimization_uid_list['room_temperature']].values.tolist()[0]
    ac_onoff_setting = new_data[input_optimization_uid_list['ac_onoff_setting']].values.tolist()[0]
    ac_temperature = new_data[input_optimization_uid_list['ac_temperature']].values.tolist()[0]
    ac_temperatue_settings = new_data[input_optimization_uid_list['ac_temperatue_settings']].values.tolist()[0]
    optimizationinput_key = ['room_temperature', 'ac_onoff_setting', 'ac_temperature', 'ac_temperatue_settings']
    input = [room_temperature, ac_onoff_setting, ac_temperature, ac_temperatue_settings]
    optimization_input = dict(zip(optimizationinput_key, input))
    return optimization_input

def Filter_PredictModel_output_data(new_data:pd.DataFrame,out_predict_uid_list:list)->pd.DataFrame:
    '''
    从新读取的数据中筛选获得用于作为预测输出(标签)的参数
    '''
    return new_data[out_predict_uid_list] 

In [3]:

# 创建日志
global logger

try:
    # 载入参数
    with open("./config/configs.yaml", "r",encoding="utf-8") as f:
        configs = yaml.load(f, Loader=yaml.FullLoader)
    with open("./config/model_args.yaml", "r",encoding="utf-8") as f:
        model_args = yaml.load(f, Loader=yaml.FullLoader)
    with open("./config/oa_args.yaml", "r",encoding='utf-8') as f:
        oa_args = yaml.load(f, Loader=yaml.FullLoader)
    
    # 数据库初始化
    input_database = Init_input_database(configs)
    output_database = Init_output_database(configs)
    logger = Init_logger()

    # uid列表初始化
    all_uid_list = []
    optimizationinput_key = ['room_temperature', 'ac_onoff_setting', 'ac_temperature', 'ac_temperatue_settings']
    for input_key in optimizationinput_key:
        all_uid_list += configs["uid"]["input_optimization_uid"][input_key]
    input_optimization_uid_list = configs["uid"]["input_optimization_uid"]
    output_optimization_uid_list = configs["uid"]["output_optimization_uid"]

    print("初始化完毕")

    # 初始化最后一条数据
    old_data = Get_last_data(all_uid_list, input_database)
    print('获取最后一条数据')
    last_optimization_input_data = Filter_Optimization_input_data(old_data, input_optimization_uid_list)
    print('获取最后一条优化输入数据')

    # 初始化自定义优化算法
    custom_optimization = Init_custom_optimization(oa_configs=oa_args["Custom_Optimization"],last_optimization_input_data=last_optimization_input_data)

    while True:
        start_time = time.time()
        
        # 获取最新一条数据
        new_data = Get_last_data(all_uid_list, input_database)
        if not new_data.equals(old_data): # 如果数据不相等，即发生数据更新
            old_data = new_data # 更新数据

        # 获取优化结果
        optimization_input_data:OptimizationInput = Filter_Optimization_input_data(new_data, input_optimization_uid_list)
        print(f'optimization_input is {optimization_input_data}')

        optimization_result:OptimizationOutput = Get_optimization_result(optimization_input_data,custom_optimization)
        print(f"optimization_result is {optimization_result}")

        end_time = int(time.time())
        # sleep_time = 60 - (end_time - start_time) % 60
        sleep_time = 1
        time.sleep(sleep_time)
        
except Exception as e:
    logger.error("An error occurred: %s", str(e))

初始化完毕
获取最后一条数据
获取最后一条优化输入数据
optimization_input is {'room_temperature': [26.6, 25.9], 'ac_onoff_setting': [1.0, 0.0, 0.0, 1.0, 0.0], 'ac_temperature': [26.8, 19.7, 26.1, 28.3, 26.7], 'ac_temperatue_settings': [27.0, 26.0, 26.0, 26.5, 25.5]}
close_uid:['FoShan.50.4739072', 'FoShan.50.4741120', 'FoShan.50.4751360']
Adjusting_AC_Temperature
optimization_result is {'ac_onoff_status_setting': [1, 0, 0, 1, 0], 'ac_temp_setting': [27.5, 26.0, 26.0, 26.5, 25.5]}
optimization_input is {'room_temperature': [26.6, 25.9], 'ac_onoff_setting': [1.0, 0.0, 0.0, 1.0, 0.0], 'ac_temperature': [26.8, 19.7, 26.1, 28.3, 26.7], 'ac_temperatue_settings': [27.0, 26.0, 26.0, 26.5, 25.5]}
close_uid:['FoShan.50.4739072', 'FoShan.50.4741120', 'FoShan.50.4751360']
Sleeping
optimization_result is {'ac_onoff_status_setting': [1, 0, 0, 1, 0], 'ac_temp_setting': [27.5, 26.0, 26.0, 26.5, 25.5]}
optimization_input is {'room_temperature': [26.6, 25.9], 'ac_onoff_setting': [1.0, 0.0, 0.0, 1.0, 0.0], 'ac_temperature': [26.8,

KeyboardInterrupt: 

### main

In [None]:
main()

初始化完毕
线程设置完毕
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:28Z       26.6          26.6   26.6
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:28Z       26.0          26.0   26.0
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:28Z        1.0           1.0    1.0
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:28Z        0.0           0.0    0.0
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:28Z        0.0           0.0    0.0
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:29Z        1.0           1.0    1.0
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:29Z        0.0           0.0    0.0
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:28Z       26.8          26.8   26.8
                   time  abs_value  origin_value  value
0  2024-08-30T06:27:28Z       19.7 