# 互评作业2: 频繁模式挖掘

## 读取数据

In [None]:
import pandas as pd
import json
import ast
import os
from tqdm import tqdm

input_dir = "./data/30G_data"
output_dir = "./data/30G_data"
file_count = 16

for i in range(file_count):
    input_filename = f"part-{i:05d}.parquet"
    output_filename = f"part-{i:02d}.parquet"

    input_path = os.path.join(input_dir, input_filename)
    output_path = os.path.join(output_dir, output_filename)

    print(f"\n正在处理文件: {input_filename}")

    df = pd.read_parquet(input_path, columns=['id', 'user_name', 'fullname', 'purchase_history'])
    processed_rows = []
    for _, row in tqdm(df.iterrows(), total=len(df), desc=f"处理 {input_filename}"):
        ph = row['purchase_history']

        if isinstance(ph, str):
            try:
                ph = json.loads(ph)
            except json.JSONDecodeError:
                ph = ast.literal_eval(ph)

        items = ph.get('items', [])
        items_id = [item.get('id') for item in items if isinstance(item, dict)]
        new_row = {
            'id': row['id'],
            'user_name': row['user_name'],
            'fullname': row['fullname'],
            'items': items_id,
            'payment_method': ph.get('payment_method'),
            'payment_status': ph.get('payment_status'),
            'purchase_date': ph.get('purchase_date')
        }

        processed_rows.append(new_row)

    new_df = pd.DataFrame(processed_rows)
    new_df.to_parquet(output_path, index=False)
    print(f"写入完成：{output_filename}")


## 商品类别关联规则挖掘

In [None]:
import pandas as pd
import json
import os
from tqdm import tqdm
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import fpgrowth, association_rules

data_dir = './data/30G_data'
product_catalog_path = './data/product_catalog.json'
categories_path = './data/categories.json'
output_dir = './output'
os.makedirs(output_dir, exist_ok=True)

with open(product_catalog_path, 'r', encoding='utf-8') as f:
    product_catalog = json.load(f)
item2category2 = {item['id']: item['category'] for item in product_catalog}

with open(categories_path, 'r', encoding='utf-8') as f:
    category_hierarchy = json.load(f)

category2_to_category1 = {}
for cat1, cat2_list in category_hierarchy.items():
    for cat2 in cat2_list:
        category2_to_category1[cat2] = cat1

files = [f'part-{i:02d}.parquet' for i in range(16)]
batches = [files[i:i + 4] for i in range(0, len(files), 4)]

for batch_id, batch_files in enumerate(batches):
    print(f"\n正在处理第 {batch_id + 1} 批文件：{batch_files}")
    transactions = []

    for fname in tqdm(batch_files, desc="读取事务"):
        path = os.path.join(data_dir, fname)
        df = pd.read_parquet(path, columns=["items"])

        for items in df['items']:
            try:
                categories = []
                for item in items:
                    cat2 = item2category2.get(item)
                    cat1 = category2_to_category1.get(cat2)
                    if cat1:
                        categories.append(cat1)
                if categories:
                    transactions.append(list(set(categories)))  # 去重
            except Exception:
                continue

    te = TransactionEncoder()
    te_array = te.fit(transactions).transform(transactions)
    df_trans = pd.DataFrame(te_array, columns=te.columns_)

    frequent_itemsets = fpgrowth(df_trans, min_support=0.02, use_colnames=True)
    frequent_itemsets.to_csv(f"{output_dir}/frequent_itemsets_{batch_id}.csv", index=False)

    print(f"批次 {batch_id + 1} 完成，共 {len(frequent_itemsets)} 个频繁项集")



In [None]:
import os
import pandas as pd
import ast
from mlxtend.frequent_patterns import association_rules

def parse_frozenset_string(s):
    if s.startswith("frozenset(") and s.endswith(")"):
        s = s[len("frozenset("):-1]
    return frozenset(ast.literal_eval(s))

all_fi = []

for f in os.listdir(output_dir):
    if f.startswith("frequent_itemsets_") and f.endswith(".csv"):
        fi = pd.read_csv(os.path.join(output_dir, f))
        fi['itemsets'] = fi['itemsets'].apply(parse_frozenset_string)
        all_fi.append(fi)

all_fi_df = pd.concat(all_fi, ignore_index=True).drop_duplicates()
print(f"共 {len(all_fi_df)} 个去重后的频繁项集")

rules = association_rules(all_fi_df, metric="confidence", min_threshold=0.5)

rules.to_csv(os.path.join(output_dir, "final_rules_category.csv"), index=False)
print("关联规则已保存到 final_rules_category.csv")


## 支付方式与商品类别的关联分析

In [None]:
import pandas as pd
import json
import os
from mlxtend.frequent_patterns import apriori, association_rules, fpgrowth
from tqdm import tqdm

data_dir = "./data/30G_data"
catalog_path = "./data/product_catalog.json"

with open(catalog_path, 'r', encoding='utf-8') as f:
    product_catalog = json.load(f)

id2cat = {str(p["id"]): p["category"] for p in product_catalog}
id2price = {str(p["id"]): p["price"] for p in product_catalog}

transactions = []
high_value_pm = []

for i in range(16):
    file = os.path.join(data_dir, f"part-{i:02d}.parquet")
    df = pd.read_parquet(file, columns=["items", "payment_method"])

    for _, row in tqdm(df.iterrows(), desc=f"处理文件 {file}", total=len(df)):
        payment_method = row['payment_method']
        items = row['items']
        categories = set()
        is_high_value = False
        for item in items:
            item_id = str(item)
            category = id2cat.get(item_id)
            price = id2price.get(item_id)
            if category:
                categories.add(category)
            if price and price > 5000:
                is_high_value = True

        if categories:
            transactions.append(list(categories) + [f"支付:{payment_method}"])

        if is_high_value:
            high_value_pm.append(payment_method)

from mlxtend.preprocessing import TransactionEncoder

te = TransactionEncoder()
te_ary = te.fit(transactions).transform(transactions)
df_encoded = pd.DataFrame(te_ary, columns=te.columns_)

frequent_itemsets = fpgrowth(df_encoded, min_support=0.01, use_colnames=True)
rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.6)
rules_pm2cat = rules[
    rules['antecedents'].apply(lambda x: any(str(a).startswith("支付:") for a in x)) &
    rules['consequents'].apply(lambda x: all(not str(c).startswith("支付:") for c in x))
]

rules_pm2cat.to_csv("output/payment_category_rules.csv", index=False)

high_value_pm_df = pd.Series(high_value_pm, name="payment_method").value_counts(normalize=True)
high_value_pm_df.to_csv("output/high_value_payment_distribution.csv")


## 时间序列模式挖掘

In [None]:
import pandas as pd
import numpy as np
import os
import ast
import json
from tqdm import tqdm
from collections import Counter, defaultdict

data_dir = './data/30G_data'
output_dir = './output'
os.makedirs(output_dir, exist_ok=True)
files = [f'part-{i:02d}.parquet' for i in range(16)]
product_catalog_path = './data/product_catalog.json'

with open(product_catalog_path, 'r', encoding='utf-8') as f:
    catalog = json.load(f)
item2category = {int(item['id']): item['category'] for item in catalog}

monthly_stats = Counter()
quarterly_stats = Counter()
weekday_stats = Counter()
sequence_counter = Counter()

for file in files:
    file_path = os.path.join(data_dir, file)
    print(f"正在处理文件：{file_path}")
    df = pd.read_parquet(file_path, engine='pyarrow', columns=['id', 'items', 'purchase_date'])

    def parse_items(items_str):
        try:
            if isinstance(items_str, str):
                arr = ast.literal_eval(items_str.replace('array', ''))
                return [int(i) for i in arr]
            elif isinstance(items_str, list):
                return items_str
            return []
        except Exception:
            return []

    df['items'] = df['items'].apply(parse_items)
    df['purchase_date'] = pd.to_datetime(df['purchase_date'], errors='coerce')
    df.dropna(subset=['purchase_date'], inplace=True)
    df['month'] = df['purchase_date'].dt.month
    df['quarter'] = df['purchase_date'].dt.to_period('Q').astype(str)
    df['weekday'] = df['purchase_date'].dt.day_name()

    def map_categories(item_list):
        return list({item2category.get(i) for i in item_list if i in item2category})

    df['categories'] = df['items'].apply(map_categories)
    for _, row in df.iterrows():
        for cat in row['categories']:
            monthly_stats[(row['month'], cat)] += 1
            quarterly_stats[(row['quarter'], cat)] += 1
            weekday_stats[(row['weekday'], cat)] += 1

    df.sort_values(by=['id', 'purchase_date'], inplace=True)
    grouped = df.groupby('id')

    for _, group in grouped:
        group = group.sort_values('purchase_date')
        last_cats = None
        for _, row in group.iterrows():
            curr_cats = set(row['categories'])
            if last_cats:
                for a in last_cats:
                    for b in curr_cats:
                        if a != b:
                            sequence_counter[(a, b)] += 1
            last_cats = curr_cats

def save_counter_to_csv(counter, columns, filename):
    df_out = pd.DataFrame([(*k, v) for k, v in counter.items()], columns=columns)
    df_out.to_csv(os.path.join(output_dir, filename), index=False)
    print(f"写入 {filename} 成功，共 {len(df_out)} 条")

save_counter_to_csv(monthly_stats, ['month', 'category', 'count'], 'monthly_stats.csv')
save_counter_to_csv(quarterly_stats, ['quarter', 'category', 'count'], 'quarterly_stats.csv')
save_counter_to_csv(weekday_stats, ['weekday', 'category', 'count'], 'weekday_stats.csv')
save_counter_to_csv(sequence_counter, ['category_A', 'category_B', 'count'], 'sequence_patterns.csv')

## 退款模式分析

In [None]:
import os
import pandas as pd
import ast
import json
from mlxtend.frequent_patterns import fpgrowth, association_rules
from tqdm import tqdm

data_dir = './data/30G_data'
output_dir = './output'
os.makedirs(output_dir, exist_ok=True)

files = [f'part-{i:02d}.parquet' for i in range(16)]

with open('./data/product_catalog.json', 'r', encoding='utf-8') as f:
    catalog = json.load(f)
item2category = {int(item['id']): item['category'] for item in catalog}

def parse_items(items_val):
    try:
        if isinstance(items_val, str):
            arr = ast.literal_eval(items_val.replace('array', ''))
            return list(set(item2category.get(i) for i in arr if i in item2category))
        elif isinstance(items_val, (list, tuple)):
            return list(set(item2category.get(i) for i in items_val if i in item2category))
        else:
            return []
    except:
        return []

def one_hot_encode(categories_list, all_categories):
    row = {cat: (cat in categories_list) for cat in all_categories}
    return pd.Series(row)

all_categories = set(item2category.values())
all_rules = []

for file in files:
    file_path = os.path.join(data_dir, file)
    print(f"处理文件: {file_path}")

    df = pd.read_parquet(file_path, engine='pyarrow', columns=['payment_status', 'items'])
    df['categories'] = df['items'].apply(parse_items)

    df_refund = df[df['payment_status'].isin(['已退款', '部分退款'])].copy()
    if df_refund.empty:
        print(f"文件 {file} 中无退款订单，跳过。")
        continue

    ohe_df = pd.DataFrame([one_hot_encode(cats, all_categories) for cats in df_refund['categories']])
    frequent_itemsets = fpgrowth(ohe_df, min_support=0.005, use_colnames=True)

    if frequent_itemsets.empty:
        print(f"文件 {file} 无满足支持度的频繁项集。")
        continue

    rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.4)

    if rules.empty:
        print(f"文件 {file} 无满足置信度的关联规则。")
        continue

    out_file = os.path.join(output_dir, f"refund_rules_{file.replace('.parquet', '')}.csv")
    rules.to_csv(out_file, index=False)
    print(f"规则保存到 {out_file}，共 {len(rules)} 条规则。")

    all_rules.append(rules)

if all_rules:
    combined_rules = pd.concat(all_rules, ignore_index=True).drop_duplicates()
    combined_rules.to_csv(os.path.join(output_dir, 'refund_rules_all.csv'), index=False)
    print(f"合并所有规则完成，保存到 refund_rules_all.csv ，共 {len(combined_rules)} 条规则。")
else:
    print("未发现任何符合条件的退款关联规则。")
