In [1]:
import ibis
from ibis import deferred as c

ibis.options.interactive = True
ibis.__version__

'5.0.0'

In [2]:
import duckdb

duckdb.__version__

'0.7.1'

In [3]:
duckdb_in_mem_conn = ibis.duckdb.connect()
transactions = duckdb_in_mem_conn.read_parquet("transactions.parquet")
transactions

In [4]:
transactions.count().execute() / 1e6

21.547746

In [5]:
entity_window = ibis.cumulative_window(
    group_by=c.msno, order_by=c.transaction_date
)
threshold = ibis.interval(days=60)
deadline_date = c.transaction_date.lag().over(entity_window) + threshold

(
    transactions
    .select([c.msno, c.transaction_date])
    .mutate(deadline_date=deadline_date)
)

In [6]:
(
    transactions
    .select([c.msno, c.transaction_date])
    .mutate(
        is_new_session=(c.transaction_date > deadline_date).fillna(False)
    )
)

In [7]:
(
    transactions
    .select([c.msno, c.transaction_date])
    .mutate(
        is_new_session=(c.transaction_date > deadline_date).fillna(False)
    )
    .mutate(session_id=c.is_new_session.sum().over(entity_window))
)

In [8]:
entity_window = ibis.cumulative_window(
    group_by=c.msno, order_by=c.transaction_date
)
threshold = ibis.interval(days=60)
deadline_date = c.transaction_date.lag().over(entity_window) + threshold
is_new_session = (c.transaction_date > deadline_date).fillna(False)

sessionized = (
    transactions
    .mutate(is_new_session=is_new_session)
    .mutate(session_id=c.is_new_session.sum().over(entity_window))
    .drop("is_new_session")
)
sessions = (
    sessionized
    .group_by([c.msno, c.session_id])
    .aggregate(
        session_start_date=c.transaction_date.min(),
        session_end_date=c.transaction_date.max(),
    )
    .order_by([c.msno, c.session_start_date])
)
sessions.count() / 1e6

[1;36m3.096563[0m

In [9]:
# ibis.show_sql(sessions)

In [10]:
def sessionize(table, threshold, entity_col, date_col):
    entity_window = ibis.cumulative_window(
        group_by=entity_col, order_by=date_col
    )
    deadline_date = date_col.lag().over(entity_window) + threshold
    is_new_session = (date_col > deadline_date).fillna(False)

    return (
        table
        .mutate(is_new_session=is_new_session)
        .mutate(session_id=c.is_new_session.sum().over(entity_window))
        .drop("is_new_session")
    )


def extract_sessions(table, entity_col, date_col, session_col):
    return (
        table
        .group_by([entity_col, session_col])
        .aggregate(
            session_start_date=date_col.min(),
            session_end_date=date_col.max(),
        )
        .order_by([entity_col, c.session_start_date])
    )


def preprocess_transactions(transactions):
    return (
        transactions
        .pipe(
            sessionize,
            threshold=ibis.interval(days=60),
            entity_col=c.msno,
            date_col=c.transaction_date,
        )
        .pipe(
            extract_sessions,
            entity_col=c.msno,
            date_col=c.transaction_date,
            session_col=c.session_id,
        )
    )

In [11]:
sessions = preprocess_transactions(transactions)
%time sessions.count().execute() / 1e6

CPU times: user 31 s, sys: 3.92 s, total: 34.9 s
Wall time: 6.47 s


3.096563

In [12]:
%time sessions_df = sessions.to_pandas()
sessions_df

CPU times: user 34.4 s, sys: 4.49 s, total: 38.9 s
Wall time: 8.86 s


Unnamed: 0,msno,session_id,session_start_date,session_end_date
0,+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=,0,2016-09-09,2016-09-09
1,+++IZseRRiQS9aaSkH6cMYU6bGDcxUieAi/tH67sC5s=,0,2015-11-21,2015-11-21
2,+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=,0,2016-11-16,2017-02-15
3,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,0,2015-01-31,2016-01-31
4,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,1,2016-07-31,2017-01-31
...,...,...,...,...
3096558,zzz9+ZF4+GMyt63oU8xfjo1EkvRqH5OINlES0RUJI6I=,0,2015-10-14,2016-11-03
3096559,zzzF1KsGfHH3qI6qiSNSXC35UXmVKMVFdxkp7xmDMc0=,0,2017-02-05,2017-02-05
3096560,zzzN9thH22os1dRS0VHReY/8FTfGHOi86//d+wGGFsQ=,0,2016-03-04,2017-02-02
3096561,zzztsqkufVj9DPVJDM3FxDkhlbCL5z4aiYxgPSGkIK4=,0,2015-06-08,2015-06-08


In [13]:
# clickhouse_conn = ibis.clickhouse.connect(host="localhost", port=9000)
# transactions = clickhouse_conn.table("transactions")
# %time preprocess_transactions(transactions).count().execute()

In [14]:
# ibis.show_sql(preprocess_transactions(transactions))

In [15]:
user_logs = duckdb_in_mem_conn.read_parquet("user_logs.parquet")
user_logs.count().execute() / 1e6

392.106543

In [16]:
# XXX: the following raises OutOfMemoryException after 5 minutes on my laptop

# def preprocess_user_logs(user_logs):
#     return (
#         user_logs
#         .pipe(sessionize, threshold=ibis.interval(days=2), entity_col=c.msno, date_col=c.date)
#         .pipe(extract_sessions, entity_col=c.msno, date_col=c.date, session_col=c.session_id)
#     )
# %time first_10_user_logs_sessions = preprocess_user_logs(user_logs).limit(10).execute()
# first_10_user_logs_sessions

In [17]:
import polars as pl


pl.__version__

'0.16.18'

In [18]:
transactions_df = pl.read_parquet("transactions.parquet")
transactions_df

msno,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel
str,i32,i32,i32,i32,bool,date,date,bool
"""YyO+tlZtAXYXoZ…",41,30,129,129,true,2015-09-30,2015-11-01,false
"""AZtu6Wl0gPojrE…",41,30,149,149,true,2015-09-30,2015-10-31,false
"""UkDFI97Qb6+s2L…",41,30,129,129,true,2015-09-30,2016-04-27,false
"""M1C56ijxozNaGD…",39,30,149,149,true,2015-09-30,2015-11-28,false
"""yvj6zyBUaqdbUQ…",39,30,149,149,true,2015-09-30,2015-11-21,false
"""KN7I82kjY0Tn76…",21,30,149,149,true,2015-09-30,2015-11-07,false
"""m5ptKif9BjdUgh…",39,30,149,149,true,2015-09-30,2015-11-28,false
"""uQxbyACsPOEkTI…",39,30,149,149,true,2015-09-30,2015-11-25,false
"""LUPRfoE2r3WwVW…",39,30,149,149,true,2015-09-30,2015-12-22,false
"""pMVjPLgVknaJYm…",39,30,149,149,true,2015-09-30,2015-11-18,false


In [19]:
transactions_lazy_df = pl.scan_parquet("transactions.parquet")
transactions_lazy_df

In [20]:
transactions_lazy_df.head(10).collect()

msno,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel
str,i32,i32,i32,i32,bool,date,date,bool
"""YyO+tlZtAXYXoZ…",41,30,129,129,True,2015-09-30,2015-11-01,False
"""AZtu6Wl0gPojrE…",41,30,149,149,True,2015-09-30,2015-10-31,False
"""UkDFI97Qb6+s2L…",41,30,129,129,True,2015-09-30,2016-04-27,False
"""M1C56ijxozNaGD…",39,30,149,149,True,2015-09-30,2015-11-28,False
"""yvj6zyBUaqdbUQ…",39,30,149,149,True,2015-09-30,2015-11-21,False
"""KN7I82kjY0Tn76…",21,30,149,149,True,2015-09-30,2015-11-07,False
"""m5ptKif9BjdUgh…",39,30,149,149,True,2015-09-30,2015-11-28,False
"""uQxbyACsPOEkTI…",39,30,149,149,True,2015-09-30,2015-11-25,False
"""LUPRfoE2r3WwVW…",39,30,149,149,True,2015-09-30,2015-12-22,False
"""pMVjPLgVknaJYm…",39,30,149,149,True,2015-09-30,2015-11-18,False


In [21]:
def sessionize_pl(df, entity_col, date_col, threshold):
    sessionized = (
        df.sort([entity_col, date_col])
        .with_columns(
            [
                (pl.col(date_col).diff().over(entity_col).dt.days() > threshold)
                .fill_null(False)
                .alias("is_new_session"),
            ]
        )
        .with_columns(
            [
                pl.col("is_new_session").cumsum().over(entity_col).alias("session_id"),
            ]
        )
        .drop(["is_new_session"])
    )
    return sessionized

def extract_sessions_pl(df, entity_col, date_col, session_col):
    sessions = (
        df.groupby([entity_col, session_col])
        .agg(
            [
                pl.col(date_col).min().alias("session_start_date"),
                pl.col(date_col).max().alias("session_end_date"),
            ]
        )
        .sort([entity_col, "session_start_date"])
    )
    return sessions


def preprocess_transactions_pl(df):
    return (
        df
        .pipe(
            sessionize_pl,
            entity_col="msno",
            date_col="transaction_date",
            threshold=60,
        )
        .pipe(
            extract_sessions_pl,
            entity_col="msno",
            date_col="transaction_date",
            session_col="session_id",
        )
    )


%time sessions_collected = preprocess_transactions_pl(transactions_lazy_df).collect()
sessions_collected

CPU times: user 40.9 s, sys: 3.2 s, total: 44.1 s
Wall time: 10.5 s


msno,session_id,session_start_date,session_end_date
str,u32,date,date
"""+++FOrTS7ab3tI…",0,2016-09-09,2016-09-09
"""+++IZseRRiQS9a…",0,2015-11-21,2015-11-21
"""+++hVY1rZox/33…",0,2016-11-16,2017-02-15
"""+++l/EXNMLTijf…",0,2015-01-31,2016-01-31
"""+++l/EXNMLTijf…",1,2016-07-31,2017-01-31
"""+++snpr7pmobhL…",0,2015-01-26,2017-02-26
"""++/9R3sX37Cjxb…",0,2016-03-15,2017-02-15
"""++/Gw1B9K+XOlB…",0,2015-01-13,2015-08-19
"""++/TR7WI15q2ZC…",0,2015-01-24,2015-08-26
"""++/UDNo9DLrxT8…",0,2015-01-31,2016-01-31


In [22]:
sessions_collected.shape

(3096563, 4)

The following crashes the kernel after after a few minutes:

In [23]:
# def preprocess_user_logs_pl(df):
#     return (
#         df
#         .pipe(
#             sessionize_pl,
#             threshold=2,
#             entity_col="msno",
#             date_col="date",
#         )
#         .pipe(
#             extract_sessions_pl,
#             entity_col="msno",
#             date_col="date",
#             session_col="session_id",
#         )
#     )

# user_logs_lazy_df = pl.scan_parquet("user_logs.parquet")
# %time first_10_user_logs_sessions = preprocess_user_logs_pl(user_logs_lazy_df).head(10).collect()
# first_10_user_logs_sessions