# Transformations in Spark

In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [3]:
## Create SparkContext, SparkSession
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs:///apps/hive/warehouse/'

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

FileNotFoundError: [WinError 2] The system cannot find the file specified

In [3]:
# Verify Spark Driver
spark

Now there are 6 tables available in HIVE, in the database *insofe_empdb**

Access these tables and process the data

#### Verify HIVE the Data in Hive

In [4]:
# insofe_empdb
spark.sql("use insofe_empdb")

DataFrame[]

#### Verify/List all the tables in the above database

In [5]:
spark.sql("show tables").show()

+------------+------------+-----------+
|    database|   tableName|isTemporary|
+------------+------------+-----------+
|insofe_empdb| departments|      false|
|insofe_empdb|    dept_emp|      false|
|insofe_empdb|dept_manager|      false|
|insofe_empdb|   employees|      false|
|insofe_empdb|    salaries|      false|
|insofe_empdb|      titles|      false|
+------------+------------+-----------+



#### Create DataFrame for departments data from departments table in HIVE

In [69]:
deptDF = spark.sql("SELECT * FROM departments")

#### Verify the departments DataFrame

In [71]:
deptDF.show(10)

+-------+------------------+-------------------+
|dept_no|         dept_name|      last_modified|
+-------+------------------+-------------------+
|   d001|         Marketing|2013-01-28 23:59:59|
|   d002|           Finance|2013-01-28 23:59:59|
|   d003|   Human Resources|2013-01-28 23:59:59|
|   d004|        Production|2013-01-28 23:59:59|
|   d005|       Development|2013-01-28 23:59:59|
|   d006|Quality Management|2013-01-28 23:59:59|
|   d007|             Sales|2013-01-28 23:59:59|
|   d008|          Research|2013-01-28 23:59:59|
|   d009|  Customer Service|2013-01-28 23:59:59|
|   d010|         Analytics|2018-07-05 09:02:09|
+-------+------------------+-------------------+



#### Verify the Schema of deparaments DataFrame

In [72]:
deptDF.printSchema()

root
 |-- dept_no: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Create DataFrame for departments&employees from dept_emp table in HIVE
<br>For each employee his/her respective department is available in dept_emp table
<br>An employee might move/work in different times in his/her tenure
<br>So, there will be multiple records for an employee (with different departments in different time periods)
<br>At any time an employee must be active, only in one department and this can be identified with to_date of '9999-01-01'

In [75]:
dept_empDF = spark.sql("select * from dept_emp ")

#### Verify the departments&employees DataFrame

In [76]:
dept_empDF.show(4)

+------+------+-------+----------+----------+-------------------+
|seq_no|emp_no|dept_no| from_date|   to_date|      last_modified|
+------+------+-------+----------+----------+-------------------+
|     1|     1|   d001|1986-01-01|9999-01-01|2013-01-28 23:59:59|
|     2|     2|   d002|1986-01-01|9999-01-01|2013-01-28 23:59:59|
|     3|     3|   d003|1986-01-01|9999-01-01|2013-01-28 23:59:59|
|     4|     4|   d004|1986-01-01|9999-01-01|2013-01-28 23:59:59|
+------+------+-------+----------+----------+-------------------+
only showing top 4 rows



#### Verify the schema for departments&employees DataFrame

In [77]:
dept_empDF.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Total Count in departments&employees DataFrame

In [78]:
dept_empDF.count()

341603

#### Filter only the active records from the above DataFrame
Active employees are whose to_date is '9999-01-01'

In [79]:
active_dept_empDF = dept_empDF[dept_empDF.to_date == '9999-01-01']

#### Active employees count

In [80]:
active_dept_empDF.count()

250124

In [81]:
active_dept_empDF.dtypes

[('seq_no', 'int'),
 ('emp_no', 'int'),
 ('dept_no', 'string'),
 ('from_date', 'string'),
 ('to_date', 'string'),
 ('last_modified', 'timestamp')]

#### Remove unnecessary columns

In [82]:
from pyspark.sql.functions import col, expr, column
active_dept_empDF = active_dept_empDF.select('emp_no', 'dept_no', 'from_date', 'to_date')
active_dept_empDF.dtypes

[('emp_no', 'int'),
 ('dept_no', 'string'),
 ('from_date', 'string'),
 ('to_date', 'string')]

#### Make the DataFrame available in in-memory

In [83]:
active_dept_empDF.cache()

DataFrame[emp_no: int, dept_no: string, from_date: string, to_date: string]

#### Create DataFrame for departments&managers data from dept_manager table in HIVE
<br>For each department respective manager's employee number is available in dept_manager table
<br>A department may have multiple manager's 
<br>So, there will be multiple records for a department (with different employee number's for different time periods)
<br>At any time an there will be only one active manager for each department and is can be identified by to_date value '9999-01-01'

In [84]:
dept_managerDF = spark.sql("select * from dept_manager")

#### Verify the departments&managers DataFrame

In [85]:
dept_managerDF.show(4)

+------+-------+------+----------+----------+-------------------+
|seq_no|dept_no|emp_no| from_date|   to_date|      last_modified|
+------+-------+------+----------+----------+-------------------+
|     1|   d001|     1|1986-01-01|1992-10-01|2013-01-28 23:59:59|
|     2|   d002|     2|1986-01-01|1990-12-17|2013-01-28 23:59:59|
|     3|   d003|     3|1986-01-01|1993-03-21|2013-01-28 23:59:59|
|     4|   d004|     4|1986-01-01|1989-09-09|2013-01-28 23:59:59|
+------+-------+------+----------+----------+-------------------+
only showing top 4 rows



#### Verify the schema for departments&managers DataFrame

In [86]:
dept_managerDF.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Total Count in departments&managers DataFrame

In [87]:
dept_managerDF.count()

25

#### Filter only the active records from above DataFrame
<br>Though there are only total 10 departments, but there are 25 records (manager records) exists, 
<br>remove the inactive records
<br>Active records are whose to_date is '9999-01-01'

In [88]:
active_dept_managerDF = dept_managerDF[dept_managerDF.to_date=='9999-01-01']

#### Verify the records count from the above DataFrame

In [89]:
active_dept_managerDF.count()

10

#### Verify for the unwanted columns

In [90]:
active_dept_managerDF.dtypes

[('seq_no', 'int'),
 ('dept_no', 'string'),
 ('emp_no', 'int'),
 ('from_date', 'string'),
 ('to_date', 'string'),
 ('last_modified', 'timestamp')]

#### Remove unwanted columns and rename the columns as necessary

In [92]:
active_dept_managerDF = active_dept_managerDF.select('dept_no', expr('emp_no AS mgr_emp_no'), expr('from_date AS mgr_from_date'))

#### Verify the above DataFrame

In [93]:
active_dept_managerDF.show(4)

+-------+----------+-------------+
|dept_no|mgr_emp_no|mgr_from_date|
+-------+----------+-------------+
|   d002|        10|   1990-12-17|
|   d003|     19827|   1993-03-21|
|   d001|     45502|   1993-10-01|
|   d005|     64439|   1995-04-25|
+-------+----------+-------------+
only showing top 4 rows



#### Create DataFrame for employees data from employees table in HIVE

In [94]:
employeesDF = spark.sql("select * from employees")

#### Verify employees DataFrame

In [96]:
employeesDF.show(4)

+------+----------+----------+------------+------+----------+-------------------+
|emp_no|birth_date|first_name|   last_name|gender| hire_date|      last_modified|
+------+----------+----------+------------+------+----------+-------------------+
|     1|1958-09-12| Margareta|  Markovitch|     M|1986-01-01|2013-01-28 23:59:59|
|     2|1961-10-28|      Ebru|       Alpin|     M|1986-01-01|2013-01-28 23:59:59|
|     3|1955-06-24|   Shirish|Ossenbruggen|     F|1986-01-01|2013-01-28 23:59:59|
|     4|1958-06-08| Krassimir|     Wegerle|     F|1986-01-01|2013-01-28 23:59:59|
+------+----------+----------+------------+------+----------+-------------------+
only showing top 4 rows



#### Verify schema of employees DataFrame

In [97]:
employeesDF.printSchema()

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Remove unwanted columns

In [98]:
employeesDF = employeesDF.drop('last_modified')

#### Verify above DataFrame

In [99]:
employeesDF.show(4)

+------+----------+----------+------------+------+----------+
|emp_no|birth_date|first_name|   last_name|gender| hire_date|
+------+----------+----------+------------+------+----------+
|     1|1958-09-12| Margareta|  Markovitch|     M|1986-01-01|
|     2|1961-10-28|      Ebru|       Alpin|     M|1986-01-01|
|     3|1955-06-24|   Shirish|Ossenbruggen|     F|1986-01-01|
|     4|1958-06-08| Krassimir|     Wegerle|     F|1986-01-01|
+------+----------+----------+------------+------+----------+
only showing top 4 rows



#### Create DataFrame for salaries data from salaries table in HIVE

In [100]:
salariesDF = spark.sql("select * from salaries")

#### Verify salaries DataFrame

In [101]:
salariesDF.show(4)

+------+------+------+----------+----------+-------------------+
|seq_no|emp_no|salary| from_date|   to_date|      last_modified|
+------+------+------+----------+----------+-------------------+
|     1|     1| 70166|1986-01-01|1987-01-01|2013-01-28 23:59:59|
|     2|     1| 70820|1987-01-01|1988-01-01|2013-01-28 23:59:59|
|     3|     1| 71970|1988-01-01|1989-01-01|2013-01-28 23:59:59|
|     4|     1| 75211|1989-01-01|1989-12-31|2013-01-28 23:59:59|
+------+------+------+----------+----------+-------------------+
only showing top 4 rows



#### Verify schema of salaries DataFrame

In [102]:
salariesDF.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Verify count of records in salaries DataFrame

In [103]:
salariesDF.count()

2854047

#### Filter only the active records from above DataFrame

In [104]:
active_salariesDF = salariesDF[salariesDF.to_date=='9999-01-01']

#### Verify the record count

In [105]:
active_salariesDF.count()

250124

In [49]:
active_salariesDF.dtypes

[('seq_no', 'int'),
 ('emp_no', 'int'),
 ('salary', 'int'),
 ('from_date', 'string'),
 ('to_date', 'string'),
 ('last_modified', 'timestamp')]

#### Remove and rename unnecessary columns

In [106]:
from pyspark.sql.functions import col, expr, column
active_salariesDF = active_salariesDF.select("emp_no", "salary", expr("from_date as sal_from_date"))

In [107]:
active_salariesDF.dtypes

[('emp_no', 'int'), ('salary', 'int'), ('sal_from_date', 'string')]

#### Create titles DataFrame for the titles table in HIVE

In [108]:
titlesDF = spark.sql("select * from titles")

#### Verify titles DataFrame

In [110]:
titlesDF.show(4)

+------+------+------------+----------+----------+-------------------+
|seq_no|emp_no|       title| from_date|   to_date|      last_modified|
+------+------+------------+----------+----------+-------------------+
|     1|     1|     Manager|1986-01-01|1992-10-01|2013-01-28 23:59:59|
|     2|     1|Senior Staff|1992-10-01|9999-01-01|2013-01-28 23:59:59|
|     3|     2|     Manager|1986-01-01|1990-12-17|2013-01-28 23:59:59|
|     4|     2|Senior Staff|1990-12-17|9999-01-01|2013-01-28 23:59:59|
+------+------+------------+----------+----------+-------------------+
only showing top 4 rows



#### Verify titles DataFrame schema

In [111]:
titlesDF.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Verify records count in the above DataFrame

In [112]:
titlesDF.count()

453308

#### Filter active records from the above DataFrame

In [113]:
active_titlesDF = titlesDF[titlesDF.to_date=='9999-01-01']

In [114]:
active_titlesDF.dtypes

[('seq_no', 'int'),
 ('emp_no', 'int'),
 ('title', 'string'),
 ('from_date', 'string'),
 ('to_date', 'string'),
 ('last_modified', 'timestamp')]

#### Remove and rename the columns as necessary

In [115]:
from pyspark.sql.functions import col, expr, column
active_titlesDF = active_titlesDF.select('emp_no', 'title', expr('from_date AS title_from_date'))

In [116]:
active_titlesDF.dtypes

[('emp_no', 'int'), ('title', 'string'), ('title_from_date', 'string')]

#### Join department and departments_manager s DataFrames
Result will have each department and corresponding manager's employee no

In [117]:
dept_curr_mgrDF = deptDF.join(active_dept_managerDF, 'dept_no', 'inner')

In [118]:
dept_curr_mgrDF.show(50)

+-------+------------------+-------------------+----------+-------------+
|dept_no|         dept_name|      last_modified|mgr_emp_no|mgr_from_date|
+-------+------------------+-------------------+----------+-------------+
|   d002|           Finance|2013-01-28 23:59:59|        10|   1990-12-17|
|   d003|   Human Resources|2013-01-28 23:59:59|     19827|   1993-03-21|
|   d001|         Marketing|2013-01-28 23:59:59|     45502|   1993-10-01|
|   d005|       Development|2013-01-28 23:59:59|     64439|   1995-04-25|
|   d007|             Sales|2013-01-28 23:59:59|     71341|   1994-03-07|
|   d008|          Research|2013-01-28 23:59:59|    107706|   1996-04-08|
|   d006|Quality Management|2013-01-28 23:59:59|    149081|   2000-06-28|
|   d009|  Customer Service|2013-01-28 23:59:59|    151543|   2003-01-03|
|   d004|        Production|2013-01-28 23:59:59|    215054|   2005-08-30|
|   d010|         Analytics|2018-07-05 09:02:09|    300030|   2013-01-29|
+-------+------------------+----------

#### Find the manager's details by joining above DataFrame with employee's details using emp_no

In [133]:
dept_curr_mgrDF.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('last_modified', 'timestamp'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string')]

In [120]:
employeesDF.dtypes

[('emp_no', 'int'),
 ('birth_date', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('hire_date', 'string')]

In [134]:
join_expr = dept_curr_mgrDF["mgr_emp_no"] == employeesDF["emp_no"]

In [135]:
dept_curr_mgr_detailsDF = dept_curr_mgrDF.join(employeesDF,join_expr,'inner')

In [136]:
dept_curr_mgr_detailsDF.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('last_modified', 'timestamp'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('emp_no', 'int'),
 ('birth_date', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('hire_date', 'string')]

In [137]:
dept_curr_mgr_detailsDF.show(4)

+-------+---------------+-------------------+----------+-------------+------+----------+----------+----------+------+----------+
|dept_no|      dept_name|      last_modified|mgr_emp_no|mgr_from_date|emp_no|birth_date|first_name| last_name|gender| hire_date|
+-------+---------------+-------------------+----------+-------------+------+----------+----------+----------+------+----------+
|   d002|        Finance|2013-01-28 23:59:59|        10|   1990-12-17|    10|1959-03-28|     Isamu|Legleitner|     F|1986-01-14|
|   d003|Human Resources|2013-01-28 23:59:59|     19827|   1993-03-21| 19827|1960-12-02|   Karsten|   Sigstam|     F|1986-08-04|
|   d001|      Marketing|2013-01-28 23:59:59|     45502|   1993-10-01| 45502|1967-06-21|  Vishwani|  Minakawa|     M|1988-04-12|
|   d005|    Development|2013-01-28 23:59:59|     64439|   1995-04-25| 64439|1970-04-25|      Leon|  DasSarma|     F|1989-10-21|
+-------+---------------+-------------------+----------+-------------+------+----------+---------

#### Rename columns as necessary

In [139]:
from pyspark.sql.functions import col

replacements = {'birth_date' : 'mgr_birth_date', 
                'first_name' : 'mgr_first_name',
                'last_name' : 'mgr_last_name',
                'gender' : 'mgr_gender',
                'hire_date' : 'mgr_hire_date'
               }

dept_curr_mgr_detailsDF = dept_curr_mgr_detailsDF.select([col(c).alias(replacements.get(c, c)) for c in dept_curr_mgr_detailsDF.columns])

#### Verify above DataFrame

In [140]:
dept_curr_mgr_detailsDF.show(4)

+-------+---------------+-------------------+----------+-------------+------+--------------+--------------+-------------+----------+-------------+
|dept_no|      dept_name|      last_modified|mgr_emp_no|mgr_from_date|emp_no|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+---------------+-------------------+----------+-------------+------+--------------+--------------+-------------+----------+-------------+
|   d002|        Finance|2013-01-28 23:59:59|        10|   1990-12-17|    10|    1959-03-28|         Isamu|   Legleitner|         F|   1986-01-14|
|   d003|Human Resources|2013-01-28 23:59:59|     19827|   1993-03-21| 19827|    1960-12-02|       Karsten|      Sigstam|         F|   1986-08-04|
|   d001|      Marketing|2013-01-28 23:59:59|     45502|   1993-10-01| 45502|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d005|    Development|2013-01-28 23:59:59|     64439|   1995-04-25| 64439|    1970-04-25|          Leon|     DasSar

#### Remove unwanted columns

In [142]:
dept_curr_mgr_detailsDF = dept_curr_mgr_detailsDF.drop('last_modified', 'emp_no')

In [143]:
dept_curr_mgr_detailsDF.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string')]

In [144]:
dept_curr_mgr_detailsDF.show(4)

+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|dept_no|      dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|   d002|        Finance|        10|   1990-12-17|    1959-03-28|         Isamu|   Legleitner|         F|   1986-01-14|
|   d003|Human Resources|     19827|   1993-03-21|    1960-12-02|       Karsten|      Sigstam|         F|   1986-08-04|
|   d001|      Marketing|     45502|   1993-10-01|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d005|    Development|     64439|   1995-04-25|    1970-04-25|          Leon|     DasSarma|         F|   1989-10-21|
+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
only showing top 4 rows



#### Join employees DataFrame with departments&employee DataFrame
<br>Join employeesDF and active_dept_empDF based on emp_no
<br>result DataFrame of the above is employee and his/her corresponding department

In [146]:
employeesDF.dtypes

[('emp_no', 'int'),
 ('birth_date', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('hire_date', 'string')]

In [147]:
active_dept_empDF.dtypes

[('emp_no', 'int'),
 ('dept_no', 'string'),
 ('from_date', 'string'),
 ('to_date', 'string')]

In [148]:
emp_deptDF = active_dept_empDF.join(employeesDF,'emp_no','inner')

#### Verify the above result DataFrame

In [149]:
emp_deptDF.show(4)

+------+-------+----------+----------+----------+----------+---------+------+----------+
|emp_no|dept_no| from_date|   to_date|birth_date|first_name|last_name|gender| hire_date|
+------+-------+----------+----------+----------+----------+---------+------+----------+
|   148|   d005|1986-02-03|9999-01-01|1960-03-11|    Feipei| Nollmann|     M|1986-02-03|
|   463|   d008|1986-02-06|9999-01-01|1955-04-15|Dharmaraja| Sadowsky|     M|1986-02-06|
|   496|   d006|2001-09-02|9999-01-01|1964-03-29|      Mari|    Rotem|     M|1986-02-06|
|   833|   d005|1994-03-25|9999-01-01|1961-09-14|      Huan|  Preusig|     M|1986-02-09|
+------+-------+----------+----------+----------+----------+---------+------+----------+
only showing top 4 rows



#### Rename the columns

In [150]:
from pyspark.sql.functions import col

replacements = {
    'from_date' : 'dept_from_date',
    'birth_date' : 'emp_birth_date',
    'first_name' : 'emp_first_name',
    'last_name' : 'emp_last_name',
    'gender' : 'emp_gender',
    'hire_date' : 'emp_hire_date'
}

emp_deptDF = emp_deptDF.select([col(c).alias(replacements.get(c, c)) for c in emp_deptDF.columns])

#### Verify DataFrame

In [151]:
emp_deptDF.show(4)

+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
|emp_no|dept_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|
+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
|     1|   d001|    1986-01-01|9999-01-01|    1958-09-12|     Margareta|   Markovitch|         M|   1986-01-01|
|     2|   d002|    1986-01-01|9999-01-01|    1961-10-28|          Ebru|        Alpin|         M|   1986-01-01|
|     3|   d003|    1986-01-01|9999-01-01|    1955-06-24|       Shirish| Ossenbruggen|         F|   1986-01-01|
|     4|   d004|    1986-01-01|9999-01-01|    1958-06-08|     Krassimir|      Wegerle|         F|   1986-01-01|
+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
only showing top 4 rows



#### Verify active records count

In [152]:
emp_deptDF.count()

250124

#### Create a DataFrame with employees and respective manager's details

<br>DataFrame **emp_deptDF** contains all the active employees along with the their department
<br>DataFrame **dept_curr_mgr_detailsDF** contains all the departments its manager''s details
<br>Join these two DataFrames based on the dept_no, to result a DataFrame with employee''s along with the manager''s details.

In [154]:
emp_deptDF.dtypes

[('emp_no', 'int'),
 ('dept_no', 'string'),
 ('dept_from_date', 'string'),
 ('to_date', 'string'),
 ('emp_birth_date', 'string'),
 ('emp_first_name', 'string'),
 ('emp_last_name', 'string'),
 ('emp_gender', 'string'),
 ('emp_hire_date', 'string')]

In [155]:
emp_deptDF.count()

250124

In [156]:
dept_curr_mgr_detailsDF.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string')]

In [157]:
dept_curr_mgr_detailsDF.count()

10

#### Join by broadcasting the smaller table - efficient join

In [159]:
from pyspark.sql.functions import broadcast
active_emp_dept_mgrDF = emp_deptDF.join(broadcast(dept_curr_mgr_detailsDF), 'dept_no', 'inner')

#### Verify the DataFrame

In [160]:
active_emp_dept_mgrDF.show(4)

+-------+------+--------------+----------+--------------+--------------+-------------+----------+-------------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|dept_no|emp_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|      dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+------+--------------+----------+--------------+--------------+-------------+----------+-------------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|   d001|     1|    1986-01-01|9999-01-01|    1958-09-12|     Margareta|   Markovitch|         M|   1986-01-01|      Marketing|     45502|   1993-10-01|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d002|     2|    1986-01-01|9999-01-01|    1961-10-28|          Ebru|        Alpin|         M|   1986-01-01| 

#### Verify the counts

In [161]:
active_emp_dept_mgrDF.count()

250124

#### Make the DataFrame available in in-memory

In [162]:
active_emp_dept_mgrDF.cache()

DataFrame[dept_no: string, emp_no: int, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string]

#### Join Salaries and Titles DataFrames
<br>**active_salariesDF** DataFrame contains the current salaries details of active employees
<br>**active_titlesDF** DataFrame contains the current titles/designation details of active employees
<br>join these two DataFrames based on the emp_no 
<br>result is DataFrame consists of all active employees along with their salaries and titles details

In [163]:
active_salariesDF.dtypes

[('emp_no', 'int'), ('salary', 'int'), ('sal_from_date', 'string')]

In [166]:
active_titlesDF.dtypes

[('emp_no', 'int'), ('title', 'string'), ('title_from_date', 'string')]

In [170]:
active_salariesDF.cache()
active_titlesDF.cache()

DataFrame[emp_no: int, title: string, title_from_date: string]

In [168]:
emp_sal_titlesDF = active_salariesDF.join(active_titlesDF, 'emp_no', 'inner')

#### Verify the DataFrame

In [169]:
emp_sal_titlesDF.show(4)

+------+------+-------------+---------------+---------------+
|emp_no|salary|sal_from_date|          title|title_from_date|
+------+------+-------------+---------------+---------------+
|   148|121640|   2003-01-30|Senior Engineer|     1993-02-04|
|   463| 63130|   2003-02-02|   Senior Staff|     1986-02-06|
|   496| 50281|   2002-08-16|       Engineer|     2000-08-17|
|   833| 52747|   2003-03-23|Senior Engineer|     1994-03-25|
+------+------+-------------+---------------+---------------+
only showing top 4 rows



#### Verify record count

In [171]:
emp_sal_titlesDF.count()

250124

In [172]:
emp_sal_titlesDF.cache()

DataFrame[emp_no: int, salary: int, sal_from_date: string, title: string, title_from_date: string]

#### Final Join 
<br>By now there are 2 DataFrames
<br>**active_emp_dept_mgrDF** consists of all active employees along with the manager''s details.
<br>**emp_sal_titlesDF** consists of the current salary and titles details for all the active employees.
<br>join these two DataFrames to result a DataFrame with all the details for all active employees

In [173]:
active_emp_dept_mgrDF.count()

250124

In [174]:
active_emp_dept_mgrDF.dtypes

[('dept_no', 'string'),
 ('emp_no', 'int'),
 ('dept_from_date', 'string'),
 ('to_date', 'string'),
 ('emp_birth_date', 'string'),
 ('emp_first_name', 'string'),
 ('emp_last_name', 'string'),
 ('emp_gender', 'string'),
 ('emp_hire_date', 'string'),
 ('dept_name', 'string'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string')]

In [175]:
emp_sal_titlesDF.count()

250124

In [176]:
emp_sal_titlesDF.dtypes

[('emp_no', 'int'),
 ('salary', 'int'),
 ('sal_from_date', 'string'),
 ('title', 'string'),
 ('title_from_date', 'string')]

In [177]:
emp_sal_titlesDF.cache()
active_emp_dept_mgrDF.cache()

DataFrame[dept_no: string, emp_no: int, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string]

In [178]:
active_emp_detailsDF = active_emp_dept_mgrDF.join(emp_sal_titlesDF, 'emp_no', 'inner')

In [179]:
active_emp_detailsDF.cache()

DataFrame[emp_no: int, dept_no: string, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string, salary: int, sal_from_date: string, title: string, title_from_date: string]

#### Verify DataFrame

In [180]:
active_emp_detailsDF.show(4)

+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+------------------+----------+-------------+--------------+--------------+-------------+----------+-------------+------+-------------+---------------+---------------+
|emp_no|dept_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|         dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|salary|sal_from_date|          title|title_from_date|
+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+------------------+----------+-------------+--------------+--------------+-------------+----------+-------------+------+-------------+---------------+---------------+
|   148|   d005|    1986-02-03|9999-01-01|    1960-03-11|        Feipei|     Nollmann|         M|   1986-02-03|       Development|     64439|   1995-04-25|    1970-04

#### Derive additional columns such as
-  emp_age = current_date - emp_birth_date
-  emp_tenure = current_date - emp_hire_date
-  mgr_age = current_date - mgr_birth_date
-  mgr_tenure = current_date - mgr_hire_date
-  salary_since = current_date - sal_from_date
-  role_since = current_date - title_from_date
-  emp_dept_tenure = current_date - dept_from_date
-  mgr_dept_tenure = current_date - mgr_from_date

#### Create a temporary table/view to perform sql queries

In [209]:
active_emp_detailsDF.registerTempTable("active_emp_details_sqlTBL")

In [211]:
active_emp_detailsDF.dtypes

[('emp_no', 'int'),
 ('dept_no', 'string'),
 ('dept_from_date', 'string'),
 ('to_date', 'string'),
 ('emp_birth_date', 'string'),
 ('emp_first_name', 'string'),
 ('emp_last_name', 'string'),
 ('emp_gender', 'string'),
 ('emp_hire_date', 'string'),
 ('dept_name', 'string'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string'),
 ('salary', 'int'),
 ('sal_from_date', 'string'),
 ('title', 'string'),
 ('title_from_date', 'string')]

#### Change the order of the columns with the select and derive the columns as necessary

In [215]:
active_employees_data  = spark.sql("""
SELECT emp_no, emp_first_name, emp_last_name, emp_gender, emp_birth_date, emp_hire_date,
       round(datediff(current_date,to_date(emp_birth_date))/365) as emp_age,
       round(datediff(current_date,to_date(emp_hire_date))/365) as emp_tenure,
       salary, sal_from_date, 
       round(datediff(current_date,to_date(sal_from_date))/365) as salary_since,
       title, title_from_date,
       round(datediff(current_date,to_date(title_from_date))/365) as role_since,
       dept_no, dept_name, dept_from_date,
       round(datediff(current_date,to_date(dept_from_date))/365) as emp_dept_tenure,
       mgr_emp_no, mgr_first_name, mgr_last_name, mgr_gender, mgr_birth_date, mgr_hire_date, mgr_from_date,
       round(datediff(current_date,to_date(mgr_birth_date))/365) as mgr_age,
       round(datediff(current_date,to_date(mgr_hire_date))/365) as mgr_tenure,
       round(datediff(current_date,to_date(mgr_from_date))/365) as mgr_dept_tenure
FROM active_emp_details_sqlTBL""")

#### Verify the DataFrame

In [216]:
active_employees_data.show(4)

+------+--------------+-------------+----------+--------------+-------------+-------+----------+------+-------------+------------+---------------+---------------+----------+-------+------------------+--------------+---------------+----------+--------------+-------------+----------+--------------+-------------+-------------+-------+----------+---------------+
|emp_no|emp_first_name|emp_last_name|emp_gender|emp_birth_date|emp_hire_date|emp_age|emp_tenure|salary|sal_from_date|salary_since|          title|title_from_date|role_since|dept_no|         dept_name|dept_from_date|emp_dept_tenure|mgr_emp_no|mgr_first_name|mgr_last_name|mgr_gender|mgr_birth_date|mgr_hire_date|mgr_from_date|mgr_age|mgr_tenure|mgr_dept_tenure|
+------+--------------+-------------+----------+--------------+-------------+-------+----------+------+-------------+------------+---------------+---------------+----------+-------+------------------+--------------+---------------+----------+--------------+-------------+-------

#### Write the DataFrame to the persistent storage - HDFS

In [217]:
active_employees_data.repartition(1).write.option("header", "false").csv("/user/manasm/datasets/employeesdb/results/active_employees_data")

In [218]:
active_employees_data.cache()

DataFrame[emp_no: int, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_birth_date: string, emp_hire_date: string, emp_age: double, emp_tenure: double, salary: int, sal_from_date: string, salary_since: double, title: string, title_from_date: string, role_since: double, dept_no: string, dept_name: string, dept_from_date: string, emp_dept_tenure: double, mgr_emp_no: int, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_birth_date: string, mgr_hire_date: string, mgr_from_date: string, mgr_age: double, mgr_tenure: double, mgr_dept_tenure: double]

In [219]:
active_employees_data.count()

250124

### Aggregated Data
#### Create the Aggregations
-  Based on Department
-  Based on Department and Gender

#### Aggregate based on department

In [221]:
from pyspark.sql import functions as F

dept_aggrDF = active_employees_data.groupBy('dept_no').agg(
    F.min('salary').alias('Min_Salary'),
    F.max('salary').alias('Max_Salary'),
    F.mean('salary').alias('Mean_Salary'),
    F.count('salary').alias('Total_Employees'),
    F.stddev('salary').alias('StdDev_Salary'),
    F.sum('salary').alias('Total_salary'),
    F.min('emp_age').alias('Min_Age'),
    F.max('emp_age').alias('Max_Age'),
    F.mean('emp_age').alias('Mean_Age'),
    F.min('emp_tenure').alias('Min_Tenure'),
    F.max('emp_tenure').alias('Max_Tenure'),
    F.mean('emp_tenure').alias('Mean_Tenure'),
    F.mean('salary_since').alias('Mean_Salary_Since'),
    F.mean('role_since').alias('Mean_Role_Since')
)

In [224]:
dept_aggrDF.cache()

DataFrame[dept_no: string, Min_Salary: int, Max_Salary: int, Mean_Salary: double, Total_Employees: bigint, StdDev_Salary: double, Total_salary: bigint, Min_Age: double, Max_Age: double, Mean_Age: double, Min_Tenure: double, Max_Tenure: double, Mean_Tenure: double, Mean_Salary_Since: double, Mean_Role_Since: double]

In [225]:
dept_aggrDF.show()

+-------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+-----------------+------------------+
|dept_no|Min_Salary|Max_Salary|       Mean_Salary|Total_Employees|     StdDev_Salary|Total_salary|Min_Age|Max_Age|          Mean_Age|Min_Tenure|Max_Tenure|       Mean_Tenure|Mean_Salary_Since|   Mean_Role_Since|
+-------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+-----------------+------------------+
|   d005|     12582|    142434|61388.935342615165|          62344|16393.484361748524|  3827231785|   24.0|   66.0| 46.82923777749262|       1.0|      33.0| 21.67711407673553| 9.85997048633389| 15.65260490183498|
|   d009|     12505|    142950| 61567.09564047363|          18580|19054.432158113068|  1143916637|   24.0|   66.0| 46.40398277717976|       1.0|      33

#### Aggregation based on Department and Gender

In [226]:
dept_gender_aggrDF = active_employees_data.groupBy('dept_no', 'emp_gender').agg(
    F.min('salary').alias('Min_Salary'),
    F.max('salary').alias('Max_Salary'),
    F.mean('salary').alias('Mean_Salary'),
    F.count('salary').alias('Total_Employees'),
    F.stddev('salary').alias('StdDev_Salary'),
    F.sum('salary').alias('Total_salary'),
    F.min('emp_age').alias('Min_Age'),
    F.max('emp_age').alias('Max_Age'),
    F.mean('emp_age').alias('Mean_Age'),
    F.min('emp_tenure').alias('Min_Tenure'),
    F.max('emp_tenure').alias('Max_Tenure'),
    F.mean('emp_tenure').alias('Mean_Tenure'),
    F.mean('salary_since').alias('Mean_Salary_Since'),
    F.mean('role_since').alias('Mean_Role_Since')
)

In [227]:
dept_gender_aggrDF.show()

+-------+----------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+-----------------+------------------+
|dept_no|emp_gender|Min_Salary|Max_Salary|       Mean_Salary|Total_Employees|     StdDev_Salary|Total_salary|Min_Age|Max_Age|          Mean_Age|Min_Tenure|Max_Tenure|       Mean_Tenure|Mean_Salary_Since|   Mean_Role_Since|
+-------+----------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+-----------------+------------------+
|   d006|         M|     12596|    137308| 59807.84364820847|           9210|17371.741914549362|   550830240|   24.0|   66.0|  46.3742671009772|       1.0|      32.0| 20.86243213897937| 9.57100977198697|15.130618892508144|
|   d006|         F|     12776|    137294| 60621.83104034627|           6469|18632.929352115545|   392162625

#### Write above DataFrames to storage

In [228]:
dept_aggrDF.repartition(1).write.option("header", "false").csv("/user/manasm/datasets/employeesdb/results/aggr_dept/")
dept_gender_aggrDF.repartition(1).write.option("header", "false").csv("/user/manasm/datasets/employeesdb/results/aggr_dept_gender/")