# DEVELHOPE PYSPARK ASSIGNMENT
## EU SUPERSTORE DATASET ANALYSIS IN PYSPARK

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [3]:
spark

In [4]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("C:/Users/zakria/Desktop/Data_PySpark_BigData_Exercises/Sample - EU Superstore.csv")

In [5]:
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [6]:
# how many rows of the EU Superstore dataset have the country being France
df.filter("Country = 'France'").count()

2827

In [7]:
# of those, how many are profitable?
df.filter((df.Country == 'France') & (df.Profit > 0)).count()


2277

In [8]:
# how any different discount brackets exist? what are they?
df.select('Discount').distinct().show() # shwoing discount brackets
df.select('Discount').distinct().count() # Counting all distinct discount brackets

+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|     0.5|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



14

In [9]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
df.groupBy('Discount').agg({'Profit':'sum'}).orderBy("sum(Profit)",ascending=False).show()

+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|       0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.8|           -460.284|
|     0.3| -758.4209999999999|
|    0.45|         -1103.1915|
|    0.85|          -3068.658|
|     0.7|          -5496.765|
|    0.65| -6221.965499999999|
|    0.35|          -9122.649|
|     0.6|-20517.456000000002|
|     0.4|-21346.427999999996|
|     0.5|         -96632.115|
+--------+-------------------+



In [10]:
# what is the value after which we should stop offering discount?
profit_discount=df.groupBy('Discount').agg({'Profit':'sum'}).orderBy("sum(Profit)",ascending=False)
profit_discount.filter(f.col("sum(Profit)").cast("float") >= 0).show()

+--------+------------------+
|Discount|       sum(Profit)|
+--------+------------------+
|       0|383806.53000000026|
|     0.1| 126884.0309999999|
|    0.15|24677.563499999975|
|     0.2|2189.5499999999984|
+--------+------------------+



In [11]:
# who are the top 5 most profitable customers
top_cus=df.groupBy('Customer Name').agg({"Profit":'sum'}).orderBy("sum(Profit)",ascending=False).\
    withColumnRenamed("sum(Profit)","Total_Profit").withColumn("Total_Profit",f.round("Total_Profit",2))
top_cus.show(5)


+-----------------+------------+
|    Customer Name|Total_Profit|
+-----------------+------------+
|     Susan Pistek|     4974.51|
|    Patrick Jones|      3986.0|
|Patrick O'Donnell|      3778.2|
|    Ellis Ballard|     3459.66|
|  Mike Gockenbach|     3144.44|
+-----------------+------------+
only showing top 5 rows



In [12]:
top_cus_collected=top_cus.collect()[0:5]
top_5_customers=[row[0] for row in top_cus_collected]
top_5_customers

['Susan Pistek',
 'Patrick Jones',
 "Patrick O'Donnell",
 'Ellis Ballard',
 'Mike Gockenbach']

In [13]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
top_cus_collected=top_cus.collect()[0:5]
top_5_customers=[row[0] for row in top_cus_collected]
df.filter(df['Customer Name'].isin(top_5_customers)).count()


76

In [15]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)
sales_wo_discount = df.withColumn("SaleWithoutDiscount", col("Sales") / (1 - col("Discount")))

In [16]:
sales_wo_discount.select("Sales","Discount","SaleWithoutDiscount").show()

+--------+--------+-------------------+
|   Sales|Discount|SaleWithoutDiscount|
+--------+--------+-------------------+
|    79.2|       0|               79.2|
|  388.92|       0|             388.92|
|   35.19|       0|              35.19|
|   50.94|       0|              50.94|
|  307.44|       0|             307.44|
|   122.4|       0|              122.4|
|  413.82|       0|             413.82|
|  428.22|       0|             428.22|
| 3979.29|       0|            3979.29|
|   43.56|       0|              43.56|
|   25.26|       0|              25.26|
|2443.905|     0.1| 2715.4500000000003|
|   12.21|       0|              12.21|
|2167.296|    0.15| 2549.7599999999998|
| 138.105|     0.1|             153.45|
| 128.385|     0.1| 142.64999999999998|
|  690.12|       0|             690.12|
|    8.16|       0|               8.16|
|  347.88|       0|             347.88|
| 575.505|     0.1|  639.4499999999999|
+--------+--------+-------------------+
only showing top 20 rows



In [19]:
# calculate the difference between sales and discount value
from pyspark.sql.functions import col,round
df = df.withColumn("Difference", round(col("Sales") - (col("Sales")-col("Discount")*col("Sales")),2))

In [20]:
df.select("Sales","Discount","Difference").show()

+--------+--------+----------+
|   Sales|Discount|Difference|
+--------+--------+----------+
|    79.2|       0|       0.0|
|  388.92|       0|       0.0|
|   35.19|       0|       0.0|
|   50.94|       0|       0.0|
|  307.44|       0|       0.0|
|   122.4|       0|       0.0|
|  413.82|       0|       0.0|
|  428.22|       0|       0.0|
| 3979.29|       0|       0.0|
|   43.56|       0|       0.0|
|   25.26|       0|       0.0|
|2443.905|     0.1|    244.39|
|   12.21|       0|       0.0|
|2167.296|    0.15|    325.09|
| 138.105|     0.1|     13.81|
| 128.385|     0.1|     12.84|
|  690.12|       0|       0.0|
|    8.16|       0|       0.0|
|  347.88|       0|       0.0|
| 575.505|     0.1|     57.55|
+--------+--------+----------+
only showing top 20 rows



In [24]:
# how much money did we not gain due to the discounts - per discount bracket?
from pyspark.sql.functions import sum

discounted_sales = df.withColumn("DiscountedSales", col("Sales") * col("Discount"))
total_discounted_sales = discounted_sales.groupBy("Discount").agg(sum("DiscountedSales").alias("TotalDiscountedSales"))
total_sales = df.agg(sum("Sales").alias("TotalSales"))
missed_sales = total_sales.first()[0] - total_discounted_sales.groupBy().sum("TotalDiscountedSales").first()[0]

print(f"Total missed sales due to discounts:", missed_sales)
total_discounted_sales.show()


Total missed sales due to discounts: 2649532.9683750146
+--------+--------------------+
|Discount|TotalDiscountedSales|
+--------+--------------------+
|     0.3|  1841.1687000000002|
|     0.7|  2560.2254999999996|
|       0|                 0.0|
|     0.2|   8522.496000000001|
|    0.15|   38448.20002499998|
|    0.35|  18956.014349999998|
|     0.8|            127.1328|
|    0.45|  1145.8928250000001|
|     0.5|   91867.13250000023|
|    0.65|         4276.879425|
|     0.6|  15857.618399999994|
|     0.1|   76241.20410000002|
|    0.85|            677.3157|
|     0.4|  28034.812799999996|
+--------+--------------------+



In [30]:
# find the discount bracket which made us not gain the most (dynamically)
total_discounted_sales.orderBy("TotalDiscountedSales",ascending=False).show(1)


+--------+--------------------+
|Discount|TotalDiscountedSales|
+--------+--------------------+
|     0.5|   91867.13250000023|
+--------+--------------------+
only showing top 1 row



In [36]:
# what would have been the total profit if we removed all orders from that discount group? 
from pyspark.sql.functions import when
total_profit_wo_dis_group = df.filter(col("Discount") != 0.5) \
                 .agg(sum(col("Sales")).alias("TotalProfit")) \
                 .first()["TotalProfit"]

In [37]:
total_profit_wo_dis_group

2754354.796500009

In [40]:
#how much more (or less) profit is that?
total_profit = df.agg(sum("Profit")).collect()[0][0]
total_profit

372829.7415000005

In [22]:
df.createOrReplaceTempView("superstore")
result_sql_table = spark.sql("SELECT * FROM superstore LIMIT 10")
result_sql_table.show()

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-------------+-------+--------------+------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+----------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment|         City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name|  Sales|Quantity|Discount| Profit|Difference|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-------------+-------+--------------+------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+----------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|        Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|   79.2|       3|       0|   3

In [23]:
# use an SQL query to count the number of rows
spark.sql("SELECT COUNT(*) FROM superstore").show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [27]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
profit_ratio_per_country = spark.sql("""
    SELECT country, round(SUM(Profit) / SUM(Sales),2) AS profit_ratio
    FROM superstore
    GROUP BY country
""")
profit_ratio_per_country.show()

+--------------+------------+
|       country|profit_ratio|
+--------------+------------+
|        Sweden|       -0.57|
|       Germany|        0.17|
|        France|        0.13|
|       Belgium|        0.24|
|       Finland|        0.19|
|         Italy|        0.07|
|        Norway|        0.25|
|         Spain|        0.19|
|       Denmark|        -0.5|
|       Ireland|       -0.44|
|   Switzerland|        0.29|
|      Portugal|       -0.58|
|       Austria|        0.26|
|United Kingdom|        0.21|
|   Netherlands|       -0.53|
+--------------+------------+



In [33]:
# is the country with the largest profit ratio, the country with the largest profit?
profit_ratio_per_country.orderBy("profit_ratio",ascending=False).show(1)

+-----------+------------+
|    country|profit_ratio|
+-----------+------------+
|Switzerland|        0.29|
+-----------+------------+
only showing top 1 row



In [31]:
largest_profit_ratio=spark.sql("""
    SELECT country, SUM(profit) AS total_profit
    FROM superstore
    GROUP BY country
    ORDER BY total_profit DESC
    LIMIT 1
""")
largest_profit_ratio.show()

+--------------+------------------+
|       country|      total_profit|
+--------------+------------------+
|United Kingdom|111900.15000000001|
+--------------+------------------+



# Sweden has the largest profit ratio while United Kingdom have the largest total profit earned