In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# import pyspark.sql.functions

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [3]:
spark

In [59]:
# read our data - lives in a csv file
path='./Sample_EU_Superstore.csv'
df=spark.read.csv(path,header=True, inferSchema=True)

In [None]:
df.columns

In [None]:
df.show()

In [None]:
df.printSchema()

In [5]:
# how many rows of the EU Superstore dataset have the country being France
df.filter('Country =="France"').count()

2827

In [6]:
# of those, how many are profitable?
df.where((F.col('Profit') > 0) & (F.col('Country') == "France")).count()


2329

In [7]:
# how any different discount brackets exist? what are they?
df.select('Discount').distinct().show()

+--------+
|Discount|
+--------+
|     0.0|
|     0.2|
|     0.7|
|     0.1|
|    0.45|
|     0.6|
|     0.8|
|    0.35|
|     0.5|
|     0.4|
|    0.85|
|    0.15|
|    0.65|
|     0.3|
+--------+



In [12]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
df.groupBy('Discount').agg(F.round(F.sum('Profit'),2).alias('Total Profit by Discount Bracket')).sort('Discount').show()

+--------+--------------------------------+
|Discount|Total Profit by Discount Bracket|
+--------+--------------------------------+
|     0.0|                       383806.53|
|     0.1|                       126884.03|
|    0.15|                        24677.56|
|     0.2|                         2189.55|
|     0.3|                         -758.42|
|    0.35|                        -9122.65|
|     0.4|                       -21346.43|
|    0.45|                        -1103.19|
|     0.5|                       -96632.12|
|     0.6|                       -20517.46|
|    0.65|                        -6221.97|
|     0.7|                        -5496.77|
|     0.8|                         -460.28|
|    0.85|                        -3068.66|
+--------+--------------------------------+



In [18]:
# what is the value after which we should stop offering discount?
df.groupBy('Discount').agg(F.sum('Profit').alias('Total Profit by Discount Bracket')).sort('Discount')\
    .where(F.col('Total Profit by Discount Bracket')>0).collect()[-1][0]



0.2

In [19]:
# who are the top 5 most profitable customers

df.groupBy('Customer Name').agg(F.sum('Profit').alias('Total Profit per Customer')).sort(F.col('Total Profit per Customer').desc()).limit(5).show()

+-----------------+-------------------------+
|    Customer Name|Total Profit per Customer|
+-----------------+-------------------------+
|     Susan Pistek|        4974.512999999999|
|    Patrick Jones|       3986.0039999999995|
|Patrick O'Donnell|                 3778.197|
|    Ellis Ballard|                  3459.66|
|  Mike Gockenbach|       3144.4439999999995|
+-----------------+-------------------------+



In [41]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?

filter= list(df.groupBy('Customer Name').agg(F.sum('Profit').alias('Total Profit per Customer')).sort(F.col('Total Profit per Customer').desc()).limit(5).select('Customer Name'))

df.withColumn('Customer Name', F.col('Customer Name').isin(filter)).show()

+------+---------------+----------+----------+--------------+-----------+-------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+-------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|         true|  Corporate|        Leeds|             England|United Kingdom|  North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|

In [42]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)

df=df.withColumn('Sales w/out Discount',F.round(F.col('Sales')/(1-F.col('Discount')),2))

df.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+--------------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|Sales w/out Discount|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+--------------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United Kingdom

In [43]:
# calculate the difference between sales and discount value

df=df.withColumn('Discount Amount', F.round(F.col('Sales w/out Discount')-F.col('Sales'),2))

In [44]:
# how much money did we not gain due to the discounts - per discount bracket?
df.groupBy('Discount').agg(F.round(F.sum('Discount Amount'),2).alias("Total Discount Loss")).show()

+--------+-------------------+
|Discount|Total Discount Loss|
+--------+-------------------+
|     0.0|                0.0|
|     0.2|           10653.11|
|     0.7|             8534.1|
|     0.1|           84712.86|
|    0.45|            2083.44|
|     0.6|           39644.07|
|     0.8|             635.67|
|    0.35|           29163.12|
|     0.5|          183735.71|
|     0.4|           46724.64|
|    0.85|            4515.44|
|    0.15|           45233.16|
|    0.65|           12219.66|
|     0.3|            2630.23|
+--------+-------------------+



In [51]:
# find the discount bracket which made us not gain the most (dynamically)
maxx=df.groupBy('Discount').agg(F.round(F.sum('Discount Amount'),2).alias("Total_Discount_Loss")).agg(F.max('Total_Discount_Loss')).collect()[0][0]
discount=df.groupBy('Discount').agg(F.round(F.sum('Discount Amount'),2).alias("Total_Discount_Loss")).where(F.col('Total_Discount_Loss') == maxx).collect()[0][0]

discount

0.5

In [53]:
# what would have been the total profit if we removed all orders from that discount group? 
profit_wo=df.where(F.col('Discount') != discount).select('Profit').agg(F.round(F.sum('Profit'),2).alias('Total Sales w/o max discount loss')).collect()[0][0]

profit_wo

469461.86

In [54]:
#how much more (or less) profit is that?
profit_w= df.select('Profit').agg(F.round(F.sum('Profit'),2)).collect()[0][0]

profit_wo-profit_w


96632.12

In [55]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView('temp_table')

In [56]:
# use an SQL query to count the number of rows
spark.sql("SELECT COUNT('Row_ID') from temp_table").show()

+-------------+
|count(Row_ID)|
+-------------+
|        10000|
+-------------+



In [57]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql("""SELECT Country, SUM(Profit)/SUM(Sales) AS Profit_Ratiot FROM temp_table GROUP BY Country ORDER BY Country""").show()

+--------------+--------------------+
|       Country|       Profit_Ratiot|
+--------------+--------------------+
|       Austria|  0.2641908775042505|
|       Belgium| 0.23508766583987942|
|       Denmark| -0.4957190005664471|
|       Finland| 0.18864296633316185|
|        France| 0.12693568221933804|
|       Germany| 0.17066792076621765|
|       Ireland|-0.44426677493909256|
|         Italy| 0.06844355185424991|
|   Netherlands| -0.5298342790541865|
|        Norway|  0.2517747548521659|
|      Portugal| -0.5761662270806188|
|         Spain| 0.18941580658358978|
|        Sweden| -0.5745674280714466|
|   Switzerland|  0.2909201193350232|
|United Kingdom| 0.21170103540397134|
+--------------+--------------------+



In [58]:
# is the country with the largest profit ratio, the country with the largest profit?
df_temp=spark.sql("""SELECT Country, SUM(Profit) AS Total_Profit, SUM(Profit)/SUM(Sales) AS Profit_Ratio FROM temp_table 
        GROUP BY Country 
        ORDER BY Country
""")

max_profit=df_temp.agg(F.max('Total_Profit')).collect()[0][0]
max_ratio=df_temp.agg(F.max('Profit_Ratio')).collect()[0][0]

df_temp.select(['Country']).where(F.col('Total_Profit') == max_profit).collect()[0][0]\
        ==df_temp.select(['Country']).where(F.col('Profit_Ratio') == max_ratio).collect()[0][0]


False