In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

df = spark.read \
    .format('csv') \
    .option('inferSchema', 'true') \
    .option('header', 'true') \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .load('/Users/macos/Documents/DS/MachineLearning/Projects/PySpark/welmart.csv')
df.show()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/23 16:28:33 WARN Utils: Your hostname, Sushant-MBP.local, resolves to a loopback address: 127.0.0.1; using 192.168.68.62 instead (on interface en0)
25/06/23 16:28:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/23 16:28:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|      Country|           City|         State|Postal Code| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|     1|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|   CG-12520|       Claire Gute|   Consumer|United States|      Henderson|      Kentucky|      42420|  South|FUR-BO-10001798|   

In [2]:
df.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)



In [3]:
df.count()

9994

## Tasks: 

1. Determine the best-selling product sub-category.
2. Identify the product category generating the highest revenue.
3. Compile a top 10 list of the most valuable customers.
4. Determine the state responsible for the highest number of orders.
5. Find the year with the highest revenue generation.

In [None]:
# Task 1 : Determine the best-selling product sub-category. 
# i.e. product sub-category which has the highest sales volume (most Quantity)
df_subcat_sums = df.groupBy('Sub-Category').sum('Quantity')
df_subcat_sums.sort(col('sum(Quantity)'), ascending=False).show(1)

+------------+-------------+
|Sub-Category|sum(Quantity)|
+------------+-------------+
|     Binders|         5974|
+------------+-------------+
only showing top 1 row


In [28]:
# Task 2: Identify the product category generating the highest revenue.
# i.e. product category with highest sales
df.groupBy('Category').agg(F.round(F.sum('Sales'), 2).alias('CategorySales')).orderBy('CategorySales', ascending=False).show(1)

+----------+-------------+
|  Category|CategorySales|
+----------+-------------+
|Technology|    836154.03|
+----------+-------------+
only showing top 1 row


In [26]:
# Compile a top 10 list of the most valuable customers.
# i.e. customers whos purchases amount the most
df.groupBy('Customer ID').agg(F.round(F.sum(df.Sales * df.Quantity), 2).alias('TotalSales')).orderBy('TotalSales', ascending=False).show(10)

+-----------+----------+
|Customer ID|TotalSales|
+-----------+----------+
|   SM-20320| 146749.77|
|   AB-10105| 143857.71|
|   TC-20980|  92602.57|
|   BM-11140|  88175.27|
|   JH-15985|  80520.04|
|   KL-16645|  74912.31|
|   GT-14710|  73016.05|
|   ME-17320|   71594.3|
|   BS-11365|  71586.33|
|   PO-18850|  69297.41|
+-----------+----------+
only showing top 10 rows


In [31]:
# Determine the state responsible for the highest number of orders.
df.groupBy('State').count().orderBy('count', ascending=False).show(1)

+----------+-----+
|     State|count|
+----------+-----+
|California| 2001|
+----------+-----+
only showing top 1 row


In [43]:
# Find the year with the highest revenue generation.
# Date format : 8/27/2014
from datetime import datetime
def str_to_date(s):
    return datetime.strptime(s, "%m/%d/%Y").year
str2date = udf(lambda x:str_to_date(x))
df.withColumn("ShipYear", str2date(col("Ship Date"))).groupBy('ShipYear').agg(F.sum('Profit').alias('YearlyProfit')).orderBy('YearlyProfit', ascending=False).show(1)

+--------+-----------------+
|ShipYear|     YearlyProfit|
+--------+-----------------+
|    2017|92346.87519999997|
+--------+-----------------+
only showing top 1 row
