In [None]:
# Import other modules not related to PySpark
from datetime import *
import matplotlib.pyplot as plt
import numpy as np
import sweetviz as sv
from IPython.core.interactiveshell import InteractiveShell
import statistics as stats
# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all" 

In [None]:
import pyspark
import pyspark.pandas as ps
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql import Window
from pyspark.sql.functions import lit, desc, col, size, array_contains\
, isnan, udf, hour, array_min, array_max, countDistinct, to_date, to_timestamp
from pyspark.sql.types import *

In [None]:
MAX_MEMORY = '15G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY) \
        .set("spark.sql.legacy.timeParserPolicy", "LEGACY")
        
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("transaction_data") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

In [None]:
spark = init_spark()
filename_data = './transaction_data.csv'
df = spark.read.csv(filename_data, inferSchema=True, header=True)

print('Data overview')
df.printSchema()

In [None]:
print('Data frame describe (string and numeric columns only)')
df.describe().toPandas()

print(f'Total {df.count()} rows')
df.limit(5).toPandas()

#### 1. Clean data and filter Outliers (Explain your ideas and ways how to find out Outliers and the ways to treat them)

In [None]:
# fill -1 with None
df = df.na.replace(to_replace={-1: None})
print("df length before cleaning: ", df.count())

# drop duplicate
df = df.drop_duplicates()
print("df length after drop duplicates: ", df.count())

# drop NA
df = df.na.drop()
print("df length after drop na: ", df.count())

# drop NumberOfItemsPurchased <= 0
df = df.filter(col("NumberOfItemsPurchased") > 0)
print("df length after drop non-positive NumberOfItemsPurchased: ", df.count())

In [None]:
print("Same TransactionId and ItemCode but different CostPerItem")

df.select(['TransactionId', 'ItemCode', 'CostPerItem'])\
    .drop_duplicates()\
    .groupBy(['TransactionId', 'ItemCode'])\
    .count()\
    .filter(col('count') > 1)\
    .count()

In [None]:
# UserId, TransactionId, ItemCode are category.

# Calculate Z-score for CostPerItem and NumberOfItemsPurchased

for column in ['CostPerItem', 'NumberOfItemsPurchased']:
    stats = df.select(functions.mean(col(column)).alias('mean'), functions.stddev(col(column)).alias('stddev')).collect()[0]
    df = df.withColumn(f'{column}_z_score', (col(column) - stats['mean']) / stats['stddev'])

outlier_candidate_df = df.filter('abs(CostPerItem_z_score) > 3').filter('abs(NumberOfItemsPurchased_z_score) > 3')

# Some of rows that have Z-score > 3, but it's seem to be normal because price and quantity can be arbitrary.

df = df.drop('CostPerItem_z_score', 'NumberOfItemsPurchased_z_score')

#### 2. Calculate the number of Items purchased and prices in each month

In [None]:
df = df.sort('TransactionTime', ascending=True)

df = df.withColumn('date', to_date(df['TransactionTime'], 'E MMM dd HH:mm:ss zzz yyyy').cast(DateType()).cast('timestamp'))

# found out that some date belong to 2028, which is not valid date. So I will filter them out.
df = df.filter(col('date') < functions.current_date())

df = df.withColumn('TotalItemCost', col('NumberOfItemsPurchased') * col('CostPerItem'))

month_grouped_df = df.groupBy(functions.month('date').alias('month'), functions.year('date').alias('year'))
result_df = month_grouped_df.agg(
    functions.sum('NumberOfItemsPurchased').alias('NumberOfItemsPurchased_by_month'),
    functions.sum('TotalItemCost').alias('TotalItemCost_by_month'),
).sort('year', 'month')

result_df.show()

#### 3. Calculate the number of items purchased for each userID in 30 days for each day (Add new column) 

In [None]:
days = lambda i: i * 86400

grouped_df = df.groupBy('UserId', 'date').agg(functions.sum('NumberOfItemsPurchased').alias('NumberOfItemsPurchased'))

w = Window.partitionBy('UserId').orderBy(F.col("date").cast('long')).rangeBetween(-days(30), 0)
result_df = grouped_df.withColumn('NumberOfItemsPurchased_30days', functions.sum("NumberOfItemsPurchased").over(w))

result_df.sort('UserId', 'date').show()

In [None]:
df.toPandas().to_csv('./clean_data.csv', index=False)