# Foothub_Coding_Test

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 69kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=6c4a92ad307ec53ba3a8e4b002e94b4c8d48381ac15464d54894b5b888a8de84
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, DoubleType, IntegerType
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder.appName('Test').getOrCreate()

In [None]:
spark

In [None]:
# Import Customer_test data
customer_data = spark.read.option('header', 'true').csv('customer_test.csv', inferSchema = True)

In [None]:
# Check for the data types
customer_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)



In [None]:
# Top 3 rows in the data set
customer_data.head(3)

[Row(id=1, first_name='Sara', last_name='Ramirez', email='samantha67@yahoo.com'),
 Row(id=2, first_name='Joshua', last_name='Jimenez', email='richardtimothy@hotmail.com'),
 Row(id=3, first_name='Nicole', last_name='Navarro', email='nicholsonwilliam@valdez.net')]

In [None]:
# Import Orders_test data
orders_data = spark.read.option('header', 'true').csv('orders_test.csv', inferSchema = True)

In [None]:
# Check for the data types
orders_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- total: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- order_date: string (nullable = true)



In [None]:
# order_date column reading as string, lets convert it to Date type.
orders_data = orders_data.withColumn("order_date", orders_data["order_date"].cast(DateType()))

In [None]:
orders_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- total: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- order_date: date (nullable = true)



In [None]:
# Viewing top 3 rows in the dataset
orders_data.head(3)

[Row(id=1, total=19.36, customer_id=21, store_id=1, order_date=datetime.date(2020, 3, 3)),
 Row(id=2, total=8.85, customer_id=88, store_id=8, order_date=datetime.date(2020, 4, 2)),
 Row(id=3, total=5.53, customer_id=41, store_id=3, order_date=datetime.date(2020, 3, 3))]

In [None]:
# Importing store_test data
store_data = spark.read.options(header=True, multiline = True).csv('store_test.csv')

In [None]:
# Check for the column types
store_data.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)



In [None]:
# Convert 'id' column from string to integer
store_data = store_data.withColumn("id", store_data["id"].cast(IntegerType()))

In [None]:
# Look at the top 3 rows in the data set
store_data.head(3)

[Row(id=1, name='Valdez Inc', address='18321 Joseph Lodge\nChristineland, NH 69026'),
 Row(id=2, name='Stevens-Barr', address='Unit 0902 Box 4445\nDPO AE 19637'),
 Row(id=3, name='Taylor Ltd', address='3433 Hill Forest\nCharlesmouth, KY 16037')]

In [None]:
# Extract Year, Month and Day of month from Order_date column and create new columns for the same.
orders_data = orders_data.withColumn('Year', year(orders_data.order_date))
orders_data = orders_data.withColumn('Month', month(orders_data.order_date))
orders_data = orders_data.withColumn('day', dayofmonth(orders_data.order_date))


In [None]:
orders_data.head(3)

[Row(id=1, total=19.36, customer_id=21, store_id=1, order_date=datetime.date(2020, 3, 3), Year=2020, Month=3, day=3),
 Row(id=2, total=8.85, customer_id=88, store_id=8, order_date=datetime.date(2020, 4, 2), Year=2020, Month=4, day=2),
 Row(id=3, total=5.53, customer_id=41, store_id=3, order_date=datetime.date(2020, 3, 3), Year=2020, Month=3, day=3)]

In [None]:
# Replace month number with Month name
orders_data = orders_data.withColumn('Month', when(orders_data['Month']==1, \
                                       'January').otherwise(when(orders_data['Month']==2, 'February'\
                                                                 ).otherwise(when(orders_data['Month']==3, 'March'\
                                                                                  ).otherwise(when(orders_data['Month']==4, 'April'\
                                                                                                   ).otherwise(when(orders_data['Month']==5, 'May'\
                                                                                                                    ).otherwise(when(orders_data['Month']==6, 'June'\
                                                                                                                                     ).otherwise(when(orders_data['Month']==7, 'July'\
                                                                                                                                                      ).otherwise(when(orders_data['Month']==8, 'August'\
                                                                                                                                                                       ).otherwise(when(orders_data['Month']==9, 'September'\
                                                                                                                                                                                        ).otherwise(when(orders_data['Month']==10, 'October'\
                                                                                                                                                                                                         ).otherwise(when(orders_data['Month']==11, 'November').otherwise('December'))))))))))))

In [None]:
orders_data.head(3)

[Row(id=1, total=19.36, customer_id=21, store_id=1, order_date=datetime.date(2020, 3, 3), Year=2020, Month='March', day=3),
 Row(id=2, total=8.85, customer_id=88, store_id=8, order_date=datetime.date(2020, 4, 2), Year=2020, Month='April', day=2),
 Row(id=3, total=5.53, customer_id=41, store_id=3, order_date=datetime.date(2020, 3, 3), Year=2020, Month='March', day=3)]

In [None]:
# Join all three data frames to get the desired output.
final_df = orders_data.join(customer_data, orders_data.customer_id==customer_data.id, how='left')

In [None]:
final_df = final_df.join(store_data, final_df.store_id==store_data.id, how='left')

In [None]:
# Let's have a look at final data set
final_df.show()

+---+-----+-----------+--------+----------+----+--------+---+---+-----------+---------+--------------------+---+--------------------+--------------------+
| id|total|customer_id|store_id|order_date|Year|   Month|day| id| first_name|last_name|               email| id|                name|             address|
+---+-----+-----------+--------+----------+----+--------+---+---+-----------+---------+--------------------+---+--------------------+--------------------+
|  1|19.36|         21|       1|2020-03-03|2020|   March|  3| 21|     Alexis| Ferguson|hayesdavid@yahoo.com|  1|          Valdez Inc|18321 Joseph Lodg...|
|  2| 8.85|         88|       8|2020-04-02|2020|   April|  2| 88|Christopher|     Diaz|christinemcguire@...|  8|Martinez, Marsh a...|1434 Shelby Stree...|
|  3| 5.53|         41|       3|2020-03-03|2020|   March|  3| 41|    Rebecca|   Little|cynthiaoliver@gma...|  3|          Taylor Ltd|3433 Hill Forest
...|
|  4| 12.9|         96|       8|2020-03-15|2020|   March| 15| 96|     

In [None]:
# Drop redundant columns
final_df.drop('id')

DataFrame[total: double, customer_id: int, store_id: int, order_date: date, Year: int, Month: string, day: int, first_name: string, last_name: string, email: string, name: string, address: string]

In [None]:
# Rename column for the desired output column name
final_df = final_df.withColumnRenamed('name', 'Store Name')

In [None]:
# Answer for Question 1
Aggregate = final_df.groupBy('Year', 'Month', 'Store Name').agg(sum('total').alias('Total Revenue'), count('Month').alias('Number of orders'))

In [None]:
Aggregate.toPandas().to_csv('Aggregate.csv')

In [None]:
orders_per_user = final_df.groupBy('first_name', 'last_name', 'email').count()

In [None]:
orders_per_user.show()

+----------+---------+--------------------+-----+
|first_name|last_name|               email|count|
+----------+---------+--------------------+-----+
|   Douglas|   Finley|summersdawn@yahoo...|   12|
|     Wayne|     Long|  willie18@yahoo.com|   13|
|   Timothy| Caldwell|   tina37@barker.com|   12|
|      Ryan|  Terrell|carsondaniel@yaho...|    6|
|    Ronald|     Ross|allenvalerie@sala...|   12|
|    Hannah|   Taylor|  andrea08@gmail.com|   11|
| Katherine|    Jones|claudiabond@hotma...|   13|
|   Beverly|   Barnes|mitchellkyle@yaho...|    5|
|    Regina|  Sanchez|michael60@hamilto...|   10|
|    Stacey|    Beard|paulday@brooks-pa...|   17|
|    Nicole|  Navarro|nicholsonwilliam@...|   11|
|     David|  Collins|terri55@cooper-tu...|    8|
|    Joanna|    Terry|aprilwilliams@yah...|    7|
|    Johnny| Campbell|tracywells@brown.com|    8|
|    Pamela|Nicholson|richard37@hotmail...|   11|
|    Travis|   Phelps|  andrew38@yahoo.com|    7|
|  Nicholas| Hamilton|christopher49@nel...|   11|


In [None]:
orders_per_user = orders_per_user.withColumnRenamed('count', 'Orders_Placed_by_user')

In [None]:
Few_Orders = orders_per_user.filter(orders_per_user.Orders_Placed_by_user < 10)

In [None]:
Few_Orders.show()

+----------+---------+--------------------+---------------------+
|first_name|last_name|               email|Orders_Placed_by_user|
+----------+---------+--------------------+---------------------+
|      Ryan|  Terrell|carsondaniel@yaho...|                    6|
|   Beverly|   Barnes|mitchellkyle@yaho...|                    5|
|     David|  Collins|terri55@cooper-tu...|                    8|
|    Joanna|    Terry|aprilwilliams@yah...|                    7|
|    Johnny| Campbell|tracywells@brown.com|                    8|
|    Travis|   Phelps|  andrew38@yahoo.com|                    7|
|     Brian|    Riley|  lauren30@gmail.com|                    8|
|     Tanya|   Holmes|    hlucas@boyer.com|                    6|
|   Francis|    Smith|jasonsmith@smith.com|                    9|
|Alexandria| Alvarado|sjohnston@young-b...|                    4|
|  Michelle|  Bridges|carlsonanthony@pe...|                    7|
|   Richard| Peterson| nancy53@hotmail.com|                    8|
|    Shelb

In [None]:
import hashlib

In [None]:
Few_Orders = Few_Orders.toPandas()

In [None]:
# Answer for Question 3
Few_Orders['email_encrypted']= [hashlib.md5(val.encode('UTF-8')).hexdigest() for val in Few_Orders['email']]

In [None]:
Few_Orders.head(5)

Unnamed: 0,first_name,last_name,email,Orders_Placed_by_user,email_encrypted
0,Ryan,Terrell,carsondaniel@yahoo.com,6,f7b1672d53d0ae4b07021aea9cf9f143
1,Beverly,Barnes,mitchellkyle@yahoo.com,5,79a0ea032f92e7993ea7431f2a582b5e
2,David,Collins,terri55@cooper-turner.com,8,050ec448b62eb2f80f05d84e477fa1d8
3,Joanna,Terry,aprilwilliams@yahoo.com,7,f9477c84df3fa1addf8fb94a5bd41b6c
4,Johnny,Campbell,tracywells@brown.com,8,0cc70fe75f583592ebd844bf22660859


In [None]:
# Answer for Question 2
Few_Orders.to_csv('Users_with_less_than_Ten_orders.csv')