# Transformations in Spark

In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [2]:
## Create SparkContext, SparkSession
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs:///apps/hive/warehouse/'

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
# Verify Spark Driver
spark

Now there are 6 tables available in HIVE, in the database *insofe_empdb**

Access these tables and process the data

#### Verify HIVE the Data in Hive

In [9]:
# insofe_empdb_akc2320
spark.sql('use insofe_empdb_akc2320')

DataFrame[]

#### Verify/List all the tables in the above database

In [11]:
spark.sql('show tables').show()

+--------------------+------------+-----------+
|            database|   tableName|isTemporary|
+--------------------+------------+-----------+
|insofe_empdb_akc2320| departments|      false|
|insofe_empdb_akc2320|    dept_emp|      false|
|insofe_empdb_akc2320|dept_manager|      false|
|insofe_empdb_akc2320|   employees|      false|
|insofe_empdb_akc2320|    salaries|      false|
|insofe_empdb_akc2320|      titles|      false|
+--------------------+------------+-----------+



#### Create DataFrame for departments data from departments table in HIVE

In [12]:
df_dept=spark.sql('select * from departments')
df_dept.show(2)

+-------+---------+-------------------+
|dept_no|dept_name|      last_modified|
+-------+---------+-------------------+
|   d001|Marketing|2013-01-28 23:59:59|
|   d002|  Finance|2013-01-28 23:59:59|
+-------+---------+-------------------+
only showing top 2 rows



#### Verify the departments DataFrame

In [13]:
df_dept.show()

+-------+------------------+-------------------+
|dept_no|         dept_name|      last_modified|
+-------+------------------+-------------------+
|   d001|         Marketing|2013-01-28 23:59:59|
|   d002|           Finance|2013-01-28 23:59:59|
|   d003|   Human Resources|2013-01-28 23:59:59|
|   d004|        Production|2013-01-28 23:59:59|
|   d005|       Development|2013-01-28 23:59:59|
|   d006|Quality Management|2013-01-28 23:59:59|
|   d007|             Sales|2013-01-28 23:59:59|
|   d008|          Research|2013-01-28 23:59:59|
|   d009|  Customer Service|2013-01-28 23:59:59|
|   d010|         Analytics|2019-01-05 15:37:59|
+-------+------------------+-------------------+



#### Verify the Schema of deparaments DataFrame

In [18]:
df_dept.printSchema()

root
 |-- dept_no: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Create DataFrame for departments&employees from dept_emp table in HIVE
<br>For each employee his/her respective department is available in dept_emp table
<br>An employee might move/work in different times in his/her tenure
<br>So, there will be multiple records for an employee (with different departments in different time periods)
<br>At any time an employee must be active, only in one department and this can be identified with to_date of '9999-01-01'

In [19]:
df_dept_emp = spark.sql('select * from dept_emp')
df_dept_emp.show(3)

+------+------+-------+----------+----------+-------------------+
|seq_no|emp_no|dept_no| from_date|   to_date|      last_modified|
+------+------+-------+----------+----------+-------------------+
|     1|     1|   d001|1986-01-01|9999-01-01|2013-01-28 23:59:59|
|    10|    10|   d002|1986-01-14|9999-01-01|2013-01-28 23:59:59|
|   100|    94|   d004|1994-05-26|1999-04-29|2013-01-28 23:59:59|
+------+------+-------+----------+----------+-------------------+
only showing top 3 rows



#### Verify the departments&employees DataFrame

In [20]:
df_dept_emp.show(4)

+------+------+-------+----------+----------+-------------------+
|seq_no|emp_no|dept_no| from_date|   to_date|      last_modified|
+------+------+-------+----------+----------+-------------------+
|     1|     1|   d001|1986-01-01|9999-01-01|2013-01-28 23:59:59|
|    10|    10|   d002|1986-01-14|9999-01-01|2013-01-28 23:59:59|
|   100|    94|   d004|1994-05-26|1999-04-29|2013-01-28 23:59:59|
|  1000|   909|   d009|1988-09-12|9999-01-01|2013-01-28 23:59:59|
+------+------+-------+----------+----------+-------------------+
only showing top 4 rows



#### Verify the schema for departments&employees DataFrame

In [21]:
df_dept_emp.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Total Count in departments&employees DataFrame

In [22]:
df_dept_emp.count()

341603

#### Filter only the active records from the above DataFrame


In [23]:
df_emp_active = df_dept_emp.where(df_dept_emp.to_date == '9999-01-01')

#### Active employees count

In [24]:
df_emp_active.count()

250124

In [27]:
df_emp_active.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Remove unnecessary columns

In [29]:
df_emp_active=df_emp_active.drop('seq_no','last_modified')
df_emp_active.dtypes

[('emp_no', 'int'),
 ('dept_no', 'string'),
 ('from_date', 'string'),
 ('to_date', 'string')]

#### Make the DataFrame available in in-memory

In [31]:
df_emp_active.cache()

DataFrame[emp_no: int, dept_no: string, from_date: string, to_date: string]

#### Create DataFrame for departments&managers data from dept_manager table in HIVE
<br>For each department respective manager's employee number is available in dept_manager table
<br>A department may have multiple manager's 
<br>So, there will be multiple records for a department (with different employee number's for different time periods)
<br>At any time an there will be only one active manager for each department and is can be identified by to_date value '9999-01-01'

In [70]:
df_dept_manager = spark.sql('select * from dept_manager')

#### Verify the departments&managers DataFrame

In [71]:
df_dept_manager.show(4)

+------+-------+------+----------+----------+-------------------+
|seq_no|dept_no|emp_no| from_date|   to_date|      last_modified|
+------+-------+------+----------+----------+-------------------+
|     1|   d001|     1|1986-01-01|1992-10-01|2013-01-28 23:59:59|
|    10|   d002|    10|1990-12-17|9999-01-01|2013-01-28 23:59:59|
|    11|   d003| 19827|1993-03-21|9999-01-01|2013-01-28 23:59:59|
|    12|   d004| 31345|1990-09-09|1994-08-02|2013-01-28 23:59:59|
+------+-------+------+----------+----------+-------------------+
only showing top 4 rows



#### Verify the schema for departments&managers DataFrame

In [72]:
df_dept_manager.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Total Count in departments&managers DataFrame

In [73]:
df_dept_manager.count()

25

#### Filter only the active records from above DataFrame
<br>Though there are only total 10 departments, but there are 25 records (manager records) exists, 
<br>remove the inactive records


In [74]:
df_dept_manager_active = df_dept_manager.where(df_dept_manager.to_date =='9999-01-01')

#### Verify the records count from the above DataFrame

In [75]:
df_dept_manager_active.count()

10

In [76]:
df_dept_manager_active.show(4)

+------+-------+------+----------+----------+-------------------+
|seq_no|dept_no|emp_no| from_date|   to_date|      last_modified|
+------+-------+------+----------+----------+-------------------+
|    10|   d002|    10|1990-12-17|9999-01-01|2013-01-28 23:59:59|
|    11|   d003| 19827|1993-03-21|9999-01-01|2013-01-28 23:59:59|
|    13|   d001| 45502|1993-10-01|9999-01-01|2013-01-28 23:59:59|
|    15|   d005| 64439|1995-04-25|9999-01-01|2013-01-28 23:59:59|
+------+-------+------+----------+----------+-------------------+
only showing top 4 rows



#### Verify for  columns

In [77]:
df_dept_manager_active.dtypes

[('seq_no', 'int'),
 ('dept_no', 'string'),
 ('emp_no', 'int'),
 ('from_date', 'string'),
 ('to_date', 'string'),
 ('last_modified', 'timestamp')]

#### rename the columns as necessary

In [79]:

df_dept_manager_active = df_dept_manager_active.selectExpr("dept_no as dept_no","emp_no as mgr_emp_no", "from_date as mgr_from_date")

#### Verify the above DataFrame

In [80]:
df_dept_manager_active.show(4)

+-------+----------+-------------+
|dept_no|mgr_emp_no|mgr_from_date|
+-------+----------+-------------+
|   d002|        10|   1990-12-17|
|   d003|     19827|   1993-03-21|
|   d001|     45502|   1993-10-01|
|   d005|     64439|   1995-04-25|
+-------+----------+-------------+
only showing top 4 rows



#### Create DataFrame for employees data from employees table in HIVE

In [81]:
df_employees = spark.sql('select * from employees')

#### Verify employees DataFrame

In [82]:
df_employees.show(4)

+------+----------+----------+------------+------+----------+-------------------+
|emp_no|birth_date|first_name|   last_name|gender| hire_date|      last_modified|
+------+----------+----------+------------+------+----------+-------------------+
|     1|1958-09-12| Margareta|  Markovitch|     M|1986-01-01|2013-01-28 23:59:59|
|     2|1961-10-28|      Ebru|       Alpin|     M|1986-01-01|2013-01-28 23:59:59|
|     3|1955-06-24|   Shirish|Ossenbruggen|     F|1986-01-01|2013-01-28 23:59:59|
|     4|1958-06-08| Krassimir|     Wegerle|     F|1986-01-01|2013-01-28 23:59:59|
+------+----------+----------+------------+------+----------+-------------------+
only showing top 4 rows



#### Verify schema of employees DataFrame

In [84]:
df_employees.printSchema()

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Remove unwanted columns - last_modified

In [85]:
df_employees = df_employees.drop('last_modified')

#### Verify above DataFrame

In [86]:
df_employees.show(4)

+------+----------+----------+------------+------+----------+
|emp_no|birth_date|first_name|   last_name|gender| hire_date|
+------+----------+----------+------------+------+----------+
|     1|1958-09-12| Margareta|  Markovitch|     M|1986-01-01|
|     2|1961-10-28|      Ebru|       Alpin|     M|1986-01-01|
|     3|1955-06-24|   Shirish|Ossenbruggen|     F|1986-01-01|
|     4|1958-06-08| Krassimir|     Wegerle|     F|1986-01-01|
+------+----------+----------+------------+------+----------+
only showing top 4 rows



#### Create DataFrame for salaries data from salaries table in HIVE

In [87]:
df_salaries = spark.sql('select * from salaries')

#### Verify salaries DataFrame

In [88]:
df_salaries.show(4)

+------+------+------+----------+----------+-------------------+
|seq_no|emp_no|salary| from_date|   to_date|      last_modified|
+------+------+------+----------+----------+-------------------+
|     1|     1| 70166|1986-01-01|1987-01-01|2013-01-28 23:59:59|
|    10|     1| 91165|1994-12-30|1995-12-30|2013-01-28 23:59:59|
|   100|     6| 84203|1994-12-30|1995-12-30|2013-01-28 23:59:59|
|  1000|    73| 39000|1997-11-25|1998-11-25|2013-01-28 23:59:59|
+------+------+------+----------+----------+-------------------+
only showing top 4 rows



#### Verify schema of salaries DataFrame

In [90]:
df_salaries.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Verify count of records in salaries DataFrame

In [91]:
df_salaries.count()

2854047

#### Filter only the active records from above DataFrame

In [92]:
df_salaries_active = df_salaries.where(df_salaries.to_date == '9999-01-01')

#### Verify the record count

In [93]:
df_salaries_active.count()

250124

In [94]:
df_salaries_active.dtypes

[('seq_no', 'int'),
 ('emp_no', 'int'),
 ('salary', 'int'),
 ('from_date', 'string'),
 ('to_date', 'string'),
 ('last_modified', 'timestamp')]

#### Remove and rename unnecessary columns

In [95]:
df_salaries_active = df_salaries_active.selectExpr('emp_no as emp_no','salary as salary', 'from_date as sal_from_date' )

In [98]:
df_salaries_active.dtypes

[('emp_no', 'int'), ('salary', 'int'), ('sal_from_date', 'string')]

#### Create titles DataFrame for the titles table in HIVE

In [99]:
df_titles = spark.sql('select * from titles')

#### Verify titles DataFrame

In [100]:
df_titles.show(4)

+------+------+----------------+----------+----------+-------------------+
|seq_no|emp_no|           title| from_date|   to_date|      last_modified|
+------+------+----------------+----------+----------+-------------------+
|     1|     1|         Manager|1986-01-01|1992-10-01|2013-01-28 23:59:59|
|    10|     5|Technique Leader|1993-04-25|9999-01-01|2013-01-28 23:59:59|
|   100|    60|           Staff|1997-11-02|9999-01-01|2013-01-28 23:59:59|
|  1000|   620|        Engineer|1996-09-15|2003-03-20|2013-01-28 23:59:59|
+------+------+----------------+----------+----------+-------------------+
only showing top 4 rows



#### Verify titles DataFrame schema

In [102]:
df_titles.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)



#### Verify records count in the above DataFrame

In [103]:
df_titles.count()

453308

#### Filter active records from the above DataFrame

In [104]:
df_titles_active =  df_titles.where(df_titles.to_date == '9999-01-01')

In [105]:
df_titles_active.dtypes

[('seq_no', 'int'),
 ('emp_no', 'int'),
 ('title', 'string'),
 ('from_date', 'string'),
 ('to_date', 'string'),
 ('last_modified', 'timestamp')]

#### Remove and rename the columns as necessary

In [106]:
df_titles_active = df_titles_active.selectExpr('emp_no as emp_no','title as title', 'from_date as title_from_date' )

In [107]:
df_titles_active.dtypes

[('emp_no', 'int'), ('title', 'string'), ('title_from_date', 'string')]

#### Join department and departments_manager s DataFrames
Result will have each department and corresponding manager's employee no

In [110]:
df_dept_manager_active.show(1)
df_dept.show(1)

+-------+----------+-------------+
|dept_no|mgr_emp_no|mgr_from_date|
+-------+----------+-------------+
|   d002|        10|   1990-12-17|
+-------+----------+-------------+
only showing top 1 row

+-------+---------+-------------------+
|dept_no|dept_name|      last_modified|
+-------+---------+-------------------+
|   d001|Marketing|2013-01-28 23:59:59|
+-------+---------+-------------------+
only showing top 1 row



In [116]:

jn_dept_deptmgr = df_dept.join(df_dept_manager_active, on='dept_no')

In [117]:
jn_dept_deptmgr.show()

+-------+------------------+-------------------+----------+-------------+
|dept_no|         dept_name|      last_modified|mgr_emp_no|mgr_from_date|
+-------+------------------+-------------------+----------+-------------+
|   d002|           Finance|2013-01-28 23:59:59|        10|   1990-12-17|
|   d003|   Human Resources|2013-01-28 23:59:59|     19827|   1993-03-21|
|   d001|         Marketing|2013-01-28 23:59:59|     45502|   1993-10-01|
|   d005|       Development|2013-01-28 23:59:59|     64439|   1995-04-25|
|   d007|             Sales|2013-01-28 23:59:59|     71341|   1994-03-07|
|   d008|          Research|2013-01-28 23:59:59|    107706|   1996-04-08|
|   d006|Quality Management|2013-01-28 23:59:59|    149081|   2000-06-28|
|   d009|  Customer Service|2013-01-28 23:59:59|    151543|   2003-01-03|
|   d004|        Production|2013-01-28 23:59:59|    215054|   2005-08-30|
|   d010|         Analytics|2019-01-05 15:37:59|    300030|   2013-01-29|
+-------+------------------+----------

#### Find the manager's details by joining above DataFrame with employee's details using emp_no

In [118]:
jn_dept_deptmgr.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('last_modified', 'timestamp'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string')]

In [121]:
df_employees.dtypes

[('emp_no', 'int'),
 ('birth_date', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('hire_date', 'string')]

In [124]:
joinExpression = jn_dept_deptmgr["mgr_emp_no"] == df_employees['emp_no']
df_mgr_details = jn_dept_deptmgr.join(df_employees, joinExpression)

In [125]:
df_mgr_details.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('last_modified', 'timestamp'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('emp_no', 'int'),
 ('birth_date', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('hire_date', 'string')]

In [127]:
df_mgr_details.show(4)
df_mgr_details.count()

+-------+---------------+-------------------+----------+-------------+------+----------+----------+----------+------+----------+
|dept_no|      dept_name|      last_modified|mgr_emp_no|mgr_from_date|emp_no|birth_date|first_name| last_name|gender| hire_date|
+-------+---------------+-------------------+----------+-------------+------+----------+----------+----------+------+----------+
|   d002|        Finance|2013-01-28 23:59:59|        10|   1990-12-17|    10|1959-03-28|     Isamu|Legleitner|     F|1986-01-14|
|   d003|Human Resources|2013-01-28 23:59:59|     19827|   1993-03-21| 19827|1960-12-02|   Karsten|   Sigstam|     F|1986-08-04|
|   d001|      Marketing|2013-01-28 23:59:59|     45502|   1993-10-01| 45502|1967-06-21|  Vishwani|  Minakawa|     M|1988-04-12|
|   d005|    Development|2013-01-28 23:59:59|     64439|   1995-04-25| 64439|1970-04-25|      Leon|  DasSarma|     F|1989-10-21|
+-------+---------------+-------------------+----------+-------------+------+----------+---------

10

#### Rename columns as necessary

In [130]:
from pyspark.sql.functions import col

replacements = {'birth_date' : 'mgr_birth_date', 
                'first_name' : 'mgr_first_name',
                'last_name' : 'mgr_last_name',
                'gender' : 'mgr_gender',
                'hire_date' : 'mgr_hire_date'
               }
df_mgr_details = df_mgr_details.select([col(c).alias(replacements.get(c, c)) for c in df_mgr_details.columns])

#### Verify above DataFrame

In [131]:
df_mgr_details.show(4)

+-------+---------------+-------------------+----------+-------------+------+--------------+--------------+-------------+----------+-------------+
|dept_no|      dept_name|      last_modified|mgr_emp_no|mgr_from_date|emp_no|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+---------------+-------------------+----------+-------------+------+--------------+--------------+-------------+----------+-------------+
|   d002|        Finance|2013-01-28 23:59:59|        10|   1990-12-17|    10|    1959-03-28|         Isamu|   Legleitner|         F|   1986-01-14|
|   d003|Human Resources|2013-01-28 23:59:59|     19827|   1993-03-21| 19827|    1960-12-02|       Karsten|      Sigstam|         F|   1986-08-04|
|   d001|      Marketing|2013-01-28 23:59:59|     45502|   1993-10-01| 45502|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d005|    Development|2013-01-28 23:59:59|     64439|   1995-04-25| 64439|    1970-04-25|          Leon|     DasSar

In [58]:
df_mgr_details

+-------+---------------+-------------------+----------+-------------+------+--------------+--------------+-------------+----------+-------------+
|dept_no|      dept_name|      last_modified|mgr_emp_no|mgr_from_date|emp_no|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+---------------+-------------------+----------+-------------+------+--------------+--------------+-------------+----------+-------------+
|   d002|        Finance|2013-01-28 23:59:59|        10|   1990-12-17|    10|    1959-03-28|         Isamu|   Legleitner|         F|   1986-01-14|
|   d003|Human Resources|2013-01-28 23:59:59|     19827|   1993-03-21| 19827|    1960-12-02|       Karsten|      Sigstam|         F|   1986-08-04|
|   d001|      Marketing|2013-01-28 23:59:59|     45502|   1993-10-01| 45502|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d005|    Development|2013-01-28 23:59:59|     64439|   1995-04-25| 64439|    1970-04-25|          Leon|     DasSar

In [133]:
df_mgr_details.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('last_modified', 'timestamp'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('emp_no', 'int'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string')]

#### Remove unwanted columns

In [59]:
'last_modified', 'emp_no'

In [134]:
df_mgr_details= df_mgr_details.drop('last_modified', 'emp_no')
df_mgr_details.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string')]

In [135]:
df_mgr_details.show(4)

+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|dept_no|      dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|   d002|        Finance|        10|   1990-12-17|    1959-03-28|         Isamu|   Legleitner|         F|   1986-01-14|
|   d003|Human Resources|     19827|   1993-03-21|    1960-12-02|       Karsten|      Sigstam|         F|   1986-08-04|
|   d001|      Marketing|     45502|   1993-10-01|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d005|    Development|     64439|   1995-04-25|    1970-04-25|          Leon|     DasSarma|         F|   1989-10-21|
+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
only showing top 4 rows



#### Join employees DataFrame with departments&employee DataFrame
<br>Join employeesDF and active_dept_empDF based on emp_no
<br>result DataFrame of the above is employee and his/her corresponding department

In [139]:
df_employees.dtypes

[('emp_no', 'int'),
 ('birth_date', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('hire_date', 'string')]

In [138]:

df_emp_active.dtypes

[('emp_no', 'int'),
 ('dept_no', 'string'),
 ('from_date', 'string'),
 ('to_date', 'string')]

In [140]:
jn_emp_empactive = df_emp_active.join(df_employees, on='emp_no')

#### Verify the above result DataFrame

In [141]:
jn_emp_empactive.show(4)

+------+-------+----------+----------+----------+----------+---------+------+----------+
|emp_no|dept_no| from_date|   to_date|birth_date|first_name|last_name|gender| hire_date|
+------+-------+----------+----------+----------+----------+---------+------+----------+
|   148|   d005|1986-02-03|9999-01-01|1960-03-11|    Feipei| Nollmann|     M|1986-02-03|
|   463|   d008|1986-02-06|9999-01-01|1955-04-15|Dharmaraja| Sadowsky|     M|1986-02-06|
|   496|   d006|2001-09-02|9999-01-01|1964-03-29|      Mari|    Rotem|     M|1986-02-06|
|   833|   d005|1994-03-25|9999-01-01|1961-09-14|      Huan|  Preusig|     M|1986-02-09|
+------+-------+----------+----------+----------+----------+---------+------+----------+
only showing top 4 rows



In [65]:
jn_emp_empactive.show

+------+-------+----------+----------+----------+----------+---------+------+----------+
|emp_no|dept_no| from_date|   to_date|birth_date|first_name|last_name|gender| hire_date|
+------+-------+----------+----------+----------+----------+---------+------+----------+
|   148|   d005|1986-02-03|9999-01-01|1960-03-11|    Feipei| Nollmann|     M|1986-02-03|
|   463|   d008|1986-02-06|9999-01-01|1955-04-15|Dharmaraja| Sadowsky|     M|1986-02-06|
|   496|   d006|2001-09-02|9999-01-01|1964-03-29|      Mari|    Rotem|     M|1986-02-06|
|   833|   d005|1994-03-25|9999-01-01|1961-09-14|      Huan|  Preusig|     M|1986-02-09|
+------+-------+----------+----------+----------+----------+---------+------+----------+
only showing top 4 rows



#### Rename the columns

In [142]:
from pyspark.sql.functions import col

replacements = {
    'from_date' : 'dept_from_date',
    'birth_date' : 'emp_birth_date',
    'first_name' : 'emp_first_name',
    'last_name' : 'emp_last_name',
    'gender' : 'emp_gender',
    'hire_date' : 'emp_hire_date'
}

jn_emp_empactive = jn_emp_empactive.select([col(c).alias(replacements.get(c, c)) for c in jn_emp_empactive.columns])

#### Verify DataFrame

In [143]:
jn_emp_empactive.show()

+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
|emp_no|dept_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|
+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
|     1|   d001|    1986-01-01|9999-01-01|    1958-09-12|     Margareta|   Markovitch|         M|   1986-01-01|
|     2|   d002|    1986-01-01|9999-01-01|    1961-10-28|          Ebru|        Alpin|         M|   1986-01-01|
|     3|   d003|    1986-01-01|9999-01-01|    1955-06-24|       Shirish| Ossenbruggen|         F|   1986-01-01|
|     4|   d004|    1986-01-01|9999-01-01|    1958-06-08|     Krassimir|      Wegerle|         F|   1986-01-01|
+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
only showing top 4 rows



#### Verify active records count

In [144]:
jn_emp_empactive.count()

250124

#### Create a DataFrame with employees and respective manager's details

<br>DataFrame **emp_deptDF** contains all the active employees along with the their department
<br>DataFrame **dept_curr_mgr_detailsDF** contains all the departments its manager''s details
<br>Join these two DataFrames based on the dept_no, to result a DataFrame with employee''s along with the manager''s details.

In [145]:
jn_emp_empactive.dtypes

[('emp_no', 'int'),
 ('dept_no', 'string'),
 ('dept_from_date', 'string'),
 ('to_date', 'string'),
 ('emp_birth_date', 'string'),
 ('emp_first_name', 'string'),
 ('emp_last_name', 'string'),
 ('emp_gender', 'string'),
 ('emp_hire_date', 'string')]

In [147]:
jn_emp_empactive.count()

250124

In [148]:
df_mgr_details.dtypes

[('dept_no', 'string'),
 ('dept_name', 'string'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string')]

In [149]:
df_mgr_details.count()

10

#### Join by broadcasting the smaller table - efficient join

In [150]:
from pyspark.sql.functions import broadcast
active_emp_dept_mgrDF = jn_emp_empactive.join(broadcast(df_mgr_details),'dept_no', 'inner')

#### Verify the DataFrame

In [152]:
active_emp_dept_mgrDF.show(4)

+-------+------+--------------+----------+--------------+--------------+-------------+----------+-------------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|dept_no|emp_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|      dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+------+--------------+----------+--------------+--------------+-------------+----------+-------------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|   d001|     1|    1986-01-01|9999-01-01|    1958-09-12|     Margareta|   Markovitch|         M|   1986-01-01|      Marketing|     45502|   1993-10-01|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d002|     2|    1986-01-01|9999-01-01|    1961-10-28|          Ebru|        Alpin|         M|   1986-01-01| 

#### Verify the counts

In [155]:
active_emp_dept_mgrDF.count()

250124

#### Make the DataFrame available in in-memory

In [157]:
active_emp_dept_mgrDF.cache()

DataFrame[dept_no: string, emp_no: int, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string]

#### Join Salaries and Titles DataFrames
<br>**active_salariesDF** DataFrame contains the current salaries details of active employees
<br>**active_titlesDF** DataFrame contains the current titles/designation details of active employees
<br>join these two DataFrames based on the emp_no 
<br>result is DataFrame consists of all active employees along with their salaries and titles details

In [158]:
df_salaries_active.dtypes

[('emp_no', 'int'), ('salary', 'int'), ('sal_from_date', 'string')]

In [159]:
df_titles_active.dtypes

[('emp_no', 'int'), ('title', 'string'), ('title_from_date', 'string')]

In [160]:
df_salaries_active.cache
df_titles_active.cache

<bound method DataFrame.cache of DataFrame[emp_no: int, title: string, title_from_date: string]>

In [161]:
jn_sal_titles = df_salaries_active.join(df_titles_active, on='emp_no')

#### Verify the DataFrame

In [162]:
jn_sal_titles.show(4)

+------+------+-------------+---------------+---------------+
|emp_no|salary|sal_from_date|          title|title_from_date|
+------+------+-------------+---------------+---------------+
|   148|121640|   2003-01-30|Senior Engineer|     1993-02-04|
|   463| 63130|   2003-02-02|   Senior Staff|     1986-02-06|
|   496| 50281|   2002-08-16|       Engineer|     2000-08-17|
|   833| 52747|   2003-03-23|Senior Engineer|     1994-03-25|
+------+------+-------------+---------------+---------------+
only showing top 4 rows



#### Verify record count

In [163]:
jn_sal_titles.count()

250124

In [164]:
jn_sal_titles.cache()

DataFrame[emp_no: int, salary: int, sal_from_date: string, title: string, title_from_date: string]

#### Final Join 
<br>By now there are 2 DataFrames
<br>**active_emp_dept_mgrDF** consists of all active employees along with the manager''s details.
<br>**emp_sal_titlesDF** consists of the current salary and titles details for all the active employees.
<br>join these two DataFrames to result a DataFrame with all the details for all active employees

In [84]:
active_emp_dept_mgrDF.count()

250124

In [165]:
active_emp_dept_mgrDF.dtypes

[('dept_no', 'string'),
 ('emp_no', 'int'),
 ('dept_from_date', 'string'),
 ('to_date', 'string'),
 ('emp_birth_date', 'string'),
 ('emp_first_name', 'string'),
 ('emp_last_name', 'string'),
 ('emp_gender', 'string'),
 ('emp_hire_date', 'string'),
 ('dept_name', 'string'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string')]

In [166]:
jn_sal_titles.count()

250124

In [167]:
jn_sal_titles.dtypes

[('emp_no', 'int'),
 ('salary', 'int'),
 ('sal_from_date', 'string'),
 ('title', 'string'),
 ('title_from_date', 'string')]

In [169]:
jn_sal_titles.cache()
active_emp_dept_mgrDF.cache()

DataFrame[dept_no: string, emp_no: int, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string]

In [173]:
active_emp_detailsDF = active_emp_dept_mgrDF.join(jn_sal_titles, on='emp_no')

In [174]:
### Cache the employee details fro the above steps
active_emp_detailsDF.cache()

DataFrame[emp_no: int, dept_no: string, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string, salary: int, sal_from_date: string, title: string, title_from_date: string]

#### Verify DataFrame

In [175]:
active_emp_detailsDF.show(4)

+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+------------------+----------+-------------+--------------+--------------+-------------+----------+-------------+------+-------------+---------------+---------------+
|emp_no|dept_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|         dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|salary|sal_from_date|          title|title_from_date|
+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+------------------+----------+-------------+--------------+--------------+-------------+----------+-------------+------+-------------+---------------+---------------+
|   148|   d005|    1986-02-03|9999-01-01|    1960-03-11|        Feipei|     Nollmann|         M|   1986-02-03|       Development|     64439|   1995-04-25|    1970-04

#### Derive additional columns such as
-  emp_age = current_date - emp_birth_date
-  emp_tenure = current_date - emp_hire_date
-  mgr_age = current_date - mgr_birth_date
-  mgr_tenure = current_date - mgr_hire_date
-  salary_since = current_date - sal_from_date
-  role_since = current_date - title_from_date
-  emp_dept_tenure = current_date - dept_from_date
-  mgr_dept_tenure = current_date - mgr_from_date

#### Create a temporary table/view to perform sql queries

In [176]:
active_emp_detailsDF.registerTempTable("active_emp_details_sqlTBL")


In [177]:
active_emp_detailsDF.dtypes

[('emp_no', 'int'),
 ('dept_no', 'string'),
 ('dept_from_date', 'string'),
 ('to_date', 'string'),
 ('emp_birth_date', 'string'),
 ('emp_first_name', 'string'),
 ('emp_last_name', 'string'),
 ('emp_gender', 'string'),
 ('emp_hire_date', 'string'),
 ('dept_name', 'string'),
 ('mgr_emp_no', 'int'),
 ('mgr_from_date', 'string'),
 ('mgr_birth_date', 'string'),
 ('mgr_first_name', 'string'),
 ('mgr_last_name', 'string'),
 ('mgr_gender', 'string'),
 ('mgr_hire_date', 'string'),
 ('salary', 'int'),
 ('sal_from_date', 'string'),
 ('title', 'string'),
 ('title_from_date', 'string')]

#### Change the order of the columns with the select and derive the columns as necessary

In [178]:
active_employees_data  = spark.sql("""
SELECT emp_no, emp_first_name, emp_last_name, emp_gender, emp_birth_date, emp_hire_date,
       round(datediff(current_date,to_date(emp_birth_date))/365) as emp_age,
       round(datediff(current_date,to_date(emp_hire_date))/365) as emp_tenure,
       salary, sal_from_date, 
       round(datediff(current_date,to_date(sal_from_date))/365) as salary_since,
       title, title_from_date,
       round(datediff(current_date,to_date(title_from_date))/365) as role_since,
       dept_no, dept_name, dept_from_date,
       round(datediff(current_date,to_date(dept_from_date))/365) as emp_dept_tenure,
       mgr_emp_no, mgr_first_name, mgr_last_name, mgr_gender, mgr_birth_date, mgr_hire_date, mgr_from_date,
       round(datediff(current_date,to_date(mgr_birth_date))/365) as mgr_age,
       round(datediff(current_date,to_date(mgr_hire_date))/365) as mgr_tenure,
       round(datediff(current_date,to_date(mgr_from_date))/365) as mgr_dept_tenure
FROM active_emp_details_sqlTBL""")

#### Verify the DataFrame

In [179]:
active_employees_data.show(4)

+------+--------------+-------------+----------+--------------+-------------+-------+----------+------+-------------+------------+---------------+---------------+----------+-------+------------------+--------------+---------------+----------+--------------+-------------+----------+--------------+-------------+-------------+-------+----------+---------------+
|emp_no|emp_first_name|emp_last_name|emp_gender|emp_birth_date|emp_hire_date|emp_age|emp_tenure|salary|sal_from_date|salary_since|          title|title_from_date|role_since|dept_no|         dept_name|dept_from_date|emp_dept_tenure|mgr_emp_no|mgr_first_name|mgr_last_name|mgr_gender|mgr_birth_date|mgr_hire_date|mgr_from_date|mgr_age|mgr_tenure|mgr_dept_tenure|
+------+--------------+-------------+----------+--------------+-------------+-------+----------+------+-------------+------------+---------------+---------------+----------+-------+------------------+--------------+---------------+----------+--------------+-------------+-------

#### Write the DataFrame to the persistent storage - HDFS

In [181]:
active_employees_data.repartition(1).write.option("header", "false").csv("/user/2320B48/datasets/employeesdb/results/active_employees_data")

In [182]:
active_employees_data.cache()

DataFrame[emp_no: int, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_birth_date: string, emp_hire_date: string, emp_age: double, emp_tenure: double, salary: int, sal_from_date: string, salary_since: double, title: string, title_from_date: string, role_since: double, dept_no: string, dept_name: string, dept_from_date: string, emp_dept_tenure: double, mgr_emp_no: int, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_birth_date: string, mgr_hire_date: string, mgr_from_date: string, mgr_age: double, mgr_tenure: double, mgr_dept_tenure: double]

In [183]:
active_employees_data.count()

250124

### Aggregated Data
#### Create the Aggregations
-  Based on Department
-  Based on Department and Gender

#### Aggregate based on department

In [184]:
from pyspark.sql import functions as F

dept_aggrDF = active_employees_data.groupBy('dept_no').agg(
    F.min('salary').alias('Min_Salary'),
    F.max('salary').alias('Max_Salary'),
    F.mean('salary').alias('Mean_Salary'),
    F.count('salary').alias('Total_Employees'),
    F.stddev('salary').alias('StdDev_Salary'),
    F.sum('salary').alias('Total_salary'),
    F.min('emp_age').alias('Min_Age'),
    F.max('emp_age').alias('Max_Age'),
    F.mean('emp_age').alias('Mean_Age'),
    F.min('emp_tenure').alias('Min_Tenure'),
    F.max('emp_tenure').alias('Max_Tenure'),
    F.mean('emp_tenure').alias('Mean_Tenure'),
    F.mean('salary_since').alias('Mean_Salary_Since'),
    F.mean('role_since').alias('Mean_Role_Since')
)


In [185]:
dept_aggrDF.cache()

DataFrame[dept_no: string, Min_Salary: int, Max_Salary: int, Mean_Salary: double, Total_Employees: bigint, StdDev_Salary: double, Total_salary: bigint, Min_Age: double, Max_Age: double, Mean_Age: double, Min_Tenure: double, Max_Tenure: double, Mean_Tenure: double, Mean_Salary_Since: double, Mean_Role_Since: double]

In [187]:
dept_aggrDF.show(4)

+-------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+------------------+------------------+
|dept_no|Min_Salary|Max_Salary|       Mean_Salary|Total_Employees|     StdDev_Salary|Total_salary|Min_Age|Max_Age|          Mean_Age|Min_Tenure|Max_Tenure|       Mean_Tenure| Mean_Salary_Since|   Mean_Role_Since|
+-------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+------------------+------------------+
|   d005|     12582|    142434|61388.935342615165|          62344|16393.484361748524|  3827231785|   24.0|   67.0|47.333167586295396|       2.0|      33.0|22.196618760426023|10.361590529962788|16.154305145643526|
|   d009|     12505|    142950| 61567.09564047363|          18580|19054.432158113068|  1143916637|   24.0|   67.0|46.909149623250805|       2.0|    

#### Aggregation based on Department and Gender

In [188]:
dept_gender_aggrDF = active_employees_data.groupBy('dept_no', 'emp_gender').agg(
    F.min('salary').alias('Min_Salary'),
    F.max('salary').alias('Max_Salary'),
    F.mean('salary').alias('Mean_Salary'),
    F.count('salary').alias('Total_Employees'),
    F.stddev('salary').alias('StdDev_Salary'),
    F.sum('salary').alias('Total_salary'),
    F.min('emp_age').alias('Min_Age'),
    F.max('emp_age').alias('Max_Age'),
    F.mean('emp_age').alias('Mean_Age'),
    F.min('emp_tenure').alias('Min_Tenure'),
    F.max('emp_tenure').alias('Max_Tenure'),
    F.mean('emp_tenure').alias('Mean_Tenure'),
    F.mean('salary_since').alias('Mean_Salary_Since'),
    F.mean('role_since').alias('Mean_Role_Since')
)

In [191]:
dept_gender_aggrDF.show(4)


+-------+----------+----------+----------+-----------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+------------------+------------------+
|dept_no|emp_gender|Min_Salary|Max_Salary|      Mean_Salary|Total_Employees|     StdDev_Salary|Total_salary|Min_Age|Max_Age|          Mean_Age|Min_Tenure|Max_Tenure|       Mean_Tenure| Mean_Salary_Since|   Mean_Role_Since|
+-------+----------+----------+----------+-----------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+------------------+------------------+
|   d006|         M|     12596|    137308|59807.84364820847|           9210|17371.741914549362|   550830240|   24.0|   67.0| 46.87752442996743|       2.0|      33.0|21.377307274701412|10.071769815418024|15.632356134636264|
|   d006|         F|     12776|    137294|60621.83104034627|           6469|18632.929352115545|   392162625|

#### Write above DataFrames to storage

In [192]:
dept_aggrDF.repartition(1).write.option("header", "false").csv("/user/2320B48/datasets/employeesdb/results/aggr_dept/")
dept_gender_aggrDF.repartition(1).write.option("header", "false").csv("/user/2320B48/datasets/employeesdb/results/aggr_dept_gender/")

In [193]:
active_employees_data.printSchema()

root
 |-- emp_no: integer (nullable = true)
 |-- emp_first_name: string (nullable = true)
 |-- emp_last_name: string (nullable = true)
 |-- emp_gender: string (nullable = true)
 |-- emp_birth_date: string (nullable = true)
 |-- emp_hire_date: string (nullable = true)
 |-- emp_age: double (nullable = true)
 |-- emp_tenure: double (nullable = true)
 |-- salary: integer (nullable = true)
 |-- sal_from_date: string (nullable = true)
 |-- salary_since: double (nullable = true)
 |-- title: string (nullable = true)
 |-- title_from_date: string (nullable = true)
 |-- role_since: double (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- dept_from_date: string (nullable = true)
 |-- emp_dept_tenure: double (nullable = true)
 |-- mgr_emp_no: integer (nullable = true)
 |-- mgr_first_name: string (nullable = true)
 |-- mgr_last_name: string (nullable = true)
 |-- mgr_gender: string (nullable = true)
 |-- mgr_birth_date: string (nullable = true)
 |