In [1]:
import pandas as pd
import numpy as np
import findspark
findspark.init('C:\spark\spark-3.1.2-bin-hadoop3.2')
import pyspark
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark import SparkConf

from pyspark.sql.functions import *
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)


# Three Tables

In [2]:
orders = spark.read.option("inferschema","true").option("header","true").csv("orders.csv")
customers = spark.read.option("inferschema","true").option("header","true").csv("customers.csv")
employees = spark.read.option("inferschema","true").option("header","true").csv("employees.csv")
orders.createTempView("orders") 
customers.createTempView("customers")
employees.createTempView("employees")

In [3]:
orders.show(3)
customers.show(3)
employees.show(3)

+--------+-----------+----------+-----------+----------+
|order_id|customer_id|    status|salesman_id|order_date|
+--------+-----------+----------+-----------+----------+
|       1|          8|in process|          2|26-04-2010|
|       2|          9|   pending|          3|26-04-2011|
|       3|          9|   pending|          2|26-04-2012|
+--------+-----------+----------+-----------+----------+
only showing top 3 rows

+-----------+----+-------+--------+------------+
|customer_id|name|address| website|credit_limit|
+-----------+----+-------+--------+------------+
|          1|Mike|    Usa|Mike.com|       77144|
|          2|John|    Usa|John.com|       39943|
|          3|Bray|    Usa|Bray.com|       64925|
+-----------+----+-------+--------+------------+
only showing top 3 rows

+-----------+----------+---------+-------------+-------+----------+----------+---------+
|employee_id|first_name|last_name|        email|  phone| hire_date|manager_id|job_title|
+-----------+----------+------

In [4]:
#joining 3 tables
orders.join(customers,orders.order_id==customers.customer_id).join(employees,customers.customer_id==employees.employee_id).show()

+--------+-----------+----------+-----------+----------+-----------+------+-------+----------+------------+-----------+----------+---------+-------------+-------+----------+----------+---------+
|order_id|customer_id|    status|salesman_id|order_date|customer_id|  name|address|   website|credit_limit|employee_id|first_name|last_name|        email|  phone| hire_date|manager_id|job_title|
+--------+-----------+----------+-----------+----------+-----------+------+-------+----------+------------+-----------+----------+---------+-------------+-------+----------+----------+---------+
|       1|          8|in process|          2|26-04-2010|          1|  Mike|    Usa|  Mike.com|       77144|          1|     James|    Smith|  Smith@gmail|4628843|22-08-2005|         1|Developer|
|       2|          9|   pending|          3|26-04-2011|          2|  John|    Usa|  John.com|       39943|          2|   Michael|     Rose|   Rose@gmail|  67853|22-08-2006|         2| Designer|
|       3|          9|   

In [8]:
orders.join(customers,orders.order_id==customers.customer_id,'left_outer').join(employees,employees.employee_id==customers.customer_id,'left_anti').explain()

== Physical Plan ==
*(3) BroadcastHashJoin [customer_id#42], [employee_id#68], LeftAnti, BuildRight, false
:- *(3) BroadcastHashJoin [order_id#16], [customer_id#42], LeftOuter, BuildRight, false
:  :- FileScan csv [order_id#16,customer_id#17,status#18,salesman_id#19,order_date#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/HI/big data/orders.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:int,customer_id:int,status:string,salesman_id:int,order_date:string>
:  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#360]
:     +- *(1) Filter isnotnull(customer_id#42)
:        +- FileScan csv [customer_id#42,name#43,address#44,website#45,credit_limit#46] Batched: false, DataFilters: [isnotnull(customer_id#42)], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/HI/big data/customers.csv], PartitionFilters: [], PushedFilters: [IsNotNull(customer_id)], ReadSchema: s

In [9]:
spark.sql('SELECT * FROM orders LEFT OUTER JOIN customers ON orders.order_id==customers.customer_id LEFT ANTI JOIN employees ON customers.customer_id==employees.employee_id').explain()

== Physical Plan ==
*(3) BroadcastHashJoin [customer_id#42], [employee_id#68], LeftAnti, BuildRight, false
:- *(3) BroadcastHashJoin [order_id#16], [customer_id#42], LeftOuter, BuildRight, false
:  :- FileScan csv [order_id#16,customer_id#17,status#18,salesman_id#19,order_date#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/HI/big data/orders.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:int,customer_id:int,status:string,salesman_id:int,order_date:string>
:  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#408]
:     +- *(1) Filter isnotnull(customer_id#42)
:        +- FileScan csv [customer_id#42,name#43,address#44,website#45,credit_limit#46] Batched: false, DataFilters: [isnotnull(customer_id#42)], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/HI/big data/customers.csv], PartitionFilters: [], PushedFilters: [IsNotNull(customer_id)], ReadSchema: s

In [3]:
#assignment 4
#Which employees could convince customers to order products?
#Not all employees are salseman
#find where employee id = salesman id

spark.sql('SELECT * FROM employees e LEFT SEMI JOIN orders o ON e.employee_id = o.salesman_id')
employees.join(orders, f.expr("employee_id = salesman_id"), "left_semi")

#wrong
#spark.sql('SELECT * FROM employees e LEFT OUTER JOIN orders o ON e.employee_id = o.salesman_id').show()
#spark.sql('SELECT * FROM employees e LEFT ANTI JOIN orders o ON e.employee_id = o.salesman_id').show()
#employees.join(orders, f.expr("employee_id = salesman_id"), "left_outer").show()
#employees.join(orders, f.expr("employee_id = salesman_id"), "left_anti").show()
#orders.join(employees, f.expr("employee_id = salesman_id"), "left_semi").show()
#orders.join(employees, f.expr("salesman_id = employee_id"), "left_anti").show()
#spark.sql('SELECT * FROM orders o LEFT SEMI JOIN employees e ON e.employee_id = o.salesman_id').show()
#spark.sql('SELECT * FROM orders o LEFT ANTI JOIN employees e ON o.salesman_id = e.employee_id').show()

DataFrame[employee_id: int, first_name: string, last_name: string, email: string, phone: int, hire_date: string, manager_id: int, job_title: string]

In [4]:
#which employees could convince most customers?
#only employess which are salesman should show

employees.join(orders, f.expr("employee_id = salesman_id"),"inner").groupBy("employee_id").agg(f.expr("count(*) as sales")).orderBy("sales",ascending=False)
spark.sql('SELECT e.employee_id, count(*) as sales FROM employees e INNER JOIN orders o ON e.employee_id = o.salesman_id GROUP BY e.employee_id ORDER BY sales DESC')

#wrong
#spark.sql('SELECT e.employee_id, count(*) as sales FROM employees e FULL JOIN orders o ON e.employee_id = o.salesman_id GROUP BY e.employee_id ORDER BY sales DESC')
#spark.sql('SELECT e.employee_id, count(*) as sales FROM employees e INNER JOIN orders o ON e.employee_id = o.salesman_id GROUP BY o.salesman_id ORDER BY sales DESC')
#spark.sql('SELECT e.employee_id, count(*) as sales FROM employees e INNER JOIN orders o ON e.employee_id = o.salesman_id GROUP BY e.employee_id ORDER BY sales DESC')
#spark.sql('SELECT o.salesman_id, count(*) as sales FROM orders o LEFT ANTI JOIN employees e ON o.salesman_id = e.employee_id GROUP BY o.salesman_id ORDER BY sales')
#spark.sql('SELECT e.employee_id, count(*) as sales FROM employees e SEMI JOIN orders o ON e.employee_id = o.salesman_id GROUP BY e.employee_id ORDER BY sales DESC')
#employees.join(orders, f.expr("employee_id = salesman_id"),"full").groupBy("employee_id").agg(f.expr("count(*) as sales")).orderBy("sales",ascending=False)
#employees.join(orders, f.expr("employee_id = salesman_id"),"left_semi").groupBy("employee_id").agg(f.expr("count(*) as sales")).orderBy("sales",ascending=False)
#orders.join(employees, f.expr("salesman_id = employee_id"),"left_anti").groupBy("salesman_id").agg(f.expr("count(*) as sales")).orderBy("sales",ascending=True).show()
#orders.join(employees, f.expr("salesman_id = employee_id"),"left_semi").groupBy("employee_id").agg(f.expr("count(*) as sales")).orderBy("sales",ascending=False)

DataFrame[employee_id: int, sales: bigint]

In [5]:
#assignment7
order_items = spark.read.option("inferschema","true").option("header","true").csv("order_items.csv")
orders2 = spark.read.option("inferschema","true").option("header","true").csv("orders2.csv")
order_items.createTempView("order_items") 
orders2.createTempView("orders2") 

#physical plan
spark.sql('SELECT year(order_date) as year, sum(quantity * unit_price) as revenue FROM orders2 NATURAL JOIN order_items GROUP BY year(order_date) ORDER BY year').explain()

#wrong
#spark.sql('SELECT order_date, sum(quantity * unit_price) as revenue FROM orders NATURAL JOIN order_items GROUP BY order_date ORDER BY order_date')
#spark.sql('SELECT year(order_date) as year, sum(quantity * unit_price) as revenue FROM orders NATURAL JOIN order_items GROUP BY year(order_date) ORDER BY year Desc')
#spark.sql('SELECT year(order_date) as year, sum(quantity) as revenue FROM orders NATURAL JOIN order_items GROUP BY year(order_date) ORDER BY year')
#spark.sql('SELECT year(order_date) as year, sum(quantity * unit_price) as revenue FROM orders Full JOIN order_items GROUP BY year(order_date) ORDER BY year')
#spark.sql('SELECT year(order_date) as year, sum(quantity + unit_price) as revenue FROM orders NATURAL JOIN order_items GROUP BY year(order_date) ORDER BY year')

== Physical Plan ==
*(4) Sort [year#208 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(year#208 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#174]
   +- *(3) HashAggregate(keys=[year(cast(order_date#200 as date))#213], functions=[sum(cast((quantity#201 * unit_price#175) as bigint))])
      +- Exchange hashpartitioning(year(cast(order_date#200 as date))#213, 200), ENSURE_REQUIREMENTS, [id=#170]
         +- *(2) HashAggregate(keys=[year(cast(order_date#200 as date)) AS year(cast(order_date#200 as date))#213], functions=[partial_sum(cast((quantity#201 * unit_price#175) as bigint))])
            +- *(2) Project [order_date#200, quantity#201, unit_price#175]
               +- *(2) BroadcastHashJoin [order_id#196], [order_id#172], Inner, BuildRight, false
                  :- *(2) Filter isnotnull(order_id#196)
                  :  +- FileScan csv [order_id#196,order_date#200,quantity#201] Batched: false, DataFilters: [isnotnull(order_id#196)], Format: CSV, Location: InMemoryFile

In [6]:
#rdd
#Use the RDD API to identify how many orders each customer is waiting for (this means, status = "Pending"). 
#The customers are identified only by customer id. 
#Sort the resulting RDD by the number of pending orders in descending order.

orders.rdd.filter(lambda row: row["status"] == "pending").map(lambda row: (row["customer_id"], 1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x:x[1], ascending= False).collect()
orders.rdd.filter(lambda y: y["status"] == "pending").map(lambda y: (y["customer_id"], 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x:x[1], ascending= False).collect()

#orders.rdd.filter(lambda row: row["status"] == "pending").map(lambda row: (row["customer_id"], 1)).reduceByKey(lambda x,y: x * y).sortBy(lambda x:x[1], ascending= False)
#orders.filter(lambda x: x["status"] == "pending").map(lambda x: (x["customer_id"], 2)).reduceByKey(lambda x,y: x+y).sortBy(lambda x:x[1], ascending= False)
#orders.rdd.filter(lambda y: y["status == pending"]).map(lambda y: (y["customer_id"], 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x:x[1], ascending= False)
#orders.filter(lambda row: row["status"] == "pending").map(lambda row: (row["customer_id"], 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x:x[1], ascending= False)
#orders.rdd.filter(lambda y: y["status"] == "pending").map(lambda y: (y["customer_id"], 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x:x[2], ascending= False)


[(9, 2), (11, 1)]

In [7]:
employees.show()

+-----------+----------+---------+-------------+-------+----------+----------+---------+
|employee_id|first_name|last_name|        email|  phone| hire_date|manager_id|job_title|
+-----------+----------+---------+-------------+-------+----------+----------+---------+
|          1|     James|    Smith|  Smith@gmail|4628843|22-08-2005|         1|Developer|
|          2|   Michael|     Rose|   Rose@gmail|  67853|22-08-2006|         2| Designer|
|          3|    Robert|  William|William@gmail| 664829|22-08-2007|         3|   Editor|
|          4|     Maria|    Jones|  Jones@gmail|  67583|22-08-2008|         4|  Manager|
|          5|       Jen|   Browns| Browns@gmail|6278482|22-08-2009|         5|Team Lead|
+-----------+----------+---------+-------------+-------+----------+----------+---------+



In [8]:
orders.join(employees, f.expr("employee_id = manager_id"),"inner").explain()

== Physical Plan ==
BroadcastNestedLoopJoin BuildLeft, Inner
:- BroadcastExchange IdentityBroadcastMode, [id=#209]
:  +- FileScan csv [order_id#16,customer_id#17,status#18,salesman_id#19,order_date#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/HI/orders.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:int,customer_id:int,status:string,salesman_id:int,order_date:string>
+- *(1) Filter ((isnotnull(employee_id#68) AND isnotnull(manager_id#74)) AND (employee_id#68 = manager_id#74))
   +- FileScan csv [employee_id#68,first_name#69,last_name#70,email#71,phone#72,hire_date#73,manager_id#74,job_title#75] Batched: false, DataFilters: [isnotnull(employee_id#68), isnotnull(manager_id#74), (employee_id#68 = manager_id#74)], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/HI/employees.csv], PartitionFilters: [], PushedFilters: [IsNotNull(employee_id), IsNotNull(manager_id)], ReadSchema: struct<employee_id:int,first_nam

In [13]:
spark.sql('SELECT * FROM orders o INNER JOIN employees e ON e.employee_id = e.manager_id ORDER BY employee_id' ).explain()

== Physical Plan ==
*(2) Sort [employee_id#68 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(employee_id#68 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#335]
   +- BroadcastNestedLoopJoin BuildLeft, Inner
      :- BroadcastExchange IdentityBroadcastMode, [id=#326]
      :  +- FileScan csv [order_id#16,customer_id#17,status#18,salesman_id#19,order_date#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/HI/orders.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:int,customer_id:int,status:string,salesman_id:int,order_date:string>
      +- *(1) Filter ((isnotnull(employee_id#68) AND isnotnull(manager_id#74)) AND (employee_id#68 = manager_id#74))
         +- FileScan csv [employee_id#68,first_name#69,last_name#70,email#71,phone#72,hire_date#73,manager_id#74,job_title#75] Batched: false, DataFilters: [isnotnull(employee_id#68), isnotnull(manager_id#74), (employee_id#68 = manager_id#74)], Format: CSV, Location: I

In [10]:
orders.select(col('order_id'),col('salesman_id')).join(employees, f.expr("employee_id = salesman_id"),"left_outer").groupBy("employee_id").count()

DataFrame[employee_id: int, count: bigint]

In [11]:
spark.sql('SELECT year(order_date) as year, sum(quantity * unit_price) as revenue FROM orders2 NATURAL JOIN order_items GROUP BY year(order_date) ORDER BY year').explain()

== Physical Plan ==
*(4) Sort [year#337 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(year#337 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#301]
   +- *(3) HashAggregate(keys=[year(cast(order_date#200 as date))#342], functions=[sum(cast((quantity#201 * unit_price#175) as bigint))])
      +- Exchange hashpartitioning(year(cast(order_date#200 as date))#342, 200), ENSURE_REQUIREMENTS, [id=#297]
         +- *(2) HashAggregate(keys=[year(cast(order_date#200 as date)) AS year(cast(order_date#200 as date))#342], functions=[partial_sum(cast((quantity#201 * unit_price#175) as bigint))])
            +- *(2) Project [order_date#200, quantity#201, unit_price#175]
               +- *(2) BroadcastHashJoin [order_id#196], [order_id#172], Inner, BuildRight, false
                  :- *(2) Filter isnotnull(order_id#196)
                  :  +- FileScan csv [order_id#196,order_date#200,quantity#201] Batched: false, DataFilters: [isnotnull(order_id#196)], Format: CSV, Location: InMemoryFile

In [None]:
spark.sql('SELECT customer_id , title, first_name, last_name, email FROM Poll INNER JOIN Organizer ON organizer == organizer_id ORDER BY title ASC')