## Task 1: Spark SQL (15m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
sales_file_location = "/FileStore/tables/Sales_table.csv"
products_file_location = "/FileStore/tables/Products_table.csv"
sellers_file_location = "/FileStore/tables/Sellers_table.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
products_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(products_file_location)

sales_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sales_file_location)

sellers_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(sellers_file_location)

In [0]:
#Registering tables as SQL
products_table.createOrReplaceTempView("product")
sales_table.createOrReplaceTempView("sale")
sellers_table.createOrReplaceTempView("sell")

In [0]:
# (a) Output the top 3 most popular products sold among all sellers [2m]
# Your table should have 1 column(s): [product_name]

#T1 CTE compiles the total number of items sold per product id, and takes the top3
#Join products table and T1 to get the names of the top 3

q1a_output = spark.sql("\
With T1 AS \
(SELECT product_id, sum(num_of_items_sold) as total_sold \
FROM sale \
GROUP BY product_id \
ORDER BY total_sold DESC \
LIMIT 3) \
SELECT product_name \
FROM product, T1 \
WHERE product.product_id = T1.product_id \
ORDER BY T1.total_sold DESC")

q1a_output.show()

+-------------+
| product_name|
+-------------+
|product_51270|
|product_18759|
|product_59652|
+-------------+



In [0]:
# (b) Find out the total sales of the products sold by sellers 1 to 10 and output the top most sold product [2m]
# Your table should have 1 column(s): [product_name]

#T1 CTE compiles the total number of items sold per product_id for seller_id between 1 and 10 and takes the product with the highest number sold
#Similarly join the products table with T1 CTE to get the product name for that product id

q1b_output = spark.sql("\
With T1 AS \
(SELECT product_id , sum(num_of_items_sold) as total_sold \
FROM sale \
WHERE seller_id > 0 AND seller_id < 11 \
GROUP BY product_id \
ORDER BY total_sold DESC LIMIT 1) \
SELECT product_name \
FROM product, T1 \
WHERE product.product_id = T1.product_id \
")
q1b_output.show()

+-------------+
| product_name|
+-------------+
|product_36658|
+-------------+



In [0]:
# (c) Compute the combined revenue earned from sellers where seller_id ranges from 1 to 500 inclusive. [3m]
# Your table should have 1 column(s): [total_revenue]

#T1 CTE compiles the the total number of items sold per product_id for seller_id from 1 to 500
#T2 CTE compiles the product_id, total number sold for this id and the price of each unit by joining T1 with the products table
#T3 CTE compiles the total product revenue for each product_id by multiplying the price per unit to the total number of units sold
#Final Query sums up the product revenue for all the products
q1c_output = spark.sql("\
With T1 AS \
(SELECT sale.product_id, sum(num_of_items_sold) as total_sold \
FROM sale \
WHERE sale.seller_id>0 AND sale.seller_id < 501 \
GROUP BY sale.product_id), \
T2 AS \
(SELECT T1.product_id, T1.total_sold, product.price \
FROM T1, product \
WHERE T1.product_id = product.product_id), \
T3 AS \
(SELECT T2.total_sold*T2.price as product_revenue \
FROM T2) \
SELECT sum(product_revenue) as total_revenue \
FROM T3\
")
q1c_output.show()

+-------------+
|total_revenue|
+-------------+
|    160916699|
+-------------+



In [0]:
# (d) Among sellers with rating >= 4 who have achieved a combined number of products sold >= 3000, find out the top 10 most expensive product sold by any of the sellers. (If there are multiple products at the same price, please sort them in ascending order of product_id) [8m]
# Your table should have 1 column(s): [product_name]
# To get the full mark, your query should not run for more than 1 min

#T1 compiles all the seller ids which have a seller rating >=4 and have sold >= 3000 items
#T2 compiles the unique id, name, price of products which are sold by the seller ids in T1
#Final Query gets the product names the top 10 most expensive products, with ties broken by product_id in ascending order
q1d_output = spark.sql(" \
With T1 AS \
(SELECT sell.seller_id as id \
FROM sell, sale \
WHERE sell.seller_id = sale.seller_id AND sell.rating >= 4 \
GROUP BY sell.seller_id \
HAVING SUM(sale.num_of_items_sold) >= 3000), \
T2 AS \
(SELECT DISTINCT product.product_id, product.product_name, product.price \
FROM T1, sale, product \
WHERE T1.id = sale.seller_id AND sale.product_id = product.product_id) \
SELECT product_name \
FROM T2 \
ORDER BY price DESC, product_id \
LIMIT 10 \
")
q1d_output.show()

+------------+
|product_name|
+------------+
| product_106|
| product_117|
| product_363|
| product_712|
| product_843|
| product_897|
| product_923|
|product_1466|
|product_1507|
|product_1514|
+------------+



## Task 2: Spark ML (10m)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
bank_train_location = "/FileStore/tables/bank_train.csv"
bank_test_location = "/FileStore/tables/bank_test.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bank_train = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_train_location)

bank_test = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(bank_test_location)

Build ML model to predict whether the customer will subscribe bank deposit service or not. Train the model using training set and evaluate the model performance (e.g. accuracy) using testing set. 
* You can explore different methods to pre-process the data and select proper features
* You can utilize different machine learning models and tune model hyperparameters
* Present the final testing accuracy.

In [0]:
# data preparation (4m)
#Import Packages
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline

#Build String Indexer for Categorical Variables - Converting Categorical Features to numerical feature
col_to_index = [i[0] for i in bank_train.dtypes if i[1] == 'string']
indexed_cols = [i+"_index" for i in col_to_index]
stringIndexer = StringIndexer(inputCols=col_to_index, outputCols=indexed_cols)
fitted_indexer = stringIndexer.fit(bank_train)

#Transform the bank so that we can fit the one hot encoder later
indexed_bank = fitted_indexer.transform(bank_train)

#Build One-hot encoding for variables that are not binary
binary_indexes = ['default_index','housing_index','loan_index']
col_to_one_hot = [i for i in indexed_cols if i not in binary_indexes]
one_hot_output = [i+"_hot" for i in col_to_one_hot]
encoder = OneHotEncoder(inputCols=col_to_one_hot,
                        outputCols=one_hot_output)
fitted_encoder = encoder.fit(indexed_bank)

#Assembler
to_assemble = [i[0] for i in bank_train.dtypes if i[0] not in col_to_index and i[0]!='label'] + binary_indexes +  one_hot_output
assembler = VectorAssembler(
    inputCols=to_assemble,
    outputCol="features")




In [0]:
# model building (4m)
#Build Pipeline
gbt = GBTClassifier(maxIter = 16)
pipeline = Pipeline(stages = [fitted_indexer,fitted_encoder,assembler,gbt])
model = pipeline.fit(bank_train)

In [0]:
# model evaluation (2m)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
pred_test = model.transform(bank_test)
predictionAndLabels = pred_test.select("prediction","label")
evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Test set accuracy = "+str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.8428123600537394
