In [None]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col,max,count,sum,mean,stddev_pop,hour,countDistinct,expr,stddev,window,column
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
import math

In [2]:
# sc.stop()

# Initialization of SparkConf which is required for Spark Context
conf = SparkConf().setAppName('myapp').setMaster('local')

# Initialization of SparkContext
sc = SparkContext().getOrCreate(conf=conf)
sc.setLogLevel("OFF")

# Initialization of SparkSession into spark variable
spark = SparkSession(sc)

### How many instances of each product were sold in each country?

In [31]:
# dataframe.filter((col("StockCode") == 22728) & (col("Country") == "France")).groupBy("Country").pivot("StockCode").sum("Quantity").toPandas()
dataframe.filter("StockCode = 22728 AND Country = 'France'").groupBy("Country").pivot("StockCode").sum("Quantity").toPandas()

Unnamed: 0,Country,22728
0,France,24


In [32]:
dataframe.groupBy("StockCode").pivot("Country").sum("Quantity").toPandas()

Unnamed: 0,StockCode,Australia,EIRE,France,Germany,Netherlands,Norway,United Kingdom
0,22728,,,24.0,,,,13.0
1,21259,,,,,,,8.0
2,21889,,24.0,,,,,28.0
3,22121,,,,,,,14.0
4,90022,,,,,,,1.0
...,...,...,...,...,...,...,...,...
1346,84906,,,,,,,1.0
1347,22492,,36.0,36.0,,,,36.0
1348,17164B,,,,,,,2.0
1349,22311,,,,,,,8.0


### Which employees could convince customers to order products?

In [33]:
# Reading Data
employees = spark.read.csv('employees.csv', header=True, inferSchema=True)
employees.createTempView('employees')

customers = spark.read.csv('customers.csv', header=True, inferSchema=True)
customers.createTempView('customers')

orders = spark.read.csv('orders.csv', header=True, inferSchema=True)
orders.createTempView('orders')

In [34]:
# We have used toPandas() here for beauty only. In exams, it should be show() instead.
spark.sql("SELECT * FROM employees e LEFT SEMI JOIN orders o ON e.employee_id = o.salesman_id").toPandas()
# employees.join(orders, expr("employee_id = salesman_id"), "left_semi").toPandas()

Unnamed: 0,employee_id,Name,email,phone,hire_date,manager_id,job_title
0,70,Samson Ortiz,consectetuer.cursus@utodio.net,0845 46 41,OctOct-1616-21212121,57,Lavinia Melendez
1,71,Stephen Dejesus,quis@magnaLoremipsum.com,(01269) 774419,JunJun-2727-22222222,55,Stacy Reilly
2,72,Alvin Weiss,sociis.natoque@natoquepenatibuset.co.uk,0381 994 1063,SepSep-1313-20202020,50,Inga Snow
3,73,Benjamin Reyes,Nullam.scelerisque@turpisvitae.net,076 8045 4732,FebFeb-1818-21212121,52,Libby Weeks
4,74,Amal Gross,aliquam.eu@semconsequat.net,055 1465 7860,NovNov-0707-21212121,56,Marah Dean
5,75,Kato Hoover,sed.sem.egestas@blanditat.com,(01740) 44119,JulJul-0707-22222222,59,Macey Slater
6,76,Yuli Warren,nisi.nibh.lacinia@malesuada.ca,(010256) 78732,FebFeb-1515-21212121,56,Hanna Pugh
7,77,Wade Nichols,urna.justo.faucibus@sapiencursusin.net,(023) 5115 1644,AugAug-2222-20202020,56,Gwendolyn Ramirez
8,78,Drake Morales,quam.elementum@ProinultricesDuis.org,07495 111025,JulJul-2828-20202020,55,Jeanette Phillips
9,79,Silas Barber,vitae@bibendumullamcorperDuis.com,055 5328 8347,AprApr-2222-22222222,52,Yolanda Ramirez


### Which employees could convince most customers to order products?

In [35]:
# OF COURSE YOU SHOULD RUN EITHER OF THEM AND COMMENT OUT THE OTHER ONE

# Note the expr() function used in agg. It is simply sql representation of the same statement
employees.join(orders, expr("employee_id = salesman_id"),"inner") \
.groupBy("employee_id").agg(expr("count(*) AS sales")).orderBy("sales",ascending=False).toPandas()

# expr() functions equivalent query using dataframe at agg.
employees.join(orders, expr("employee_id = salesman_id"),"inner") \
.groupBy("employee_id").agg(count('*').alias('sales')).orderBy("sales",ascending=False).toPandas()

Unnamed: 0,employee_id,sales
0,83,9
1,84,8
2,76,7
3,79,7
4,72,6
5,87,6
6,82,6
7,80,6
8,89,6
9,86,5


In [36]:
# Same query as above using spark SQL
spark.sql("SELECT e.employee_id, count(*) as sales FROM employees e INNER JOIN orders o ON e.employee_id = o.salesman_id GROUP BY e.employee_id ORDER BY sales DESC").toPandas()

Unnamed: 0,employee_id,sales
0,83,9
1,84,8
2,76,7
3,79,7
4,72,6
5,87,6
6,82,6
7,80,6
8,89,6
9,86,5
