充分理解赛题是进行特征工程和模型构建的基础。
- 了解赛题概况和数据，分析赛题以及大致的处理方式
- 了解模型评测指标
# 赛题背景
新闻 APP 中新闻推荐，要求根据用户历史浏览数据，预测用户下一次点击的新闻。
# 数据概况
来自某新闻 APP 的用户交互数据，包括 30 万用户，近 300 万次点击，涉及 36 万篇不同的文章，每个文章有对应的 embedding 向量表示。
数据集划分：训练集 20 万，测试集A 5万，测试集B 5万
数据表：
- *train_click_log.csv*：训练集用户点击日志
| 字段名              | 含义         |
|---------------------|--------------|
| user_id             | 用户id       |
| click_article_id    | 点击文章id   |
| click_timestamp     | 点击时间戳   |
| click_environment   | 点击环境     |
| click_deviceGroup   | 点击设备组   |
| click_os            | 点击操作系统 |
| click_country       | 点击城市     |
| click_region        | 点击地区     |
| click_referrer_type | 点击来源类型 |

- *testA_click_log.csv*：测试集用户点击日志

- *articles.csv*: 新闻文章信息数据表
| 字段名        | 含义                         |
|---------------|------------------------------|
| article_id     | 文章id，与 click_article_id 相对应 |
| category_id    | 文章类型id                   |
| created_at_ts  | 文章创建时间戳               |
| words_count    | 文章字数                     |
- *articles_emb.csv*：新闻文章embedding向量表示

- *sample_submit.csv*：提交样例文件
| user_id | article_1 | article_2 | article_3 | article_4 | article_5 |
|--------:|----------:|----------:|----------:|----------:|----------:|
|  200000 |         1 |         2 |         3 |         4 |         5 |

# 评价指标
根据 *sample_submit.scv*，最后提交的格式，是针对每个用户，给出5篇推荐结果，按照点击的概率从前往后排序。实际每个用户最后一次点击只会是某一篇文章，因为评价指标的公式是
$$\text{score(user)} = \sum^5_{k=1}\frac{s(\text{user}, k)}{k}$$
如果五篇推荐结果中，包含用户真实点击的文章，针对该用户得分 $1/k$，$k$ 是这篇文章在推荐结果中的位置。因此排序越靠前，得分越高；如果不在推荐列表中，得分为0.


# 问题分析
目标：根据用户历史点击，预测用户最后一次点击的新闻文章
任务类型：看似是推荐，实质是排序 - 需要对候选文章打分，点击概率从高到低。问题在于， 36 万的候选集太大
- 召回：将问题看作二元分类{点击，不点击}，筛出候选集 -> 多大？
- 精排：把问题当作 CRT 预测，对候选集打分并排序

#Baseline

In [4]:
# import packages
import time, math, os
from tqdm import tqdm
import gc
import pickle
import random
from datetime import datetime
from operator import itemgetter
import numpy as np
import pandas as pd
import warnings
from collections import defaultdict
import collections
warnings.filterwarnings('ignore')

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
data_path = '/content/drive/MyDrive/news-recommendation/data/' # 天池平台路径
save_path = '/content/drive/MyDrive/news-recommendation/temp_results/'  # 天池平台路径

# df节省内存函数
输入：任意一个 pandas.DataFrame

输出：同一个 DataFrame，数值列被转成合适的最小数据类型

In [5]:
# 节约内存的一个标配函数
def reduce_mem(df):
    starttime = time.time()
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']

    # 计算压缩前内存
    start_mem = df.memory_usage().sum() / 1024**2

    # 遍历每一列
    for col in df.columns:
        col_type = df[col].dtypes
        # 如果是数值类型（int, float), 找到最大最小值，降精度后，在可表示范围内，但是一定会有精度损失
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if pd.isnull(c_min) or pd.isnull(c_max):
                continue
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else: # float
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
    # 计算压缩后内存
    end_mem = df.memory_usage().sum() / 1024**2

    # 输出数据压缩比例和耗时
    print('-- Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction),time spend:{:2.2f} min'.format(end_mem,                                                                                                       100*(start_mem-end_mem)/start_mem,
                                                                                                           (time.time()-starttime)/60))
    return df

# 读取采样或全量数据
1. 小样本用户调试
- 显著降低内存与运行时间，适合“debug 模式”
2. 线下/线上两套数据视图
- 线下验证只用训练期数据，防止数据泄漏，评估更可信
- 线上提交前，把测试点击也并进来，让召回/共现统计覆盖测试期，提高召回质量与覆盖率

In [6]:
# debug模式：从训练集中划出一部分数据来调试代码
def get_all_click_sample(data_path, sample_nums=10000):
    """
        训练集中采样一部分数据调试
        data_path: 原数据的存储路径
        sample_nums: 采样数目（这里由于机器的内存限制，可以采样用户做（相对少））
    """
    np.random.seed(42) # 保证复现实验

    all_click = pd.read_csv(data_path + "train_click_log.csv")
    # user_id 去重，拿到所有用户id
    all_user_ids = all_click.user_id.unique()

    # random.choice: 随机抽取 id，数量 sample_nums, replace=False 表示不放回抽样 -> 不重复 id
    sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False)
    # 获取这些抽样的用户的点击记录
    all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
    # 去除重复记录，主要是看 click_timestamp (日志偶尔会重复写入)
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))

    return all_click

# 线下 / 线上两种模式的“全量点击日志”，所有用户，非抽样
# 线上提交结果：使用所有数据进行训练 train + testA
# 线下验证模型：只使用训练集 train
def get_all_click_df(data_path, offline=True):
    if offline:
        all_click = pd.read_csv(data_path + 'train_click_log.csv')
    else:
        trn_click = pd.read_csv(data_path + 'train_click_log.csv')
        tst_click = pd.read_csv(data_path + 'testA_click_log.csv')

        # append 在 pandas 2.x 已移除，用 concat
        all_click = pd.concat([trn_click, tst_click], ignore_index=True)
    # 去重
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))

    return all_click

In [9]:
# 全量训练集
all_click_df = get_all_click_df(data_path, offline=False)

# 获取`用户-文章-点击时间`字典
为每个用户构造一个按时间升序的**点击序列**，元素是（文章 id，时间戳），便于做序列建模。

- 以 user_id 分组，只保留文章ID和时间戳两列。sort=False 表示不改变分组键的顺序，因此每个用户组内的行顺序沿用你在上游对 df 的排序（通常你已经按 click_timestamp 升序排过）
- .apply(lambda g: ...)：对每个用户组 g 执行函数
 - g['click_article_id'].to_numpy() 和 g['click_timestamp'].to_numpy() 转成 NumPy 数组（比逐行遍历更高效）
 - zip(...) 把两列一一配对成 (article_id, timestamp) 元组
 - list(...) 实体化为列表
 - seq.to_dict()： 把这个 Series 转成 Python 字典

In [10]:
# 根据点击时间获取用户的点击文章序列   {user1: [(item1, time1), (item2, time2)..]...}
# 传入数据集
def get_user_item_time(click_df):
    # 按用户+时间戳排序；mergesort 稳定，时间相同不打乱原序
    click_df = click_df.sort_values(['user_id', 'click_timestamp'], kind='mergesort')

    # 分组后把每个用户的 (item, ts) 序列打包成 list[tuple]
    seq = (click_df.groupby('user_id', sort=False)[['click_article_id', 'click_timestamp']]
           .apply(lambda g: list(zip(g['click_article_id'].to_numpy(),
                                     g['click_timestamp'].to_numpy()))))
    return seq.to_dict() # 转换为 字典

# top-k 点击的文章

In [11]:
def get_item_topk_click(click_df, k):
  # count 文章 id
  topk_click = click_df['click_article_id'].value_counts().index[:k]

  return topk_click

# ItemCF 计算物品相似度
文章与文章之间的相似性矩阵计算
- input:
  - df 数据表
  - item_created_time_dict 文章创建时间的字典
- return: 文章和文章的相似性矩阵

In [12]:
def itemcf_sim(df):
  # 先获得 用户-文章-时间戳 字典
  user_item_time_dict = get_user_item_time(df)

  # 计算物品相似度
  i2i_sim = {}
  item_cnt = defaultdict(int) # 统计物品点击数量

  for user, item_time_list in tqdm(user_item_time_dict.items()):
    # 对用户的每一次点击
    for i, i_click_time in item_time_list:
      item_cnt[i] += 1 # 统计文章点击次数
      i2i_sim.setdefault(i, {}) # 初始化第 i 个物品
      for j, j_click_time in item_time_list:
          if(i == j):
            continue
          # （i， j）初始化
          i2i_sim[i].setdefault(j, 0)
          # 相似度计算：对用户点击序列里 i,j 共现逐一累加权重，权重是用户点击序列长度
          i2i_sim[i][j] += 1 / math.log(len(item_time_list) + 1)

  i2i_sim_ = i2i_sim.copy()
  for i, related_items in i2i_sim.items():
      for j, wij in related_items.items():
          # 用文章被点击数量做归一化
          i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])

  # 将得到的相似性矩阵保存到本地
  pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))

  return i2i_sim_

In [13]:
i2i_sim = itemcf_sim(all_click_df)

100%|██████████| 250000/250000 [00:36<00:00, 6903.77it/s]


# 基于 ItemCF 做文章推荐
- 从用户历史点击文章出发
- 通过 文章-文章 相似度矩阵，找到最相似的候选文章
- 累加相似分数，得到候选文章的打分
- 如果召回数不足，热门文章做补充
- 返回排序后的候选文章列表

In [18]:
# 基于 item 做召回 i2i
def item_based_recommend(user_id, user_hist_items, i2i_sim, sim_item_topk, recall_item_number, item_topk_click):
    # 用户历史交互过的文章
    # set，去重后物品
    user_hist_items_ = {item_id for item_id, _ in user_hist_items}

    item_rank = {} # 候选物品
    for loc, (i, click_time) in enumerate(user_hist_items):
        for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
            # 去除用户已经浏览过的
            for j in user_hist_items_:
              continue
            item_rank.setdefault(j, 0)
            item_rank[j] += wij

    if len(item_rank) < recall_item_number:
        for i, item in enumerate(item_topk_click):
            if item in item_rank.items():
                continue
            item_rank[item] = - i - 100 # 给负值
            if len(item_rank) == recall_item_number:
                break

    item_rank = sorted(
        item_rank.items(),              # 把字典变成 [(item, score), ...] 的列表
        key=lambda x: x[1],             # 按元组的第二个元素（score）排序
        reverse=True                    # 倒序 → 分数高的在前
    )[:recall_item_number]              # 只保留前K个

    return item_rank

In [19]:
# 给每个用户根据 ItemCF 推荐文章
user_recall_items_dict = collections.defaultdict(dict)

user_item_time_dict = get_user_item_time(all_click_df)

i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))

sim_item_topk = 10 # 每篇文章选取相似文章数量

recall_item_num = 10 # 召回文章数量

item_topk_click = get_item_topk_click(all_click_df, k=50) # 热门文章数量

for user in tqdm(all_click_df['user_id'].unique()): # 遍历所有用户
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict[user],
                                                        i2i_sim, sim_item_topk, recall_item_num,
                                                        item_topk_click)


100%|██████████| 250000/250000 [1:05:23<00:00, 63.72it/s]


In [20]:
# 召回的字典转换为 df
user_item_score_list = []

for user, items in tqdm(user_recall_items_dict.items()):
    for item, score in items:
        user_item_score_list.append([user, item, score])

recall_df = pd.DataFrame(user_item_score_list, columns=['user_id', 'click_article_id', 'pred_score'])

100%|██████████| 250000/250000 [00:07<00:00, 34621.39it/s]


# 生成提交文件


In [23]:
def submit(recall_df, topk=5, model_name=None):
    recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
    recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')

    # 删除分数列
    del recall_df['pred_score']
    submit = recall_df[recall_df['rank'] <= topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()

    # 设置列名
    submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]
    submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2',
                                                  3: 'article_3', 4: 'article_4', 5: 'article_5'})

    save_name = save_path + model_name + '_' + datetime.today().strftime('%m-%d') + '.csv'
    submit.to_csv(save_name, index=False, header=True)

In [24]:
# 获取测试集
tst_click = pd.read_csv(data_path + 'testA_click_log.csv')
tst_users = tst_click['user_id'].unique()

# 从所有的召回数据中将测试集中的用户选出来
# 用的是全量训练集，所以包含 testA 中的用户数据
tst_recall = recall_df[recall_df['user_id'].isin(tst_users)]

# 生成提交文件
submit(tst_recall, topk=5, model_name='itemcf_baseline')
