In [46]:
import pyspark

In [47]:
from pyspark.sql import functions as Func
from pyspark.sql.functions import when
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [48]:
spark=SparkSession.builder.appName("EU_Superstore").getOrCreate()

In [49]:
spark

In [50]:
df=spark.read.option("header","true").csv("EU_Superstore.csv")

In [51]:
df.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United Kingdom|  North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes 

In [52]:
df.columns

['Row ID',
 'Order ID',
 'Order Date',
 'Ship Date',
 'Ship Mode',
 'Customer ID',
 'Customer Name',
 'Segment',
 'City',
 'State',
 'Country',
 'Region',
 'Product ID',
 'Category',
 'Sub-Category',
 'Product Name',
 'Sales',
 'Quantity',
 'Discount',
 'Profit']

In [53]:
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [54]:
df=df.withColumn("Row ID",df['Row ID'].cast('int'))

In [55]:
df=df.withColumn("Sales",df['Sales'].cast('float'))

In [56]:
df=df.withColumn("Quantity",df['Quantity'].cast('int'))

In [57]:
df=df.withColumn("Profit",df['Profit'].cast('float'))

In [58]:
df=df.withColumn("Discount",df['Discount'].cast('float'))

In [59]:
df.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United Kingdom|  North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes 

# how many rows of the EU Superstore dataset have the country being France

In [60]:
France_df=df.filter(df['Country']=='France')
France_total=France_df.count()

# of those, how many are profitable?

In [61]:
profitable_count_france=France_df.filter(France_df['Profit']>0).count()
profitable_count_france

2329

In [63]:
print("among all orders from France",int(profitable_count_france/France_total*100),"% are profitable")

among all orders from France 82 % are profitable


# how many different discount brackets exist? what are they?

In [64]:
France_df.select(France_df['Discount']).distinct().show()

+--------+
|Discount|
+--------+
|    0.15|
|     0.1|
|    0.35|
|     0.6|
|     0.5|
|    0.65|
|     0.0|
+--------+



In [65]:
df.groupBy('Discount').sum('sales').sort('Discount').show()

+--------+------------------+
|Discount|        sum(sales)|
+--------+------------------+
|     0.0| 1522455.807906866|
|     0.1|  762412.041636467|
|    0.15|256321.33266067505|
|     0.2|42612.479701042175|
|     0.3| 6137.228960037231|
|    0.35|54160.040939331055|
|     0.4|  70087.0320777893|
|    0.45|2546.4285278320312|
|     0.5|183734.26430511475|
|     0.6| 26429.36386156082|
|    0.65| 6579.814552307129|
|     0.7|3657.4650268554688|
|     0.8| 158.9159984588623|
|    0.85| 796.8419952392578|
+--------+------------------+



In [66]:
df.groupby('Discount').count().sort('Discount').show()

+--------+-----+
|Discount|count|
+--------+-----+
|     0.0| 6134|
|     0.1| 1737|
|    0.15|  407|
|     0.2|  125|
|     0.3|   51|
|    0.35|   45|
|     0.4|  274|
|    0.45|    2|
|     0.5| 1080|
|     0.6|  116|
|    0.65|   17|
|     0.7|    6|
|     0.8|    4|
|    0.85|    2|
+--------+-----+



In [67]:
df =df.withColumn('Cost',df['Sales']-df['Profit'])
df = df.withColumn('Profitable_Flag',
                  when(df['Profit']>0,"Profitable").
                  when(df['Profit']<0,"Not Profitable").
                  when(df['Profit']==0,"zero").
                  otherwise('error'))

# let's see the total profit by discount bracket, make sure they are ordered by 

In [68]:
pivot= df.groupby('Discount').sum('Profit').sort('Discount')

In [69]:
df.groupby(['Discount','Profitable_Flag']).count().sort('Discount').show()

+--------+---------------+-----+
|Discount|Profitable_Flag|count|
+--------+---------------+-----+
|     0.0|     Profitable| 5989|
|     0.0|           zero|  145|
|     0.1|     Profitable| 1329|
|     0.1| Not Profitable|  407|
|     0.1|           zero|    1|
|    0.15| Not Profitable|  159|
|    0.15|     Profitable|  248|
|     0.2|     Profitable|   76|
|     0.2| Not Profitable|   49|
|     0.3| Not Profitable|   33|
|     0.3|     Profitable|   18|
|    0.35|     Profitable|   11|
|    0.35| Not Profitable|   34|
|     0.4| Not Profitable|  221|
|     0.4|     Profitable|   53|
|    0.45| Not Profitable|    2|
|     0.5|           zero|   12|
|     0.5| Not Profitable| 1068|
|     0.6| Not Profitable|  116|
|    0.65| Not Profitable|   17|
+--------+---------------+-----+
only showing top 20 rows



In [70]:
df.groupby(['Discount','Profitable_Flag']).count().sort('Discount').show()

+--------+---------------+-----+
|Discount|Profitable_Flag|count|
+--------+---------------+-----+
|     0.0|           zero|  145|
|     0.0|     Profitable| 5989|
|     0.1|     Profitable| 1329|
|     0.1|           zero|    1|
|     0.1| Not Profitable|  407|
|    0.15| Not Profitable|  159|
|    0.15|     Profitable|  248|
|     0.2| Not Profitable|   49|
|     0.2|     Profitable|   76|
|     0.3|     Profitable|   18|
|     0.3| Not Profitable|   33|
|    0.35|     Profitable|   11|
|    0.35| Not Profitable|   34|
|     0.4|     Profitable|   53|
|     0.4| Not Profitable|  221|
|    0.45| Not Profitable|    2|
|     0.5| Not Profitable| 1068|
|     0.5|           zero|   12|
|     0.6| Not Profitable|  116|
|    0.65| Not Profitable|   17|
+--------+---------------+-----+
only showing top 20 rows



In [71]:
profit_and_discount=df.groupby(['Discount','Profitable_Flag']).sum('Profit').sort('Discount')
res=profit_and_discount.groupby('Profitable_Flag').sum('sum(Profit)')
res.show()

+---------------+------------------+
|Profitable_Flag|  sum(sum(Profit))|
+---------------+------------------+
| Not Profitable|-186188.9957401813|
|           zero|               0.0|
|     Profitable| 559018.7371592559|
+---------------+------------------+



In [72]:
profit_and_discount.show()

+--------+---------------+-------------------+
|Discount|Profitable_Flag|        sum(Profit)|
+--------+---------------+-------------------+
|     0.0|     Profitable|  383806.5302368067|
|     0.0|           zero|                0.0|
|     0.1|     Profitable| 135214.93478811532|
|     0.1|           zero|                0.0|
|     0.1| Not Profitable| -8330.904032681603|
|    0.15| Not Profitable| -7195.565995974466|
|    0.15|     Profitable|  31873.12961292267|
|     0.2|     Profitable|  4808.184008717537|
|     0.2| Not Profitable| -2618.634010396898|
|     0.3| Not Profitable| -943.4909943044186|
|     0.3|     Profitable|  185.0699982047081|
|    0.35| Not Profitable|-11128.135562896729|
|    0.35|     Profitable| 2005.4865083694458|
|     0.4| Not Profitable|-22471.829946905375|
|     0.4|     Profitable| 1125.4020061194897|
|    0.45| Not Profitable|-1103.1914978027344|
|     0.5| Not Profitable| -96632.11525454745|
|     0.5|           zero|                0.0|
|     0.6| No

# what is the value after which we should stop offering discount?

In [73]:
without_discount=profit_and_discount.filter(profit_and_discount['Discount']==0)
with_discount=profit_and_discount.filter(profit_and_discount['Discount']!=0).groupBy('Profitable_Flag').sum()

without_discount.show()

+--------+---------------+-----------------+
|Discount|Profitable_Flag|      sum(Profit)|
+--------+---------------+-----------------+
|     0.0|     Profitable|383806.5302368067|
|     0.0|           zero|              0.0|
+--------+---------------+-----------------+



In [74]:
with_discount.show()

+---------------+------------------+------------------+
|Profitable_Flag|     sum(Discount)|  sum(sum(Profit))|
+---------------+------------------+------------------+
| Not Profitable| 6.050000034272671|-186188.9957401813|
|           zero|0.6000000014901161|               0.0|
|     Profitable|1.5000000223517418|175212.20692244917|
+---------------+------------------+------------------+



In [75]:
discount_profit=df.groupby(['Discount']).sum('Profit').sort('Discount',ascending=False)
profitable_discount=discount_profit.filter(discount_profit['sum(Profit)']>0)
profitable_max_discount=profitable_discount.select('Discount').collect()
print("profitable maximum discount rate is: ",profitable_max_discount[0][0]*100,"percent")

profitable maximum discount rate is:  20.000000298023224 percent


# who are the top 5 most profitable customers

In [76]:
customer_profit=df.select(['Customer ID', 'Customer Name','Profit','Sales','Discount','Segment'])
customer_profit=customer_profit.groupBy(['Customer ID','Customer Name','Segment']).sum('Profit').orderBy('sum(Profit)',ascending=False)
customer_profit.show(5)

+-----------+-----------------+---------+------------------+
|Customer ID|    Customer Name|  Segment|       sum(Profit)|
+-----------+-----------------+---------+------------------+
|   SP-20920|     Susan Pistek| Consumer| 4974.513064682484|
|   PJ-18835|    Patrick Jones|Corporate|3986.0040826797485|
|   PO-18865|Patrick O'Donnell| Consumer| 3778.197093129158|
|   EB-13840|    Ellis Ballard|Corporate| 3459.660038240254|
|   MG-18145|  Mike Gockenbach| Consumer|3144.4440593719482|
+-----------+-----------------+---------+------------------+
only showing top 5 rows



In [86]:
top10_list=customer_profit.collect()

for i in range(5):
    print(f"the number {i+1}",top10_list[i][1])

the number 1 Susan Pistek
the number 2 Patrick Jones
the number 3 Patrick O'Donnell
the number 4 Ellis Ballard
the number 5 Mike Gockenbach


# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?

In [87]:
top_5cust=[]

for i in range(5):
    top_5cust.append(top10_list[i][1])

In [88]:
top5_df=df.withColumn("is top 5", when(df['Customer Name'].isin(top_5cust),True).otherwise(False))

In [89]:
top5_df=top5_df.filter(top5_df["is top 5"]=="True")
top5_df.count()

76

# create a new column which is the value of the sale were there not discount applied

In [90]:
df=df.withColumn("Org_Price",(df.Sales)/(1-df.Discount))

# calculate the difference between sales and discount value

In [91]:
df=df.withColumn("Discount_amount",df.Org_Price-df.Sales)

# how much money did we not gain due to the discounts - per discount bracket?

one way

In [92]:
df.filter(df['Profit']<0).groupby(['Discount']).sum('Profit').sort('Discount').show()


+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|     0.1| -8330.904032681603|
|    0.15| -7195.565995974466|
|     0.2| -2618.634010396898|
|     0.3| -943.4909943044186|
|    0.35|-11128.135562896729|
|     0.4|-22471.829946905375|
|    0.45|-1103.1914978027344|
|     0.5| -96632.11525454745|
|     0.6| -20517.45601272583|
|    0.65| -6221.965492248535|
|     0.7| -5496.764984130859|
|     0.8|-460.28399658203125|
|    0.85| -3068.657958984375|
+--------+-------------------+



other way

In [94]:
discounted_total_table=df.groupBy('Discount').pivot('Segment').sum('Discount_amount')
discounted_total_table=discounted_total_table.withColumn("Total",discounted_total_table['Consumer']+discounted_total_table['Corporate']+discounted_total_table['Home Office']).orderBy('Total',ascending=False)
discounted_total_table.show()

+--------+------------------+------------------+------------------+------------------+
|Discount|          Consumer|         Corporate|       Home Office|             Total|
+--------+------------------+------------------+------------------+------------------+
|     0.5|106548.71956014633|  49934.7448720932|27250.799872875214|183734.26430511475|
|     0.1| 43699.39762236991|26694.730102942744|14318.343786541722| 84712.47151185437|
|     0.4| 21287.27796990879|19281.790047467635| 6155.615392802582| 46724.68341017901|
|    0.15|21885.300325112476|14535.726252363505| 8812.141316031895| 45233.16789350788|
|     0.6|17485.309708010063|15545.809386591982| 6612.930636021532| 39644.04973062358|
|    0.35|14897.349054331104|10405.039073088758| 3860.713896186916|29163.102023606778|
|    0.65| 7633.605819701509|2023.1638090569754| 2562.884687771139|12219.654316529623|
|     0.2| 7212.185403869611|1962.7978490890587|1478.1358785825992|10653.119131541269|
|     0.7| 4147.961759978145|1203.677930532

# find the discount bracket which made us not gain the most (dynamically)

In [96]:
loss_discount=df.filter(df['Profit']<0).groupby(['Discount']).sum('Profit').sort('sum(Profit)')
most_loss_discount=loss_discount.collect()
print("the discount bracket which resulted the most loss is: ", most_loss_discount[0][0]*100, "percent discount")

the discount bracket which resulted the most loss is:  50.0 percent discount


# what would have been the total profit if we removed all orders from that discount group? 

In [97]:
with_50=df.groupBy('Segment').sum('Profit').sort('sum(Profit)',ascending=False).collect()

In [98]:
without_50=df.filter(df['Discount']!=0.5).groupBy('Segment').sum('Profit').sort('sum(Profit)',ascending=False).collect()

In [99]:
total_without_50=0
for i in range(len(without_50)):
    total_without_50+=without_50[i][1]

In [101]:
int(total_without_50)

469461

# how much more (or less) profit is that?

In [102]:
total_with_50=0
for i in range(len(with_50)):
    total_with_50+=with_50[i][1]

In [104]:
print("we gain",int(total_without_50-total_with_50),"euros less in", most_loss_discount[0][0]*100,"percent discount bracket")

we gain 96632 euros less in 50.0 percent discount bracket


# create a temporary table for our superstore table in sql

In [105]:
df.createOrReplaceTempView('df')

# use an SQL query to count the number of rows

In [108]:
spark.sql("SELECT COUNT(*) from df").show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [109]:
df.count()

10000

# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)

### with SQL

In [187]:
profit_ratio_table=spark.sql("SELECT SUM(Profit)/SUM(Sales) AS Profit_Ratio, Country as C from df as df GROUP BY Country ORDER BY Profit_Ratio DESC ")
profit_ratio_list=profit_ratio_table.collect()
country_with_highest_profit_ratio=profit_ratio_list[0][1]
country_with_highest_profit_ratio

'Switzerland'

### with pyspark

In [174]:
df3=df.groupBy('Country').agg({'Profit':'sum','Sales':'sum'})
df3=df3.withColumn('profit_ratio',df3['sum(Profit)']/df3['sum(Sales)']).orderBy('profit_ratio',ascending=False)

In [150]:
highest_profit_ratio=df3.orderBy('profit_ratio',ascending=False).show(1)


+-----------+-----------------+------------------+-------------------+
|    Country|       sum(Sales)|       sum(Profit)|       profit_ratio|
+-----------+-----------------+------------------+-------------------+
|Switzerland|24877.85997390747|7237.4700926840305|0.29092012336571044|
+-----------+-----------------+------------------+-------------------+
only showing top 1 row



# is the country with the largest profit ratio, the country with the largest profit?

In [175]:
most_profitted_country=df3.orderBy('sum(Profit)',ascending=False).show(1)

+--------------+-----------------+------------------+-------------------+
|       Country|       sum(Sales)|       sum(Profit)|       profit_ratio|
+--------------+-----------------+------------------+-------------------+
|United Kingdom|528576.2984557152|111900.15013868175|0.21170103628484374|
+--------------+-----------------+------------------+-------------------+
only showing top 1 row



In [184]:
TOTAL_PROFIT_TABLE=spark.sql("SELECT SUM(Profit) AS Total_profit, Country as C from df as df GROUP BY Country ORDER BY Total_profit DESC ")
TOTAL_PROFIT_list=TOTAL_PROFIT_TABLE.collect()
the_country_with_largest_profit=TOTAL_PROFIT_list[0][1]
the_country_with_largest_profit

'United Kingdom'

In [191]:
result=country_with_highest_profit_ratio == the_country_with_largest_profit

In [195]:
print(f"the most profitable country is: {the_country_with_largest_profit}, and the country with the highest profit ratio is {country_with_highest_profit_ratio}.\n are they same? \n {result}")

the most profitable country is: United Kingdom, and the country with the highest profit ratio is Switzerland.
 are they same? 
 False
