In [43]:
#install pyspark
!pip install pyspark


Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip[0m


In [44]:
#install pyspark and SQL
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType


In [45]:
spark=SparkSession.builder.master("local").appName("hdfs_test").getOrCreate()

In [46]:
salesdata=spark.read.csv("hdfs://127.0.0.1:9000/sales/Sales_April_2019.csv",inferSchema=True, header=True)


In [47]:
salesdata.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|    null|                null|            null|      null|          null|                null|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



In [48]:
#view schema
salesdata.printSchema()

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



In [49]:
#View statistical details of data like count, mean
salesdata.describe().show()

[Stage 29:>                                                         (0 + 1) / 1]

+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|   Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+-------------------+------------------+--------------+--------------------+
|  count|             18289|       18324|              18289|             18289|         18324|               18324|
|   mean|185328.81672043304|        null| 1.1246104215648751|184.43102630000277|          null|                null|
| stddev| 5061.520829296985|        null|0.43640973695741925| 330.9133771769665|          null|                null|
|    min|            176558|20in Monitor|                  1|              2.99|04/01/19 03:09|1 14th St, New Yo...|
|    max|            194094|      iPhone|                  7|            1700.0|    Order Date|    Purchase Address|
+-------+------------------+------------+-------------------+---

                                                                                

In [50]:
#check the columns
salesdata.columns

['Order ID',
 'Product',
 'Quantity Ordered',
 'Price Each',
 'Order Date',
 'Purchase Address']

In [51]:
salesdata.show()

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|    null|                null|            null|      null|          null|                null|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99|04/30/19 09:27|333 8th St, Los A...|
|  176562|USB-C Charging Cable|               1|     11.95|04/29/19 13:03|381 Wilson St, Sa...|
|  176563|Bose SoundSport H...|         

In [52]:
from pyspark.sql.functions import col,isnan, when, count
salesdata.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in salesdata.columns]
   ).show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|      94|     59|              94|        94|        59|              59|
+--------+-------+----------------+----------+----------+----------------+



# Question 1: What was the most productive month in terms of sales?


In [61]:
#Rename the column

from pyspark.sql.functions import month, sum

columns = salesdata.columns
new_columns = [col.replace(" ", "_").replace("#", "") for col in columns]
df = salesdata.toDF(*new_columns)
df.columns
df.show(5)


+--------+--------------------+----------------+----------+--------------+--------------------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|    Order_Date|    Purchase_Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99|04/30/19 09:27|333 8th St, Los A...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



In [63]:
#drop all null values
salesdata = salesdata.dropna()
salesdata.show(5)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99|04/30/19 09:27|333 8th St, Los A...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows



In [64]:
#Update the data type of columns

from pyspark.sql.functions import month, sum, col, to_timestamp
df = df.withColumn("Quantity_Ordered", df["Quantity_Ordered"].cast("int"))
df = df.withColumn("Price_Each", df["Price_Each"].cast("double"))
df = df.withColumn("Order_Date", to_timestamp(df["Order_Date"], "MM/dd/yy HH:mm"))

df.show(5)

+--------+--------------------+----------------+----------+-------------------+--------------------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|
+--------+--------------------+----------------+----------+-------------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|2019-04-19 08:46:00|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99|2019-04-07 22:30:00|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|2019-04-12 14:38:00|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|2019-04-12 14:38:00|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99|2019-04-30 09:27:00|333 8th St, Los A...|
+--------+--------------------+----------------+----------+-------------------+--------------------+
only showing top 5 rows



In [66]:
# Add a new column for the total sales
df = df.withColumn("Total_Sales", df["Quantity_Ordered"] * df["Price_Each"])

# Extract the month from the Order_Date
df = df.withColumn("Month", month(df["Order_Date"]))
df.show(5)



+--------+--------------------+----------------+----------+-------------------+--------------------+-----------+-----+
|Order_ID|             Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|Total_Sales|Month|
+--------+--------------------+----------------+----------+-------------------+--------------------+-----------+-----+
|  176558|USB-C Charging Cable|               2|     11.95|2019-04-19 08:46:00|917 1st St, Dalla...|       23.9|    4|
|  176559|Bose SoundSport H...|               1|     99.99|2019-04-07 22:30:00|682 Chestnut St, ...|      99.99|    4|
|  176560|        Google Phone|               1|     600.0|2019-04-12 14:38:00|669 Spruce St, Lo...|      600.0|    4|
|  176560|    Wired Headphones|               1|     11.99|2019-04-12 14:38:00|669 Spruce St, Lo...|      11.99|    4|
|  176561|    Wired Headphones|               1|     11.99|2019-04-30 09:27:00|333 8th St, Los A...|      11.99|    4|
+--------+--------------------+----------------+

In [69]:
# Group by Month and calculate the sum of Total_Sales
monthly_sales = df.groupBy("Month").agg(sum("Total_Sales").alias("Total_Sales"))

# Find the month with the highest sales
most_productive_month = monthly_sales.orderBy("Total_Sales", ascending=False).first()


# Extract the month number and total sales value

month_number = most_productive_month["Month"]
total_sales = most_productive_month["Total_Sales"]

# Print the result
print("The most productive month in terms of sales is Month:", month_number)
print("Total Sales: $", total_sales)


The most productive month in terms of sales is Month: 4
Total Sales: $ 3385499.8200007016


# What City had the highest number of sales?


In [73]:
# Extract the city from the Purchase Address column
from pyspark.sql.functions import split, col, sum
df = df.withColumn("City", split(df["Purchase_Address"], ",")[1])
df.show(5)

+--------+--------------------+----------------+----------+-------------------+--------------------+-----------+-----+------------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|Total_Sales|Month|        City|
+--------+--------------------+----------------+----------+-------------------+--------------------+-----------+-----+------------+
|  176558|USB-C Charging Cable|               2|     11.95|2019-04-19 08:46:00|917 1st St, Dalla...|       23.9|    4|      Dallas|
|  176559|Bose SoundSport H...|               1|     99.99|2019-04-07 22:30:00|682 Chestnut St, ...|      99.99|    4|      Boston|
|  176560|        Google Phone|               1|     600.0|2019-04-12 14:38:00|669 Spruce St, Lo...|      600.0|    4| Los Angeles|
|  176560|    Wired Headphones|               1|     11.99|2019-04-12 14:38:00|669 Spruce St, Lo...|      11.99|    4| Los Angeles|
|  176561|    Wired Headphones|               1|     11.99|2019-04-30 09:27:

In [74]:
# Group by City and calculate the sum of Total_Sales
city_sales = df.groupBy("City").agg(sum("Total_Sales").alias("Total_Sales"))

# Find the city with the highest sales
most_productive_city = city_sales.orderBy("Total_Sales", ascending=False).first()

if most_productive_city:
    city_name = most_productive_city["City"]
    total_sales = most_productive_city["Total_Sales"]

    # Print the result
    print("The city with the highest number of sales is:", city_name)
    print("Total Sales: $", total_sales)
else:
    print("No data available for analysis.")

The city with the highest number of sales is:  San Francisco
Total Sales: $ 817074.7699999722


# What time should we display adverstisement to maximize likelihood of customer's buying product?

In [76]:
from pyspark.sql.functions import hour, count

# Extract the hour from the "Order_Date" column
df = df.withColumn("Order_Hour", hour(df["Order_Date"]))

# Group by hour and calculate the count of orders
hourly_orders = df.groupBy("Order_Hour").agg(count("*").alias("Order_Count"))

# Find the hour with the highest number of orders
most_active_hour = hourly_orders.orderBy("Order_Count", ascending=False).first()

if most_active_hour:
    hour_of_day = most_active_hour["Order_Hour"]
    order_count = most_active_hour["Order_Count"]

    # Print the result
    print("The optimal time to display advertisements is:", hour_of_day, "o'clock")
    print("Order Count: ", order_count)
else:
    print("No data available for analysis.")

The optimal time to display advertisements is: 19 o'clock
Order Count:  1286


# What product sold the most? Why do you think it sold the most?


In [79]:
#Group by product and calculate the total sales of particular product
product_sales=df.groupBy("Product").agg(sum("Quantity_Ordered").alias("Quantity_Ordered"))

#Find the product with the highest quantity sold
most_sold_product= product_sales.orderBy(("Quantity_Ordered"), ascending=False).first()

product_name=most_sold_product["Product"]
Quantity_sold=most_sold_product["Quantity_Ordered"]

# Print the result

print("product sold the most :  ",product_name )
print("Total Quantity Sold:  ", Quantity_sold)

product sold the most :   AAA Batteries (4-pack)
Total Quantity Sold:   2936


In [80]:
df.show()

+--------+--------------------+----------------+----------+-------------------+--------------------+-----------+-----+--------------+----------+
|Order_ID|             Product|Quantity_Ordered|Price_Each|         Order_Date|    Purchase_Address|Total_Sales|Month|          City|Order_Hour|
+--------+--------------------+----------------+----------+-------------------+--------------------+-----------+-----+--------------+----------+
|  176558|USB-C Charging Cable|               2|     11.95|2019-04-19 08:46:00|917 1st St, Dalla...|       23.9|    4|        Dallas|         8|
|  176559|Bose SoundSport H...|               1|     99.99|2019-04-07 22:30:00|682 Chestnut St, ...|      99.99|    4|        Boston|        22|
|  176560|        Google Phone|               1|     600.0|2019-04-12 14:38:00|669 Spruce St, Lo...|      600.0|    4|   Los Angeles|        14|
|  176560|    Wired Headphones|               1|     11.99|2019-04-12 14:38:00|669 Spruce St, Lo...|      11.99|    4|   Los Angel

In [None]:
# Save the processed DataFrame to HDFS
df.write.format("csv").mode("overwrite").save("hdfs://127.0.0.1:9000/sales/result.csv")