## File 01 - User-level Preprocessed Data Output
##### Group 12:

##### Hannah Schmuckler, mmc4cv

##### Rob Schwartz, res7cd

In this file, we create the preprocessed user-level data from event-level data, with a few features.

Outputs:
- The preprocessed data file is output to `processed_data/preprocessed_01.parquet`.
- Additionally, the raw data is filtered on the preprocessed data user ids: `processed_data/month_01_filtered.parquet`, 

### Set up Spark session and data schema

We can specify more options in the SparkSession creator, but currently the options are at the default settings.

In [1]:
%%time
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql.functions import col

spark = SparkSession.builder \
        .appName("project") \
        .getOrCreate()

sc = spark.sparkContext

schema = "`event_time` TIMESTAMP,`event_type` STRING,`product_id` INT,`category_id` BIGINT,`category_code` STRING,`brand` STRING,`price` FLOAT,`user_id` INT,`user_session` STRING"
#ddl_schema = T._parse_datatype_string(schema)

CPU times: user 216 ms, sys: 152 ms, total: 368 ms
Wall time: 5.18 s


### Read in dataframes

In [2]:
%%time
df1 = spark.read.schema(schema).csv("/project/ds5559/group12/raw_data/2020-01.csv")
df2 = spark.read.schema(schema).csv("/project/ds5559/group12/raw_data/2020-02.csv")

CPU times: user 2.78 ms, sys: 2.84 ms, total: 5.61 ms
Wall time: 1.81 s


### Limit number of records in dataframes

We can limit each dataframe to a smaller subset. Notably, the dataframe is arranged by time, so this is how the subset will be biased.

In [3]:
# df1=df1.limit(100000)
df1.createOrReplaceTempView("m1")

# df2=df11.limit(100000)
df2.createOrReplaceTempView("m2")

### Create a transformed table containing elements of interest for our model

We propose a basic table format (see https://docs.google.com/document/d/1NG4KGticBXn0D3PL5_zMxLV2Pr7A8PQtLcasxCOd1nA/edit).

Every row is a user_id who exists in M1 and may or may not exist in M2.
Columns include:
- `user_id` (ID)
- `T_total_spend` (response variable)
- `total_spend` (sum among all purchase events, NB: month 2. Will be 0 is user does not exist in month 2 or makes no purchase events)
- `total_spend` (sum among all purchase events)
- `user_sessions` (count of distinct user sessions/browsing sessions)
- `purchase_events` (count of distinct purchase events)




In [4]:
%%time
df = spark.sql("""SELECT

               /* ID */
               m1.user_id AS user_id,
               
               /* Output: Total spend in month 2; Will be 0 is user does not exist in month 2 or makes no purchase events */
               IFNULL(SUM(m2.price),0) AS T_total_spend,
               
               
               /* Total spend in month 1: we sum the price of any 'purchase' events */
               SUM(CAST(m1.event_type=='purchase' AS INT) * m1.price) AS total_spend,
               
               /* Total events in month 1: includes all event types (multiple per session) */
               COUNT(m1.event_type) AS total_events,
               
               /* Total purchase events in month 1: we sum the occurence of any 'purchase' events */
               /* SUM(CAST(m1.event_type=='purchase' AS INT)) AS purchase_events, */
               
               /* Total user sessions in month 1: we count all distinct user sessions */
               COUNT(DISTINCT m1.user_session) AS total_sessions
               
               FROM m1
            
            /* Note: This is a left join, so purchasers in month 2 must be in month 1 to be included in the output */
            LEFT JOIN 
            (
               SELECT * FROM m2
               WHERE event_type='purchase'
            ) m2
            
            ON m1.user_id=m2.user_id
            
            /* Prevent adding bad data where user_id is null */
            WHERE ISNULL(m1.user_id)<>1
            
            GROUP BY m1.user_id ORDER BY total_events DESC""")
df.show(5)

+---------+--------------------+--------------------+------------+--------------+
|  user_id|       T_total_spend|         total_spend|total_events|total_sessions|
+---------+--------------------+--------------------+------------+--------------+
|568782581|2.3885857005200386E8| 5.821735228239441E7|      926856|           532|
|582826305|1.9694101198085403E8|1.7104022296691895E7|      605784|           123|
|563599039|2.3802253637968063E8| 6.886928014480591E7|      442758|           108|
|568805468|  2.09597173788414E8|  3391851.5849990845|      425169|          4636|
|592727922|1.1673367678547668E8|   8990452.824829102|      423648|            78|
+---------+--------------------+--------------------+------------+--------------+
only showing top 5 rows

CPU times: user 5.73 ms, sys: 4.63 ms, total: 10.4 ms
Wall time: 53.4 s


#### Remove customers who did not make a purchase in month 1. 

In [5]:
df = df.filter(col('total_spend') > 0)

In [7]:
df.write.mode("overwrite").parquet("./processed_data/preprocessed_01.parquet")
print(df.count())
df.show(5)

359105
+---------+--------------------+--------------------+------------+--------------+
|  user_id|       T_total_spend|         total_spend|total_events|total_sessions|
+---------+--------------------+--------------------+------------+--------------+
|568782581|2.3885857005200386E8| 5.821735228239441E7|      926856|           532|
|582826305|1.9694101198085403E8|1.7104022296691895E7|      605784|           123|
|563599039|2.3802253637968063E8| 6.886928014480591E7|      442758|           108|
|568805468|  2.09597173788414E8|  3391851.5849990845|      425169|          4636|
|592727922|1.1673367678547668E8|   8990452.824829102|      423648|            78|
+---------+--------------------+--------------------+------------+--------------+
only showing top 5 rows



#### Write to CSV file

In [8]:
%%time
# Not necessary at this time, but this CSV can be written if desired
# kept_df.coalesce(1).write.option("header", "true").csv("./processed_data/temp_preprocessed_01.csv")

CPU times: user 1e+03 ns, sys: 2 µs, total: 3 µs
Wall time: 6.44 µs


#### Write the raw data, filtered on the appropriate user-ids, to files.

In [9]:
%%time
# month_01_filtered = df1.join(kept_df,'user_id','leftsemi')
month_01_filtered = df1.join(df,'user_id','leftsemi')
print(month_01_filtered.count())
#month_01_filtered.show(5)

15923973
+---------+-------------------+----------+----------+-------------------+--------------------+-------+-----+--------------------+
|  user_id|         event_time|event_type|product_id|        category_id|       category_code|  brand|price|        user_session|
+---------+-------------------+----------+----------+-------------------+--------------------+-------+-----+--------------------+
|512372691|2020-01-13 05:56:41|      view|   2502059|2232732092565815652|appliances.kitche...|  artel|54.93|de15fe24-bf76-4db...|
|512372691|2020-01-13 05:57:47|      view|   2502051|2232732092565815652|appliances.kitche...|  artel|54.72|de15fe24-bf76-4db...|
|512372691|2020-01-13 05:58:18|      view|   2500289|2232732092565815652|appliances.kitche...|  bosch|330.8|de15fe24-bf76-4db...|
|512372691|2020-01-13 06:02:02|      view|  16400016|2053013551957672134|                null|rondell|72.05|de15fe24-bf76-4db...|
|512372691|2020-01-13 06:02:11|      cart|  16400016|2053013551957672134|        

CPU times: user 1 µs, sys: 2 µs, total: 3 µs
Wall time: 6.2 µs


In [11]:
%%time
month_01_filtered.write.mode("overwrite").parquet("./processed_data/month_01_filtered.parquet")

CPU times: user 23.9 ms, sys: 22.2 ms, total: 46 ms
Wall time: 5min 12s
