In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [3]:
spark

In [4]:
# read our data - lives in a csv file

db = spark.read.csv("Sample - EU Superstore.csv",header=True,inferSchema=True)
db.limit(5).show()

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|     0.0|  39.6|
|     2|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-1

In [5]:
# how many rows of the EU Superstore dataset have the country being France

db.filter(db["Country"]=="France").count()


2827

In [6]:
# of those, how many are profitable?
db.filter((db["Country"]=="France")&(db["Profit"]>0)).count()



2329

In [7]:
# how any different discount brackets exist? what are they?
db.select("Discount").distinct().count()


14

In [8]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
db.select("Profit","Discount").groupBy("Discount").agg({"Profit":"sum"}).orderBy("Discount").show()


+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|     0.0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.3| -758.4209999999999|
|    0.35|          -9122.649|
|     0.4|-21346.427999999996|
|    0.45|         -1103.1915|
|     0.5|         -96632.115|
|     0.6|-20517.456000000002|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|     0.8|           -460.284|
|    0.85|          -3068.658|
+--------+-------------------+



In [9]:
# what is the value after which we should stop offering discount?
db1=db.select("Profit","Discount").groupBy("Discount").agg({"Profit":"sum"}).orderBy("Discount")
db1.filter(db1["sum(Profit)"]>0).collect()[3][0]



0.2

In [10]:
# who are the top 5 most profitable customers
# db.show()
db.select("Customer Name","Customer ID","Profit").groupBy("Customer Name").agg({"Profit":"sum"}).orderBy("sum(Profit)",ascending=False).limit(5).show()



+-----------------+------------------+
|    Customer Name|       sum(Profit)|
+-----------------+------------------+
|     Susan Pistek| 4974.512999999999|
|    Patrick Jones|3986.0039999999995|
|Patrick O'Donnell|          3778.197|
|    Ellis Ballard|           3459.66|
|  Mike Gockenbach|3144.4439999999995|
+-----------------+------------------+



In [11]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
l=db.select("Customer Name","Customer ID","Profit").groupBy("Customer Name").agg({"Profit":"sum"}).orderBy("sum(Profit)",ascending=False).limit(5).collect()
l1=[]
for x in l:
    l1.append(x[0])
print(l1)
db.filter(db["Customer Name"].isin(l1)).count()



['Susan Pistek', 'Patrick Jones', "Patrick O'Donnell", 'Ellis Ballard', 'Mike Gockenbach']


76

In [12]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)
db=db.withColumn("salenodiscount", db["Sales"]/(1-db["Discount"]))
db.limit(5).show()

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+--------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|salenodiscount|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+--------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|     0.0|  39.6|          79.2|
|     2|

In [13]:
# calculate the difference between sales and discount value
db=db.withColumn("diff",db["salenodiscount"]-db["Sales"])
db.show()


+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+------------------+------------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|    salenodiscount|              diff|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+------------------+------------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporat

In [14]:
# how much money did we not gain due to the discounts - per discount bracket?
db.select("Discount","diff").groupBy("Discount").agg({"diff":"sum"}).orderBy("Discount").show()


+--------+------------------+
|Discount|         sum(diff)|
+--------+------------------+
|     0.0|               0.0|
|     0.1| 84712.44899999995|
|    0.15| 45233.17650000002|
|     0.2|10653.119999999997|
|     0.3|2630.2410000000004|
|    0.35|29163.099000000002|
|     0.4| 46724.68800000002|
|    0.45|2083.4414999999995|
|     0.5|183734.26500000045|
|     0.6| 39644.04599999997|
|    0.65|12219.655499999999|
|     0.7|          8534.085|
|     0.8| 635.6640000000002|
|    0.85|          4515.438|
+--------+------------------+



In [15]:
# find the discount bracket which made us not gain the most (dynamically)
max=db.select("Discount","diff").groupBy("Discount").agg({"diff":"sum"}).orderBy("sum(diff)",ascending=False).collect()[0][0]
max



0.5

In [25]:
# what would have been the total profit if we removed all orders from that discount group? 
from pyspark.sql import functions
db1=db.filter(db["Discount"]!=0.5)
db1.select(functions.sum("Profit")).show()
db.select(functions.sum("Profit")).show()



+-----------------+
|      sum(Profit)|
+-----------------+
|469461.8565000003|
+-----------------+

+-----------------+
|      sum(Profit)|
+-----------------+
|372829.7415000005|
+-----------------+



In [31]:
#how much more (or less) profit is that?
diff=db1.select(functions.sum("Profit")).collect()[0][0]-db.select(functions.sum("Profit")).collect()[0][0]
diff



96632.11499999976

In [32]:
# create a temporary table for our superstore table in sql
db.createOrReplaceTempView("table")


In [33]:
# use an SQL query to count the number of rows
spark.sql("select count(*) from table").show()


+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [42]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)

spark.sql("select Country, sum(Profit)/sum(Sales) as profit_ratio from table group by Country").orderBy("profit_ratio",ascending=False).show()



+--------------+--------------------+
|       Country|        profit_ratio|
+--------------+--------------------+
|   Switzerland|  0.2909201193350232|
|       Austria|  0.2641908775042505|
|        Norway|  0.2517747548521659|
|       Belgium| 0.23508766583987942|
|United Kingdom| 0.21170103540397134|
|         Spain| 0.18941580658358978|
|       Finland| 0.18864296633316185|
|       Germany| 0.17066792076621765|
|        France| 0.12693568221933804|
|         Italy| 0.06844355185424991|
|       Ireland|-0.44426677493909256|
|       Denmark| -0.4957190005664471|
|   Netherlands| -0.5298342790541865|
|        Sweden| -0.5745674280714466|
|      Portugal| -0.5761662270806188|
+--------------+--------------------+



In [44]:
# is the country with the largest profit ratio, the country with the largest profit? answer: no.

db.select("Country","Profit").groupBy("Country").agg({"Profit":"sum"}).orderBy("sum(Profit)",ascending=False).show()



+--------------+-------------------+
|       Country|        sum(Profit)|
+--------------+-------------------+
|United Kingdom| 111900.15000000001|
|        France| 109029.00299999975|
|       Germany| 107322.82049999991|
|         Spain|  54390.11999999999|
|       Austria|           21442.26|
|         Italy| 19828.757999999965|
|       Belgium|           11572.59|
|   Switzerland|  7237.470000000001|
|        Norway|            5167.77|
|       Finland|            3905.73|
|       Denmark|-4282.0470000000005|
|       Ireland| -7392.381000000003|
|      Portugal| -8703.059999999998|
|        Sweden|-17519.366999999987|
|   Netherlands| -41070.07499999996|
+--------------+-------------------+

