In [None]:
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import udf,pandas_udf
import pyspark.sql.types as T
from  pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from scipy.interpolate import interp1d
from itertools import product
import math
plt.style.use('seaborn-whitegrid')
%matplotlib inline

In [None]:
def create_window(partitionby, orderby = None, rangebetween = None):
    out = f"Window.partitionBy('{partitionby}')"
    if orderby is not None:
        out = out + f".orderBy('{orderby}')"
    if rangebetween is not None:
        out = out + f".rangeBetween({rangebetween[0]}, {rangebetween[1]})"
    return eval(out)
    
    
def plot_frames_train_val(train, validation, frames, main, x = "id", y1 = "cumsum", y2 = "cumsum", ax1_lab = "train", ax2_lab = "Validation", rows = 7, cols = 20):
    fig,axs =  plt.subplots(rows, cols, figsize = (20,10))
    k = 0
    for i in range(rows):
        for j in range(cols):
            try:
                train_data  = train[train.sku.isin([frames[k]])]
                val_data  = validation[validation.sku.isin([frames[k]])]
                l1 = axs[i][j].scatter(train_data[x], train_data[y1], c = "r", label = ax1_lab, alpha = 0.5)
                l2 = axs[i][j].scatter(val_data[x], val_data[y2], c = "b", label = ax2_lab, alpha = 0.5)
                axs[i][j].set_xticks([])
                axs[i][j].set_yticks([])
                axs[i][j].title.set_text(frames[k])
            except:
                pass
            k += 1
    fig.legend([l1, l2], labels = [ax1_lab.title(), ax2_lab.title()])
    plt.subplots_adjust(right=0.9)
    fig.suptitle(main.title())
    plt.show()
     
        
@udf(T.IntegerType())
def count_zeros(x):
    counter = 0
    for i in x:
        if i == 0.0:
            counter += 1
        else:
            break
    return counter
    
    
@udf(T.IntegerType()) 
def are_consecutive_dates(x):
    x = sorted([datetime.strptime(i, "%Y-%m-%d") for i in x])
    res = True
    for idx in range(1, len(x)):
        if (x[idx] - x[idx - 1]).days != 1:
            res = False
            break
    return res


def forward_fill(window, data, column):
     return data.withColumn(column, F.last(column, True).over(window)) 
    
def write_data(data, name):
    data.coalesce(1).write.format("parquet").mode("overwrite").save(name)
    
    
def proportion_transform(x, var, drop_first = False):
    out = x.groupBy(["sku", var]).count()
    out = out.groupBy("sku").pivot(var).sum("count").na.fill(0)
    distinct_values = [row[0] for row in x.select(var).distinct().collect()]
    for i in distinct_values:
        denominator = set(distinct_values) - set(i)
        denominator = [f"F.col('{j}')" for j in denominator]
        denominator = "+".join(denominator) 
        expression = f"out.withColumn('{i}_prop', F.col('{i}')/({denominator}))" 
        out = eval(expression)
    out = out.drop(*distinct_values)
    if drop_first:
        out = out.drop(f"{distinct_values[0]}_prop")
    return out.na.fill(0)


def create_proportion_columns(x):
    listing_type = proportion_transform(x, "listing_type")
    shipping_payment = proportion_transform(x, "shipping_payment")
    shipping_logistic_type = proportion_transform(x, "shipping_logistic_type")
    minutes_active = x.groupBy("sku").avg("minutes_active").withColumnRenamed("avg(minutes_active)", "minutes_active_avg")
    selling_rate = x.groupBy("sku").avg("selling_rate").withColumnRenamed("avg(selling_rate)", "selling_rate_avg")
    price = x.groupBy("sku").avg("current_price").withColumnRenamed("avg(current_price)", "price_avg")
    features = (listing_type.join(shipping_payment, "sku")
            .join(shipping_logistic_type, "sku")
            .join(minutes_active, "sku")
            .join(selling_rate, "sku")
            .join(price, "sku")) 
    return features


def get_dof(X, X_active):
    X = X.withColumn("dayofweek", F.dayofweek("date"))
    X_active = X_active.withColumn("dayofweek", F.dayofweek("date"))
    dof_X = X.groupBy("sku").pivot("dayofweek").sum("sold_quantity").na.fill(0)
    dof_X_active = X.groupBy("sku").pivot("dayofweek").sum("sold_quantity").na.fill(0)
    dof_X = dof_X.withColumn('sum',sum([F.col(c) for c in dof_X.columns]))
    dof_X_active = dof_X_active.withColumn('sum',sum([F.col(c) for c in dof_X_active.columns]))
    dof_X = dof_X.select(F.col("sku"), *[F.col(x)/F.col("sum") for x in dof_X.columns[1:-1]]).drop("sum")
    dof_X_active = dof_X_active.select(F.col("sku"), *[F.col(x)/F.col("sum") for x in dof_X_active.columns[1:-1]]).drop("sum")
    for i,j in zip(range(7), dof_X.columns[1:]):
        dof_X = dof_X.withColumnRenamed(j, f"day_{i}")
        dof_X_active = dof_X.withColumnRenamed(j, f"day_{i}")
    return dof_X,dof_X_active


def get_days_active(X):
    days_X_active = X.groupBy("sku").agg({"is_active":"sum"})
    total_days = X.groupBy("sku").agg({"sku":"count"})
    days_X_active = days_X_active.join(total_days, "sku")
    days_X_active = days_X_active.withColumn("proportion_active", F.col("sum(is_active)")/F.col("count(sku)"))
    return days_X_active.select("sku", "proportion_active")

In [None]:
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "15g") \
    .appName('meli-app') \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [None]:
data_path = "./DATA"

train = spark.read.parquet(f"{data_path}/train_data.parquet")
meta = spark.read.json(f"{data_path}/items_static_metadata_full.jl")
test = spark.read.csv(f"{data_path}/test_data.csv", header = True)
train0 = train # keep track of original data

meta = meta.withColumn("item_domain_id", F.regexp_replace(F.col("item_domain_id"), "^.*-", ""))
meta.filter(F.col("sku") == 454273).show() # some with nulls
meta = meta.na.fill("")

# Add metadata, items sold/domain id a
test = test.withColumn("monid", F.monotonically_increasing_id())
test = test.join(meta[["sku", "item_domain_id", "site_id"]], "sku", how = "left")
test = test.orderBy("monid")

In [None]:
meta.filter(F.col("sku") == 35253).show() # some with nulls

In [None]:
train.filter(F.col("sku") == 35253).show() # some with nulls

In [None]:
counts_total = train.groupBy("sku").count()
counts_total = counts_total.groupBy(F.col("count").alias("number_of_items")).count().sort(F.asc("number_of_items")).toPandas()
counts_total["proportion"] = 100 * counts_total["count"] / counts_total["count"].sum()
expr = counts_total[counts_total["count"] > 30].proportion.sum()

In [None]:
sns.scatterplot("number_of_items", "proportion", data = counts_total)
print(f"Counts > 30: {expr}")

In [None]:
# rows with leading 0s of minutes actives
tt = train.groupBy("sku").agg(F.collect_list("minutes_active").alias("vec"))
tt = tt.withColumn("to_remove", count_zeros(F.col("vec"))).drop("vec")
tt = tt.toPandas()
plt.hist(tt.to_remove.astype("float")[tt.to_remove > 0], bins = range(0, 60, 1))
plt.show()

In [None]:
# How many items have x ts values with minutes active > 0?
counts = train.filter(F.col("minutes_active") > 0).groupBy("sku").agg({"sku":"count"}).withColumnRenamed("count(sku)", "counts")
counts = counts.groupBy("counts").count()
counts = counts.toPandas()
counts[["proportion"]] = 100 * counts.counts / counts.counts.sum()
counts = counts.sort_values("counts")
expr = counts[counts["counts"] > 30].proportion.sum()
sns.scatterplot("counts", "proportion", data = counts)
print(f"Counts > 30: {expr}")

In [None]:
# Check if dates are consecutive for each SKU
not_consecutives = train.groupBy("sku").agg(are_consecutive_dates(F.collect_list(F.col("date"))).alias("consecutive"))
not_consecutives.filter(F.col("consecutive") == False).show()

In [None]:
# check if ts start at different dates
mindate = train.groupBy("sku").agg(F.min("date"))
mindate.select("min(date)").distinct().show()

# check if ts start at different dates
maxdate = train.groupBy("sku").agg(F.max("date"))
maxdate.select("max(date)").distinct().show()

In [None]:
train = train.withColumn('selling_rate', F.when(F.col("minutes_active") > 0, F.col("sold_quantity")/F.col("minutes_active")).otherwise(F.lit(0)))
train = train.withColumn('is_active', F.when(F.col("minutes_active") > 0, F.lit(1)).otherwise(F.lit(0)))
train_active = train.filter(F.col('is_active') == 1)

window = create_window('sku', 'date')
train = train.withColumn('id',  F.row_number().over(window))
train = train.withColumn("is_train", F.when(F.col("id") <= 30, F.lit(1)).otherwise(F.lit(0)))
train = train.withColumn("sku_split", F.concat(F.col("sku"), F.lit("_"), F.col("is_train").cast(T.StringType()))).drop("id")

train_active = train_active.withColumn('id',  F.row_number().over(window))
train_active = train_active.withColumn("is_train", F.when(F.col("id") <= 30, F.lit(1)).otherwise(F.lit(0)))
train_active = train_active.withColumn("sku_split", F.concat(F.col("sku"), F.lit("_"), F.col("is_train").cast(T.StringType()))).drop("id")
# Compute cumsum for items sold quantities


window2 = create_window('sku_split', 'date')
windowval = create_window("sku_split", 'date', [Window.unboundedPreceding, 0])

train = train.withColumn('cumsum', F.sum('sold_quantity').over(windowval))
train_active = train_active.withColumn('cumsum', F.sum('sold_quantity').over(windowval))

train = train.withColumn("max", F.max("cumsum").over(windowval) + 1)
train_active = train_active.withColumn("max", F.max("cumsum").over(windowval) + 1)
train = train.withColumn('cumsum_pc', F.col("cumsum") / F.col("max"))
train_active = train_active.withColumn('cumsum_pc', F.col("cumsum") / F.col("max"))

train = train.withColumn('id',  F.row_number().over(window2))
train_active = train_active.withColumn('id',  F.row_number().over(window2))

In [None]:
validation = train.filter(F.col("is_train") == 0)
train = train.filter(F.col("is_train") == 1)
validation_active = train_active.filter(F.col("is_train") == 0)
train_active = train_active.filter(F.col("is_train") == 1)

In [None]:
days_activity_train = get_days_active(train)
days_activity_validation = get_days_active(validation)

In [None]:
dof_train,dof_train_active = get_dof(train,train_active)
dof_validation,dof_validation_active = get_dof(validation,validation_active)

In [None]:
rolling_windows = [1, 2, 3, 4, 5]

for cumtype in ["cumsum", "cumsum_pc"]:
    for interval in rolling_windows:
        train = train.withColumn(f"rolling_{cumtype}_{interval}", F.avg(cumtype).over(create_window('sku', 'id', [-interval, 0])))
        train_active = train_active.withColumn(f"rolling_{cumtype}_{interval}", F.avg(cumtype).over(create_window('sku', 'id', [-interval,0])))
        validation = validation.withColumn(f"rolling_{cumtype}_{interval}", F.avg(cumtype).over(create_window('sku', 'id', [-interval, 0])))
        validation_active = validation_active.withColumn(f"rolling_{cumtype}_{interval}", F.avg(cumtype).over(create_window('sku', 'id', [-interval, 0])))

In [None]:
skus = train.select("sku").distinct().rdd.flatMap(lambda x: x).collect()
skus = np.random.choice(skus, 200).tolist()

In [None]:
q0 = train.filter(train.sku.isin(skus)).toPandas()
q1 = validation.filter(train.sku.isin(skus)).toPandas()

frames = q0.sku.unique()
plot_frames_train_val(train = q0, validation = q1, frames = frames, main = "Train vs validation")
plot_frames_train_val(train = q0[q0.minutes_active > 0], validation = q1[q1.minutes_active > 0], frames = frames, main = "Train vs validation for both minutes active > 0")
plot_frames_train_val(train = q0, validation = q0[q0.minutes_active > 0], frames = frames, ax1_lab = "train", ax2_lab= "Train minutes active > 0", 
                      main = "Train vs train minutes active > 0") 
plot_frames_train_val(train = q1, validation = q1[q1.minutes_active > 0], frames = frames, ax1_lab = "validation", ax2_lab= "Validation minutes active > 0",
                      main = "Validation vs validation minutes active > 0") 

In [None]:
q0 = train_active.filter(train_active.sku.isin(skus)).toPandas()
q1 = validation_active.filter(train_active.sku.isin(skus)).toPandas()

plot_frames_train_val(train = q0, validation = q1, frames = frames, main = "Train active vs validation")

In [None]:
features_train = create_proportion_columns(train)
features_train_active = create_proportion_columns(train_active)
features_validation = create_proportion_columns(validation)
features_validation_active = create_proportion_columns(validation)

In [None]:
# Generate time series for 30 days
id = spark.range(1,31)
sku = train.select(F.col("sku")).distinct()
dates_sku = id.crossJoin(sku)

train = dates_sku.join(train, ["sku", "id"], how="left")
train_active = dates_sku.join(train_active, ["sku", "id"], how="left")

validation = dates_sku.join(validation, ["sku", "id"], how="left")
validation_active = dates_sku.join(validation_active, ["sku", "id"], how="left")

In [None]:
# Add metadata
train = train.join(meta[["sku", "item_domain_id", "site_id"]], "sku")
train_active = train_active.join(meta[["sku", "item_domain_id", "site_id"]], "sku")
validation = validation.join(meta[["sku", "item_domain_id", "site_id"]], "sku")
validation_active = validation_active.join(meta[["sku", "item_domain_id", "site_id"]], "sku")

In [None]:
to_drop = ["date", "is_train", "sku_split"]
train = train.drop(*to_drop)
validation = validation.drop(*to_drop)
train_active = train_active.drop(*to_drop)
validation_active = validation_active.drop(*to_drop)

In [None]:
windowval = create_window('sku', 'id', [Window.unboundedPreceding, 0])

for item in ["item_domain_id", "site_id", "currency", "max"]:
    train = forward_fill(windowval, train, item) # fill with last non null value
    train_active = forward_fill(windowval, train_active, item) # fill with last non null value
    validation = forward_fill(windowval, validation, item) # fill with last non null value
    validation_active = forward_fill(windowval, validation_active, item) # fill with last non null value


to_fill =  ["sold_quantity", "minutes_active", "is_active", 
            "current_price", "cumsum", "selling_rate"] + [f"rolling_cumsum_{interval}" for interval in rolling_windows] + [f"rolling_cumsum_pc_{interval}" for interval in rolling_windows] 


to_fill = dict(zip(to_fill, [-1 for i in range(len(to_fill))]))
train = train.na.fill(to_fill)
train_active = train_active.na.fill(to_fill).drop("is_active")
validation = validation.na.fill(to_fill)
validation_active = validation_active.na.fill(to_fill).drop("is_active")

In [None]:
train_active_min = train_active.select("sku", "id", "rolling_cumsum_1", 'item_domain_id', "site_id")
validation_active_min = validation_active.select("sku", "id", "rolling_cumsum_1", 'item_domain_id', "site_id")

In [None]:
train.printSchema()

In [None]:
validation.printSchema()

In [None]:
test.printSchema()

In [None]:
write_data(train,"train.parquet")
write_data(validation,"validation.parquet")
write_data(train_active,"train_active.parquet")
write_data(validation_active,"validation_active.parquet")
write_data(test,"test.parquet")

write_data(train_active_min, "train_active_min.parquet")
write_data(validation_active_min, "validation_active_min.parquet")

write_data(features_train,"features_train.parquet")
write_data(features_train_active,"features_train_active.parquet")
write_data(features_validation,"features_validation.parquet")
write_data(features_validation_active,"features_validation_active.parquet")

write_data(dof_train,"dof_train.parquet")
write_data(dof_train_active,"dof_train_active.parquet")
write_data(dof_validation,"dof_validation.parquet")
write_data(dof_validation_active,"dof_validation_active.parquet")

write_data(days_activity_train, "days_activity_train.parquet")
write_data(days_activity_validation, "days_activity_validation.parquet")