In [68]:
import polars as pl
from pathlib import Path
import numpy as np
import datetime

In [69]:
dpath = Path('../dataset')

dtype = 'small'
articles = pl.read_parquet(f'{dpath}/ebnerd_{dtype}/articles.parquet')

behaviors_train = pl.read_parquet(f'{dpath}/ebnerd_{dtype}/train/behaviors.parquet')
history_train = pl.read_parquet(f'{dpath}/ebnerd_{dtype}/train/history.parquet')

behaviors_val = pl.read_parquet(f'{dpath}/ebnerd_{dtype}/validation/behaviors.parquet')
history_val = pl.read_parquet(f'{dpath}/ebnerd_{dtype}/validation/history.parquet')

In [70]:
print('History train: ', history_train['impression_time_fixed'].explode().min(), history_train['impression_time_fixed'].explode().max(), (history_train['impression_time_fixed'].explode().max() - history_train['impression_time_fixed'].explode().min()))
print('Behaviors train: ', behaviors_train['impression_time'].explode().min(), behaviors_train['impression_time'].explode().max(), behaviors_train['impression_time'].explode().max() - behaviors_train['impression_time'].explode().min())

History train:  2023-04-27 07:00:00 2023-05-18 06:59:59 20 days, 23:59:59
Behaviors train:  2023-05-18 07:00:01 2023-05-25 06:59:58 6 days, 23:59:57


In [71]:
print('History val: ', history_val['impression_time_fixed'].explode().min(), history_val['impression_time_fixed'].explode().max(), (history_val['impression_time_fixed'].explode().max() - history_val['impression_time_fixed'].explode().min()))
print('Behaviors val: ', behaviors_val['impression_time'].explode().min(), behaviors_val['impression_time'].explode().max(), behaviors_val['impression_time'].explode().max() - behaviors_val['impression_time'].explode().min())

History val:  2023-05-04 07:00:00 2023-05-25 06:59:59 20 days, 23:59:59
Behaviors val:  2023-05-25 07:00:02 2023-06-01 06:59:59 6 days, 23:59:57


In [72]:
behaviors_val['impression_time'].explode().max() - behaviors_train['impression_time'].explode().min()

datetime.timedelta(days=13, seconds=86398)

In [73]:
def behaviors_to_history(behaviors: pl.DataFrame) -> pl.DataFrame:
        return behaviors.sort('impression_time').select('user_id', 'impression_time', 'next_scroll_percentage', 'article_ids_clicked', 'next_read_time')\
                .rename({'impression_time': 'impression_time_fixed', 
                        'article_ids_clicked': 'article_id_fixed', 
                        'next_read_time': 'read_time_fixed', 
                        'next_scroll_percentage': 'scroll_percentage_fixed'})\
                .explode('article_id_fixed').group_by('user_id').agg(pl.all())
        
behaviors_to_history(behaviors_train).head(2)

user_id,impression_time_fixed,scroll_percentage_fixed,article_id_fixed,read_time_fixed
u32,list[datetime[μs]],list[f32],list[i32],list[f32]
1802107,"[2023-05-18 09:04:03, 2023-05-18 13:07:43, … 2023-05-24 21:28:28]","[34.0, null, … 33.0]","[9770886, 9771351, … 9776967]","[3.0, 10.0, … 2.0]"
475476,"[2023-05-18 12:52:59, 2023-05-20 13:31:53, … 2023-05-24 23:44:29]","[44.0, 100.0, … 46.0]","[9769497, 9755712, … 9779867]","[7.0, 3.0, … 14.0]"


In [74]:
history_train.explode(pl.all().exclude('user_id'))

user_id,impression_time_fixed,scroll_percentage_fixed,article_id_fixed,read_time_fixed
u32,datetime[μs],f32,i32,f32
13538,2023-04-27 10:17:43,100.0,9738663,17.0
13538,2023-04-27 10:18:01,35.0,9738569,12.0
13538,2023-04-27 10:18:13,100.0,9738663,4.0
13538,2023-04-27 10:18:17,24.0,9738490,5.0
13538,2023-04-27 10:18:23,100.0,9738663,4.0
…,…,…,…,…
1710834,2023-05-17 21:09:45,20.0,9770741,9.0
1710834,2023-05-17 21:09:55,43.0,9770594,44.0
1710834,2023-05-17 21:10:39,99.0,9728166,35.0
1710834,2023-05-17 21:11:15,99.0,9769433,44.0


In [75]:
history_all = pl.concat([
    history_train.explode(pl.all().exclude('user_id')).join(
        history_val.explode(pl.all().exclude('user_id')), 
        on=['user_id', 'impression_time_fixed'], how='anti'),
    history_val.explode(pl.all().exclude('user_id')),
    behaviors_to_history(behaviors_val).explode(pl.all().exclude('user_id')),
]).sort(['user_id', 'impression_time_fixed'])\
.group_by('user_id').agg(pl.all())
history_all.head(2)

user_id,impression_time_fixed,scroll_percentage_fixed,article_id_fixed,read_time_fixed
u32,list[datetime[μs]],list[f32],list[i32],list[f32]
10068,"[2023-04-27 14:22:26, 2023-04-27 18:51:53, … 2023-05-16 19:48:21]","[100.0, 100.0, … 100.0]","[9735753, 9739065, … 9759717]","[39.0, 21.0, … 7.0]"
10200,"[2023-05-14 05:46:27, 2023-05-14 05:46:44, … 2023-05-29 05:09:35]","[36.0, 24.0, … null]","[9764325, 9763923, … 9784952]","[16.0, 15.0, … 1.0]"


In [76]:
history_all['impression_time_fixed'].explode().dt.date().min(), history_all['impression_time_fixed'].explode().dt.date().max()

(datetime.date(2023, 4, 27), datetime.date(2023, 6, 1))

## Moving window

In [77]:
def moving_window_split_iterator(history: pl.DataFrame, behaviors: pl.DataFrame, window:int=4, window_val:int=2, stride:int=2, verbose=True):
    assert behaviors['impression_time'].is_sorted()
    
    
    all_dates = history['impression_time_fixed'].explode().dt.date().unique().append(
        behaviors['impression_time'].dt.date().unique()
    ).unique().sort().to_list()
    all_dates_map = {date: i for i, date in enumerate(all_dates)}
    if verbose:
        print(f'Date range: [{all_dates[0]}:{all_dates_map[all_dates[0]]} - {all_dates[-1]}:{all_dates_map[all_dates[-1]]}]')
    
    history_window_train_start_date = history['impression_time_fixed'].explode().min().date()    
    start_window_train_behavior_date = behaviors['impression_time'].min().date()
    start_window_hour = datetime.time(7, 0, 0)
    last_date = behaviors['impression_time'].max().date()
    i = 0
    while  start_window_train_behavior_date + datetime.timedelta(days=window + window_val) <= last_date:
        end_window_train_behavior_date = start_window_train_behavior_date + datetime.timedelta(days=window)
        start_window_val_behavior_date  = end_window_train_behavior_date
        end_window_val_behavior_date = start_window_val_behavior_date + datetime.timedelta(days=window_val)
        
        
        history_window_train_end_date = history_window_train_start_date + datetime.timedelta(days=21)
        history_window_val_start_date = history_window_train_start_date + datetime.timedelta(days=window)
        history_window_val_end_date = history_window_val_start_date + datetime.timedelta(days=21)
        


        if verbose:
            print(f'Fold {i}: ')
            print(f'Train: [[{history_window_train_start_date} - {history_window_train_end_date}] - [{start_window_train_behavior_date} - {end_window_train_behavior_date}]] [{all_dates_map[history_window_train_start_date]} - {all_dates_map[history_window_train_end_date]} - {all_dates_map[end_window_train_behavior_date]}]')
            print(f'Validation: [[{history_window_val_start_date} - {history_window_val_end_date}] - [{start_window_val_behavior_date} - {end_window_val_behavior_date}]] [{all_dates_map[history_window_val_start_date]} - {all_dates_map[history_window_val_end_date]} - {all_dates_map[end_window_val_behavior_date]}]')
        
            
        
        history_k_train = history.explode(pl.all().exclude('user_id')).filter(
            pl.col('impression_time_fixed') >= datetime.datetime.combine(history_window_train_start_date, start_window_hour),
            pl.col('impression_time_fixed') < datetime.datetime.combine(history_window_train_end_date, start_window_hour),
        ).group_by('user_id').agg(pl.all())
        
        behaviors_k_train = behaviors.filter(
            pl.col('impression_time') >= datetime.datetime.combine(start_window_train_behavior_date, start_window_hour),
            pl.col('impression_time') < datetime.datetime.combine(end_window_train_behavior_date, start_window_hour),
            pl.col('user_id').is_in(history_k_train['user_id'])
        )
        
        
        history_k_val = history.explode(pl.all().exclude('user_id')).filter(
            pl.col('impression_time_fixed') >= datetime.datetime.combine(history_window_val_start_date, start_window_hour),
            pl.col('impression_time_fixed') < datetime.datetime.combine(history_window_val_end_date, start_window_hour),
        ).group_by('user_id').agg(pl.all())

        behaviors_k_val = behaviors.filter(
            pl.col('impression_time') >= datetime.datetime.combine(start_window_val_behavior_date, start_window_hour),
            pl.col('impression_time') < datetime.datetime.combine(end_window_val_behavior_date, start_window_hour),
            pl.col('user_id').is_in(history_k_val['user_id'])
        )
        
        
        
        start_window_train_behavior_date += datetime.timedelta(days=stride)
        history_window_train_start_date += datetime.timedelta(days=stride)
        i+=1
        
        yield history_k_train, behaviors_k_train, history_k_val, behaviors_k_val

In [78]:
behaviors_all = behaviors_train.vstack(behaviors_val).sort('impression_time').set_sorted('impression_time')
for i, (history_k_train, behaviors_k_train, history_k_val, behaviors_k_val) in enumerate(moving_window_split_iterator(history_all, behaviors_all, window=4, window_val=2, stride=2)):
    if i == 1:
        break

Date range: [2023-04-27:0 - 2023-06-01:35]
Fold 0: 
Train: [[2023-04-27 - 2023-05-18] - [2023-05-18 - 2023-05-22]] [0 - 21 - 25]
Validation: [[2023-05-01 - 2023-05-22] - [2023-05-22 - 2023-05-24]] [4 - 25 - 27]
Fold 1: 
Train: [[2023-04-29 - 2023-05-20] - [2023-05-20 - 2023-05-24]] [2 - 23 - 27]
Validation: [[2023-05-03 - 2023-05-24] - [2023-05-24 - 2023-05-26]] [6 - 27 - 29]


In [86]:
history_users = history_k_train['user_id'].unique().to_list()
behaviors_users = behaviors_k_train['user_id'].unique().to_list()

not_in_history = [id for id in behaviors_users if id not in history_users]
not_in_history

[349106, 581021, 1467116, 1795668, 2439844]

In [87]:
behaviors_k_train.filter(pl.col('user_id').is_in(not_in_history))

impression_id,article_id,impression_time,read_time,scroll_percentage,device_type,article_ids_inview,article_ids_clicked,user_id,is_sso_user,gender,postcode,age,is_subscriber,session_id,next_read_time,next_scroll_percentage
u32,i32,datetime[μs],f32,f32,i8,list[i32],list[i32],u32,bool,i8,i8,i8,bool,u32,f32,f32
501830347,,2023-05-20 07:20:01,16.0,,1,"[9772300, 9772475, … 9772601]",[9755712],1795668,false,,,,false,1906844,3.0,52.0
501830339,,2023-05-20 07:20:22,3.0,,1,"[9462356, 9559366, … 9773307]",[9773307],1795668,false,,,,false,1906844,2.0,35.0
501830348,,2023-05-20 07:20:28,14.0,,1,"[9773210, 9500202, … 9769504]",[9772635],1795668,false,,,,false,1906844,16.0,60.0
501830344,,2023-05-20 07:20:59,10.0,,1,"[9518647, 9500202, … 9773137]",[9773137],1795668,false,,,,false,1906844,11.0,27.0
501830342,,2023-05-20 07:21:22,12.0,,1,"[9773210, 9746360, … 9769624]",[9769624],1795668,false,,,,false,1906844,11.0,100.0
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
384454257,,2023-05-23 20:24:52,55.0,,1,"[9735234, 9747985, … 9749058]",[9735234],349106,false,,,,false,2007716,641.0,100.0
158439798,,2023-05-24 06:16:08,52.0,,1,"[9778351, 9778021, … 9778628]",[9778369],581021,false,,,,false,1802320,23.0,46.0
158439781,,2023-05-24 06:17:55,28.0,,1,"[9778351, 9778375, … 9695098]",[9778318],581021,false,,,,false,1802320,11.0,100.0
158439784,,2023-05-24 06:18:35,22.0,,1,"[9746342, 9761586, … 9142564]",[9778310],581021,false,,,,false,1802320,3.0,21.0


In [79]:
history_k_val['impression_time_fixed'].explode().min(), history_k_val['impression_time_fixed'].explode().max()

(datetime.datetime(2023, 5, 3, 7, 0),
 datetime.datetime(2023, 5, 24, 6, 59, 59))

In [None]:
behaviors_k_train

In [89]:
not_in_history = [id for id in behaviors_train['user_id'].unique().to_list() if id not in history_train['user_id'].unique().to_list()]
not_in_history

[]