In [1]:
import pyspark
from pyspark.sql import SparkSession

import pyspark.sql.functions as func
from pyspark.sql.functions import sum

In [2]:
spark = SparkSession.builder.master("local[*]").appName("kayamuiSWorking").getOrCreate()

In [3]:
spark

In [4]:
# read our data - lives in a csv file

df = spark.read.format("csv").option("header","true").load("C:/Users/mkaya/OneDrive/Masaüstü/exercises_develhope/Sample - EU Superstore.csv")

In [5]:
df.show(5)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|       0|  39.6|
|     2|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-1

In [7]:
df.columns

['Row ID',
 'Order ID',
 'Order Date',
 'Ship Date',
 'Ship Mode',
 'Customer ID',
 'Customer Name',
 'Segment',
 'City',
 'State',
 'Country',
 'Region',
 'Product ID',
 'Category',
 'Sub-Category',
 'Product Name',
 'Sales',
 'Quantity',
 'Discount',
 'Profit']

In [8]:
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [9]:
# how many rows of the EU Superstore dataset have the country being France
df.select("Country").filter(df["Country"].isin("France")).count()

2827

In [11]:
# of those, how many are profitable?

df.filter((df["Country"].isin("France")) & (df["Profit"] > 0.0)).count()

2329

In [19]:
# how any different discount brackets exist? what are they?
df.select("Discount").distinct().count()

14

In [20]:
df.select("Discount").distinct().show()

+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|     0.5|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



In [47]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
profit_by_discount = df.groupby("Discount").agg({"Profit":"sum"})

profit_by_discount = profit_by_discount.withColumn("sum(Profit)", func.round(profit_by_discount["sum(Profit)"], 2)).\
orderBy("sum(Profit)", ascending=False).withColumnRenamed("sum(Profit)", "totalProfit")

profit_by_discount.show()

+--------+-----------+
|Discount|totalProfit|
+--------+-----------+
|       0|  383806.53|
|     0.1|  126884.03|
|    0.15|   24677.56|
|     0.2|    2189.55|
|     0.8|    -460.28|
|     0.3|    -758.42|
|    0.45|   -1103.19|
|    0.85|   -3068.66|
|     0.7|   -5496.77|
|    0.65|   -6221.97|
|    0.35|   -9122.65|
|     0.6|  -20517.46|
|     0.4|  -21346.43|
|     0.5|  -96632.12|
+--------+-----------+



In [56]:
# what is the value after which we should stop offering discount?

profit_by_discount.filter(profit_by_discount["totalProfit"] > 0).orderBy("totalProfit", ascending=True).select("Discount").collect()[0][0]

'0.2'

In [197]:
# who are the top 5 most profitable customers

def get_top_names():
    top_5 = df.groupBy("Customer ID", "Customer Name").agg({"Profit":"sum"})

    top_5 = top_5.withColumn("Profit", func.round(top_5["sum(Profit)"], 2)).\
    drop("sum(Profit)").\
    orderBy("Profit", ascending=False).\
    drop("Profit").limit(5)

    names = []
    for i in top_5.collect():
        names.append(i[1])
    return names
get_top_names()

['Susan Pistek',
 'Patrick Jones',
 "Patrick O'Donnell",
 'Ellis Ballard',
 'Mike Gockenbach']

In [198]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?

df.filter(df["Customer Name"].isin(get_top_names())).count()

76

In [23]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)
df = df.withColumn("original", func.round(df["Sales"] / (1-df["Discount"]), 2))

In [28]:
df.createOrReplaceTempView("df")

In [40]:
spark.sql("SELECT Sales, Original, Discount FROM df WHERE Discount > 0.0 ORDER BY Original desc").show()

+---------+--------+--------+
|    Sales|Original|Discount|
+---------+--------+--------+
|  3399.66| 6799.32|     0.5|
| 5729.346| 6365.94|     0.1|
|  5726.16|  6362.4|     0.1|
| 5276.988| 5863.32|     0.1|
|  2875.77| 5751.54|     0.5|
| 4876.875|  5737.5|    0.15|
| 2830.425| 5660.85|     0.5|
| 4748.436| 5276.04|     0.1|
| 4448.832| 5233.92|    0.15|
| 1824.144| 5211.84|    0.65|
| 2570.865| 5141.73|     0.5|
| 4627.368| 5141.52|     0.1|
| 4618.215| 5131.35|     0.1|
|   4544.1|  5049.0|     0.1|
| 1950.372| 4875.93|     0.6|
| 3062.631| 4711.74|    0.35|
| 1363.905| 4546.35|     0.7|
|   2683.8|  4473.0|     0.4|
| 2671.452| 4452.42|     0.4|
|2794.2525| 4298.85|    0.35|
+---------+--------+--------+
only showing top 20 rows



In [47]:
# calculate the difference between sales and discount value

df = df.withColumn("Discounted Value", func.round(df["Sales"] - (df["Sales"] * df["Discount"]), 2))

df.\
select("Sales", "Discount", "Discounted Value").\
filter(df["Discount"] > 0.0).\
orderBy("Sales", ascending=False).\
show()

+--------+--------+----------------+
|   Sales|Discount|Discounted Value|
+--------+--------+----------------+
|  999.09|    0.15|          849.23|
|  993.66|     0.5|          496.83|
| 993.615|     0.5|          496.81|
| 993.531|    0.15|           844.5|
|992.9955|    0.15|          844.05|
| 990.468|     0.1|          891.42|
|  99.792|     0.1|           89.81|
|  99.792|     0.4|           59.88|
|  99.765|     0.5|           49.88|
|   99.54|     0.5|           49.77|
|  99.252|     0.1|           89.33|
|  99.252|     0.1|           89.33|
|  99.099|     0.3|           69.37|
|  989.04|     0.5|          494.52|
|  989.04|     0.5|          494.52|
| 987.957|     0.1|          889.16|
| 986.904|     0.6|          394.76|
| 985.824|     0.6|          394.33|
| 985.284|     0.1|          886.76|
|  983.88|     0.1|          885.49|
+--------+--------+----------------+
only showing top 20 rows



In [111]:
# how much money did we not gain due to the discounts - per discount bracket?

df = df.withColumn("Money Lost", func.round(df["Sales"] - df["Discounted Value"],2))

df.\
select("Discount", "Money Lost").\
filter(df["Discount"] > 0.0).\
groupBy("Discount").sum().\
withColumn("Money Lost", func.round("sum(Money Lost)")).\
drop("sum(Money Lost)").\
orderBy(func.round("Money Lost",2), ascending=False).\
show()

+--------+----------+
|Discount|Money Lost|
+--------+----------+
|     0.5|   91866.0|
|     0.1|   76241.0|
|    0.15|   38448.0|
|     0.4|   28035.0|
|    0.35|   18956.0|
|     0.6|   15858.0|
|     0.2|    8522.0|
|    0.65|    4277.0|
|     0.7|    2560.0|
|     0.3|    1841.0|
|    0.45|    1146.0|
|    0.85|     677.0|
|     0.8|     127.0|
+--------+----------+



In [145]:
# find the discount bracket which made us not gain the most (dynamically)

df = df.withColumn("Profit", df["Profit"].cast("float"))

less_profitable_discount = df.groupBy("Discount").sum().orderBy("sum(Profit)").\
select("Discount").\
take(1)[0][0]

less_profitable_discount

'0.5'

In [157]:
# what would have been the total profit if we removed all orders from that discount group? 

profit_without_less = df.filter(df["Discount"] != less_profitable_discount).select(sum(df["Profit"])).collect()[0][0]

profit_without_less

469461.85667362204

In [167]:
#how much more (or less) profit is that?

def is_more():
    if profit_without_less > df.select(sum(df["Profit"])).collect()[0][0]:
        print("Total Profit without less profitable discount is more : ")
        return profit_without_less - df.select(sum(df["Profit"])).collect()[0][0]
    else:
        print("Total Profit with less profitable discount is more : ")
        return df.select(sum(df["Profit"])).collect()[0][0] - profit_without_less
is_more()

Total Profit without less profitable discount is more : 


96632.11525454745

In [6]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView("df")

In [173]:
# use an SQL query to count the number of rows
spark.sql("SELECT COUNT(df.Sales) FROM df").show()

+------------+
|count(Sales)|
+------------+
|       10000|
+------------+



In [175]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql("SELECT df.country, sum(df.profit) / sum(df.sales) as ratio FROM df GROUP BY country ORDER BY ratio desc").show()

+--------------+--------------------+
|       country|               ratio|
+--------------+--------------------+
|   Switzerland| 0.29092012306058596|
|       Austria|   0.264190876869252|
|        Norway| 0.25177475561207147|
|       Belgium| 0.23508766512254156|
|United Kingdom| 0.21170103566633977|
|         Spain| 0.18941580588813087|
|       Finland| 0.18864296597998498|
|       Germany|  0.1706679208396213|
|        France| 0.12693568220672438|
|         Italy| 0.06844355267097506|
|       Ireland| -0.4442667763849829|
|       Denmark|-0.49571900283557735|
|   Netherlands| -0.5298342813704527|
|        Sweden| -0.5745674291524092|
|      Portugal| -0.5761662303775278|
+--------------+--------------------+



In [11]:
# is the country with the largest profit ratio, the country with the largest profit?

highest_ratio = spark.sql("SELECT country, sum(profit) / sum(sales) as ratio FROM df GROUP BY country ORDER BY ratio desc").\
take(1)[0][0]

highest_profit = spark.sql("SELECT country, sum(df.Profit) as profit FROM df GROUP BY country ORDER BY profit desc").\
take(1)[0][0]

print(f"The country has highest ratio is {highest_ratio} and largest profit is {highest_profit}")

highest_ratio == highest_profit

The country has highest ratio is Switzerland and largest profit is United Kingdom


False