In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/19 21:45:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/19 21:45:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/01/19 21:45:20 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/01/19 21:45:20 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/01/19 21:45:20 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [3]:
spark

In [6]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("/Users/Yasminbarree/Sample - EU Superstore.csv")

                                                                                

In [7]:
# how many rows of the EU Superstore dataset have the country being France
#2277
df.filter(df['Country'] == "France").count()

                                                                                

2827

In [8]:
# of those, how many are profitable?

df.filter((df['Country'] == "France") & (df['Profit'] > 0)).count()


2277

In [10]:
# how any different discount brackets exist? what are they? # the 14 different discount brackets

df.select("Discount").distinct().count()

df.select("Discount").distinct().show()


+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|     0.5|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



In [12]:
# let's see the total profit by discount bracket, make sure they are ordered by 
from pyspark.sql.functions import col, sum, mean
df.groupBy("Discount").agg(sum("Profit")).orderBy("Discount").show()


+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|       0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.3| -758.4209999999999|
|    0.35|          -9122.649|
|     0.4|-21346.427999999996|
|    0.45|         -1103.1915|
|     0.5|         -96632.115|
|     0.6|-20517.456000000002|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|     0.8|           -460.284|
|    0.85|          -3068.658|
+--------+-------------------+



In [13]:
# what is the value after which we should stop offering discount?
# the discount at 30% or higher shows loss

df.groupBy("Discount").agg(sum("Profit")).orderBy("Discount").show()


+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|       0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.3| -758.4209999999999|
|    0.35|          -9122.649|
|     0.4|-21346.427999999996|
|    0.45|         -1103.1915|
|     0.5|         -96632.115|
|     0.6|-20517.456000000002|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|     0.8|           -460.284|
|    0.85|          -3068.658|
+--------+-------------------+



In [14]:
# who are the top 5 most profitable customers

df.groupBy("Customer ID").agg(sum("Profit")).orderBy(sum("Profit"), ascending = False).show(5)


+-----------+------------------+
|Customer ID|       sum(Profit)|
+-----------+------------------+
|   SP-20920| 4974.512999999999|
|   PJ-18835|3986.0039999999995|
|   PO-18865|          3778.197|
|   EB-13840|           3459.66|
|   MG-18145|3144.4439999999995|
+-----------+------------------+
only showing top 5 rows



In [15]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?

df.groupBy("Customer ID").agg(sum("Profit")).orderBy(sum("Profit"), ascending = False).show(5)

+-----------+------------------+
|Customer ID|       sum(Profit)|
+-----------+------------------+
|   SP-20920| 4974.512999999999|
|   PJ-18835|3986.0039999999995|
|   PO-18865|          3778.197|
|   EB-13840|           3459.66|
|   MG-18145|3144.4439999999995|
+-----------+------------------+
only showing top 5 rows



In [16]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)

df = df.withColumn("Original Sale", df["Sales"]/(1- df["Discount"]))



In [17]:
# calculate the difference between sales and discount value
df = df.withColumn("Net sales", df['Sales'] - df['Discount'])

In [19]:
# how much money did we not gain due to the discounts - per discount bracket?

df = df.withColumn("Discounted Sale",  df['Sales'] * df['Discount'])
df = df.withColumn("Money not gained", df['Sales'] - df['Discounted Sale'])
df.groupBy("Discount").agg(sum("Money not gained")).show()

+--------+---------------------+
|Discount|sum(Money not gained)|
+--------+---------------------+
|     0.3|            4296.0603|
|     0.7|   1097.2395000000001|
|       0|   1522455.8100000003|
|     0.2|   34089.984000000004|
|    0.15|        217873.133475|
|    0.35|          35204.02665|
|     0.8|    31.78319999999999|
|    0.45|   1400.5356749999999|
|     0.5|    91867.13250000023|
|    0.65|          2302.935075|
|     0.6|   10571.745599999998|
|     0.1|    686170.8368999984|
|    0.85|   119.52630000000005|
|     0.4|    42052.21919999996|
+--------+---------------------+



In [None]:
# find the discount bracket which made us not gain the most (dynamically)
from pyspark.sql.functions import sum
df = df.withColumn("Discounted Sale",  df['Sales'] * df['Discount'])
df = df.withColumn("Money not gained", df['Sales'] - df['Discounted Sale'])
df.groupBy("Discount").agg(sum("Money not gained")).orderBy("sum(Money not gained)" , ascending = False).limit(1).show()

In [20]:
# what would have been the total profit if we removed all orders from that discount group? 

df.removed_value = df.filter(col("Discount") != "0")
df_removed_value = df.withColumn("Discounted Sale",  df['Sales'] * df['Discount'])
df_removed_value = df.withColumn("Money not gained", df['Sales'] - df['Discounted Sale'])
df_removed_value.agg(sum("Profit")).show()




+-----------------+
|      sum(Profit)|
+-----------------+
|372829.7415000005|
+-----------------+



In [21]:
#how much more (or less) profit is that?

# It means that removing that discount bracket will result is loss of profit estimated 10976 .
from pyspark.sql.functions import sum
df.removed_value = df.filter(col("Discount") != "0")


df.removed_value.agg(sum("Profit")).show()
df.agg(sum("Profit")).show()

+-------------------+
|        sum(Profit)|
+-------------------+
|-10976.788500000022|
+-------------------+

+-----------------+
|      sum(Profit)|
+-----------------+
|372829.7415000005|
+-----------------+



In [22]:
# create a temporary table for our superstore table in sql


df.createOrReplaceTempView("df")

In [24]:
# use an SQL query to count the number of rows
spark.sql("SELECT COUNT(*) FROM df").show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [None]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)

spark.sql("SELECT Country, sum(Profit)/sum(Sales) as ratio FROM df group by Country ").show()

In [25]:
# is the country with the largest profit ratio, the country with the largest profit?
# the result of the largest profit ratio is Switzerland and largest profit is Germany.

spark.sql("SELECT Country, sum(Profit)/sum(Sales) as ratio FROM df group by Country order by ratio desc ").show()
spark.sql("SELECT Country, Profit FROM df order by Profit desc ").show()


+--------------+--------------------+
|       Country|               ratio|
+--------------+--------------------+
|   Switzerland|  0.2909201193350232|
|       Austria|  0.2641908775042505|
|        Norway|  0.2517747548521659|
|       Belgium| 0.23508766583987942|
|United Kingdom| 0.21170103540397134|
|         Spain| 0.18941580658358978|
|       Finland| 0.18864296633316185|
|       Germany| 0.17066792076621765|
|        France| 0.12693568221933804|
|         Italy| 0.06844355185424991|
|       Ireland|-0.44426677493909256|
|       Denmark| -0.4957190005664471|
|   Netherlands| -0.5298342790541865|
|        Sweden| -0.5745674280714466|
|      Portugal| -0.5761662270806188|
+--------------+--------------------+

+--------------+------+
|       Country|Profit|
+--------------+------+
|       Germany|99.696|
|        France|99.672|
|        France|99.558|
|       Germany|99.558|
|        France| 99.48|
|         Spain|99.441|
|       Germany|99.372|
|       Germany| 99.36|
|        Fran