### 黄牛检测

In [1]:
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
from collections import defaultdict
import numpy as np
import csv
import random

In [2]:
# 读取数据
file_path = './data/raw_data.xlsx'
sheet_name = 'Sheet1'
data = pd.read_excel(file_path, sheet_name=sheet_name)

In [3]:
# 时间预处理
data['订单创建时间'] = pd.to_datetime(data['订单创建时间'])
data['就诊日期'] = pd.to_datetime(data['就诊日期'])

In [4]:
# 写入到指定log文件中
def write_log(log_file_path):
    # 将数据逐行写入.log 文件
    with open(log_file_path, 'w', encoding='utf-8') as log_file:
        for index, row in data.iterrows():
            # 将每一行转换为字符串并写入日志文件
            row = row.to_dict()
            log_file.write(row['患者ID'] + " : ")
            for k, v in row.items():
                if k != "患者ID":
                    log_file.write(str(v) + " ")
            log_file.write("\n")
    print(f"Data has been written to {log_file_path}")

#### 1. 基于规则的检测

1. IP重复，来自同一个IP，且为超过3个人挂号
2. 用户重复，来自同一个用户，且挂号了超过3个科室/超过两个app_id
3. 时间过早，每天5:00-5:01进行操作的

In [5]:
# 重复值筛选，找出data的seg_name字段中重复数大于limit的行, sort_add和asc_add是检测完之后添加的排序要求
def duplicate_detect(data, seg_name, limit, up=True, sort_add=[], asc_add=[]):
    assert len(sort_add) == len(asc_add)
    value_counts = data[seg_name].value_counts()
    if up:
        # 向上筛
        dup_row = data[data[seg_name].isin(value_counts[value_counts > limit].index)]
    else:
        # 向下筛
        dup_row = data[data[seg_name].isin(value_counts[value_counts < limit].index)]
    sort_by = [seg_name] + sort_add
    asc = [True] + asc_add
    dup_row = dup_row.sort_values(by=sort_by, ascending=asc)
    return dup_row

In [6]:
# 筛选出同一个dup_seg中有超过limit个unique_seg的索引(从0开始)
def unique_dup_filter(data, dup_seg, unique_seg, limit, up=True, get_row=False):
    # 确定是黄牛的行编号
    selects = []
    # IP重复检测
    dup = duplicate_detect(data, dup_seg, limit, up=up, sort_add=[unique_seg], asc_add=[True])
    print("init dup_num:", len(dup))
    if len(dup) == 0:
        return selects
    pos = 0          # 这个IP的起点
    count = 1      # 涉及多少个用户ID
    last = dup[unique_seg].iloc[0] # 上一个用户的ID
    dup_num = len(dup)
    for i in tqdm(range(1, dup_num)):
        if dup[dup_seg].iloc[i] == dup[dup_seg].iloc[pos]:
            if dup[unique_seg].iloc[i] != last:
                # 相同IP下一个新的患者
                count += 1
                last = dup[unique_seg].iloc[i]
        if dup[dup_seg].iloc[i] != dup[dup_seg].iloc[pos] or i == dup_num - 1:
            # 开始检测下一个IP
            if count > limit:
                # 达到重复人数条件
                for j in range(pos, i):
                    selects.append(dup['ID'].iloc[j])
            # 重置
            pos = i
            count = 1
            last = dup[unique_seg].iloc[i]
    print("filtered dup_num:", len(selects))
    if not get_row:
        selects.sort()
        return selects
    else:
        dup_rows = data[data['ID'].isin(selects)]
        dup_rows = dup_rows.sort_values(by=dup_seg, ascending=True)
        return dup_rows

In [7]:
# 对unique_dup_filter, 在一定范围内遍历limit
# 使用: limits, select_list = grid_traverse(data, '患者ID', '就诊科室名称', 5, 10)
def grid_traverse(data, dup_seg, unique_seg, start, end, gap=1):
    limits = []
    selects_list = []
    for limit in range(start, end, gap):
        print(f"limit: {limit}")
        selects = unique_dup_filter(data, dup_seg, unique_seg, limit)
        limits.append(limit)
        selects_list.append(selects)
    return limits, selects_list

In [8]:
# 将一个list写作答案
def write_list(lis):
    # 打开一个文件进行写入，如果文件不存在则创建
    with open('./data/result.txt', 'w', encoding='utf-8') as file:
        # 遍历列表中的每个元素
        for item in lis:
            # 将每个元素写入文件，每个元素后面加上换行符
            file.write(str(item) + '\n')
        print(f"Total line: {len(lis)}")

In [9]:
# 时间段过滤器，过滤出每天一段时间内的数据
# 形如daily_filter(data, '5:00:00', '5:01:00')
def daily_filter(data, start, end):
    return data[(data['订单创建时间'].dt.time >= pd.to_datetime(start).time()) &
                    (data['订单创建时间'].dt.time <= pd.to_datetime(end).time())]

In [10]:
# 时间分段统计
def hour_count():
    print("\t\t总数\t已挂号  医保换号  已退号  窗口退号  无号退款  超时取消")
    for i in range(5, 23):
        time_filter = data[(data['订单创建时间'].dt.time >= pd.to_datetime(f'{i}:00:00').time()) &
                     (data['订单创建时间'].dt.time <= pd.to_datetime(f'{i+1}:00:00').time())]
        counts = time_filter['状态'].value_counts()
        print(f"{i}:00 - {i+1}:00 \t{len(time_filter)}\t{counts['已挂号']}\t{counts['医保换号']}\t {counts['已退号']}\t"
              f"   {counts['窗口退号']}\t   {counts['无号退款']}\t    {counts['超时取消']}\t")

def minute_line(data, hour):
    count = []
    for i in tqdm(range(0, 59, 5)):
        start = str(i)
        end = str(i+5)
        if len(start) < 2:
            start = '0' + start
        if len(end) < 2:
            end = '0' + end 
        time_filter = data[(data['订单创建时间'].dt.time >= pd.to_datetime(f'{hour}:{start}:00').time()) &
                     (data['订单创建时间'].dt.time <= pd.to_datetime(f'{hour}:{end}:00').time())]
        count.append(len(time_filter))
    plt.plot(count)

In [11]:
# 恰好在16:00进行第二天/下一周操作
def hurry_sixteen(data, get_row=False):
    gap = daily_filter(data, '16:00:00', '16:00:01')
    time_diff = (gap['就诊日期'] - gap['订单创建时间']).dt.days
    hurry_row = gap[(time_diff == 0) | (time_diff == 6)]
    if get_row:
        return hurry_row
    else:
        return hurry_row['ID'].tolist()

In [12]:
# 将之前一次答案中的数据提取成列表
def get_list(res_id):
    file_path = f'./data/result{res_id}.txt'
    with open(file_path, 'r') as file:
        lis = [int(line.strip()) for line in file]
    return lis

In [13]:
# 两个list进行对比
def lis_cmp(lis1, lis2, ret=False, show=False):
    set1 = set(lis1)
    set2 = set(lis2)
    new_ele = set2 - set1
    miss_ele = set1 - set2
    same_ele = set1.intersection(set2)
    if show:
        print(f"lis1: {len(lis1)},\tlis2: {len(lis2)},\tmore: {len(new_ele)},"
            f"\tmiss: {len(miss_ele)},\tsame = {len(same_ele)}")
    if ret:
        return list(new_ele), list(miss_ele), list(same_ele)

# 与之前一次答案进行对比
def res_cmp(res_id, lis, ret=False, show=False):
    lis1 = get_list(res_id)
    res = lis_cmp(lis1, lis, ret, show)
    if ret:
        return res

In [14]:
# 频繁退号
def frequent_drop(data, limit, get_row=False):
    drop = data[data['状态'] == '已退号']
    mass_drop = duplicate_detect(drop, "患者ID", limit)
    if get_row:
        return mass_drop['ID'].tolist()
    else:
        return mass_drop

In [15]:
# 不是黄牛的用户
def normal_user(data):
    # 黑名单
    ip_black = duplicate_detect(data, 'IP_ADDRESS', 2)['IP_ADDRESS'].to_list()
    user_black = duplicate_detect(data, '患者ID', 4)['患者ID'].to_list()
    # 删除黑名单中对应的所有ip和患者ID
    white_rows = data[~data['IP_ADDRESS'].isin(ip_black)]
    white_rows = white_rows[~data['患者ID'].isin(user_black)]
    return white_rows
    # 基于用户分组的进一步筛选
    # depart_limit = white_rows.groupby('患者ID').filter(lambda group: group['就诊科室名称'].nunique() <= 2)
    # app_limit = depart_limit.groupby('患者ID').filter(lambda group: group['APPID'].nunique() <= 2)
    # area_limit = app_limit[(app_limit['省份'] == '北京') | (app_limit['省份'] == '河北')]
    # time_limit = area_limit[((area_limit['订单创建时间'].dt.time >= pd.to_datetime(f'6:00:00').time()) &
    #                  (area_limit['订单创建时间'].dt.time <= pd.to_datetime(f'16:00:00').time())) | 
    #                  (area_limit['订单创建时间'].dt.time >= pd.to_datetime(f'17:00:00').time())]
    # return time_limit

In [16]:
# 根据权重在多个不同的list中进行合并筛选
def weighted_selection(lists, weights, n, limit=0):
    # 创建一个字典来存储元素的总权重
    total_weights = defaultdict(float)

    # 遍历每个列表及其对应的权重
    for lst, weight in zip(lists, weights):
        for element in lst:
            total_weights[element] += weight  # 累加权重

    # 将字典转换为列表，并筛选出权重大于 limit 的元素
    filtered_elements = {k: v for k, v in total_weights.items() if v >= limit}

    # 按照权重排序
    sorted_elements = sorted(filtered_elements.items(), key=lambda x: x[1], reverse=True)

    # 选择前 n 个元素及其权重
    top_n_elements = sorted_elements[:n]

    return top_n_elements

#### 2. 基于聚类的分析

In [17]:
def ip_encoder(ip):
    parts = list(map(int, ip.split('.')))
    return (parts[0] << 24) + (parts[1] << 16) + (parts[2] << 8) + parts[3]

#### 3. 基于学习

In [None]:
state2num = {
    '已退号': 0,
    '已挂号': 1,
    '窗口退号': 2,
    '医保换号': 3,
    '无号退款': 4,
    '超时取消': 5,
    '主动取消': 6,
    np.nan: 7
}

def get_embed_list(state_list_pd, seg_name, series_name):
    state_lists = state_list_pd.groupby(seg_name)[series_name].apply(list).reset_index()
    state_lists = state_lists[series_name].to_list()
    for i in range(len(state_lists)):
        state_lists[i] = [str(state2num[str_]) for str_ in state_lists[i]]
    return state_lists

def generate_train_test(data, seg_name, series_name, normal_func, limit=2000, path='../LogBERT/output/rd/'):
    # train
    normal_rows = normal_func(data)
    normal_id = normal_rows[seg_name].to_list()
    sample_id = random.sample(normal_id, limit)
    normals = data[data[seg_name].isin(sample_id)]
    state_lists = get_embed_list(normals, seg_name, series_name)
    with open(path + "train", 'w') as train_file:
        for states in state_lists:
            train_file.write(" ".join(states) + "\n")

    # test_normal
    unsample_id = list(set(normal_id) - set(sample_id))
    test_normals = data[data[seg_name].isin(unsample_id)]
    state_lists = get_embed_list(test_normals, seg_name, series_name)
    with open(path + "test_normal", 'w') as train_file:
        for states in state_lists:
            train_file.write(" ".join(states) + "\n")

    # test_abnormal
    res_id = get_list(5)
    test_abnormals = data[data['ID'].isin(res_id)]
    state_lists = get_embed_list(test_abnormals, seg_name, series_name)
    with open(path + "test_abnormal", 'w') as train_file:
        for states in state_lists:
            train_file.write(" ".join(states) + "\n")

##### LogBERT 魔改记录

1. logbert.py里的options["min_len"]改成了1, epoch也改成了2
2. sample.py中line = line.squeeze()改为line.ravel()
3. sample.py中logkey_seq_pairs = np.array(logkey_seq_pairs,)加上了dtype=object，predict_log.py中如果报错也添加这个就行
4. train_log.py中epoch > 0及生成center

#### 工作区

In [23]:
generate_train_test(data, "患者ID", "状态", normal_user)

  white_rows = white_rows[~data['患者ID'].isin(user_black)]


3210 3210


#### 流放地

In [20]:
# from sklearn.preprocessing import OneHotEncoder
# from sklearn.cluster import KMeans
# from sklearn.pipeline import Pipeline
# from sklearn.compose import ColumnTransformer
# from sklearn.impute import SimpleImputer
# from sklearn.preprocessing import MinMaxScaler
# # 特征选择
# used_data = pd.DataFrame()
# used_data['ID'] = data['ID']
# used_data['start_t'] = data['订单创建时间'].dt.hour * 3600 + data['订单创建时间'].dt.minute * 60 + data['订单创建时间'].dt.second
# used_data['delta_t'] = (data['就诊日期'] - data['订单创建时间']).astype('int64') / 10**9
# used_data['pid'] = data['患者ID']

# # 统计每个患者ID对应的不同APPID数量
# app_num = data.groupby('患者ID')['APPID'].nunique().reset_index()
# app_num.columns = ['pid', 'app_num']  # 重命名列
# used_data = used_data.merge(app_num, on='pid', how='left')

# # 每个患者的科室挂号数
# dorm_num = data.groupby('患者ID')['就诊科室名称'].nunique().reset_index()
# dorm_num.columns = ['pid', 'dorm_num']  # 重命名列
# used_data = used_data.merge(dorm_num, on='pid', how='left')

# used_data['province'] = data['省份']
# used_data['ip'] = data['IP_ADDRESS'].apply(ip_encoder)
# used_data['status'] = data['状态']

# features = ['start_t', 'delta_t', 'pid', 'province', 'ip', 'status', 'app_num']

# # 归一化
# scaler = MinMaxScaler()  # 或使用 StandardScaler()
# used_data[['start_t', 'delta_t', 'ip']] = scaler.fit_transform(used_data[['start_t', 'delta_t', 'ip']])

# # 使用 ColumnTransformer 进行特征处理
# preprocessor = ColumnTransformer(
#     transformers=[
#         ('num', SimpleImputer(strategy='constant'), ['delta_t', 'start_t', 'ip', 'app_num']),
#         ('cat', OneHotEncoder(), ['pid', 'province', 'status'])  # 分类特征处理
#     ],
#     remainder='drop'
# )
# # 创建聚类管道
# pipeline = Pipeline(steps=[
#     ('preprocessor', preprocessor),
#     ('kmeans', KMeans(n_clusters=2, random_state=0))
# ])

# # 拟合模型
# pipeline.fit(used_data[features])

# # 获取聚类标签
# used_data[f'cluster'] = pipeline.predict(used_data[features])