In [None]:
import rootutils
rootutils.setup_root('.', indicator='.project-root', pythonpath=True)

import os

import pandas as pd
import json

from relbench.base import Table
from IPython.display import display
import duckdb

from src.helpers.task_vector_generation_helper import get_task_columns_dict, generate_task_vector_clauses, get_timestamps_for_split, transform_features
from src.helpers.relbench_helper import get_dataset
from src.helpers.dataset_helper import get_default_db
from src.helpers.database_helper import read_stypes_cache
from src.definitions import DATA_DIR

dataset_name = 'rel-amazon'
stype_dict = read_stypes_cache(os.path.join(DATA_DIR, dataset_name, 'stypes.json'))
dataset = get_dataset(dataset_name)
timedelta = pd.Timedelta(days=365 // 4)
num_eval_timestamps = 1

# Load unique categories
with open('unique_categories.json', 'r') as f:
    categories = json.load(f)

In [None]:
db, _, _ = get_default_db(dataset_name, dataset, 'glove')
task_columns_dict = get_task_columns_dict(db, stype_dict)

sub_customer_review_clauses, final_customer_review_clauses, when_clause_customer_review = \
    generate_task_vector_clauses(['customer', 'review'], 'customer_id', task_columns_dict, stype_dict)
sub_customer_review_product_clauses, final_customer_review_product_clauses, when_clause_customer_review_product = \
    generate_task_vector_clauses(['customer', 'review', 'product'], 'customer_id', task_columns_dict, stype_dict)
sub_customer_review_review_0_clauses, final_customer_review_review_0_clauses, when_clause_customer_review_review_0 = \
    generate_task_vector_clauses(['customer', 'review', 'review'], 'customer_id', task_columns_dict, stype_dict, postfix=0)
sub_customer_review_review_1_clauses, final_customer_review_review_1_clauses, when_clause_customer_review_review_1 = \
    generate_task_vector_clauses(['customer', 'review', 'review'], 'customer_id', task_columns_dict, stype_dict, postfix=1)
sql_query = f"""
    SELECT
        universal.timestamp as timestamp,
        universal.customer_id as customer_id,
        
        -- customer-review task
        {final_customer_review_clauses},
        
        -- customer-review-product task
        {final_customer_review_product_clauses},
        
        -- customer-review-review task
        {final_customer_review_review_0_clauses},
        {final_customer_review_review_1_clauses},
        
        -- The label column
        CASE 
            WHEN 
                {when_clause_customer_review} OR 
                {when_clause_customer_review_product} OR 
                {when_clause_customer_review_review_0} OR 
                {when_clause_customer_review_review_1}
            THEN 0 
            ELSE 1 
        END AS label
    FROM
        -- universal table
        (
            SELECT DISTINCT
                t.timestamp,
                r.customer_id
            FROM
                timestamp_df t
                CROSS JOIN (
                    SELECT 
                        customer_id,
                        MIN(review_time) AS first_review
                    FROM review
                    GROUP BY customer_id
                ) r
            WHERE
                -- Case 1: Reviews that fall within a window
                EXISTS (
                    SELECT 1
                    FROM all_timestamp_df t2
                    WHERE t2.timestamp <= t.timestamp
                    AND r.first_review > t2.timestamp
                    AND r.first_review <= t2.timestamp + INTERVAL '{timedelta}'
                )
                -- Case 2 (modified): If first review is before earliest timestamp, 
                -- include for ALL timestamps
                OR (
                    r.first_review <= (SELECT MIN(timestamp) FROM all_timestamp_df)
                )
        ) as universal
        -- customer-review task
            LEFT JOIN
        (
            SELECT
                t.timestamp,
                customer__0.customer_id as customer__0__customer_id,
                {sub_customer_review_clauses}
            FROM
                timestamp_df as t
                LEFT JOIN (
                    customer as customer__0
                        INNER JOIN 
                    review as review__0 ON review__0.customer_id = customer__0.customer_id
                )
                ON
                    review__0.review_time > t.timestamp AND
                    review__0.review_time <= t.timestamp + INTERVAL '{timedelta}'
            GROUP BY
                t.timestamp,
                customer__0.customer_id
        ) as customer_review_0
            ON
                universal.customer_id = customer_review_0.customer__0__customer_id
                AND universal.timestamp = customer_review_0.timestamp
        -- customer-review-product task
            LEFT JOIN
        (   
            SELECT
                t.timestamp,
                customer__0.customer_id as customer__0__customer_id,
                {sub_customer_review_product_clauses}
            FROM
                timestamp_df as t
                LEFT JOIN (
                    customer as customer__0
                        INNER JOIN 
                    review as review__0 ON review__0.customer_id = customer__0.customer_id
                        INNER JOIN
                    product as product__0 ON product__0.product_id = review__0.product_id
                )
                ON
                    review__0.review_time > t.timestamp AND
                    review__0.review_time <= t.timestamp + INTERVAL '{timedelta}'
            GROUP BY
                t.timestamp,
                customer__0.customer_id
        ) as customer_review_product_0
            ON 
                universal.customer_id = customer_review_product_0.customer__0__customer_id
                AND universal.timestamp = customer_review_product_0.timestamp
        -- customer-review-review task (JOIN on customer_id)
            LEFT JOIN
        (
            SELECT
                t.timestamp,
                customer__0.customer_id as customer__0__customer_id,
                {sub_customer_review_review_0_clauses}
            FROM
                timestamp_df as t
                LEFT JOIN (
                    customer as customer__0
                        INNER JOIN 
                    review as review__0 ON review__0.customer_id = customer__0.customer_id
                        INNER JOIN
                    review as review__1 ON review__1.customer_id = review__0.customer_id AND review__1.review_time <= review__0.review_time
                )
                ON
                    review__0.review_time > t.timestamp AND
                    review__0.review_time <= t.timestamp + INTERVAL '{timedelta}'
            GROUP BY
                t.timestamp,
                customer__0.customer_id
        ) as customer_review_review_0
            ON
                universal.customer_id = customer_review_review_0.customer__0__customer_id
                AND universal.timestamp = customer_review_review_0.timestamp
        -- customer-review-review task (JOIN on product_id)
            LEFT JOIN
        (
            SELECT
                t.timestamp,
                customer__0.customer_id as customer__0__customer_id,
                {sub_customer_review_review_1_clauses}
            FROM
                timestamp_df as t
                LEFT JOIN (
                    customer as customer__0
                        INNER JOIN 
                    review as review__0 ON review__0.customer_id = customer__0.customer_id
                        INNER JOIN
                    review as review__1 ON review__1.product_id = review__0.product_id AND review__1.review_time <= review__0.review_time
                )
                ON
                    review__0.review_time > t.timestamp AND
                    review__0.review_time <= t.timestamp + INTERVAL '{timedelta}'
            GROUP BY
                t.timestamp,
                customer__0.customer_id
        ) as customer_review_review_1
            ON
                universal.customer_id = customer_review_review_1.customer__0__customer_id
                AND universal.timestamp = customer_review_review_1.timestamp
"""
print(sql_query)

In [None]:
# Register to duckdb
split = 'train'
db, train_timestamps = get_timestamps_for_split(dataset, stype_dict, split, timedelta, num_eval_timestamps)
timestamp_df = pd.DataFrame({"timestamp": train_timestamps})
all_timestamp_df = pd.DataFrame({"timestamp": train_timestamps})
df_dict = {f"{table_name}": table.df for table_name, table in db.table_dict.items()}
duckdb.register("timestamp_df", timestamp_df)
duckdb.register("all_timestamp_df", all_timestamp_df)
for table_name, df in df_dict.items():
    duckdb.register(table_name, df)

new_df = duckdb.sql(sql_query).df()
assert (new_df['customer_id'].max() < len(db.table_dict['customer']))

# Transform training data
label_0_rows = new_df[new_df['label'] == 0]
label_1_rows = new_df[new_df['label'] == 1]

print(label_0_rows.shape)
print(label_1_rows.shape)
print(f"Label ratio: {len(label_0_rows) / (len(label_0_rows) + len(label_1_rows))} / {len(label_1_rows) / (len(label_0_rows) + len(label_1_rows))}")

# Transform training data
scaled_train_df, std_scalers, onehot_encoder, _ = transform_features(label_0_rows, is_train=split == 'train')
display(scaled_train_df.head(3))
print(scaled_train_df.columns[2:])

# Create a DataFrame with zeros for label_1_rows with same columns as scaled_train_df
label_1_zeros = pd.DataFrame(0, index=label_1_rows.index, columns=scaled_train_df.columns[2:-1])

# Add back timestamp, customer_id and label columns from label_1_rows
label_1_zeros['timestamp'] = label_1_rows['timestamp']
label_1_zeros['customer_id'] = label_1_rows['customer_id'] 
label_1_zeros['label'] = label_1_rows['label']

# Concatenate with scaled_train_df
scaled_train_df = pd.concat([scaled_train_df, label_1_zeros], axis=0, ignore_index=True)

print("Number of customers: ", len(scaled_train_df['customer_id'].unique()))

In [None]:
# Register to duckdb
split = 'val'
db, val_timestamps = get_timestamps_for_split(dataset, stype_dict, split, timedelta, num_eval_timestamps)
timestamp_df = pd.DataFrame({"timestamp": val_timestamps})
all_timestamp_df = pd.DataFrame({"timestamp": pd.concat([pd.Series(val_timestamps), pd.Series(train_timestamps)], ignore_index=True)})
df_dict = {f"{table_name}": table.df for table_name, table in db.table_dict.items()}
duckdb.register("timestamp_df", timestamp_df)
duckdb.register("all_timestamp_df", all_timestamp_df)
for table_name, df in df_dict.items():
    duckdb.register(table_name, df)

new_df = duckdb.sql(sql_query).df()
assert (new_df['customer_id'].max() < len(db.table_dict['customer']))

# Transform val data
label_0_rows = new_df[new_df['label'] == 0]
label_1_rows = new_df[new_df['label'] == 1]

print(label_0_rows.shape)
print(label_1_rows.shape)
print(f"Label ratio: {len(label_0_rows) / (len(label_0_rows) + len(label_1_rows))} / {len(label_1_rows) / (len(label_0_rows) + len(label_1_rows))}")

# Transform val data
scaled_val_df, _, _, _ = transform_features(label_0_rows, std_scalers=std_scalers, onehot_encoder=onehot_encoder, is_train=split == 'train')
display(scaled_val_df.head(3))

# Create a DataFrame with zeros for label_1_rows with same columns as scaled_val_df
label_1_zeros = pd.DataFrame(0, index=label_1_rows.index, columns=scaled_val_df.columns[2:-1])

# Add back timestamp, customer_id and label columns from label_1_rows
label_1_zeros['timestamp'] = label_1_rows['timestamp']
label_1_zeros['customer_id'] = label_1_rows['customer_id'] 
label_1_zeros['label'] = label_1_rows['label']

# Concatenate with scaled_val_df
scaled_val_df = pd.concat([scaled_val_df, label_1_zeros], axis=0, ignore_index=True)

print("Number of customers: ", len(scaled_val_df['customer_id'].unique()))

In [None]:
# Register to duckdb
split = 'test'
db, test_timestamps = get_timestamps_for_split(dataset, stype_dict, split, timedelta, num_eval_timestamps)
timestamp_df = pd.DataFrame({"timestamp": test_timestamps})
all_timestamp_df = pd.DataFrame({"timestamp": pd.concat([pd.Series(test_timestamps), pd.Series(val_timestamps), pd.Series(train_timestamps)], ignore_index=True)})
df_dict = {f"{table_name}": table.df for table_name, table in db.table_dict.items()}
duckdb.register("timestamp_df", timestamp_df)
duckdb.register("all_timestamp_df", all_timestamp_df)
for table_name, df in df_dict.items():
    duckdb.register(table_name, df)

new_df = duckdb.sql(sql_query).df()
assert (new_df['customer_id'].max() < len(db.table_dict['customer']))

# Transform test data
label_0_rows = new_df[new_df['label'] == 0]
label_1_rows = new_df[new_df['label'] == 1]

print(label_0_rows.shape)
print(label_1_rows.shape)
print(f"Label ratio: {len(label_0_rows) / (len(label_0_rows) + len(label_1_rows))} / {len(label_1_rows) / (len(label_0_rows) + len(label_1_rows))}")

# Transform test data
scaled_test_df, _, _, _ = transform_features(label_0_rows, std_scalers=std_scalers, onehot_encoder=onehot_encoder, is_train=split == 'train')
display(scaled_test_df.head(3))

# Create a DataFrame with zeros for label_1_rows with same columns as scaled_test_df
label_1_zeros = pd.DataFrame(0, index=label_1_rows.index, columns=scaled_test_df.columns[2:-1])

# Add back timestamp, customer_id and label columns from label_1_rows
label_1_zeros['timestamp'] = label_1_rows['timestamp']
label_1_zeros['customer_id'] = label_1_rows['customer_id'] 
label_1_zeros['label'] = label_1_rows['label']

# Concatenate with scaled_test_df
scaled_test_df = pd.concat([scaled_test_df, label_1_zeros], axis=0, ignore_index=True)

print("Number of customers: ", len(scaled_test_df['customer_id'].unique()))

In [6]:
assert scaled_train_df.shape[1] == scaled_val_df.shape[1] == scaled_test_df.shape[1]
entity_col = "customer_id"
entity_table = "customer"
time_col = "timestamp"

# Create Table objects
train_table = Table(df=scaled_train_df, fkey_col_to_pkey_table={entity_col: entity_table}, pkey_col=None, time_col=time_col)
val_table = Table(df=scaled_val_df, fkey_col_to_pkey_table={entity_col: entity_table}, pkey_col=None, time_col=time_col)
test_table = Table(df=scaled_test_df, fkey_col_to_pkey_table={entity_col: entity_table}, pkey_col=None, time_col=time_col)

task_name = "user-tve-2-hop"
path_dir = os.path.join(DATA_DIR, 'relbench', dataset_name, 'tasks', task_name)
os.makedirs(path_dir, exist_ok=True)

train_table.save(os.path.join(path_dir, 'train.parquet'))
val_table.save(os.path.join(path_dir, 'val.parquet'))
test_table.save(os.path.join(path_dir, 'test.parquet'))