### Name : < Kwak Su Bin >

# Data Access using SparkSQL and Dataframe

## Activity : Join Queries

In this module, you will practice how to write codes to retrieve data using Spark SQL and Dataframes API.

The complete list of Dataframe functions can be accessed from [here](https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html), [here](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join) and [here](https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.functions$)


In this activity, we will use HR schema as shown below
![hr](HR.gif)

### INITIALIZATION
The first section of this scipt is the initialization section. 
In this section, we are preparing Spark environment to recognize and process SQL statements.

In [2]:
from pyspark import SparkContext, SparkConf # Spark
from pyspark.sql import SparkSession # Spark SQL
from pyspark.sql.types import *

#additional 
from pyspark.sql.functions import *

sc = SparkContext.getOrCreate()

# local[*]: run Spark locally with as many working processors as logical cores on your machine.
# In the field of `master`, we use a local server with as many working processors (or threads) as possible (i.e. `local[*]`). 
# If we want Spark to run locally with 'k' worker threads, we can specify as `local[k]`.
# The `appName` field is a name to be shown on the Sparking cluster UI. 

# If there is no existing spark context, we now create a new context
if (sc is None):
    sc = SparkContext(master="local[3]", appName="Week 2 - Join Query")
spark = SparkSession(sparkContext=sc)


## DATA STRUCTURE DEFINITION

In this section, we are preparing the data structure to match the datafiles provided as the datasources

In [3]:
#COUNTRIES TABLE
scCountries = StructType([StructField("country_id",StringType()),StructField("country_name",StringType()),StructField("region_id",IntegerType())])

#DEPARTMENTS TABLE
scDepartments = StructType([StructField("department_id",IntegerType()),
StructField("department_name",StringType()),
StructField("manager_id",IntegerType()),
StructField("location_id",IntegerType())
])

#EMPLOYEES TABLE
scEmployees = StructType([
StructField("employee_id",IntegerType()),
StructField("first_name",StringType()),
StructField("last_name",StringType()),
StructField("email",StringType()),
StructField("phone_number",StringType()),
StructField("hire_date",StringType()),
StructField("job_id",StringType()),
StructField("salary",IntegerType()),
StructField("commission_pct",FloatType()),
StructField("manager_id",IntegerType()),
StructField("department_id",IntegerType())
])

#JOBS TABLE
scJobs = StructType([
StructField("job_id",StringType()),
StructField("job_title",StringType()),
StructField("min_salary",IntegerType()),
StructField("max_salary",IntegerType())
])

#JOB_HISTORY TABLE
scJob_history = StructType([
StructField("employee_id",IntegerType()),
StructField("start_date",StringType()),
StructField("end_date",StringType()),
StructField("job_id",StringType()),
StructField("department_id",IntegerType())
])

#LOCATIONS TABLE
scLocations = StructType([
StructField("location_id",IntegerType()),
StructField("street_address",StringType()),
StructField("postal_code",StringType()),
StructField("city",StringType()),
StructField("state_province",StringType()),
StructField("country_id",StringType())
])

#REGIONS TABLE
scRegions = StructType([
StructField("region_id",IntegerType()),
StructField("region_name",StringType())
])

### DATA LOADING

In [4]:
#COUNTRIES DATA
dataCountries = sc.textFile('COUNTRIES.csv')
dataCountries = dataCountries.map(lambda x: x.split(','))
dataCountries = dataCountries.map(lambda x: [x[0],x[1], int(x[2])])

#DEPARTMENTS DATA
dataDepartments = sc.textFile('DEPARTMENTS.csv')
dataDepartments = dataDepartments.map(lambda x: x.split(','))
dataDepartments = dataDepartments.map(lambda x: [int(x[0]),x[1], int(x[2]), int(x[3])])

#EMPLOYEES DATA
dataEmployees = sc.textFile('EMPLOYEES.csv')
dataEmployees = dataEmployees.map(lambda x: x.split(','))
dataEmployees = dataEmployees.map(lambda x: [int(x[0]),x[1], x[2], \
                                             x[3],x[4], x[5], x[6], \
                                             int(x[7]),float(x[8]), int(x[9]), int(x[10])\
                                            ])

#JOBS_DATA
dataJobs = sc.textFile('JOBS.csv')
dataJobs = dataJobs.map(lambda x: x.split(','))
dataJobs = dataJobs.map(lambda x: [x[0],x[1], \
                                   int(x[2]),int(x[3])\
                                   ])

#JOB_HISTORY_DATA
dataJob_history = sc.textFile('JOB_HISTORY.csv')
dataJob_history = dataJob_history.map(lambda x: x.split(','))
dataJob_history = dataJob_history.map(lambda x: [int(x[0]),x[1], \
                                   x[2],x[3],int(x[4])\
                                   ])

#LOCATION_DATA
dataLocations = sc.textFile('LOCATIONS.csv')
dataLocations = dataLocations.map(lambda x: x.split(','))
dataLocations = dataLocations.map(lambda x: [int(x[0]),x[1], \
                                   x[2],x[3],x[4],x[5]\
                                   ])
#REGIONS DATA
dataRegions = sc.textFile('REGIONS.csv')
dataRegions = dataRegions.map(lambda x: x.split(','))
dataRegions = dataRegions.map(lambda x: [int(x[0]),x[1] ])


### PREPARING DATAFRAMES

In [5]:
dfCountries = spark.createDataFrame(dataCountries,schema=scCountries) 
dfCountries.createOrReplaceTempView("dataCountries")

dfDepartments = spark.createDataFrame(dataDepartments,schema=scDepartments) 
dfDepartments.createOrReplaceTempView("dataDepartments")

dfEmployees = spark.createDataFrame(dataEmployees,schema=scEmployees) 
dfEmployees.createOrReplaceTempView("dataEmployees")

dfJobs = spark.createDataFrame(dataJobs,schema=scJobs) 
dfJobs.createOrReplaceTempView("dataJobs")

dfJob_history = spark.createDataFrame(dataJob_history,schema=scJob_history) 
dfJob_history.createOrReplaceTempView("dataJob_history")

dfLocations = spark.createDataFrame(dataLocations,schema=scLocations) 
dfLocations.createOrReplaceTempView("dataLocations")

dfRegions = spark.createDataFrame(dataRegions,schema=scRegions) 
dfRegions.createOrReplaceTempView("dataRegions")


### Inner Join & Outer Join

#### Question 1
Display employees' full name with department name 

Ensure you have the same format as expected output below 

![picture](lab3_q1.png)

In [5]:
#spark.sql()
spark.sql("select dataEmployees.first_name || ' ' || dataEmployees.last_name AS FullName, dataDepartments.department_name from dataEmployees join dataDepartments on dataEmployees.department_id = dataDepartments.department_id").show()

+-----------------+---------------+
|         FullName|department_name|
+-----------------+---------------+
|Michael Hartstein|      Marketing|
|          Pat Fay|      Marketing|
|     Susan Mavris|Human Resources|
|  Nancy Greenberg|        Finance|
|    Daniel Faviet|        Finance|
|        John Chen|        Finance|
|   Ismael Sciarra|        Finance|
|Jose Manuel Urman|        Finance|
|        Luis Popp|        Finance|
|  Jennifer Whalen| Administration|
|    Matthew Weiss|       Shipping|
|       Adam Fripp|       Shipping|
|   Payam Kaufling|       Shipping|
|   Shanta Vollman|       Shipping|
|    Kevin Mourgos|       Shipping|
|      Julia Nayer|       Shipping|
|Irene Mikkilineni|       Shipping|
|     James Landry|       Shipping|
|    Steven Markle|       Shipping|
|     Laura Bissot|       Shipping|
+-----------------+---------------+
only showing top 20 rows



In [6]:
#DataFrame functions
dfEmployees.join(dfDepartments, dfEmployees.department_id == dfDepartments.department_id).select(concat(col("first_name"), lit(" "), col("last_name")).alias('FullName'), "department_name").show()

+-----------------+---------------+
|         FullName|department_name|
+-----------------+---------------+
|Michael Hartstein|      Marketing|
|          Pat Fay|      Marketing|
|     Susan Mavris|Human Resources|
|  Nancy Greenberg|        Finance|
|    Daniel Faviet|        Finance|
|        John Chen|        Finance|
|   Ismael Sciarra|        Finance|
|Jose Manuel Urman|        Finance|
|        Luis Popp|        Finance|
|  Jennifer Whalen| Administration|
|    Matthew Weiss|       Shipping|
|       Adam Fripp|       Shipping|
|   Payam Kaufling|       Shipping|
|   Shanta Vollman|       Shipping|
|    Kevin Mourgos|       Shipping|
|      Julia Nayer|       Shipping|
|Irene Mikkilineni|       Shipping|
|     James Landry|       Shipping|
|    Steven Markle|       Shipping|
|     Laura Bissot|       Shipping|
+-----------------+---------------+
only showing top 20 rows



#### Question 2
Using a Self-Join concept, display employee's full name, job id and manager's name

![figure](lab3_q2.png)

In [7]:
#spark.sql
spark.sql("select e.first_name || ' ' || e.last_name as FullName, e.job_id, m.first_name || ' ' || m.last_name as ManagerName from dataEmployees e join dataEmployees m on m.employee_id = e.manager_id").show()

+-----------------+----------+----------------+
|         FullName|    job_id|     ManagerName|
+-----------------+----------+----------------+
|        Lisa Ozer|    SA_REP|Gerald Cambrault|
|   Harrison Bloom|    SA_REP|Gerald Cambrault|
|       Tayler Fox|    SA_REP|Gerald Cambrault|
|    William Smith|    SA_REP|Gerald Cambrault|
|  Elizabeth Bates|    SA_REP|Gerald Cambrault|
|    Sundita Kumar|    SA_REP|Gerald Cambrault|
|    Daniel Faviet|FI_ACCOUNT| Nancy Greenberg|
|        John Chen|FI_ACCOUNT| Nancy Greenberg|
|   Ismael Sciarra|FI_ACCOUNT| Nancy Greenberg|
|Jose Manuel Urman|FI_ACCOUNT| Nancy Greenberg|
|        Luis Popp|FI_ACCOUNT| Nancy Greenberg|
|  Nancy Greenberg|    FI_MGR|   Neena Kochhar|
|  Jennifer Whalen|   AD_ASST|   Neena Kochhar|
|     Susan Mavris|    HR_REP|   Neena Kochhar|
|     Hermann Baer|    PR_REP|   Neena Kochhar|
|  Shelley Higgins|    AC_MGR|   Neena Kochhar|
|      Bruce Ernst|   IT_PROG|Alexander Hunold|
|     David Austin|   IT_PROG|Alexander 

In [8]:
#dataframe function
dfEmployees.alias('df').join(dfEmployees.alias('df2')).where('df.employee_id = df2.manager_id').select(concat(col("df2.first_name"), lit(" "), col("df2.last_name")).alias('FullName'), 'df2.job_id',concat(col("df.first_name"), lit(" "), col("df.last_name")).alias('ManagerName')).show()

+-----------------+----------+----------------+
|         FullName|    job_id|     ManagerName|
+-----------------+----------+----------------+
|        Lisa Ozer|    SA_REP|Gerald Cambrault|
|   Harrison Bloom|    SA_REP|Gerald Cambrault|
|       Tayler Fox|    SA_REP|Gerald Cambrault|
|    William Smith|    SA_REP|Gerald Cambrault|
|  Elizabeth Bates|    SA_REP|Gerald Cambrault|
|    Sundita Kumar|    SA_REP|Gerald Cambrault|
|    Daniel Faviet|FI_ACCOUNT| Nancy Greenberg|
|        John Chen|FI_ACCOUNT| Nancy Greenberg|
|   Ismael Sciarra|FI_ACCOUNT| Nancy Greenberg|
|Jose Manuel Urman|FI_ACCOUNT| Nancy Greenberg|
|        Luis Popp|FI_ACCOUNT| Nancy Greenberg|
|  Nancy Greenberg|    FI_MGR|   Neena Kochhar|
|  Jennifer Whalen|   AD_ASST|   Neena Kochhar|
|     Susan Mavris|    HR_REP|   Neena Kochhar|
|     Hermann Baer|    PR_REP|   Neena Kochhar|
|  Shelley Higgins|    AC_MGR|   Neena Kochhar|
|      Bruce Ernst|   IT_PROG|Alexander Hunold|
|     David Austin|   IT_PROG|Alexander 

#### Question 3
Display all departments that have no employees in it.

![figure](lab3_q3.png)

In [9]:
#spark.sql
spark.sql("select department_name from dataEmployees e FULL OUTER JOIN dataDepartments d on (e.department_id=d.department_id) group by department_name having count(employee_id)=0").show()

+--------------------+
|     department_name|
+--------------------+
|       Corporate Tax|
|    Government Sales|
|             Payroll|
|          Recruiting|
|        Construction|
|                 NOC|
|            Treasury|
|Shareholder Services|
|        Retail Sales|
|         Contracting|
|          IT Support|
|  Control And Credit|
|         IT Helpdesk|
|       Manufacturing|
|          Operations|
|            Benefits|
+--------------------+



In [10]:
#dataframe function
dfEmployees.join(dfDepartments,dfEmployees.department_id==dfDepartments.department_id,'full_outer').groupBy('department_name').agg(count("employee_id").alias("merge")).select("department_name").where('merge = 0').show()

+--------------------+
|     department_name|
+--------------------+
|       Corporate Tax|
|    Government Sales|
|             Payroll|
|          Recruiting|
|        Construction|
|                 NOC|
|            Treasury|
|Shareholder Services|
|        Retail Sales|
|         Contracting|
|          IT Support|
|  Control And Credit|
|         IT Helpdesk|
|       Manufacturing|
|          Operations|
|            Benefits|
+--------------------+



#### Question 4
Find employees that have the same job with his previous job. Display their previous job titles as well.
![figure](lab3_q4.png)


In [11]:
#spark.sql
spark.sql("select first_name, last_name, job_title from dataEmployees join dataJobs on dataEmployees.job_id = dataJobs.job_id join dataJob_history on dataEmployees.employee_id = dataJob_history.employee_id and dataEmployees.job_id = dataJob_history.job_id").show()

+----------+---------+--------------------+
|first_name|last_name|           job_title|
+----------+---------+--------------------+
|  Jonathon|   Taylor|Sales Representative|
|  Jennifer|   Whalen|Administration As...|
+----------+---------+--------------------+



In [12]:
#dataframe function
cond = [dfEmployees.employee_id == dfJob_history.employee_id, dfEmployees.job_id == dfJob_history.job_id]
dfEmployees.join(dfJobs, dfEmployees.job_id == dfJobs.job_id).join(dfJob_history, cond).select(dfEmployees.first_name, dfEmployees.last_name, dfJobs.job_title).show()

+----------+---------+--------------------+
|first_name|last_name|           job_title|
+----------+---------+--------------------+
|  Jonathon|   Taylor|Sales Representative|
|  Jennifer|   Whalen|Administration As...|
+----------+---------+--------------------+



#### Question 5

Employees total salary can be calculated as their base salary + percentage of commision with their salary.
Find all employees that earn total salary more than maximum salary for his job.

![figure](lab3_q5.png)

In [6]:
#spark.sql()
sqlQry = spark.sql("select B.first_name, B.job_id, B.salary, B.commission_pct"+
                   ", B.salary+(B.commission_pct*B.salary) as total, concat(A.min_salary,' - ',A.max_salary) range "+
                   "from dataJobs A full join dataEmployees B on A.job_id = B.job_id "+
                   "where B.salary+(B.commission_pct*B.salary) > A.max_salary")
sqlQry.show()

+----------+------+------+--------------+-------+------------+
|first_name|job_id|salary|commission_pct|  total|       range|
+----------+------+------+--------------+-------+------------+
|     Peter|SA_REP| 10000|           0.3|13000.0|6000 - 12008|
|   Janette|SA_REP| 10000|          0.35|13500.0|6000 - 12008|
|   Patrick|SA_REP|  9500|          0.35|12825.0|6000 - 12008|
|     Allan|SA_REP|  9000|          0.35|12150.0|6000 - 12008|
|     Clara|SA_REP| 10500|          0.25|13125.0|6000 - 12008|
|      Lisa|SA_REP| 11500|          0.25|14375.0|6000 - 12008|
|     Ellen|SA_REP| 11000|           0.3|14300.0|6000 - 12008|
+----------+------+------+--------------+-------+------------+



In [7]:
#Put equivalence Dataframe API script in here
tmp = dfJobs.join(dfEmployees, dfEmployees.job_id == dfJobs.job_id)
temp = tmp.select(dfEmployees.first_name,dfEmployees.job_id,dfEmployees.salary,dfEmployees.commission_pct,\
                  (dfEmployees.salary + (dfEmployees.commission_pct*dfEmployees.salary)).alias("total"),\
                  concat(dfJobs.min_salary,lit(" - "), dfJobs.max_salary).alias("range")).where(dfEmployees.salary+(dfEmployees.commission_pct*dfEmployees.salary) > dfJobs.max_salary)
temp.show()

+----------+------+------+--------------+-------+------------+
|first_name|job_id|salary|commission_pct|  total|       range|
+----------+------+------+--------------+-------+------------+
|     Peter|SA_REP| 10000|           0.3|13000.0|6000 - 12008|
|   Janette|SA_REP| 10000|          0.35|13500.0|6000 - 12008|
|   Patrick|SA_REP|  9500|          0.35|12825.0|6000 - 12008|
|     Allan|SA_REP|  9000|          0.35|12150.0|6000 - 12008|
|     Clara|SA_REP| 10500|          0.25|13125.0|6000 - 12008|
|      Lisa|SA_REP| 11500|          0.25|14375.0|6000 - 12008|
|     Ellen|SA_REP| 11000|           0.3|14300.0|6000 - 12008|
+----------+------+------+--------------+-------+------------+



In [21]:
spark.sql("select job_id, (max(salary)) from dataEmployees group by job_id").show()

+----------+-----------+
|    job_id|max(salary)|
+----------+-----------+
|FI_ACCOUNT|       9000|
|    MK_MAN|      13000|
|   IT_PROG|       9000|
|    FI_MGR|      12008|
|AC_ACCOUNT|       8300|
|    HR_REP|       6500|
|  PU_CLERK|       3100|
|    AC_MGR|      12008|
|    PR_REP|      10000|
|    ST_MAN|       8200|
|    MK_REP|       6000|
|    SA_REP|      11500|
|    SA_MAN|      14000|
|    PU_MAN|      11000|
|  SH_CLERK|       4200|
|   AD_PRES|      24000|
|  ST_CLERK|       3600|
|   AD_ASST|       4400|
|     AD_VP|      17000|
+----------+-----------+



#### Question 6

Using Join syntax, find all employees who haven't changed their job and were hired on January
![figure](lab3_q6.png)

In [14]:
#spark.sql
spark.sql("SELECT de.first_name, de.job_id, de.hire_date FROM dataJob_history as dh FULL JOIN dataEmployees as de "+
          "ON dh.employee_id = de.employee_id WHERE hire_date LIKE '%Jan%' "+
          "AND de.employee_id NOT IN (SELECT employee_id FROM dataJob_history)").show()

+----------+--------+---------+
|first_name|  job_id|hire_date|
+----------+--------+---------+
| Alexander| IT_PROG|03-Jan-06|
|     Karen|  SA_MAN|05-Jan-05|
|    Curtis|ST_CLERK|29-Jan-05|
|    Mattea|  SA_REP|24-Jan-08|
|   Charles|  SA_REP|04-Jan-08|
|     James|ST_CLERK|14-Jan-07|
|   Janette|  SA_REP|30-Jan-04|
|    Tayler|  SA_REP|24-Jan-06|
|     Peter|  SA_REP|30-Jan-05|
|   Douglas|SH_CLERK|13-Jan-08|
|     Eleni|  SA_MAN|29-Jan-08|
|   Nandita|SH_CLERK|27-Jan-04|
|   Winston|SH_CLERK|24-Jan-06|
+----------+--------+---------+



In [15]:
#dataframe function
dfEmployees.join(dfJobs, dfEmployees.job_id == dfJobs.job_id).join(dfJob_history, dfEmployees.employee_id == dfJob_history.employee_id , 'leftanti').select("first_name", dfJobs.job_id, "hire_date").where(col('hire_date').like("%Jan%")).show()

+----------+--------+---------+
|first_name|  job_id|hire_date|
+----------+--------+---------+
| Alexander| IT_PROG|03-Jan-06|
|     Karen|  SA_MAN|05-Jan-05|
|    Curtis|ST_CLERK|29-Jan-05|
|    Mattea|  SA_REP|24-Jan-08|
|   Charles|  SA_REP|04-Jan-08|
|     James|ST_CLERK|14-Jan-07|
|   Janette|  SA_REP|30-Jan-04|
|    Tayler|  SA_REP|24-Jan-06|
|     Peter|  SA_REP|30-Jan-05|
|   Douglas|SH_CLERK|13-Jan-08|
|     Eleni|  SA_MAN|29-Jan-08|
|   Nandita|SH_CLERK|27-Jan-04|
|   Winston|SH_CLERK|24-Jan-06|
+----------+--------+---------+

