# Spark Config

In [57]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import sum,avg,max, min
from pyspark.sql.functions import mean as _mean, stddev as _stddev

from pyspark.sql.window import Window

from pyspark.sql.dataframe import DataFrame
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number
from pyspark.sql.functions import rank
from pyspark.sql.functions import dense_rank
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.types import *

import findspark
findspark.init()

from functools import reduce

from pyspark.sql.functions import udf
import sys

import warnings
warnings.filterwarnings('ignore')

spark = SparkSession.builder.master('local').appName('foo').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

df = spark.read.csv("transaction_data.csv", header = 'true')

In [58]:
def add_col(df, col, func):
    df_new = df.withColumn(col, func)
    return df_new

def rename_col(df, col_mapping):
    for c in col_mapping:
        df = df.withColumnRenamed(existing=c[0], new=c[1])
    return df

def checkNull(df):
    '''
    prints total rows in dataframe, number and percentage of nulls in each column 
    '''
    total_rows = df.count()
    cols = df.columns 
    abs_null = df.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in cols])
    perc_null = abs_null
    for c in cols:
        perc_null = perc_null.withColumn('{}'.format(c), f.round(f.col(c)/total_rows*100,2))
    print ('Total Rows: {}'.format(total_rows))
    print ('Absolute Nulls')
    abs_null.show(truncate=False, vertical=True)
    print ('% Percentage Nulls')
    perc_null.show(truncate=False, vertical=True)

# Clean data and remove abnormal values
- Drop null values
- Drop duplicate rows
- Drop rows where 'ItemCode' values are < 0
- Drop rows where 'NumberOfItemsPurchased' <= 0
- Drop rows where 'CostPerItem' <= 0
- There are some UserId where their values = "-1". In this case, we replace these values by the corresponding TransactionId values.
- Add column 'Totalcost' = 'CostPerItem' * 'NumberOfItemsPurchased'

In [59]:
def clean_data(df):
    df = df.withColumn("constant", lit(0))
    filtered_df = df.filter((f.col('NumberOfItemsPurchased') > f.col('constant')) 
                            & (f.col('CostPerItem') > f.col('constant'))
                            & (f.col('ItemCode') >= f.col('constant')) 
                           ).na.drop(how='any')\
                            .distinct() # Drop duplicate rows
    
    # If UserId == -1, then assign UserId := TransactionId
    filtered_df = filtered_df.withColumn('UserId_new',
                                         when((f.col('UserId') <=  f.col('constant')), f.col('TransactionId')).\
                                         otherwise(f.col('UserId'))
                                        ).drop('constant')
    
    # Change type
    filtered_df = filtered_df.withColumn('Totalcost', 
                                         format_number(f.col('CostPerItem')*f.col('NumberOfItemsPurchased'), 2)
                                        )
    filtered_df = filtered_df.withColumn('NumberOfItemsPurchased', f.col('NumberOfItemsPurchased').cast(IntegerType()))
    filtered_df = filtered_df.withColumn('Totalcost', f.col('Totalcost').cast(FloatType()))
    filtered_df = filtered_df.withColumn('UserId_new', f.col('UserId_new').cast(IntegerType()))
    filtered_df = filtered_df.withColumn('ItemCode', f.col('ItemCode').cast(IntegerType()))
    return filtered_df

# Filter Outliers 
(i.e find out the values which are too large or too small when comparing which others)

To find out the values which are too large or too small, we can check whether a value is in the interval

(Mean(column_name) - 3 x StDev(column_name); Mean(column_name) + 3 x StDev(column_name))

In [60]:
def mean_and_stdev(df, column_name):
    # Find mean and standard deviation
    df_stats = df.select(_mean(col(column_name)).alias('mean'),
                         _stddev(col(column_name)).alias('std')
                        ).collect()
    mean = df_stats[0]['mean']
    std = df_stats[0]['std']
    return [mean, std]

def find_outliers(df, column_name):
    df2 = clean_data(df)
    mean = mean_and_stdev(df2, column_name)[0]
    std = mean = mean_and_stdev(df2, column_name)[1]
    
    # Find outliers
    df2 = df2.withColumn('LowerBound' + '_' + column_name, 
                         lit(mean - 3*std)
                        )
    df2 = df2.withColumn('UpperBound' + '_' + column_name, 
                         lit(mean + 3*std)
                        )
    outlier = df2.filter((f.col(column_name) <=  f.col('LowerBound' + '_' + column_name))
                          | (f.col(column_name) >=  f.col('UpperBound' + '_' + column_name))
                          ).drop('LowerBound' + '_' + column_name, 'UpperBound' + '_' + column_name)
    return outlier

In [61]:
find_outliers(df, 'NumberOfItemsPurchased').show() 
#find_outliers(df, 'NumberOfItemsPurchased')/clean_data(df).count() = 0.0004974569656840463

+------+-------------+--------------------+--------+--------------------+----------------------+-----------+--------------+----------+---------+
|UserId|TransactionId|     TransactionTime|ItemCode|     ItemDescription|NumberOfItemsPurchased|CostPerItem|       Country|UserId_new|Totalcost|
+------+-------------+--------------------+--------+--------------------+----------------------+-----------+--------------+----------+---------+
|376929|      6396049|Tue Feb 19 14:46:...|  466137|      POPCORN HOLDER|                  3600|        1.0|United Kingdom|    376929|     null|
|307566|      6392925|Mon Feb 18 11:16:...|  484764|  RABBIT NIGHT LIGHT|                  4320|       2.48|   Netherlands|    307566|     null|
|270921|      6240399|Sat Dec 01 09:39:...|  457527|RAIN PONCHO RETRO...|                  5364|       1.18|United Kingdom|    270921|     null|
|336609|      6271067|Wed Dec 19 08:05:...|  467733|FELTCRAFT DOLL MOLLY|                  2160|       3.18|United Kingdom|    336

# Calculate the number of Items purchased and prices in each month

In [62]:
def add_time(df):
    df_clean = clean_data(df)
    df_clean = add_col(df_clean, 'day', substring('TransactionTime',9,2))

    df_clean = add_col(df_clean, 'month_string', substring('TransactionTime',5,3))
    df_clean = add_col(df_clean, 'month', when((col('month_string') == 'Jan'), '01').\
                                          when((col('month_string') >= 'Feb'), '02').\
                                          when((col('month_string') >= 'Mar'), '03').\
                                          when((col('month_string') >= 'Apr'), '04').\
                                          when((col('month_string') >= 'May'), '05').\
                                          when((col('month_string') >= 'Jun'), '06').\
                                          when((col('month_string') >= 'Jul'), '07').\
                                          when((col('month_string') >= 'Aug'), '08').\
                                          when((col('month_string') >= 'Sep'), '09').\
                                          when((col('month_string') >= 'Oct'), '10').\
                                          when((col('month_string') >= 'Nov'), '11').\
                                          otherwise('12')
                      ).drop('month_string')

    df_clean = add_col(df_clean, 'year', substring('TransactionTime',25,4))

    df_clean = df_clean.withColumn("date",to_date(concat_ws("-",col("year"),col("month"),col("day")),"yyyy-MM-dd"))

    return df_clean

def report_timeframe(df, timeframe):
    if timeframe == 'year':
        return df.groupBy(['year']).sum('NumberOfItemsPurchased', 'Totalcost')
    elif timeframe == 'month':
        return df.groupBy(['year', 'month']).sum('NumberOfItemsPurchased', 'Totalcost')
    elif timeframe == 'day':
        return df.groupBy(['year', 'month', 'day']).sum('NumberOfItemsPurchased', 'Totalcost')

df_clean = add_time(df)

In [63]:
report_timeframe(df_clean, 'month').show()

+----+-----+---------------------------+--------------------+
|year|month|sum(NumberOfItemsPurchased)|      sum(Totalcost)|
+----+-----+---------------------------+--------------------+
|2018|   04|                    3108891|   7571801.326817036|
|2019|   01|                    1608345|    4161341.62011528|
|2028|   02|                      71193|  187719.00020122528|
|2018|   02|                    6150372|1.6527933875994205E7|
|2019|   02|                    1349277|   3200872.478633404|
+----+-----+---------------------------+--------------------+



# Calculate the number of items purchased for each userID in 30 days for each day

In [64]:
#last_date = df_clean.select(max('date').alias("last_date")).collect()[0]["last_date"]
def last_from_x_to_y_days(df, x, y):
    last_date = df.select(max('date').alias("last_date")).collect()[0]["last_date"]
    df = df.withColumn("last_date", lit(last_date))
    last_from_x_to_y_days_df = df.filter((f.col('date') <= f.date_add(f.col('last_date'), -x)) 
    & (f.col('date') >= f.date_add(f.col('last_date'), -y)))
    return last_from_x_to_y_days_df

# Because of the cleaning process in Question 1, we use 'UserId_new' instead of 'UserId'
df_30days = last_from_x_to_y_days(df_clean, 0, 30)\
            .drop('UserId', 'last_date')\
            .groupBy(['UserId_new', 'date']).sum('NumberOfItemsPurchased')\
            .withColumnRenamed('sum(NumberOfItemsPurchased)', 'NumberOfItemsPurchased_30days')

df_clean_30days = last_from_x_to_y_days(df_clean, 0, 30)\
                    .join(df_30days, 
                          (df_30days['UserId_new'] == last_from_x_to_y_days(df_clean, 0, 30)['UserId_new']) 
                          & (df_30days['date'] == last_from_x_to_y_days(df_clean, 0, 30)['date']),
                          'outer'
                         )\
                    .drop('last_date', 'day', 'month', 'year')

In [65]:
df_clean_30days.show()

+------+-------------+--------------------+--------+--------------------+----------------------+-----------+-------+----------+---------+----------+----------+----------+-----------------------------+
|UserId|TransactionId|     TransactionTime|ItemCode|     ItemDescription|NumberOfItemsPurchased|CostPerItem|Country|UserId_new|Totalcost|      date|UserId_new|      date|NumberOfItemsPurchased_30days|
+------+-------------+--------------------+--------+--------------------+----------------------+-----------+-------+----------+---------+----------+----------+----------+-----------------------------+
|259287|      5913886|Fri Feb 18 10:57:...|  472437|SET OF 2 TINS VIN...|                    12|       5.87|Iceland|    259287|    70.44|2028-02-18|    259287|2028-02-18|                          849|
|259287|      5913886|Fri Feb 18 10:57:...| 1787436|BLACK CANDELABRA ...|                    36|        2.9|Iceland|    259287|    104.4|2028-02-18|    259287|2028-02-18|                          

# A Machine Learning Algorithm for a Recommendation System

We will use the dataframe 'last_from_x_to_y_days(df_clean, 0, 30)'.

- Step 1. Create a 'Rating' column: For each UserId, we calculate the ratio of 'NumberOfItemsPurchased' of each ItemDescription to the total number of items purchased.

- Step 2. To the best of my knowledge, Collaborative Filtering (CF) is the only Machine Learning algorithm for Recommendation Systems in Spark ML (https://spark.apache.org/docs/latest/ml-collaborative-filtering.html). We will use this one to write a Machine Learning algorithm as requested.

# Step 1

In [66]:
def Rating(df):
    df_MLAlgorithm1 = df.groupBy(['UserId_new']).sum('NumberOfItemsPurchased')\
                                                .withColumnRenamed('UserId_new', 'UserId_new1')\
                                               
    df_MLAlgorithm = df.groupBy(['UserId_new', 'ItemCode', 'ItemDescription'])\
                    .sum('NumberOfItemsPurchased')\
                    .withColumnRenamed('sum(NumberOfItemsPurchased)', 'NumberOfItems')
    df_MLAlgorithm = df_MLAlgorithm.join(df_MLAlgorithm1, 
                                         df_MLAlgorithm1['UserId_new1'] == df_MLAlgorithm['UserId_new']
                                        ).drop('UserId_new1')
    df_MLAlgorithm = df_MLAlgorithm.withColumn('Rating',
                                               format_number(f.col('NumberOfItems')/f.col('sum(NumberOfItemsPurchased)'),2)
                                              )
    # Change type of 'Rating' to be numeric
    df_MLAlgorithm = df_MLAlgorithm.withColumn('Rating', f.col('Rating').cast(FloatType()))
    return df_MLAlgorithm

df_MLAlgorithm = Rating(last_from_x_to_y_days(df_clean, 0, 30))

# Step 2

In [67]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Build a model
(training, test) = df_MLAlgorithm.randomSplit([0.8, 0.2])

als = ALS(maxIter=5, regParam=0.01, userCol="UserId_new", itemCol="ItemCode", ratingCol="Rating", coldStartStrategy="drop")

model = als.fit(training)

# Evaluate the model
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [68]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.050983873353492866


In [69]:
# Top 5 recommendations for each user
userRecs = model.recommendForAllUsers(5)
userRecs.collect()

[Row(UserId_new=366450, recommendations=[Row(ItemCode=1665741, rating=0.7155270576477051), Row(ItemCode=471849, rating=0.4873247444629669), Row(ItemCode=471870, rating=0.43804696202278137), Row(ItemCode=469308, rating=0.3004089891910553), Row(ItemCode=475293, rating=0.2958352863788605)]),
 Row(UserId_new=318780, recommendations=[Row(ItemCode=1665741, rating=0.3454848825931549), Row(ItemCode=469308, rating=0.2990444004535675), Row(ItemCode=1527561, rating=0.2150140255689621), Row(ItemCode=1781556, rating=0.18689508736133575), Row(ItemCode=471849, rating=0.17037218809127808)]),
 Row(UserId_new=291480, recommendations=[Row(ItemCode=481845, rating=0.232746884226799), Row(ItemCode=1783887, rating=0.23154011368751526), Row(ItemCode=1783866, rating=0.20996926724910736), Row(ItemCode=478632, rating=0.19477348029613495), Row(ItemCode=478611, rating=0.19245746731758118)]),
 Row(UserId_new=366870, recommendations=[Row(ItemCode=1780023, rating=0.24612247943878174), Row(ItemCode=440643, rating=0.12