## Objectives:
As part of the evaluation of this course, students execute a Big Data project in Spark as a team
to reach the learning objectives of solving and presenting an end-to-end solution to a Big Data 
problem in an intercultural team; and of demonstrating an expertise on key concepts, 
techniques, and trends (among others). In this project, they will apply the knowledge and 
techniques seen in class to a lifelike Big Data project. Furthermore, they will learn how to 
work in an intercultural team, how to develop and share business insights to a business team
and technical analyses to a data science team.

## Context:
BLU is a French e-commerce player offering B2C and B2B customers a broad range of products 
across more than 100 product categories ranging from kitchen utensils to computer games.
They have been active for 5 years and achieved a nice growth of 12% in 2020, 9% in 2021, and 
7% in 2022. To get more insights how the company is performing compared to its competitors, 
the marketing team ordered a report from a consumer insights agency comparing BLU to its 
two main competitors Amazon.fr and Cdiscount.fr. This revealed that BLU is able to acquire a 
larger share of new customers compared to the other two (7.8% versus 2.8% for Amazon and 
3.4% for Cdiscount), but is underperforming in repeat business from existing customers (6.2% 
vs. 19.6% for Amazon and 14.4% for Cdiscount). As a next step, BLU’s marketing team wants 
to have a deeper understanding of its customer base and use predictive modeling to inform
its business decisions.


They want you to make a prediction model to predict whether the review score of a given 
order will be positive (4-5 on 5) or negative (1-3 on 5) given some input features.
BLU’s data science team has given you a dataset of 6 tables of +/- 50k orders placed between 
September 2020 and June 2022. Your goal is to build a prediction model with the highest 
possible performance using the provided data, while respecting the fundamental principles 
of good data science. You can be creative and innovative how you use the available 
information (e.g., create new variables, use unstructured content, etc); as you would do in 
practice! The team that achieves the highest accuracy on the hold-out sample gets +2 bonus 
points on their assignment; respecting of course the appropriate modeling setup process 
(e.g., no AUC-hacking or other methods of cheating)! Furthermore, each team that solves the 
case using a multi-class classification model (where a probability is given per label) gets +1 
bonus point. Describe your approach in the technical section of the presentation. This section 
should be concise and destined for a data science audience (e.g., describe variable creation, 
algorithms used, cross-validation approach, evaluation metric(s)).

In [0]:
file_path = '/FileStore/tables/group_data'

#### Import functions

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd

#### Data transformation of individual tables

#### Orders

In [0]:
orders=spark\
      .read\
      .format("csv")\
      .option("header","true")\
      .option("inferSchema","true")\
      .load("/FileStore/tables/group_data/orders.csv")

# inspect the table
orders.show(3)
orders.printSchema()

In [0]:
# get the number of obervations/rows
print('Total rows:', orders.count())

# distinct rows/observations
print('Unique rows:', orders.distinct().count())

# get number of distinct order_id
print('Unique order id:', orders.select("order_id").distinct().count())

In [0]:
# Remove NA values from data
orders = orders.filter(~(orders.order_id == "NA"))

#row count after remvoing NA
orders.count()

In [0]:
# Drop duplicate order_id
orders = orders.dropDuplicates(["order_id"])
orders.count()

In [0]:
# rename columns
orders = orders.withColumnRenamed("order_purchase_timestamp", "order_date") \
               .withColumnRenamed("order_approved_at", "order_approved")\
               .withColumnRenamed("order_delivered_carrier_date", "delivered_to_logistics")\
               .withColumnRenamed("order_delivered_customer_date", "delivered_to_customer")\
               .withColumnRenamed("order_estimated_delivery_date", "estimated_dlvry_date")
orders.show(3)

In [0]:
#convert string formats of date to date formats
orders = orders.withColumn("order_date", to_date(col("order_date")))\
               .withColumn("order_approved", to_date(col("order_approved")))\
               .withColumn("delivered_to_logistics", to_date(col("delivered_to_logistics")))\
               .withColumn("delivered_to_customer", to_date(col("delivered_to_customer")))\
               .withColumn("estimated_dlvry_date", to_date(col("estimated_dlvry_date")))
orders.show(3)

In [0]:
# calculate time interval in days between order_dates, approval dates and delivered dates
orders = orders.withColumn("days_to_approval", datediff("order_approved",col("order_date"))) \
               .withColumn("days_to_delivery",datediff("delivered_to_customer",col("order_date"))) \
               .withColumn("delivery_response",datediff("delivered_to_customer",col("estimated_dlvry_date")))

# delivery_response: negative values indicate that delivery came earlier than estimated delivery date (eg 4 days before estimated delivery date)
#                  : positive values indicate that delivery came later than estimated delivery date (eg 4 days after estimated delivery date)

orders.show(3)

In [0]:
# extract months when orders where placed and delivered from order_date and delivered_to_customer columns
orders = orders.withColumn('order_month',month(orders.order_date))
orders = orders.withColumn('delivery_month',month(orders.delivered_to_customer))
display(orders)

In [0]:
#counting null values for each columns
display(orders.select([count(when(col(c).isNull(), c)).alias(c) for c in orders.columns]))

In [0]:
# sort dates columns to investigate the reason for missing values  
orders = orders.sort(col("order_date").asc(),
                     col("order_approved").asc(),
                     col("delivered_to_logistics").asc(),
                     col("delivered_to_customer").asc(),
                     col("estimated_dlvry_date").asc())
display(orders)

#### Order_payments

In [0]:
orders_payment = spark\
                .read\
                .format("csv")\
                .option("header","true")\
                .option("inferSchema","true")\
                .load("/FileStore/tables/group_data/order_payments.csv")

# inspect the table
orders_payment.show(3)
orders_payment.printSchema()

In [0]:
# get the number of obervations/rows
print('Total rows:', orders_payment.count())

# distinct rows/observations
print('Unique rows:', orders_payment.distinct().count())

# get number of distinct order_id
print('Unique order id:', orders_payment.select("order_id").distinct().count())

In [0]:
# Removing NA values from data
orders_payment = orders_payment.filter(~(orders_payment.order_id == "NA"))

#row count after remvoing NA
orders_payment.count()

In [0]:
# Dropping duplicate rows if exists
orders_payment = orders_payment.dropDuplicates()
orders_payment.count()

In [0]:
# Inspect the table
orders_payment.describe().show()

In [0]:
# Count the number of nulls per column
orders_payment.select([count(when(col(c).isNull(), c)).alias(c) for c in orders_payment.columns]).show()

In [0]:
# get payment value for each payment method by order_id
payment_method = orders_payment\
                .groupBy("order_id")\
                .pivot("payment_type")\
                .sum("payment_value")

# replace null values by 0
payment_method = payment_method.fillna(0)
print(payment_method.count())
payment_method.show(3)

In [0]:
# Use Binarizer to create dummy variables of payment type with threshold of 0. 
# If the customer made no payment using that method, it assigns 0 else it assigns 1

convert_dummy = Binarizer(threshold=0.0, 
                          inputCols=["credit_card", "debit_card", "mobile", "voucher"], 
                          outputCols=["credit_pymt", "debit_pymt", "mobile_pymt", "voucher_pymt"])

payment_method = convert_dummy.transform(payment_method)

# no of rows/observations
print(payment_method.count())

payment_method.show(3)

In [0]:
# get count of the distinct payment methods used per order_id
payment_method = payment_method.withColumn("distinct_payment_modes", 
                                           expr("credit_pymt + debit_pymt + mobile_pymt + voucher_pymt"))

In [0]:
# Use Binarizer with threshold of 1 to see if an order had multiple payment methods. (eg credit card and voucher) 
# If the order only had one payment method, it assigns 0 else it assigns 1
multiple_pymt_mode = Binarizer(threshold=1.0, 
                          inputCol="distinct_payment_modes", 
                          outputCol="multiple_pymt_mode")

payment_method = multiple_pymt_mode.transform(payment_method)

# no of rows/observations
print(payment_method.count())

payment_method.show(6)

In [0]:
# get orders that were paid in one installment
single_payment = orders_payment.filter(orders_payment.payment_installments <= 1)

# no of rows/observations
print(single_payment.count())

single_payment.show(3)

In [0]:
# get orders that were paid in more than one installment
mult_payments = orders_payment.filter(orders_payment.payment_installments > 1)

# no of rows/observations
print(mult_payments.count())

mult_payments.show(3)

In [0]:
# some order ids had multiple payment methods and multiple payment installments. for example, if voucher and credit card was used on one order id, where voucher had 1
# payment installment and credit card had 8 payment installments. in situations such as these, we would categorize such order as having multiple payment installments
# because of the fact that credit card has 8 payment installments
hybrid_pymt = single_payment.join(mult_payments, "order_id", "inner").select(single_payment["*"])
print(hybrid_pymt.count())
hybrid_pymt.show(3)

In [0]:
# get order ids that only exists in single_payment but not in mult_payments or hybrid_pymt tables, that is, order_id that only had one single payment installment across
# whatever number of payment methods that was used.
single_payment = single_payment.join(hybrid_pymt, "order_id", "leftanti")
print(single_payment.count())
single_payment.show(3)

In [0]:
# create a mult_installments column in single_payment, mult_payments and hybrid_pymt tables stating if order_id was completed using multiple payment installments
# 0 for No and 1 for Yes
single_payment = single_payment.withColumn("mult_installments", lit(0))
hybrid_pymt = hybrid_pymt.withColumn("mult_installments", lit(1))
mult_payments = mult_payments.withColumn("mult_installments", lit(1))

In [0]:
# get a union of single_payment, mult_payments and hybrid_pymt tables
orders_payment2 = single_payment.union(hybrid_pymt)
orders_payment2 = orders_payment2.union(mult_payments)
print(orders_payment2.count())
display(orders_payment2)

In [0]:
# get the number of obervations/rows
print('Total rows:', orders_payment2.count())

# distinct rows/observations
print('Unique rows:', orders_payment2.distinct().count())

# get number of distinct order_id
print('Unique order id:', orders_payment2.select("order_id").distinct().count())

In [0]:
# select only the necessary columns from orders_payment2
distinct_order_pymt = orders_payment2.select("order_id","mult_installments").distinct()
print(distinct_order_pymt.count())
distinct_order_pymt.show(3)

In [0]:
# join distinct_order_pymt and payment_method
order_pymt_final = payment_method.join(distinct_order_pymt, "order_id", "inner")
print(order_pymt_final.count())
display(order_pymt_final)

#### Order_reviews

In [0]:
order_reviews = spark\
                .read\
                .format("csv")\
                .option("header","true")\
                .option("inferSchema","true")\
                .load("/FileStore/tables/group_data/order_reviews.csv")

# inspect the table
order_reviews.show(3)
order_reviews.printSchema()

In [0]:
# inspect data
order_reviews.describe().show()

In [0]:
# get the number of obervations/rows
print('Total rows:', order_reviews.count())

# get number of distinct order_id
print('Unique order id:', order_reviews.select("order_id").distinct().count())

# get number of distinct review_id
print('Unique review id:', order_reviews.select("review_id").distinct().count())

In [0]:
# get number of reviews per order 
display(order_reviews.groupBy("order_id")\
                     .agg(count("review_id").alias("review_count")))

In [0]:
# sort the data in descending order and add unique row id column for review id to get the most recent review id for each order 
review = order_reviews.withColumn("unique_review_rowID",row_number()
                      .over(Window.partitionBy("review_id")
                      .orderBy(col("review_answer_timestamp")
                      .desc())))

review = review.filter(review.unique_review_rowID==1)
print('Unique review id:', review.select("review_id").distinct().count())
display(review)

In [0]:
# counting review_id for each order 
display(review.groupBy("order_id")
              .agg(count("review_id").alias("review_cnt")))

In [0]:
# add unique row id based on order id to have a unique review id for each unique order ID
fnl_review = review.withColumn("unique_order_rowID",row_number()
                   .over(Window.partitionBy("order_id")
                   .orderBy(col("review_answer_timestamp")
                   .desc())))

fnl_review = fnl_review.filter(fnl_review.unique_order_rowID==1)

print('Unique order id:', fnl_review.select("order_id").distinct().count())
print('Unique review id:', fnl_review.select("review_id").distinct().count())

fnl_review = fnl_review.drop(*('unique_order_rowID',
                             'unique_review_rowID'))
display(fnl_review)

In [0]:
# convert dates into date formats
fnl_review = fnl_review.withColumn("review_answer_timestamp",to_date(col("review_answer_timestamp")))

# calculate the difference between when the review was created (that is, sent) and the time the user responsed to the review survey
fnl_review = fnl_review.withColumn("review_repns_time",datediff("review_answer_timestamp",col("review_creation_date")))
display(fnl_review)

In [0]:
#Count the number of nulls per column
fnl_review.select([count(when(col(c).isNull(), c)).alias(c) for c in fnl_review.columns]).show()

In [0]:
#Inspect data
fnl_review.describe().show()

#### Order_items

In [0]:
order_items = spark\
             .read\
             .format("csv")\
             .option("header","true")\
             .option("inferSchema","true")\
             .load("/FileStore/tables/group_data/order_items.csv")

# inspect the table
order_items.show(3)
order_items.printSchema()

In [0]:
# number of observations/rows
print('Total rows:', order_items.count())

#count of unique product id
print('Unique product id:', order_items.select("product_id").distinct().count())

#count of unique order_id
print('Unique order id:', order_items.select("order_id").distinct().count())

In [0]:
#Remove duplicates
order_items = order_items.drop_duplicates() 
print('Unique order id:', order_items.select("order_id").distinct().count())

In [0]:
# inspect data
order_items.describe().show()

#### Products

In [0]:
products=spark\
        .read\
        .format("csv")\
        .option("header","true")\
        .option("inferSchema","true")\
        .load("/FileStore/tables/group_data/products.csv")

# inspect the table
products.show(3)
products.printSchema()

In [0]:
# number of observations/rows
print('Total rows:', products.count())

#count of unique product id
print('Unique product id:', products.select("product_id").distinct().count())

In [0]:
# merge products and order_items table based on product id
prod_order = order_items.join(products,on=["product_id"], how="left")
display(prod_order)

#### Category

In [0]:
# we created a more concise product category by grouping similar product categories together in excel and then read it into spark
category_df = spark\
              .read\
              .format("csv")\
              .option("header","true")\
              .option("inferSchema","true")\
              .load("/FileStore/tables/group_data/products_cat_dict.csv")

# inspect the table
category_df.show(3)
category_df.printSchema()

In [0]:
# merge prod_order and category_df
prod_order = prod_order.join(category_df,on=["product_category_name"], how="left")
display(prod_order)

In [0]:
#group prod_order by order id and aggregate other columns 
fnl_prd_order=prod_order.groupBy("order_id")\
                        .agg(countDistinct("product_id").alias("nbr_distict_item"),
                             count("product_id").alias("total_qty"), 
                             round(sum("price"),2).alias("total_price"),
                             round(sum("shipping_cost"),2).alias("total_ship_cost"),
                             round(mean("product_name_lenght"),2).alias("order_prod_avg_length"),
                             round(mean("product_description_lenght"),2).alias("order_avg_desc_length"),
                             round(mean("product_photos_qty"),2).alias("order_avg_photo_qty"),
                             round(mean("product_weight_g"),2).alias("order_avg_weight_g"),
                             round(mean("product_length_cm"),2).alias("order_avg_length_cm"),
                             round(mean("product_height_cm"),2).alias("order_avg_height_cm"),
                             round(mean("product_width_cm"),2).alias("order_avg_width_cm"),
                             countDistinct("new_categories").alias("distinct_prod_category"))

In [0]:
# calculate order avg vol in cm
fnl_prd_order = fnl_prd_order.withColumn("order_avg_volume_cm", 
                                           round(expr("order_avg_length_cm * order_avg_height_cm * order_avg_width_cm"),2))

# check if distinct products were ordered; 1 - Yes, 0 - No
fnl_prd_order = fnl_prd_order.withColumn("distinct_items", 
                                         when(col("nbr_distict_item")>1, 1).otherwise(0))

# check if products ordered had distinct product categories; 1 - Yes, 0 - No
fnl_prd_order = fnl_prd_order.withColumn("multiple_catgories", 
                                         when(col("distinct_prod_category")>1, 1).otherwise(0))

display(fnl_prd_order)

In [0]:
display(fnl_prd_order.describe())

#### Basetable

In [0]:
# merge all tables to create base table
base = fnl_review.join(orders, on=["order_id"], how="left")
base = base.join(order_pymt_final,on=["order_id"], how="left")
base = base.join(fnl_prd_order,on=["order_id"], how="left")
print('Basetable rows:', base.count())

In [0]:
display(base)

In [0]:
# examine the schema
base.printSchema()

In [0]:
# number many variables 
len(base.columns)

In [0]:
# calculate at what point the reviews were sent to the customers
base = base.withColumn("review_sent", when(((col("review_creation_date") <= col("order_date")) |
                                            (col("review_creation_date") < col("delivered_to_logistics"))),"Before Logisitcs")
                                     .when(((col("review_creation_date") >= col("delivered_to_logistics")) &
                                            (col("review_creation_date") < col("delivered_to_customer"))),"After Logisitcs")
                                     .when(col("review_creation_date") >= col("delivered_to_customer"),"After Delivery")
                                     .otherwise("match not found"))

# Before Logistics: review was sent before or on the same day as the order date (to account for orders that were cancelled and never made it to the logisitcs company) 
#                   or before order was sent to the logistics company.
# After Logisitcs: review was between the date when order was sent to the logistics company and before the order was delivered to the customer
# After Delivery: review was sent after order was delivered to customer

display(base)

In [0]:
# get information onif the customer is ordering for the first time or customer is a repeat customer
cust_type = base.groupBy("customer_id")\
                .agg(count("customer_id").alias("count"))\
                .withColumn("customer_type",
                            when(col("count")>1,"Return")\
                           .otherwise("New"))
print(cust_type.count())
cust_type.show(5)

In [0]:
# merge cust_type to base table
cust_type = cust_type.select("customer_id","customer_type")
base = base.join(cust_type, on=["customer_id"], how="left")
print(base.count())
display(base)

In [0]:
#Count the number of nulls per column
display(base.select([count(when(col(c).isNull(), c)).alias(c) for c in base.columns]))

In [0]:
display(base)

### Holdout Data

#### Holdout Orders

In [0]:
test_orders=spark\
            .read\
            .format("csv")\
            .option("header","true")\
            .option("inferSchema","true")\
            .load("/FileStore/tables/group_data/test_orders.csv")

# get the number of obervations/rows
print('Total rows:', test_orders.count())

# distinct rows/observations
print('Unique rows:', test_orders.distinct().count())

# get number of distinct order_id
print('Unique order id:', test_orders.select("order_id").distinct().count())

# Remove NA values from data
test_orders = test_orders.filter(~(test_orders.order_id == "NA"))

# Drop duplicate order_id
test_orders = test_orders.dropDuplicates(["order_id"])

# rename columns
test_orders = test_orders.withColumnRenamed("order_purchase_timestamp", "order_date") \
                         .withColumnRenamed("order_approved_at", "order_approved")\
                         .withColumnRenamed("order_delivered_carrier_date", "delivered_to_logistics")\
                         .withColumnRenamed("order_delivered_customer_date", "delivered_to_customer")\
                         .withColumnRenamed("order_estimated_delivery_date", "estimated_dlvry_date")

#convert string formats of date to date formats
test_orders = test_orders.withColumn("order_date", to_date(col("order_date")))\
                         .withColumn("order_approved", to_date(col("order_approved")))\
                         .withColumn("delivered_to_logistics", to_date(col("delivered_to_logistics")))\
                         .withColumn("delivered_to_customer", to_date(col("delivered_to_customer")))\
                         .withColumn("estimated_dlvry_date", to_date(col("estimated_dlvry_date")))

# calculate time interval in days between order_dates, approval dates and delivered dates
test_orders = test_orders.withColumn("days_to_approval", datediff("order_approved",col("order_date"))) \
                         .withColumn("days_to_delivery",datediff("delivered_to_customer",col("order_date"))) \
                         .withColumn("delivery_response",datediff("delivered_to_customer",col("estimated_dlvry_date")))

# delivery_response: negative values indicate that delivery came earlier than estimated delivery date (eg 4 days before estimated delivery date)
#                  : positive values indicate that delivery came later than estimated delivery date (eg 4 days after estimated delivery date)

# extract months when orders where placed and delivered from order_date and delivered_to_customer columns
test_orders = test_orders.withColumn('order_month',month(test_orders.order_date))
test_orders = test_orders.withColumn('delivery_month',month(test_orders.delivered_to_customer))

#counting null values for each columns
display(test_orders.select([count(when(col(c).isNull(), c)).alias(c) for c in test_orders.columns]))

# sort dates columns to investigate missing values  
test_orders = test_orders.sort(col("order_date").asc(),
                               col("order_approved").asc(),
                               col("delivered_to_logistics").asc(),
                               col("delivered_to_customer").asc(),
                               col("estimated_dlvry_date").asc())

print('Final rows:', test_orders.count())
display(test_orders)

#### Holdout Order Payments

In [0]:
test_order_pymts = spark\
                  .read\
                  .format("csv")\
                  .option("header","true")\
                  .option("inferSchema","true")\
                  .load("/FileStore/tables/group_data/test_order_payments.csv")

# get the number of obervations/rows
print('Total rows:', test_order_pymts.count())

# distinct rows/observations
print('Unique rows:', test_order_pymts.distinct().count())

# get number of distinct order_id
print('Unique order id:', test_order_pymts.select("order_id").distinct().count())

# Removing NA values from data
test_order_pymts = test_order_pymts.filter(~(test_order_pymts.order_id == "NA"))

# Dropping duplicate rows if exists
test_order_pymts = test_order_pymts.dropDuplicates()

# get payment value for each payment method by order_id
test_pymt_method = test_order_pymts\
                    .groupBy("order_id")\
                    .pivot("payment_type")\
                    .sum("payment_value")

# replace null values by 0
test_pymt_method = test_pymt_method.fillna(0)

# Use Binarizer to create dummy variables of payment type with threshold of 0. 
# If the customer made no payment using that method, it assigns 0 else it assigns 1
test_convert_dummy = Binarizer(threshold=0.0, 
                               inputCols=["credit_card", "debit_card", "mobile", "voucher"], 
                               outputCols=["credit_pymt", "debit_pymt", "mobile_pymt", "voucher_pymt"])

test_pymt_method = test_convert_dummy.transform(test_pymt_method)

# get count of the distinct payment methods used per order_id
test_pymt_method = test_pymt_method.withColumn("distinct_payment_modes", 
                                               expr("credit_pymt + debit_pymt + mobile_pymt + voucher_pymt"))

# Use Binarizer with threshold of 1 to see if an order had multiple payment methods. 
# If the order only had one payment method, it assigns 0 else it assigns 1
test_mult_pymt_mode = Binarizer(threshold=1.0, 
                                inputCol="distinct_payment_modes", 
                                outputCol="multiple_pymt_mode")

test_pymt_method = test_mult_pymt_mode.transform(test_pymt_method)

# get orders that were paid in one installment
test_single_pymt = test_order_pymts.filter(test_order_pymts.payment_installments <= 1)

# get orders that were paid in more than one installment
test_mult_pymt = test_order_pymts.filter(test_order_pymts.payment_installments > 1)

# get orders that had both single and multiple payment installments eg paying for an order once with voucher and in multiple installments with a credit card
# these order have transactions in both the test_single_pymt and test_mult_pymt tables
test_hybrid_pymt = test_single_pymt.join(test_mult_pymt, "order_id", "inner").select(test_single_pymt["*"])

# get order ids that only exists in test_single_pymt but not in test_mult_pymt or test_hybrid_pymt tables
test_single_pymt = test_single_pymt.join(test_hybrid_pymt, "order_id", "leftanti")

# create a mult_installments column in both test_single_pymt, test_mult_pymt and test_hybrid_pymt tables stating if order_id was completed using multiple payment installments
# 0 for No and 1 for Yes
test_single_pymt = test_single_pymt.withColumn("mult_installments", lit(0))
test_hybrid_pymt = test_hybrid_pymt.withColumn("mult_installments", lit(1))
test_mult_pymt = test_mult_pymt.withColumn("mult_installments", lit(1))

# get a union of single_payment, test_mult_pymt and test_hybrid_pymt tables
test_order_pymts2 = test_single_pymt.union(test_hybrid_pymt)
test_order_pymts2 = test_order_pymts2.union(test_mult_pymt)

# select only the necessary columns from orders_payment2
test_distinct_order_pymt = test_order_pymts2.select("order_id","mult_installments").distinct()

test_order_pymt_final = test_pymt_method.join(test_distinct_order_pymt, "order_id", "inner")
print('Final rows:', test_order_pymt_final.count())
display(test_order_pymt_final)

#### Holdout Order Items

In [0]:
test_order_items = spark\
                  .read\
                  .format("csv")\
                  .option("header","true")\
                  .option("inferSchema","true")\
                  .load("/FileStore/tables/group_data/test_order_items.csv")

# number of observations/rows
print('Total rows:', test_order_items.count())

#count of unique product id
print('Unique product id:', test_order_items.select("product_id").distinct().count())

#count of unique order_id
print('Unique order id:', test_order_items.select("order_id").distinct().count())

#Remove duplicates
test_order_items = test_order_items.drop_duplicates() 
print('Final rows:', test_order_items.count())

display(test_order_items)

#### Holdout Products

In [0]:
test_products = spark\
                .read\
                .format("csv")\
                .option("header","true")\
                .option("inferSchema","true")\
                .load("/FileStore/tables/group_data/test_products.csv")

# number of observations/rows
print('Total rows:', test_products.count())

#count of unique product id
print('Unique product id:', test_products.select("product_id").distinct().count())

# merge test_products and order_items table based on product id
test_prod_order = test_order_items.join(test_products,on=["product_id"], how="left")

# merge test_prod_order and category_df
test_prod_order = test_prod_order.join(category_df,on=["product_category_name"], how="left")
print('Rows after merge:', test_prod_order.count())

# group test_prod_order by order id and aggregate other columns 
fnl_test_prd_order = test_prod_order.groupBy("order_id")\
                                    .agg(countDistinct("product_id").alias("nbr_distict_item"),
                                         count("product_id").alias("total_qty"), 
                                         round(sum("price"),2).alias("total_price"),
                                         round(sum("shipping_cost"),2).alias("total_ship_cost"),
                                         round(mean("product_name_lenght"),2).alias("order_prod_avg_length"),
                                         round(mean("product_description_lenght"),2).alias("order_avg_desc_length"),
                                         round(mean("product_photos_qty"),2).alias("order_avg_photo_qty"),
                                         round(mean("product_weight_g"),2).alias("order_avg_weight_g"),
                                         round(mean("product_length_cm"),2).alias("order_avg_length_cm"),
                                         round(mean("product_height_cm"),2).alias("order_avg_height_cm"),
                                         round(mean("product_width_cm"),2).alias("order_avg_width_cm"),
                                         countDistinct("new_categories").alias("distinct_prod_category"))

# calculate order avg vol in cm
fnl_test_prd_order = fnl_test_prd_order.withColumn("order_avg_volume_cm", 
                                                   round(expr("order_avg_length_cm * order_avg_height_cm * order_avg_width_cm"),2))

# check if distinct products were ordered; 1 - Yes, 0 - No
fnl_test_prd_order = fnl_test_prd_order.withColumn("distinct_items", 
                                                   when(col("nbr_distict_item")>1, 1).otherwise(0))

# check if products ordered had distinct product categories; 1 - Yes, 0 - No
fnl_test_prd_order = fnl_test_prd_order.withColumn("multiple_catgories", 
                                                   when(col("distinct_prod_category")>1, 1).otherwise(0))

print('Final rows:', fnl_test_prd_order.count())

display(fnl_test_prd_order)

#### Holdout Basetable

In [0]:
# merge all tables to create base table
test_base = test_order_pymt_final.join(test_orders, on=["order_id"], how="left")
test_base = test_base.join(fnl_test_prd_order,on=["order_id"], how="left")
print('Initial basetable rows:', test_base.count())

# calculate if customer is a new or repeat customer
test_cust_type = test_base.groupBy("customer_id")\
                          .agg(count("customer_id").alias("count"))\
                          .withColumn("customer_type",
                                      when(col("count")>1,"Return")\
                                     .otherwise("New"))

# merge test_cust_type to test_base table
test_cust_type = test_cust_type.select("customer_id","customer_type")
test_base = test_base.join(test_cust_type, on=["customer_id"], how="left")

#Count the number of nulls per column
display(test_base.select([count(when(col(c).isNull(), c)).alias(c) for c in test_base.columns]))

print('Final basetable rows:', test_base.count())
display(test_base)

#### Binary Classification

In [0]:
# drop any date column as it cannot be used in a machine learning model as well as any column related to reviews as it is not available in out holdout table
bin_basetable = base.drop("order_id","delivered_to_logistics","delivered_to_customer","order_date","order_status","order_approved","estimated_dlvry_date","customer_id",
                          "review_id","review_answer_timestamp","review_creation_date","review_sent","review_repns_time")

# group review scores into two categories
bin_basetable = bin_basetable.withColumn("review_category", when(bin_basetable.review_score < 4, 0)
                                                           .when(bin_basetable.review_score >= 4, 1))

bin_basetable = bin_basetable.drop("review_score")

display(bin_basetable)

In [0]:
# assign a value to rows that have missing values as they relate to orders that were either cancelled, still being processing or shipped
bin_basetable = bin_basetable.fillna(-999)
display(bin_basetable.select([count(when(col(c).isNull(), c)).alias(c) for c in bin_basetable.columns]))

In [0]:
bin_df# combine all independent variables across each row into one vector using RFormula
bin_df = RFormula(formula="review_catagory ~ .")\
                .fit(bin_basetable)\
                .transform(bin_basetable)\
                .select("features","label")
bin_df.show(3)

In [0]:
#Create a train and test set with a 70% train, 30% test split
bin_ml_train, bin_ml_test = bin_df.randomSplit([0.7, 0.3],seed=123)

print("Basetable obs:", bin_df.count())
print("Basetable train obs:", bin_ml_train.count())
print("Basetable test obs:", bin_ml_test.count())

##### Logistic Regression

In [0]:
logreg_model = LogisticRegression().fit(bin_ml_train)
logreg_pred = logreg_model.transform(bin_ml_test)
# We will use a MulticlassClassificationEvaluator rather than BinaryClassificationEvaluator as we would be comparing performance of binary to muticlass model and we want to use the same metric in comparing both. MulticlassClassificationEvaluator doesnt have AUC 
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(logreg_pred))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(logreg_pred))

##### Decision Tree Classifier

In [0]:
dt_model = DecisionTreeClassifier().fit(bin_ml_train)
dt_pred = dt_model.transform(bin_ml_test)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(dt_pred))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(dt_pred))

##### Random Forest Classifier

In [0]:
rf_model = RandomForestClassifier().fit(bin_ml_train)
rf_pred = rf_model.transform(bin_ml_test)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(rf_pred))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(rf_pred))

##### Gradient Boosted Tree Classifier

In [0]:
from pyspark.ml.classification import GBTClassifier
gbt_model = GBTClassifier().fit(bin_ml_train)
gbt_pred = gbt_model.transform(bin_ml_test)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(gbt_pred))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(gbt_pred)) 

#### Multi-Class Classification

In [0]:
# drop any date column as it cannot be used in a machine learning model as well as any column related to reviews as it is not available in out holdout table
mclass_basetable = base.drop("order_id","delivered_to_logistics","delivered_to_customer","order_date","order_status","order_approved","estimated_dlvry_date","customer_id",
                             "review_id","review_answer_timestamp","review_creation_date","review_sent","review_repns_time")

display(mclass_basetable)

In [0]:
# assign a value to rows that have missing values as they relate to orders that were either cancelled, still being processing or shipped
mclass_basetable = mclass_basetable.fillna(-999)
display(mclass_basetable.select([count(when(col(c).isNull(), c)).alias(c) for c in mclass_basetable.columns]))

In [0]:
# combine all independent variables across each row into one vector using RFormula
# mclass_df = RFormula(formula="review_score ~ .").fit(mclass_basetable).transform(mclass_basetable).select("features","label")
mclass_df = RFormula(formula="review_score ~ .")\
                    .fit(mclass_basetable)\
                    .transform(mclass_basetable)\
                    .select("features","label")
display(mclass_df)

In [0]:
#Create a train and test set with a 70% train, 30% test split
mclass_ml_train, mclass_ml_test = mclass_df.randomSplit([0.7, 0.3],seed=123)

print("Basetable obs:", mclass_df.count())
print("Basetable train obs:", mclass_ml_train.count())
print("Basetable test obs:", mclass_ml_test.count())

##### Logistic Regression

In [0]:
logreg_model2 = LogisticRegression(family="multinomial").fit(mclass_ml_train)
logreg_pred2 = logreg_model2.transform(mclass_ml_test)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(logreg_pred2))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(logreg_pred2))

##### One Vs Rest Classifier

In [0]:
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr_model = OneVsRest(classifier=lr).fit(mclass_ml_train)
ovr_pred = ovr_model.transform(mclass_ml_test)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(ovr_pred))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(ovr_pred))

##### Decision Tree Classifier

In [0]:
dt_model2 = DecisionTreeClassifier().fit(mclass_ml_train)
dt_pred2 = dt_model2.transform(mclass_ml_test)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(dt_pred2))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(dt_pred2))

##### Random Forest Classifier

In [0]:
rf_model2 = RandomForestClassifier().fit(mclass_ml_train)
rf_pred2 = rf_model2.transform(mclass_ml_test)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(rf_pred2))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(rf_pred2))

#### Model Testing using Feature Selection

In [0]:
#Get the feature importances for the model with the best accuracy and f1 score across binary and multiclass predictions
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))
  
ExtractFeatureImp(gbt_model.featureImportances, bin_ml_train, "features").head(10) 

In [0]:
# Use only the top 10 most import features to check if the model can be simplified while also having a good accuracy and f1 score
bin_basetable1 = bin_basetable.select("review_category","delivery_response","delivery_month","days_to_delivery","total_qty","order_avg_desc_length","total_ship_cost",
                                      "order_prod_avg_length","order_month","order_avg_height_cm","order_avg_length_cm")
display(mclass_basetable1)

In [0]:
# combine all independent variables across each row into one vector using RFormula
bin_df1 = RFormula(formula="review_category ~ .")\
                  .fit(bin_basetable1)\
                  .transform(bin_basetable1)\
                  .select("features","label")
display(bin_df1)

In [0]:
#Create a train and test set with a 70% train, 30% test split
bin_ml_train1, bin_ml_test1 = bin_df1.randomSplit([0.7, 0.3],seed=123)

print("Basetable obs:", bin_df1.count())
print("Basetable train obs:", bin_ml_train1.count())
print("Basetable test obs:", bin_ml_test1.count())

##### Random Forest Classifier

In [0]:
rf_model3 = RandomForestClassifier().fit(bin_ml_train1)
rf_pred3 = rf_model3.transform(bin_ml_test1)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(rf_pred3))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(rf_pred3))

##### Decision Tree Classifier

In [0]:
dt_model3 = DecisionTreeClassifier().fit(bin_ml_train1)
dt_pred3 = dt_model3.transform(bin_ml_test1)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(dt_pred3))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(dt_pred3))

##### Gradient Boosted Tree Classifier

In [0]:
gbt_model2 = GBTClassifier().fit(bin_ml_train1)
gbt_pred2 = gbt_model2.transform(bin_ml_test1)
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(gbt_pred2))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(gbt_pred2)) 

##### Cross validation on the best model

In [0]:
rfc = RandomForestClassifier(seed=121)

rcv = CrossValidator(
  estimator = rfc,
  evaluator = MulticlassClassificationEvaluator(metricName="accuracy"),
  estimatorParamMaps = ParamGridBuilder().addGrid(rfc.numTrees, [50, 100, 500]).build(),
  numFolds=3
)

#Fitting the model
rmodel = rcv.fit(bin_ml_train1)

In [0]:
#Predict using the best CV model
rpred = rmodel.transform(bin_ml_test1)

#Evaluate on the test set
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(rpred))
print("F1 Score:", MulticlassClassificationEvaluator(metricName="f1").evaluate(rpred)) 
print("AUC:", BinaryClassificationEvaluator().evaluate(rpred))

#### Implement the best model on the holdout data

In [0]:
# Use only the features used in the model we have chosen
test_base1 = test_base.select("delivery_response","delivery_month","days_to_delivery","total_qty","order_avg_desc_length","total_ship_cost",
                              "order_prod_avg_length","order_month","order_avg_height_cm","order_avg_length_cm")
# Drop null values
test_base1 = test_base1.fillna(-999)
display(test_base1)

In [0]:
# combine all independent variables into one vector using Vector Assembler
va = VectorAssembler(inputCols=test_base1.columns,outputCol="features")
vaDF = va.transform(test_base1)
display(vaDF)

In [0]:
# use the cross-validated model on the holdout data
holdout_pred = rmodel.transform(vaDF)

In [0]:
display(holdout_pred)

In [0]:
# get the order id
final_df1 = test_base.select('order_id')\
                     .withColumn("row_index",row_number()\
                                 .over(Window.orderBy(monotonically_increasing_id())))

# get the predicted value
final_df2 = holdout_pred.select('prediction')\
                        .withColumn("row_index",row_number()\
                                    .over(Window.orderBy(monotonically_increasing_id())))

# join both tables above on row_index
final_df = final_df1.join(final_df2, on="row_index").drop("row_index")
display(final_df)

In [0]:
order_viz = orders.withColumn("days_to_logistics", datediff("delivered_to_logistics",col("order_date"))) \
                  .withColumn("logistics_to_customer", datediff("delivered_to_customer",col("delivered_to_logistics")))
display(order_viz)

#### Visualizations

In [0]:
base2 = base

In [0]:
# how total duration from order_date to estimated  delivery is affecting review _score
s = base2.groupBy("review_score").agg(avg("days_to_delivery"))
s.display()

In [0]:
# how late_delivery affects for negative review score
dlv = base2.where(col("delivery_response")>0).groupBy("review_score").agg(avg("delivery_response"))
dlv.display()

In [0]:
# nbr of orders delivered on time vs delayed based on the estimated date of delivery and date order was actually delivered to the customer
base2 = base2.withColumn("delivery_status", when(base2.delivery_response <0,"early")
                                 .when(base2.delivery_response ==0,"ontime")
                                 .when(base2.delivery_response >0, "delayed"))

ord_cnt=base2.groupBy("delivery_status").agg(count("order_id"))
ord_cnt.display()

In [0]:
o = base2.groupBy("order_month").agg(count("order_id"))
o.display()

In [0]:
d=base2.groupBy("order_month").agg(avg("delivery_response"),avg("days_to_delivery"))
d.display()

In [0]:
c=base2.groupBy("customer_type").agg(count("customer_type"))
c.display()

In [0]:
diff_cat = base2.select("order_id", "multiple_catgories")
diff_cat_dist = diff_cat.groupBy("multiple_catgories").agg(count("order_id"))
display(diff_cat_dist)

In [0]:
top_cat = prod_order.join(fnl_review, "order_id", "right")
top_cat = top_cat.groupBy("new_categories").agg(count("order_id").alias("count"), round(avg("review_score"),1).alias("avg_review_score"))
top_categories = top_cat.sort(col("avg_review_score").desc())
display(top_categories)

In [0]:
low_cat = prod_order.join(fnl_review, "order_id", "right")
low_cat = low_cat.groupBy("new_categories").agg(count("order_id").alias("count"), round(avg("review_score"),1).alias("avg_review_score"))
low_categories = low_cat.sort(col("avg_review_score").asc())
display(low_categories)

In [0]:
# filter orders that where delivered on time (delivery_response > 0) and get productrs with the highest delays
product_delivery = prod_order.join(fnl_review, "order_id", "right").join(orders,"order_id","left")
product_delivery = product_delivery.where(col("delivery_response")>0)
product_delivery = product_delivery.groupBy("new_categories").agg(avg("delivery_response").alias("avg_delivery_response"))
product_delivery = product_delivery.sort(col("avg_delivery_response").desc())
display(product_delivery)

In [0]:
order_viz = orders_payment.groupBy("payment_type").agg(count("payment_type").alias("count"),
                                                   round(mean("payment_installments"),1).alias("avg_installment"),
                                                   round(mean("payment_value"),1).alias("avg_payment"))
display(order_viz.select("*").sort(col("|count")))