In [None]:
import mysql.connector
import pandas as pd
import numpy as np
from sklearn.metrics import pairwise_distances
import joblib
from datetime import datetime
import time
import logging

# -------------------------------
# تنظیمات لاگ (اختیاری: می‌تونی به فایل هم ذخیره کنی)
# -------------------------------
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)s | %(message)s',
    handlers=[
        logging.FileHandler("anomaly_detection.log", encoding='utf-8'),
        logging.StreamHandler()
    ]
)

# -------------------------------
# 1. بارگذاری مدل
# -------------------------------
try:
    scaler = joblib.load('scaler.pkl')
    dbscan = joblib.load('dbscan_model.pkl')
    cluster_points = np.load('cluster_points.npy')
    logging.info("مدل و اجزا با موفقیت بارگذاری شدند.")
except Exception as e:
    logging.error(f"خطا در بارگذاری مدل: {e}")
    exit()

# -------------------------------
# 2. تابع تشخیص ناهنجاری
# -------------------------------
def is_anomalous(input_dict, threshold=10.0):
    input_df = pd.DataFrame([input_dict])
    scaled_input = scaler.transform(input_df)
    distance = pairwise_distances(scaled_input, cluster_points).min()
    anomaly_weight = distance
    return {
        'is_anomaly': anomaly_weight > threshold,
        'anomaly_weight': anomaly_weight
    }

# -------------------------------
# 3. اتصال به MySQL
# -------------------------------
def get_connection():
    return mysql.connector.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='',
        database='dsas'
    )

# -------------------------------
# 4. تابع اصلی پردازش
# -------------------------------
def process_new_records():
    conn = get_connection()
    cursor = conn.cursor(dictionary=True)

    try:
        # آخرین TimeStamps در جدول نتایج
        cursor.execute("SELECT MAX(TimeStamps) AS last_ts FROM results_table_mhi_lube_oil_g11")
        result = cursor.fetchone()
        last_processed_ts = result['last_ts'] if result and result['last_ts'] else None

        # شرط برای رکوردهای جدید
        if last_processed_ts:
            query_main = """
                SELECT 
                    `id`, `AssetID_8341`, `AssetID_8342`, `AssetID_8343`, `AssetID_8344`, 
                    `AssetID_8346`, `AssetID_9286`, `AssetID_9287`, `unitID`, 
                    `DateTime`, `RecordDate`, `RecordTime`, `TimeStamps`
                FROM `main_table_mhi_lube_oil_11`
                WHERE `TimeStamps` > %s
                ORDER BY `TimeStamps` ASC
            """
            cursor.execute(query_main, (last_processed_ts,))
        else:
            query_main = """
                SELECT 
                    `id`, `AssetID_8341`, `AssetID_8342`, `AssetID_8343`, `AssetID_8344`, 
                    `AssetID_8346`, `AssetID_9286`, `AssetID_9287`, `unitID`, 
                    `DateTime`, `RecordDate`, `RecordTime`, `TimeStamps`
                FROM `main_table_mhi_lube_oil_11`
                ORDER BY `TimeStamps` ASC
            """
            cursor.execute(query_main)

        new_records = cursor.fetchall()

        if not new_records:
            logging.info("رکورد جدیدی برای پردازش یافت نشد.")
            return 0

        inserted = 0
        for record in new_records:
            ts = record['TimeStamps']
            sample_input = {f'AssetID_{aid}': record[f'AssetID_{aid}'] for aid in [8341, 8342, 8343, 8344, 8346, 9286, 9287]}

            # مدل
            result = is_anomalous(sample_input, threshold=10.0)
            anomaly_weight = result['anomaly_weight']
            model_result = "Abnormal" if anomaly_weight >= 5 else "Normal"
            model_name = "dbscan"
            now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

            # داده برای درج
            insert_data = {
                **sample_input,
                'unitID': record['unitID'],
                'model_name': model_name,
                'model_result': model_result,
                'model_value': anomaly_weight,
                'DateTime': record['DateTime'],
                'RecordDate': record['RecordDate'],
                'RecordTime': record['RecordTime'],
                'TimeStamps': ts,
                'created_at': now,
                'updated_at': now
            }

            query_insert = """
                INSERT INTO `results_table_mhi_lube_oil_g11` 
                (`AssetID_8341`, `AssetID_8342`, `AssetID_8343`, `AssetID_8344`, 
                 `AssetID_8346`, `AssetID_9286`, `AssetID_9287`, `unitID`, 
                 `model_name`, `model_result`, `model_value`, 
                 `DateTime`, `RecordDate`, `RecordTime`, `TimeStamps`, 
                 `created_at`, `updated_at`)
                VALUES 
                (%(AssetID_8341)s, %(AssetID_8342)s, %(AssetID_8343)s, %(AssetID_8344)s,
                 %(AssetID_8346)s, %(AssetID_9286)s, %(AssetID_9287)s, %(unitID)s,
                 %(model_name)s, %(model_result)s, %(model_value)s,
                 %(DateTime)s, %(RecordDate)s, %(RecordTime)s, %(TimeStamps)s,
                 %(created_at)s, %(updated_at)s)
            """

            cursor.execute(query_insert, insert_data)
            inserted += 1

        conn.commit()
        logging.info(f"{inserted} رکورد جدید با موفقیت پردازش و ثبت شد.")
        return inserted

    except mysql.connector.Error as e:
        logging.error(f"خطا در دیتابیس: {e}")
        conn.rollback()
        return 0
    except Exception as e:
        logging.error(f"خطای غیرمنتظره: {e}")
        return 0
    finally:
        cursor.close()
        conn.close()

# -------------------------------
# 5. حلقه اصلی: هر 60 ثانیه
# -------------------------------
logging.info("سرویس تشخیص ناهنجاری شروع شد. هر 60 ثانیه اجرا می‌شود...")

while True:
    start_time = time.time()
    try:
        count = process_new_records()
        if count > 0:
            logging.info(f"پردازش موفق: {count} رکورد جدید")
        else:
            logging.info("بدون رکورد جدید.")
    except KeyboardInterrupt:
        logging.info("سرویس با Ctrl+C متوقف شد.")
        break
    except Exception as e:
        logging.critical(f"خطای بحرانی: {e}")

    # صبر تا 60 ثانیه (با احتساب زمان اجرا)
    elapsed = time.time() - start_time
    sleep_time = max(0, 60 - elapsed)
    time.sleep(sleep_time)

2025-11-14 15:55:35,968 | INFO | مدل و اجزا با موفقیت بارگذاری شدند.
2025-11-14 15:55:35,968 | INFO | سرویس تشخیص ناهنجاری شروع شد. هر 60 ثانیه اجرا می‌شود...
2025-11-14 15:55:35,982 | INFO | رکورد جدیدی برای پردازش یافت نشد.
2025-11-14 15:55:35,983 | INFO | بدون رکورد جدید.
