# Feature Engineering


In [1]:
from google.colab import drive
drive.mount('/content/drive')
!ls /content/drive/MyDrive/RecSys2024/
base_path = '/content/drive/MyDrive/RecSys2024/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
feature_output	output			  Recsys2024_EDA.ipynb	      Recsys2024_LGBM_train.ipynb
input		preprocess2.ipynb	  Recsys2024_ensemble.ipynb   Recsys2024_preprocess.ipynb
models		preprocess_article.ipynb  Recsys2024_LGBM_test.ipynb


In [2]:
import pandas as pd
import polars as pl
import numpy as np
import pickle
import gc

In [3]:
#DEBUG_MODE = True
DEBUG_MODE = False
DATA_TYPE = 'train'
#DATA_TYPE = 'valid'
#DATA_TYPE = 'test'

# train, validにはあるがtestにはない特徴量
ignore_col_train = ['next_scroll_percentage', 'next_read_time', 'article_id']
out_path = base_path + 'feature_output'

# Preprocessing

In [4]:
if DEBUG_MODE:
  input_path = base_path + 'input/ebnerd_small/'
else:
  input_path = base_path + 'input/ebnerd_large/'

In [5]:
!ls {input_path}

articles.parquet  train  validation


In [6]:
if DATA_TYPE == 'test':
    input_path = base_path + 'input/ebnerd_testset/'

if DATA_TYPE == 'train':
    df_impression = pl.read_parquet(input_path + 'train/behaviors.parquet')
elif DATA_TYPE == 'valid':
    df_impression = pl.read_parquet(input_path + 'validation/behaviors.parquet')
else:
    df_impression = pl.read_parquet(input_path + 'test/behaviors.parquet')

# Feature Engineering

In [7]:
def show_df(df, n=5):
    print(df.shape)
    display(df.head(n))


def expand_behavior(df, data_type='train'):
    df = df.explode("article_ids_inview").rename({"article_ids_inview": "article_id_inview"})
    if data_type in ('train', 'valid'):
        df = df.with_columns(
            (pl.col("article_id_inview").is_in(pl.col("article_ids_clicked"))).alias("clicked")
        )
        df = df.drop("article_ids_clicked")
    return df


def datetime_to_unix(df, col_name):
    return df.with_columns(
        pl.col(col_name).map_elements(lambda lst: [int(dt.timestamp()) for dt in lst])
    )


def user_features(df):
  return df.select([
      pl.col("user_id"),
      pl.col("impression_time_fixed").list.len().alias("u_history_len"),
      pl.col("impression_time_fixed").list.last().alias("u_impression_time_last"),
      pl.col("impression_time_fixed").list.mean().alias("u_impression_time_mean"),
      pl.col("read_time_fixed").list.last().alias("u_read_time_last"),
      pl.col("read_time_fixed").list.mean().alias("u_read_time_mean"),
      pl.col("scroll_percentage_fixed").list.last().alias("u_scroll_percentage_last"),
      pl.col("scroll_percentage_fixed").list.mean().alias("u_scroll_percentage_mean"),
  ])


def exact_first(df, col_name):

    new_col_name = f"{col_name}_first"
    df = df.with_columns(
        pl.col(col_name).list.first().alias(new_col_name)
    )
    return df

def pop_count_list(df, out_col_name, bins):

    pop_list = []
    for bin in bins:
        tmp = df.with_columns(
            (df["impression_time"] / bin).cast(pl.Int64).alias(f"time_bin_{bin}")
        ).group_by([f"time_bin_{bin}", "article_id"]).agg(
            pl.count().alias(out_col_name)
        )
        pop_list.append(tmp)
    return pop_list


In [8]:
print('impression_time min:', df_impression.select(pl.col("impression_time")).min().item())
print('impression_time max:', df_impression.select(pl.col("impression_time")).max().item())

df_impression = df_impression.with_columns(
    pl.col("article_ids_inview").list.len().alias("view_num").cast(pl.UInt8),
    #impression["impression_time"].dt.timestamp("ms") / 1000).cast(pl.Int64),
    (pl.col("impression_time").dt.timestamp("ms") / 1000).cast(pl.Int64),
)

if DATA_TYPE in ('train', 'valid'):
    df_impression = df_impression.drop(ignore_col_train)
show_df(df_impression, 3)

impression_time min: 2023-05-18 07:00:00
impression_time max: 2023-05-25 06:59:59
(12063890, 15)


impression_id,impression_time,read_time,scroll_percentage,device_type,article_ids_inview,article_ids_clicked,user_id,is_sso_user,gender,postcode,age,is_subscriber,session_id,view_num
u32,i64,f32,f32,i8,list[i32],list[i32],u32,bool,i8,i8,i8,bool,u32,u8
47727,1684704907,20.0,,1,"[9482380, 9775183, … 9538375]",[9775183],18293,False,,,,False,265,6
47731,1684704753,13.0,,1,"[9774557, 9774516, … 9759966]",[9759966],18293,False,,,,False,265,5
47736,1684704812,17.0,,1,"[9759966, 9774557, … 9775323]",[9774652],18293,False,,,,False,265,13


In [9]:
if DATA_TYPE == 'test':
  # testの場合はimpression_id=0が複数含まれており、user_idも入れないとuniqueにならないので入れる
  exp_col = ["article_ids_inview"]
  df_impression_article = df_impression.select(["impression_id", "user_id"] + exp_col)
else:
  exp_col = ["article_ids_inview", "article_ids_clicked"]
  df_impression_article = df_impression.select(["impression_id"] + exp_col)

df_impression = df_impression.drop(exp_col)

In [10]:
# expand list
df_impression_article = expand_behavior(df_impression_article, DATA_TYPE)
show_df(df_impression_article, 2)

(133810641, 3)


impression_id,article_id_inview,clicked
u32,i32,bool
47727,9482380,False
47727,9775183,True


In [11]:
# save
if DEBUG_MODE:
    df_impression.write_parquet(f"{out_path}/small_{DATA_TYPE}_impression.parquet")
    df_impression_article.write_parquet(f"{out_path}/small_{DATA_TYPE}_impression_article.parquet")
else:
    df_impression.write_parquet(f"{out_path}/{DATA_TYPE}_impression.parquet")
    df_impression_article.write_parquet(f"{out_path}/{DATA_TYPE}_impression_article.parquet")
del df_impression, df_impression_article
gc.collect()

0

## History features (key: user_id)

In [12]:
if DATA_TYPE == 'train':
    df_user = pl.read_parquet(input_path + 'train/history.parquet')
elif DATA_TYPE == 'valid':
    df_user = pl.read_parquet(input_path +  'validation/history.parquet')
else:
    df_user = pl.read_parquet(input_path + 'test/history.parquet')

In [13]:
df_user = datetime_to_unix(df_user, "impression_time_fixed")
show_df(df_user, 3)

(788090, 5)


user_id,impression_time_fixed,scroll_percentage_fixed,article_id_fixed,read_time_fixed
u32,list[i64],list[f32],list[i32],list[f32]
10029,"[1682662617, 1682662651, … 1684393190]","[23.0, 69.0, … null]","[9735579, 9739888, … 9770541]","[28.0, 24.0, … 0.0]"
10033,"[1682593892, 1682593976, … 1684354962]","[33.0, 41.0, … 29.0]","[9738139, 9738263, … 9769404]","[2.0, 2.0, … 1.0]"
10034,"[1682848017, 1682848053, … 1684226452]","[null, 88.0, … 100.0]","[9742693, 9742686, … 9767363]","[21.0, 103.0, … 9.0]"


In [14]:
# User * Article (過去に同一記事のclickがあった場合)
df_user_article = df_user.explode(["impression_time_fixed", "scroll_percentage_fixed", "article_id_fixed", "read_time_fixed"]).rename({"article_id_fixed": "article_id"})
df_user_article = df_user_article.group_by(["user_id", "article_id"]).agg([
    pl.col("impression_time_fixed").mean().alias("ua_impression_time_mean"),
    pl.col("impression_time_fixed").last().alias("ua_impression_time_last"),
    pl.col("scroll_percentage_fixed").mean().alias("ua_scroll_percentage_mean"),
    pl.col("scroll_percentage_fixed").last().alias("ua_scroll_percentage_last"),
    pl.col("read_time_fixed").mean().alias("ua_read_time_mean"),
    pl.col("read_time_fixed").last().alias("ua_read_time_last"),
    pl.count().alias("ua_count")
])
show_df(df_user_article, 3)

(105979276, 9)


user_id,article_id,ua_impression_time_mean,ua_impression_time_last,ua_scroll_percentage_mean,ua_scroll_percentage_last,ua_read_time_mean,ua_read_time_last,ua_count
u32,i32,f64,i64,f32,f32,f32,f32,u32
10029,9740249,1682700000.0,1682682168,68.0,68.0,19.0,19.0,1
10029,9739938,1682700000.0,1682682471,21.0,21.0,22.0,22.0,1
10029,9742625,1682800000.0,1682835151,60.0,60.0,37.0,37.0,1


In [15]:
df_user = user_features(df_user)
show_df(df_user, 2)

(788090, 8)


user_id,u_history_len,u_impression_time_last,u_impression_time_mean,u_read_time_last,u_read_time_mean,u_scroll_percentage_last,u_scroll_percentage_mean
u32,u32,i64,f64,f32,f32,f32,f32
10029,678,1684393190,1683500000.0,0.0,29.384956,,49.521538
10033,587,1684354962,1683500000.0,1.0,86.124359,29.0,67.584702


In [None]:
# save
if DEBUG_MODE:
    df_user.write_parquet(f"{out_path}/small_{DATA_TYPE}_user.parquet")
    df_user_article.write_parquet(f"{out_path}/small_{DATA_TYPE}_user_article.parquet")
else:
    df_user.write_parquet(f"{out_path}/{DATA_TYPE}_user.parquet")
    df_user_article.write_parquet(f"{out_path}/{DATA_TYPE}_user_article.parquet")
del df_user, df_user_article
gc.collect()

0

## Article Features (key: article_id)

In [None]:
# read file
df_article = pl.read_parquet(input_path + '/articles.parquet')
# 使い道がなさそうなものは一旦けずる
ignore_cols = ["url"]
df_article = df_article.drop(ignore_cols)

# setで残しておくfeature, 後でuser素性と突き合わせて使う
#set_cols = ["subcategory", "topics", "entity_groups", "ner_clusters"]
#for i in set_cols:
#    df_article = df_article.with_columns(pl.col(i).map_elements(lambda x: set(x)).alias(f"{i}_set"))

# 後段で処理しやすいようにunixtimeに直しておく
df_article = df_article.with_columns(
    (df_article["published_time"].dt.timestamp("ms") / 1000).cast(pl.Int64),
    (df_article["last_modified_time"].dt.timestamp("ms") / 1000).cast(pl.Int64),
)

# 文字数だけをfeatureとして加える
str_cols = ["title", "subtitle", "body"]
for i in str_cols:
  df_article = df_article.with_columns(pl.col(i).str.len_bytes().cast(pl.UInt16).alias(f'{i}_len'))
  df_article = df_article.drop(i)

# listの最初の要素
first_element_list = ["ner_clusters", "entity_groups", "topics", "subcategory"]
for i in first_element_list:
  df_article = exact_first(df_article, i)

# 大事そうなやつはlastも加える
last_element_list = ["ner_clusters"]
for i in first_element_list:
  df_article = exact_first(df_article, i)

# listの要素数をfeatureとして加える
list_cols = ["image_ids", "ner_clusters", "entity_groups", "topics", "subcategory"]
for i in list_cols:
    df_article = df_article.with_columns(pl.col(i).list.len().cast(pl.UInt8).alias(f'{i}_len'))
    df_article = df_article.drop(i)

# fill null
num_features = ["total_inviews", "total_pageviews", "total_read_time", "sentiment_score"]
df_article = df_article.with_columns(pl.col(num_features).fill_null(0))

original_names = df_article.columns
new_names = [f"a_{name}" if name != 'article_id' else name for name in original_names]
df_article = df_article.rename(dict(zip(original_names, new_names)))
show_df(df_article, 3)

(125541, 24)


article_id,a_last_modified_time,a_premium,a_published_time,a_article_type,a_category,a_category_str,a_total_inviews,a_total_pageviews,a_total_read_time,a_sentiment_score,a_sentiment_label,a_title_len,a_subtitle_len,a_body_len,a_ner_clusters_first,a_entity_groups_first,a_topics_first,a_subcategory_first,a_image_ids_len,a_ner_clusters_len,a_entity_groups_len,a_topics_len,a_subcategory_len
i32,i64,bool,i64,str,i16,str,i32,i32,f32,f32,str,u16,u16,u16,str,str,str,i16,u8,u8,u8,u8,u8
3000022,1688019632,False,1158744258,"""article_defaul…",414,"""underholdning""",0,0,0.0,0.9911,"""Negative""",30,120,787,"""David Gardner""","""PER""","""Kriminalitet""",432,1,1,1,5,1
3000063,1688019632,False,1159083930,"""article_defaul…",118,"""nyheder""",0,0,0.0,0.5155,"""Neutral""",32,45,2433,,,"""Kendt""",133,2,0,0,4,1
3000613,1688019633,False,1147174140,"""article_defaul…",142,"""sport""",0,0,0.0,0.9876,"""Negative""",36,116,921,"""Frankrig""","""LOC""","""Kendt""",196,1,4,4,5,2


In [None]:
# save
if DEBUG_MODE:
    df_article.write_parquet(f"{out_path}/small_{DATA_TYPE}_article.parquet")
else:
    df_article.write_parquet(f"{out_path}/{DATA_TYPE}_article.parquet")