In [4]:
import pandas as pd
import numpy as np
import random
import logging
import os

# 设置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

def generate_test_data(file_path="user_login_test_data.csv", num_records=100):
    """
    生成测试数据集，包括正常和异常数据，列与原始数据一致
    """
    user_ids = [f"test_user{i}" for i in range(1, 21)]
    ip_addresses = ["192.168.1." + str(i) for i in range(50, 60)] + \
                   ["10.0.0." + str(i) for i in range(50, 60)] + \
                   ["172.16.0." + str(i) for i in range(50, 60)] + \
                   ["203.0.113." + str(i) for i in range(50, 60)]
    login_resources = ["server1", "server2", "server3"]
    login_results = ["success", "failure"]

    data = []

    # 生成正常数据 (98条)
    for _ in range(num_records - 2):
        user = random.choice(user_ids)
        login_times = pd.date_range(
            start="2023-02-01 08:00:00",
            periods=random.randint(5, 10),
            freq="H"
        )
        for login_time in login_times:
            record = {
                "用户ID": user,
                "登录时间": login_time,
                "登录地址": random.choice(ip_addresses),
                "登录资源": random.choice(login_resources),
                "登录结果": np.random.choice(["success", "failure"], p=[0.9, 0.1])  # 修正
            }
            data.append(record)

    # 生成异常数据 (2条)

    # 异常1：频繁登录失败
    user = random.choice(user_ids)
    failure_times = pd.date_range("2023-02-02 09:00:00", periods=5, freq="T")
    for login_time in failure_times:
        record = {
            "用户ID": user,
            "登录时间": login_time,
            "登录地址": random.choice(ip_addresses),
            "登录资源": random.choice(login_resources),
            "登录结果": "failure"
        }
        data.append(record)

    # 异常2：同账号多地址登录
    user = random.choice(user_ids)
    login_time = pd.Timestamp("2023-02-03 10:00:00")
    ip_list = random.sample(ip_addresses, 5)
    for ip in ip_list:
        record = {
            "用户ID": user,
            "登录时间": login_time,
            "登录地址": ip,
            "登录资源": random.choice(login_resources),
            "登录结果": "success"
        }
        data.append(record)

    # 转换为 DataFrame
    df = pd.DataFrame(data)

    # 打乱数据顺序
    df = df.sample(frac=1).reset_index(drop=True)

    # 保存测试数据
    df.to_csv(file_path, index=False, encoding="utf-8")
    logging.info(f"测试数据已生成并保存到 {file_path}")

# 运行函数生成测试数据
generate_test_data()



  login_times = pd.date_range(
  failure_times = pd.date_range("2023-02-02 09:00:00", periods=5, freq="T")
2024-11-23 14:24:46,606 - INFO - 测试数据已生成并保存到 user_login_test_data.csv


In [6]:
import joblib
import pandas as pd

# 1. 加载优化后的模型
optimized_model_file = "optimized_anomaly_detection_model.pkl"
optimized_model = joblib.load(optimized_model_file)
print("模型已加载！")

# 2. 加载测试数据
test_file_path = "user_login_test_data.csv"
test_data = pd.read_csv(test_file_path, encoding="utf-8")
print("测试数据已加载！")

# 3. 数据预处理（根据训练时的流程处理测试数据）
def preprocess_test_data(df):
    """
    对测试数据进行清洗和特征提取，确保与训练时的特征一致。
    """
    df["登录时间"] = pd.to_datetime(df["登录时间"], errors="coerce")
    df["登录成功"] = df["登录结果"].apply(lambda x: 1 if x == "success" else 0)
    df["登录小时"] = df["登录时间"].dt.hour

    # 按用户统计特征
    user_stats = df.groupby("用户ID").agg(
        登录失败次数=("登录成功", lambda x: (x == 0).sum()),
        登录成功次数=("登录成功", lambda x: (x == 1).sum()),
        总登录次数=("登录成功", "count"),
        活跃小时数=("登录小时", "nunique")
    ).reset_index()
    user_stats["登录成功率"] = user_stats["登录成功次数"] / user_stats["总登录次数"]

    # 平均登录间隔
    df.sort_values(by=["用户ID", "登录时间"], inplace=True)
    df["时间差"] = df.groupby("用户ID")["登录时间"].diff().dt.total_seconds()
    avg_time_diff = df.groupby("用户ID")["时间差"].mean().reset_index(name="平均登录间隔")
    user_stats = user_stats.merge(avg_time_diff, on="用户ID", how="left")
    user_stats["平均登录间隔"].fillna(0, inplace=True)

    # 返回测试数据的统计特征
    return user_stats

# 对测试数据进行预处理
test_user_stats = preprocess_test_data(test_data)

# 4. 使用模型进行预测
features = ["登录失败次数", "登录成功率", "平均登录间隔", "活跃小时数"]
X_test = test_user_stats[features]
test_user_stats["模型预测"] = optimized_model.predict(X_test)

# 5. 添加预测结果到测试数据中
test_user_stats["预测结果"] = test_user_stats["模型预测"].apply(lambda x: "正常" if x == 1 else "异常")

# 6. 保存预测结果
output_file = "test_predictions_results.csv"
test_user_stats.to_csv(output_file, index=False, encoding="utf-8")
print(f"预测结果已保存到：{output_file}")

# 7. 输出部分预测结果
print("\n部分预测结果：")
print(test_user_stats[["用户ID", "登录失败次数", "登录成功率", "平均登录间隔", "活跃小时数", "预测结果"]].head(100))


模型已加载！
测试数据已加载！
预测结果已保存到：test_predictions_results.csv

部分预测结果：
           用户ID  登录失败次数     登录成功率       平均登录间隔  活跃小时数 预测结果
0    test_user1       1  0.977778   736.363636     10   异常
1   test_user10       7  0.875000   589.090909     10   异常
2   test_user11       3  0.940000   661.224490     10   异常
3   test_user12       4  0.826087   981.818182      7   异常
4   test_user13       3  0.918919   900.000000     10   异常
5   test_user14       4  0.809524  1260.000000      8   异常
6   test_user15       2  0.866667  1800.000000      8   异常
7   test_user16       0  1.000000  3600.000000      6   异常
8   test_user17       2  0.962963   611.320755     10   异常
9   test_user18       8  0.900000   410.126582     10   异常
10  test_user19       0  1.000000  1350.000000     10   异常
11   test_user2       9  0.625000  1252.173913      9   异常
12  test_user20      12  0.777778  1702.641509     10   异常
13   test_user3       4  0.897436   757.894737      9   异常
14   test_user4       2  0.935484   960.000000      

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  user_stats["平均登录间隔"].fillna(0, inplace=True)
