In [1]:
!pip install pyspark



In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import ArrayType, FloatType, LongType, IntegerType
import numpy as np
import pandas as pd
import os
import shutil

In [2]:
%%time
spark = SparkSession.builder.appName("ottoDB").getOrCreate()

CPU times: user 14.9 ms, sys: 18.3 ms, total: 33.2 ms
Wall time: 5.63 s


In [3]:
# import os
# path = "~/otto_kaggle/otto_obj_recommendation_sys/data_engineering/"
# os.getcwd()
# #os.chdir(path)

In [3]:
%%time
trainDf = spark.read.json("../../allData/rawFull/train.jsonl",lineSep='\n') # spark.read.json("../../allData/validationData/out_7day_test/train_sessions.jsonl",lineSep='\n')
testDf =  spark.read.json("../../allData/rawFull/test.jsonl",lineSep='\n')# spark.read.json("../../allData/validationData/out_7day_test/test_sessions.jsonl",lineSep='\n')

CPU times: user 5.76 ms, sys: 2.86 ms, total: 8.62 ms
Wall time: 27.3 s


In [4]:
## Display the original schema for reference
trainDf.printSchema()

root
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- aid: long (nullable = true)
 |    |    |-- ts: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- session: long (nullable = true)



In [8]:
## some utils
def convert_type_num(arr):
    return [0 if i == "clicks" else (1 if i == "carts" else 2) for i in arr]
convert_type_num_udf = udf(lambda row: convert_type_num(row), ArrayType(IntegerType()))

def convert_ts_second_num(arr):
    return [int(i/1000) for i in arr]
convert_ts_seconds_udf = udf(lambda row:  convert_ts_second_num(row), ArrayType(LongType()))

def data_preprocessing(df_spark):
    result_df = df_spark.withColumn("total_action", size(col("events")))\
                        .withColumn("aids", col("events.aid"))\
                        .withColumn("ts_seconds", convert_ts_seconds_udf(col("events.ts")) )\
                        .withColumn("action_types", convert_type_num_udf(col("events.type")))\
                        .withColumn("session_start_time", col("ts_seconds").getItem(0))\
                        .withColumn("session_end_time",element_at(col('ts_seconds'), -1))\
                        .select("session", "total_action", "session_start_time", "session_end_time", "aids", "ts_seconds", "action_types")

    return result_df


def save_csv_meta_info(df_spark, filename, path="../../allData/submission_phase_data/replicate_otto_fast_pipeline_source_data/"):
    df_spark.select("session", "total_action", "session_start_time", "session_end_time").write.parquet(f"{path}/temp_file.parquet")
    df_core_pd = pd.read_parquet(f'{path}/temp_file.parquet', engine='pyarrow')
    df_core_pd.to_csv(f'{path}/{filename}', index = False)
    ## delete the temp file from disc
    if os.path.exists(f'{path}/temp_file.parquet'):
        shutil.rmtree(f'{path}/temp_file.parquet')
    else:
        print("Warnings: temp file removal error.")
        

def save_npz_core_info(df_spark, filename, path="../../allData/submission_phase_data/replicate_otto_fast_pipeline_source_data/"):
    ## save the core info, aids, ts, ops as .npz file
    ## Step I: convert to exploded pyspark dfs
    df_parquet = df_spark.select(explode(arrays_zip("aids", "ts_seconds", "action_types"))) \
                            .select("col.aids", "col.ts_seconds", "col.action_types")
    ## Step II: save temp parquet file to disc
    df_parquet.write.parquet(f"{path}/temp_file.parquet")
    ## step III: read the temp parquet file from disc
    df_core_pd = pd.read_parquet(f'{path}/temp_file.parquet', engine='pyarrow')
    np_aids = np.array(df_core_pd["aids"])
    np_ts = np.array(df_core_pd["ts_seconds"])
    np_ops = np.array(df_core_pd["action_types"])
    ## step IV: save the np arrays as .npz file
    np.savez(f"{path}/{filename}", aids=np_aids, ts=np_ts, ops=np_ops)
    ## delete the temp file from disc
    if os.path.exists(f'{path}/temp_file.parquet'):
        shutil.rmtree(f'{path}/temp_file.parquet')
    else:
        print("Warnings: temp file removal error.")

In [9]:
## Preprocess trainDf and testDf 
trainDf_transformed = data_preprocessing(trainDf)
testDf_transformed = data_preprocessing(testDf)

In [11]:
%%time
## save the csv meta info portion
save_csv_meta_info(trainDf_transformed, "train_meta_data.csv")
save_csv_meta_info(testDf_transformed, "test_meta_data.csv")

CPU times: user 19.9 s, sys: 1.35 s, total: 21.2 s
Wall time: 1min 16s


In [12]:
%%time
## save the .npz core info portion
save_npz_core_info(trainDf_transformed, "train_core_data.npz")
save_npz_core_info(testDf_transformed, "test_core_data.npz")

CPU times: user 12.2 s, sys: 13.6 s, total: 25.8 s
Wall time: 2min 6s


In [16]:
trainDf_transformed.show(5)

+-------+------------+------------------+----------------+--------------------+--------------------+--------------------+
|session|total_action|session_start_time|session_end_time|                aids|          ts_seconds|        action_types|
+-------+------------+------------------+----------------+--------------------+--------------------+--------------------+
|      0|         147|        1659304800|      1661103727|[1517085, 1563459...|[1659304800, 1659...|[0, 0, 0, 0, 0, 0...|
|      1|          27|        1659304800|      1660857067|[424964, 1492293,...|[1659304800, 1659...|[1, 0, 1, 0, 1, 0...|
|      2|          13|        1659304800|      1660577379|[763743, 137492, ...|[1659304800, 1659...|[0, 0, 0, 0, 0, 0...|
|      3|         226|        1659304800|      1661109666|[1425967, 1425967...|[1659304800, 1659...|[1, 0, 0, 0, 1, 0...|
|      4|           3|        1659304800|      1659304900|[613619, 298827, ...|[1659304800, 1659...|           [0, 0, 2]|
+-------+------------+--