### 1. Library and Configuration

In [47]:
import findspark
findspark.init("/opt/manual/spark")
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [48]:
spark = SparkSession \
.builder \
.master("local[2]") \
.appName("Homework4_1") \
.config("spark.executer.memory","2g") \
.config("spark.driver.memory","1g") \
.enableHiveSupport() \
.getOrCreate()

In [49]:
path_of_data = "file:///home/train/datasets/retail_db"

### 2. Read CSV

### 2.1. Categories.csv

In [54]:
categories = spark.read.option("inferSchema",True).csv(path_of_data+"/categories.csv", header=True)
categories.show(10)

+----------+--------------------+-------------------+
|categoryId|categoryDepartmentId|       categoryName|
+----------+--------------------+-------------------+
|         1|                   2|           Football|
|         2|                   2|             Soccer|
|         3|                   2|Baseball & Softball|
|         4|                   2|         Basketball|
|         5|                   2|           Lacrosse|
|         6|                   2|   Tennis & Racquet|
|         7|                   2|             Hockey|
|         8|                   2|        More Sports|
|         9|                   3|   Cardio Equipment|
|        10|                   3|  Strength Training|
+----------+--------------------+-------------------+
only showing top 10 rows



In [55]:
categories.printSchema()

root
 |-- categoryId: integer (nullable = true)
 |-- categoryDepartmentId: integer (nullable = true)
 |-- categoryName: string (nullable = true)



### 2.2. Customers.csv

In [56]:
customers= spark.read.option("inferschema",True).csv(path_of_data+"/customers.csv", header=True)
customers.limit(3).toPandas()

Unnamed: 0,customerId,customerFName,customerLName,customerEmail,customerPassword,customerStreet,customerCity,customerState,customerZipcode
0,1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
1,2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
2,3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,725


In [57]:
customers.printSchema()

root
 |-- customerId: integer (nullable = true)
 |-- customerFName: string (nullable = true)
 |-- customerLName: string (nullable = true)
 |-- customerEmail: string (nullable = true)
 |-- customerPassword: string (nullable = true)
 |-- customerStreet: string (nullable = true)
 |-- customerCity: string (nullable = true)
 |-- customerState: string (nullable = true)
 |-- customerZipcode: integer (nullable = true)



In [58]:
### Departments.csv

In [59]:
departments = spark.read.option("inferSchema",True).csv(path_of_data+"/departments.csv", header=True)
departments.show(3)

+------------+--------------+
|departmentId|departmentName|
+------------+--------------+
|           2|       Fitness|
|           3|      Footwear|
|           4|       Apparel|
+------------+--------------+
only showing top 3 rows



In [60]:
departments.printSchema()

root
 |-- departmentId: integer (nullable = true)
 |-- departmentName: string (nullable = true)



### 2.3. Orders.csv

In [61]:
orders = spark.read.option("inferSchema",True).csv(path_of_data+"/orders.csv", header=True)
orders.show(3)

+-------+--------------------+---------------+---------------+
|orderId|           orderDate|orderCustomerId|    orderStatus|
+-------+--------------------+---------------+---------------+
|      1|2013-07-25 00:00:...|          11599|         CLOSED|
|      2|2013-07-25 00:00:...|            256|PENDING_PAYMENT|
|      3|2013-07-25 00:00:...|          12111|       COMPLETE|
+-------+--------------------+---------------+---------------+
only showing top 3 rows



In [62]:
orders.printSchema()

root
 |-- orderId: integer (nullable = true)
 |-- orderDate: string (nullable = true)
 |-- orderCustomerId: integer (nullable = true)
 |-- orderStatus: string (nullable = true)



### 2.4. Order_items.csv

In [63]:
order_items = spark.read.option("inferSchema",True).csv(path_of_data+"/order_items.csv", header=True)
order_items.show(3)

                                                                                

+-------------+----------------+------------------+-----------------+-----------------+---------------------+
|orderItemName|orderItemOrderId|orderItemProductId|orderItemQuantity|orderItemSubTotal|orderItemProductPrice|
+-------------+----------------+------------------+-----------------+-----------------+---------------------+
|            1|               1|               957|                1|           299.98|               299.98|
|            2|               2|              1073|                1|           199.99|               199.99|
|            3|               2|               502|                5|            250.0|                 50.0|
+-------------+----------------+------------------+-----------------+-----------------+---------------------+
only showing top 3 rows



In [64]:
order_items.printSchema()

root
 |-- orderItemName: integer (nullable = true)
 |-- orderItemOrderId: integer (nullable = true)
 |-- orderItemProductId: integer (nullable = true)
 |-- orderItemQuantity: integer (nullable = true)
 |-- orderItemSubTotal: double (nullable = true)
 |-- orderItemProductPrice: double (nullable = true)



In [65]:
### Products.csv

In [66]:
products = spark.read.option("inferSchema",True).csv(path_of_data+"/products.csv", header=True)
products.show(3)

+---------+-----------------+--------------------+------------------+------------+--------------------+
|productId|productCategoryId|         productName|productDescription|productPrice|        productImage|
+---------+-----------------+--------------------+------------------+------------+--------------------+
|        1|                2|Quest Q64 10 FT. ...|              null|       59.98|http://images.acm...|
|        2|                2|Under Armour Men'...|              null|      129.99|http://images.acm...|
|        3|                2|Under Armour Men'...|              null|       89.99|http://images.acm...|
+---------+-----------------+--------------------+------------------+------------+--------------------+
only showing top 3 rows



In [67]:
products.printSchema()

root
 |-- productId: integer (nullable = true)
 |-- productCategoryId: integer (nullable = true)
 |-- productName: string (nullable = true)
 |-- productDescription: string (nullable = true)
 |-- productPrice: double (nullable = true)
 |-- productImage: string (nullable = true)



# Q1.

Find how many unique orderItemOrderIds are in the order_items table?

In [14]:
order_items.select("orderItemOrderId").distinct().count()

                                                                                

57431

# Q2.

Find how many rows are in the orders and order_items tables?

In [15]:
orders.count()

                                                                                

68883

In [16]:
order_items.count()

                                                                                

172198

# Q3.

Write the most canceled (descending order) products to the local disk in parquet format in terms of total sales amount.

- Step 1. We need the "products" data to see which products. ------------------------------->"products"
* We need to use "categories" data to see which categories-------------------------->"categories"

- Step 2. We need to know "order" information. So I will use "order_items" data.-------------->"order_items"

- Step 3. We will join steps1 and steps2 and achieve "Cancelled Products".

In [68]:
categ_products = products.join(categories, products['productCategoryId'] == categories['categoryId']) \
                       .select("productId","productName", "categoryName")

categ_products.limit(5).toPandas().head()

Unnamed: 0,productId,productName,categoryName
0,1,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,Soccer
1,2,Under Armour Men's Highlight MC Football Clea,Soccer
2,3,Under Armour Men's Renegade D Mid Football Cl,Soccer
3,4,Under Armour Men's Renegade D Mid Football Cl,Soccer
4,5,Riddell Youth Revolution Speed Custom Footbal,Soccer


In [69]:
orders_and_items = order_items.join(orders, order_items['orderItemOrderId'] == orders['orderId']) \
                              .select("orderId","orderItemProductId","orderItemSubTotal","orderStatus")

orders_and_items.limit(5).toPandas().head()

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus
0,1,957,299.98,CLOSED
1,2,1073,199.99,PENDING_PAYMENT
2,2,502,250.0,PENDING_PAYMENT
3,2,403,129.99,PENDING_PAYMENT
4,4,897,49.98,CLOSED


In [71]:
table_of_cancelled = orders_and_items.join(categ_products, orders_and_items['orderItemProductId'] == categ_products['productId'])

table_of_cancelled.limit(25).toPandas().head(25)

                                                                                

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus,productId,productName,categoryName
0,57760,858,199.99,PENDING_PAYMENT,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
1,57847,858,199.99,COMPLETE,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
2,58071,858,199.99,PENDING,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
3,58170,858,199.99,PENDING,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
4,58585,858,199.99,CANCELED,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
5,58589,858,199.99,COMPLETE,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
6,58695,858,199.99,COMPLETE,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
7,58774,858,199.99,PENDING,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
8,58797,858,199.99,COMPLETE,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
9,58926,858,199.99,PENDING_PAYMENT,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs


In [98]:
most_cancelled_products = table_of_cancelled.filter("orderStatus == 'CANCELED'") \
.groupBy("productName").agg(F.sum("orderItemSubTotal").alias("Totalprice")) \
.orderBy(F.desc("TotalPrice"))

most_cancelled_products.limit(10).toPandas().head(10)

                                                                                

Unnamed: 0,productName,Totalprice
0,Field & Stream Sportsman 16 Gun Fire Safe,134393.28
1,Perfect Fitness Perfect Rip Deck,85785.7
2,Nike Men's Free 5.0+ Running Shoe,80691.93
3,Diamondback Women's Serene Classic Comfort Bi,80094.66
4,Pelican Sunstream 100 Kayak,66196.69
5,Nike Men's Dri-FIT Victory Golf Polo,65750.0
6,Nike Men's CJ Elite 2 TD Football Cleat,60705.33
7,O'Brien Men's Neoprene Life Vest,58126.74
8,Under Armour Girls' Toddler Spine Surge Runni,26153.46
9,LIJA Women's Eyelet Sleeveless Golf Polo,2145.0


In [99]:
output_path = "file:///home/train/pyspark_output_data"

In [100]:

most_cancelled_products.coalesce(1) \
.write.mode("overwrite").parquet(output_path+"/most_cancelled_products_parquet")

                                                                                

# Q4
Write the most canceled (descending order) categories in terms of total sales amount to the local disk in `parquet` format.

In [102]:
most_cancelled_categ = table_of_cancelled.filter("orderStatus == 'CANCELED'") \
.groupBy("categoryName").agg(F.sum("orderItemSubTotal").alias("Totalprice")) \
.orderBy(F.desc("TotalPrice"))
most_cancelled_categ.limit(10).toPandas().head(10)

                                                                                

Unnamed: 0,categoryName,Totalprice
0,Fishing,134393.28
1,Cleats,85785.7
2,Cardio Equipment,81351.93
3,Camping & Hiking,80094.66
4,Water Sports,66196.69
5,Women's Apparel,65750.0
6,Men's Footwear,60705.33
7,Indoor/Outdoor Games,58126.74
8,Shop By Sport,27423.44
9,Electronics,5685.5


In [103]:
most_cancelled_categ.coalesce(1) \
.write.mode("overwrite").parquet(output_path+"/most_cancelled_categ_parquet")

                                                                                

# Q5. 

In which month of which year (in Turkish) was the highest total sales?

In [24]:
# Step 1. I will do join date of order_items and orders.

# Step 2. I will replace date format.

In [73]:
orders_and_items_date = order_items.join(orders, order_items['orderItemOrderId'] == orders['orderId']) \
                                   .select("orderId","orderItemProductId","orderItemSubTotal","orderStatus","orderDate")

orders_and_items_date.limit(5).toPandas().head()

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus,orderDate
0,1,957,299.98,CLOSED,2013-07-25 00:00:00.0
1,2,1073,199.99,PENDING_PAYMENT,2013-07-25 00:00:00.0
2,2,502,250.0,PENDING_PAYMENT,2013-07-25 00:00:00.0
3,2,403,129.99,PENDING_PAYMENT,2013-07-25 00:00:00.0
4,4,897,49.98,CLOSED,2013-07-25 00:00:00.0


In [74]:
df_year_and_month = orders_and_items_date.withColumn("orderDate", 
                                        F.to_timestamp(F.col("orderDate"), "yyyy-MM-dd HH:mm:ss.S")) \
                                         .withColumn("Year", F.year(F.col("orderDate"))) \
                                         .withColumn("Month", F.month(F.col("orderDate")))

df_year_and_month.limit(5).toPandas()

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus,orderDate,Year,Month
0,1,957,299.98,CLOSED,2013-07-25,2013,7
1,2,1073,199.99,PENDING_PAYMENT,2013-07-25,2013,7
2,2,502,250.0,PENDING_PAYMENT,2013-07-25,2013,7
3,2,403,129.99,PENDING_PAYMENT,2013-07-25,2013,7
4,4,897,49.98,CLOSED,2013-07-25,2013,7


In [75]:
df_year_and_month.filter("orderStatus not in ('CANCELED')") \
                 .groupBy('Year','Month').agg(F.sum("orderItemSubTotal").alias("Sum_SubTotal")) \
                 .orderBy(F.desc("Sum_SubTotal")) \
                 .limit(10).toPandas()

                                                                                

Unnamed: 0,Year,Month,Sum_SubTotal
0,2013,11,3105843.27
1,2014,1,2870834.18
2,2013,12,2869997.88
3,2013,9,2866553.33
4,2014,3,2805006.32
5,2013,8,2769236.03
6,2014,4,2758912.47
7,2014,2,2712838.58
8,2014,5,2695699.48
9,2014,6,2657013.04


In [76]:
def month_to_turkish(input_month):
    month_converter = {
        1: "Ocak",
        2: "Şubat",
        3: "Mart",
        4: "Nisan",
        5: "Mayıs",
        6: "Haziran",
        7: "Temmuz",
        8: "Ağustos",
        9: "Eylül",
        10: "Ekim",
        11: "Kasım",
        12: "Aralık"
    }
    
    return month_converter.get(input_month)

In [77]:
month_to_turkish(7)

'Temmuz'

In [78]:
month_to_turkish_udf = spark.udf.register("month_to_turkish_udf", month_to_turkish, StringType())

In [79]:
df_year_and_month.filter("orderStatus not in ('CANCELED')") \
                 .groupBy('Year','Month').agg(F.sum("orderItemSubTotal").alias("Sum_SubTotal")) \
                 .orderBy(F.desc("Sum_SubTotal")) \
                 .withColumn("Month_TR", month_to_turkish_udf(F.col("Month"))) \
                 .limit(10).toPandas()

                                                                                

Unnamed: 0,Year,Month,Sum_SubTotal,Month_TR
0,2013,11,3105843.27,Kasım
1,2014,1,2870834.18,Ocak
2,2013,12,2869997.88,Aralık
3,2013,9,2866553.33,Eylül
4,2014,3,2805006.32,Mart
5,2013,8,2769236.03,Ağustos
6,2014,4,2758912.47,Nisan
7,2014,2,2712838.58,Şubat
8,2014,5,2695699.48,Mayıs
9,2014,6,2657013.04,Haziran


# Q6 

 On which day of the week (in Turkish) was the highest total sales?

In [87]:
df_day_of_week = orders_and_items_date.withColumn("orderDate", 
                                     F.to_timestamp(F.col("orderDate"), "yyyy-MM-dd HH:mm:ss.S")) \
                                      .withColumn("Day", F.dayofweek(F.col("orderDate")))

df_day_of_week.limit(5).toPandas()

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus,orderDate,Day
0,1,957,299.98,CLOSED,2013-07-25,5
1,2,1073,199.99,PENDING_PAYMENT,2013-07-25,5
2,2,502,250.0,PENDING_PAYMENT,2013-07-25,5
3,2,403,129.99,PENDING_PAYMENT,2013-07-25,5
4,4,897,49.98,CLOSED,2013-07-25,5


In [88]:
df_day_of_week.filter("orderStatus not in ('CANCELED')") \
              .groupBy('Day').agg(F.sum("orderItemSubTotal").alias("Sum_SubTotal")) \
              .orderBy(F.desc("Sum_SubTotal")) \
              .limit(10).toPandas()

                                                                                

Unnamed: 0,Day,Sum_SubTotal
0,6,5065099.0
1,5,4878165.0
2,7,4862228.0
3,3,4809500.0
4,4,4805157.0
5,1,4750555.0
6,2,4455886.0


In [89]:
# convert day to TR with udf
def day_to_turkish(input_day):
    day_converter = {
        1: "Pazartesi",
        2: "Salı",
        3: "Çarşamba",
        4: "Perşembe",
        5: "Cuma",
        6: "Cumartesi",
        7: "Pazar"
    }
    
    return day_converter.get(input_day)

In [90]:
day_to_turkish(7)

'Pazar'

In [83]:
day_to_turkish_udf = spark.udf.register("day_to_turkish_udf", day_to_turkish, StringType())

In [84]:
df_day_of_week.filter("orderStatus not in ('CANCELED')") \
              .groupBy('Day').agg(F.sum("orderItemSubTotal").alias("Sum_SubTotal")) \
              .orderBy(F.desc("Sum_SubTotal")) \
              .withColumn("Day_TR",day_to_turkish_udf(F.col("Day"))) \
              .limit(10).toPandas()

                                                                                

Unnamed: 0,Day,Sum_SubTotal,Day_TR
0,6,5065099.0,Cumartesi
1,5,4878165.0,Cuma
2,7,4862228.0,Pazar
3,3,4809500.0,Çarşamba
4,4,4805157.0,Perşembe
5,1,4750555.0,Pazartesi
6,2,4455886.0,Salı


# Q7.
Create the largest possible table from all these tables and write it in a table called retail_all in the hive test1 database.

In [85]:
# Step1 We will join categories and departments table

# Step2 We will join order and orders_items table 

# Step3 We will join result of step1 and step2

In [86]:
! ls /home/train/datasets/retail_db/

categories.csv	departments.csv  orders.csv
customers.csv	order_items.csv  products.csv


In [91]:
cat_dep_prod = products.join(categories, products.productCategoryId == categories.categoryId, "left") \
                       .join(departments, categories.categoryDepartmentId == departments.departmentId, "left")


cat_dep_prod.limit(1).toPandas()

Unnamed: 0,productId,productCategoryId,productName,productDescription,productPrice,productImage,categoryId,categoryDepartmentId,categoryName,departmentId,departmentName
0,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+F...,2,2,Soccer,2,Fitness


In [92]:
cat_dep_prod.count()

1345

In [93]:
orders_and_items_cust = order_items.join(orders, order_items['orderItemOrderId'] == orders['orderId']) \
                                   .join(customers, orders.orderCustomerId == customers.customerId)
 
orders_and_items_cust.limit(1).toPandas()

                                                                                

Unnamed: 0,orderItemName,orderItemOrderId,orderItemProductId,orderItemQuantity,orderItemSubTotal,orderItemProductPrice,orderId,orderDate,orderCustomerId,orderStatus,customerId,customerFName,customerLName,customerEmail,customerPassword,customerStreet,customerCity,customerState,customerZipcode
0,1,1,957,1,299.98,299.98,1,2013-07-25 00:00:00.0,11599,CLOSED,11599,Mary,Malone,XXXXXXXXX,XXXXXXXXX,8708 Indian Horse Highway,Hickory,NC,28601


In [94]:
orders_and_items_cust.count()

172198

In [95]:
all_in_one = orders_and_items_cust.join(cat_dep_prod, orders_and_items_cust.orderItemProductId == cat_dep_prod.productId)

all_in_one.limit(2).toPandas()

                                                                                

Unnamed: 0,orderItemName,orderItemOrderId,orderItemProductId,orderItemQuantity,orderItemSubTotal,orderItemProductPrice,orderId,orderDate,orderCustomerId,orderStatus,...,productCategoryId,productName,productDescription,productPrice,productImage,categoryId,categoryDepartmentId,categoryName,departmentId,departmentName
0,144513,57760,858,1,199.99,199.99,57760,2013-07-25 00:00:00.0,8330,PENDING_PAYMENT,...,38,GolfBuddy VT3 GPS Watch,,199.99,http://images.acmesports.sports/GolfBuddy+VT3+...,38,6,Kids' Golf Clubs,6,Outdoors
1,144738,57847,858,1,199.99,199.99,57847,2013-07-28 00:00:00.0,9548,COMPLETE,...,38,GolfBuddy VT3 GPS Watch,,199.99,http://images.acmesports.sports/GolfBuddy+VT3+...,38,6,Kids' Golf Clubs,6,Outdoors


In [96]:
all_in_one.count()

                                                                                

172198

In [97]:
all_in_one.write.format("orc") \
.mode("overwrite") \
.saveAsTable("test1.retail_all")

2023-04-18 03:51:32,748 WARN hdfs.DataStreamer: Caught exception (67 + 2) / 200]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1300)
	at java.base/java.lang.Thread.join(Thread.java:1375)
	at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:986)
	at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:640)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:810)
                                                                                