In [1]:
import polars as pl
import numpy as np
import string
manual_seed = 23

np.random.seed(manual_seed)
pl.set_random_seed(manual_seed)

# Reading database

In [2]:
db_path = 'dbs/db.sqlite3'
connection_string = 'sqlite://' + db_path
df = pl.read_database_uri(
    '''SELECT product_id, text AS review_text, recommended, found_helpful, found_funny 
    FROM review LEFT JOIN product ON product_id = product.id''',
    connection_string
)

# Data cleanup

In [3]:
# convert to integers
df = df.with_columns(
    # cast features to minimal viable types
    pl.col("found_funny").cast(pl.UInt16, strict=False).fill_null(strategy="zero"),
    pl.col("found_helpful").cast(pl.UInt16, strict=False).fill_null(strategy="zero"),
    pl.col("recommended").cast(pl.Int8)
)

In [5]:
import string
# filter out reviews that don't contain any letters.
df = df.filter(pl.col('review_text').str.contains_any(list(string.ascii_lowercase) + list(string.ascii_uppercase)))

# Regression Metric

In [7]:
def clip_column(df: pl.DataFrame, column_name: str, quantile: float=0.999, new_column_name: str=None) -> (pl.DataFrame, float):
    """
    When a value in a specified column falls outside the specified quantile, make it equal to the largest value in the specified quantile.
    This is used to clip big outliers.
    """
    if new_column_name is None:
        new_column_name = column_name
    cutoff_value = df.select(column_name).quantile(0.999)
    return pl.when(pl.col(column_name) > cutoff_value).then(cutoff_value).otherwise(pl.col(column_name)).alias(new_column_name)

In [9]:
# create normalized metrics
df = df.with_columns(
    clip_column(df, 'found_funny', new_column_name='found_funny_cutoff'),
    clip_column(df, 'found_helpful', new_column_name='found_helpful_cutoff')
)
df = df.with_columns(
    (
        (
            (pl.col("found_funny_cutoff") / pl.col("found_funny_cutoff").max()) + 
            (pl.col("found_funny_cutoff") / pl.col("found_funny_cutoff").max()).over("product_id")
        ) / 2).fill_nan(0.0).alias("found_funny"),
    (
        (
            (pl.col("found_helpful_cutoff") / pl.col("found_helpful_cutoff").max()) + 
            (pl.col("found_helpful_cutoff") / pl.col("found_helpful_cutoff").max()).over("product_id")
        ) / 2).fill_nan(0.0).alias("found_helpful")
)
df = df.drop(["found_funny_cutoff", "found_helpful_cutoff"])

# Data split

In [None]:
# split into train test dev
df_split = df.select("product_id").unique("product_id").sort("product_id")
df_split = df_split.with_columns(
    pl.lit(np.random.rand(df_split.height)).alias("split")
)
df_split = df_split.with_columns(
    pl.when(pl.col("split") < 0.8).then(pl.lit("train"))
        .otherwise(pl.when(pl.col("split") < 0.9).then(pl.lit("test"))
        .otherwise(pl.lit("dev"))).alias("split")
)
df_dict = df.join(df_split, on="product_id", how="left").partition_by("split", as_dict=True, include_key=False)

In [11]:
def write_parquet(df_dict, filename):
    df_dict[("train",)].write_parquet(filename + '_train.parquet')
    df_dict[("test",)].write_parquet(filename + '_test.parquet')
    df_dict[("dev",)].write_parquet(filename + '_dev.parquet')

write_parquet(df_dict, "data/complete")


In [12]:
df_dict_500k = {}
df_dict_500k[("train",)] = df_dict[("train",)].sample(500000, seed=manual_seed, shuffle=True)
df_dict_500k[("test",)] = df_dict[("test",)].sample(50000, seed=manual_seed, shuffle=True)
df_dict_500k[("dev",)] = df_dict[("dev",)].sample(50000, seed=manual_seed, shuffle=True)

write_parquet(df_dict_500k, "data/500k_50k")