# Transformations in Spark

## HIVE Metastore
Verify Hive Metastore - Database and Table(s)
<br>Verify previously created hive database - ** employees_db_hive **

In [1]:
# employees_db_hive
spark.sql("USE employees_db_hive")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1565446307756_0014,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
DataFrame[]

Verify/list all the table(s) in the above database

In [2]:
spark.sql("SHOW TABLES").show()

+-----------------+--------------------+-----------+
|         database|           tableName|isTemporary|
+-----------------+--------------------+-----------+
|employees_db_hive|active_employees_...|      false|
|employees_db_hive|         departments|      false|
|employees_db_hive|           dept_aggr|      false|
|employees_db_hive|            dept_emp|      false|
|employees_db_hive|    dept_gender_aggr|      false|
|employees_db_hive|        dept_manager|      false|
|employees_db_hive|           employees|      false|
|employees_db_hive|            salaries|      false|
|employees_db_hive|              titles|      false|
+-----------------+--------------------+-----------+

Now there are 6 tables available in HIVE, in the database **employees_db_hive**, 
Access these tables and process the data by creating DataFrames.

## Transformations
- 1. Create DataFrame for each of the underlying table.
- 2. Remove unnecessary rows (retain only active rows from the tables where necessary) and columns.
- 3. Cache DataFrames as necessary.

### Load and Filter DataFrames.
#### Departments Table ####
Create DataFrame for departments data from departments table in HIVE

In [3]:
deptDF = spark.sql("SELECT * FROM departments")

Verify the above DataFrame

In [4]:
deptDF.show()

+-------+------------------+-------------------+
|dept_no|         dept_name|      last_modified|
+-------+------------------+-------------------+
|   d001|         Marketing|2018-01-28 23:59:59|
|   d002|           Finance|2018-01-28 23:59:59|
|   d003|   Human Resources|2018-01-28 23:59:59|
|   d004|        Production|2018-01-28 23:59:59|
|   d005|       Development|2018-01-28 23:59:59|
|   d006|Quality Management|2018-01-28 23:59:59|
|   d007|             Sales|2018-01-28 23:59:59|
|   d008|          Research|2018-01-28 23:59:59|
|   d009|  Customer Service|2018-01-28 23:59:59|
+-------+------------------+-------------------+

Verify the schema for the above DataFrame

In [5]:
deptDF.printSchema()

root
 |-- dept_no: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)

#### Departments and Employees Table

Create DataFrame for departments & employees from dept_emp table in HIVE
- An employee might work in different departments during their tenure.
- At any time an employee is active in only in one department.
- So, there will be multiple records for an employee with different departments
- At any time an there will be only one deparment an employee actively working, and is can be identified by to_date value '9999-01-01'

In [6]:
dept_empDF = spark.sql("SELECT * FROM dept_emp")

Verify the above DataFrame

In [7]:
dept_empDF.show()

+------+------+-------+----------+----------+-------------------+
|seq_no|emp_no|dept_no| from_date|   to_date|      last_modified|
+------+------+-------+----------+----------+-------------------+
|     1|     1|   d001|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|     2|     2|   d002|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|     3|     3|   d003|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|     4|     4|   d004|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|     5|     5|   d005|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|     6|     6|   d006|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|     7|     7|   d007|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|     8|     8|   d008|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|     9|     9|   d009|1986-01-01|9999-01-01|2018-01-28 23:59:59|
|    10|    10|   d002|1986-01-14|9999-01-01|2018-01-28 23:59:59|
|    11|    11|   d007|1996-03-08|9999-01-01|2018-01-28 23:59:59|
|    12|    12|   d006|1994-08-13|9999-01-01|2018-01-28 23:59:59|
|    13|  

Veify the schema for the above DataFrame

In [8]:
dept_empDF.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)

Verify the record counts in the above DataFrame

In [9]:
dept_empDF.count()

331603

Filter only the active records from the above DataFrame, Active employees can be identified with the to_date == '9999-01-01'

In [10]:
# from pyspark.sql.functions import col
# active_dept_empDF = dept_empDF.filter(col("to_date") == '9999-01-01')
active_dept_empDF = dept_empDF[dept_empDF.to_date == '9999-01-01']

Verify the active employees count from the above DataFrame

In [11]:
active_dept_empDF.count()

240124

Verify the Data types from the above DataFrame

In [12]:
active_dept_empDF.dtypes

[('seq_no', 'int'), ('emp_no', 'int'), ('dept_no', 'string'), ('from_date', 'string'), ('to_date', 'string'), ('last_modified', 'timestamp')]

In [13]:
active_dept_empDF.count()

240124

Remove unnecessary columns, last_modified column is not necessary.

In [14]:
active_dept_empDF = active_dept_empDF.select('seq_no', 'emp_no', 'dept_no', 'from_date', 'to_date')

Verify above DataFrame

In [15]:
active_dept_empDF.show()

+------+------+-------+----------+----------+
|seq_no|emp_no|dept_no| from_date|   to_date|
+------+------+-------+----------+----------+
|     1|     1|   d001|1986-01-01|9999-01-01|
|     2|     2|   d002|1986-01-01|9999-01-01|
|     3|     3|   d003|1986-01-01|9999-01-01|
|     4|     4|   d004|1986-01-01|9999-01-01|
|     5|     5|   d005|1986-01-01|9999-01-01|
|     6|     6|   d006|1986-01-01|9999-01-01|
|     7|     7|   d007|1986-01-01|9999-01-01|
|     8|     8|   d008|1986-01-01|9999-01-01|
|     9|     9|   d009|1986-01-01|9999-01-01|
|    10|    10|   d002|1986-01-14|9999-01-01|
|    11|    11|   d007|1996-03-08|9999-01-01|
|    12|    12|   d006|1994-08-13|9999-01-01|
|    13|    13|   d005|1987-04-05|9999-01-01|
|    14|    14|   d009|1986-02-01|9999-01-01|
|    15|    15|   d005|1996-03-23|9999-01-01|
|    16|    16|   d007|1986-02-01|9999-01-01|
|    17|    17|   d004|1991-12-16|9999-01-01|
|    19|    19|   d007|1986-02-01|9999-01-01|
|    20|    20|   d004|1986-02-01|

Make the DataFrame available in in-memory

In [16]:
active_dept_empDF.cache()

DataFrame[seq_no: int, emp_no: int, dept_no: string, from_date: string, to_date: string]

#### Departments and Managers Table
Create DataFrame for departments and managers data from dept_manager table in HIVE
- For each department respective manager's employee number is available in dept_manager table
- A department may have multiple manager's
- So, there will be multiple records for a department (with different employee number's for different time periods)
- At any time an there will be only one active manager for each department and is can be identified by to_date value '9999-01-01'

In [17]:
dept_managerDF = spark.sql("SELECT * FROM dept_manager")

Verify the departments and manager's DataFrame

In [18]:
dept_managerDF.show()

+------+-------+------+----------+----------+-------------------+
|seq_no|dept_no|emp_no| from_date|   to_date|      last_modified|
+------+-------+------+----------+----------+-------------------+
|     1|   d001|     1|1986-01-01|1992-10-01|2018-01-28 23:59:59|
|     2|   d002|     2|1986-01-01|1990-12-17|2018-01-28 23:59:59|
|     3|   d003|     3|1986-01-01|1993-03-21|2018-01-28 23:59:59|
|     4|   d004|     4|1986-01-01|1989-09-09|2018-01-28 23:59:59|
|     5|   d005|     5|1986-01-01|1993-04-25|2018-01-28 23:59:59|
|     6|   d006|     6|1986-01-01|1990-05-06|2018-01-28 23:59:59|
|     7|   d007|     7|1986-01-01|1992-03-07|2018-01-28 23:59:59|
|     8|   d008|     8|1986-01-01|1992-04-08|2018-01-28 23:59:59|
|     9|   d009|     9|1986-01-01|1989-10-17|2018-01-28 23:59:59|
|    10|   d002|    10|1990-12-17|9999-01-01|2018-01-28 23:59:59|
|    11|   d003| 19827|1993-03-21|9999-01-01|2018-01-28 23:59:59|
|    12|   d004| 31345|1990-09-09|1994-08-02|2018-01-28 23:59:59|
|    13|  

Verify the schema for departments and manager's DataFrame

In [19]:
dept_managerDF.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)

Total Count in departments and manager's DataFrame

In [20]:
dept_managerDF.count()

24

Filter only the active records from above DataFrame 
<br>Though there are only total 10 departments, but there are 25 records (manager records) exists, 
<br>remove the inactive records, Active records are whose to_date is '9999-01-01

In [21]:
# from pyspark.sql.functions import col
# active_dept_managerDF = dept_managerDF.filter(col("to_date") == '9999-01-01')
active_dept_managerDF = dept_managerDF[dept_managerDF.to_date == '9999-01-01']

Verify the records count from the above DataFrame

In [22]:
active_dept_managerDF.count()

9

Verify the columns

In [23]:
active_dept_managerDF.dtypes

[('seq_no', 'int'), ('dept_no', 'string'), ('emp_no', 'int'), ('from_date', 'string'), ('to_date', 'string'), ('last_modified', 'timestamp')]

Remove unwanted columns and rename the columns as necessary, seq_no, last_modified and to_date are not necessary.

In [24]:
from pyspark.sql.functions import expr
active_dept_managerDF = active_dept_managerDF.select('dept_no', expr('emp_no AS mgr_emp_no'), expr('from_date AS mgr_from_date'))

Verify above DataFrame

In [25]:
active_dept_managerDF.show()

+-------+----------+-------------+
|dept_no|mgr_emp_no|mgr_from_date|
+-------+----------+-------------+
|   d002|        10|   1990-12-17|
|   d003|     19827|   1993-03-21|
|   d001|     45502|   1993-10-01|
|   d005|     64439|   1995-04-25|
|   d007|     71341|   1994-03-07|
|   d008|    107706|   1996-04-08|
|   d006|    149081|   2000-06-28|
|   d009|    151543|   2003-01-03|
|   d004|    215054|   2005-08-30|
+-------+----------+-------------+

#### Employees Table
Create DataFrame for employees data from employees table in HIVE

In [26]:
employeesDF = spark.sql("SELECT * FROM employees")

Verify employees DataFrame

In [27]:
employeesDF.show(4)

+------+----------+----------+------------+------+----------+-------------------+
|emp_no|birth_date|first_name|   last_name|gender| hire_date|      last_modified|
+------+----------+----------+------------+------+----------+-------------------+
|     1|1958-09-12| Margareta|  Markovitch|     M|1986-01-01|2018-01-28 23:59:59|
|     2|1961-10-28|      Ebru|       Alpin|     M|1986-01-01|2018-01-28 23:59:59|
|     3|1955-06-24|   Shirish|Ossenbruggen|     F|1986-01-01|2018-01-28 23:59:59|
|     4|1958-06-08| Krassimir|     Wegerle|     F|1986-01-01|2018-01-28 23:59:59|
+------+----------+----------+------------+------+----------+-------------------+
only showing top 4 rows

Verify schema of employees DataFrame

In [28]:
employeesDF.printSchema()

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)

Remove unwanted columns

In [29]:
employeesDF = employeesDF.drop('last_modified')

Verify above DataFrame

In [30]:
employeesDF.show(4)

+------+----------+----------+------------+------+----------+
|emp_no|birth_date|first_name|   last_name|gender| hire_date|
+------+----------+----------+------------+------+----------+
|     1|1958-09-12| Margareta|  Markovitch|     M|1986-01-01|
|     2|1961-10-28|      Ebru|       Alpin|     M|1986-01-01|
|     3|1955-06-24|   Shirish|Ossenbruggen|     F|1986-01-01|
|     4|1958-06-08| Krassimir|     Wegerle|     F|1986-01-01|
+------+----------+----------+------------+------+----------+
only showing top 4 rows

#### Salaries Table
Create DataFrame for salaries data from salaries table in HIVE

In [31]:
salariesDF = spark.sql("SELECT * FROM salaries")

Verify salaries DataFrame

In [32]:
salariesDF.show(4)

+------+------+------+----------+----------+-------------------+
|seq_no|emp_no|salary| from_date|   to_date|      last_modified|
+------+------+------+----------+----------+-------------------+
|     1|     1| 70166|1986-01-01|1987-01-01|2018-01-28 23:59:59|
|     2|     1| 70820|1987-01-01|1988-01-01|2018-01-28 23:59:59|
|     3|     1| 71970|1988-01-01|1989-01-01|2018-01-28 23:59:59|
|     4|     1| 75211|1989-01-01|1989-12-31|2018-01-28 23:59:59|
+------+------+------+----------+----------+-------------------+
only showing top 4 rows

Verify schema of salaries DataFrame

In [33]:
salariesDF.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)

Verify count of records in salaries DataFrame

In [34]:
salariesDF.count()

2844047

Filter only the active records from above DataFrame

In [35]:
active_salariesDF = salariesDF[salariesDF.to_date=='9999-01-01']

Verify the record count

In [36]:
active_salariesDF.count()

240124

In [37]:
active_salariesDF.dtypes

[('seq_no', 'int'), ('emp_no', 'int'), ('salary', 'int'), ('from_date', 'string'), ('to_date', 'string'), ('last_modified', 'timestamp')]

Remove and rename unnecessary columns

In [38]:
from pyspark.sql.functions import col, expr, column
active_salariesDF = active_salariesDF.select("emp_no", "salary", expr("from_date as sal_from_date"))

In [39]:
active_salariesDF.dtypes

[('emp_no', 'int'), ('salary', 'int'), ('sal_from_date', 'string')]

#### Titles Table
Create titles DataFrame for the titles table in HIVE

In [40]:
titlesDF = spark.sql("SELECT * FROM titles")

Verify titles DataFrame

In [41]:
titlesDF.show(4)

+------+------+------------+----------+----------+-------------------+
|seq_no|emp_no|       title| from_date|   to_date|      last_modified|
+------+------+------------+----------+----------+-------------------+
|     1|     1|     Manager|1986-01-01|1992-10-01|2018-01-28 23:59:59|
|     2|     1|Senior Staff|1992-10-01|9999-01-01|2018-01-28 23:59:59|
|     3|     2|     Manager|1986-01-01|1990-12-17|2018-01-28 23:59:59|
|     4|     2|Senior Staff|1990-12-17|9999-01-01|2018-01-28 23:59:59|
+------+------+------------+----------+----------+-------------------+
only showing top 4 rows

Verify titles DataFrame schema

In [42]:
titlesDF.printSchema()

root
 |-- seq_no: integer (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- from_date: string (nullable = true)
 |-- to_date: string (nullable = true)
 |-- last_modified: timestamp (nullable = true)

Verify records count in the above DataFrame

In [43]:
titlesDF.count()

443308

Filter active records from the above DataFrame

In [44]:
active_titlesDF = titlesDF[titlesDF.to_date=='9999-01-01']

In [45]:
active_titlesDF.dtypes

[('seq_no', 'int'), ('emp_no', 'int'), ('title', 'string'), ('from_date', 'string'), ('to_date', 'string'), ('last_modified', 'timestamp')]

Remove and rename the columns as necessary

In [46]:
from pyspark.sql.functions import col, expr, column
active_titlesDF = active_titlesDF.select('emp_no', 'title', expr('from_date AS title_from_date'))

In [47]:
active_titlesDF.dtypes

[('emp_no', 'int'), ('title', 'string'), ('title_from_date', 'string')]

Join department and departments_manager s DataFrames
<br>Result will have each department and corresponding manager's employee no

In [48]:
dept_curr_mgrDF = deptDF.join(active_dept_managerDF, 'dept_no', 'inner')
dept_curr_mgrDF.show(50)

+-------+------------------+-------------------+----------+-------------+
|dept_no|         dept_name|      last_modified|mgr_emp_no|mgr_from_date|
+-------+------------------+-------------------+----------+-------------+
|   d002|           Finance|2018-01-28 23:59:59|        10|   1990-12-17|
|   d003|   Human Resources|2018-01-28 23:59:59|     19827|   1993-03-21|
|   d001|         Marketing|2018-01-28 23:59:59|     45502|   1993-10-01|
|   d005|       Development|2018-01-28 23:59:59|     64439|   1995-04-25|
|   d007|             Sales|2018-01-28 23:59:59|     71341|   1994-03-07|
|   d008|          Research|2018-01-28 23:59:59|    107706|   1996-04-08|
|   d006|Quality Management|2018-01-28 23:59:59|    149081|   2000-06-28|
|   d009|  Customer Service|2018-01-28 23:59:59|    151543|   2003-01-03|
|   d004|        Production|2018-01-28 23:59:59|    215054|   2005-08-30|
+-------+------------------+-------------------+----------+-------------+

Find the manager's details by joining above DataFrame with employee's details using emp_no

In [49]:
dept_curr_mgrDF.dtypes

[('dept_no', 'string'), ('dept_name', 'string'), ('last_modified', 'timestamp'), ('mgr_emp_no', 'int'), ('mgr_from_date', 'string')]

In [50]:
employeesDF.dtypes

[('emp_no', 'int'), ('birth_date', 'string'), ('first_name', 'string'), ('last_name', 'string'), ('gender', 'string'), ('hire_date', 'string')]

In [51]:
join_expr = dept_curr_mgrDF["mgr_emp_no"] == employeesDF["emp_no"]

In [52]:
dept_curr_mgr_detailsDF = dept_curr_mgrDF.join(employeesDF,join_expr,'inner')

In [53]:
dept_curr_mgr_detailsDF.dtypes

[('dept_no', 'string'), ('dept_name', 'string'), ('last_modified', 'timestamp'), ('mgr_emp_no', 'int'), ('mgr_from_date', 'string'), ('emp_no', 'int'), ('birth_date', 'string'), ('first_name', 'string'), ('last_name', 'string'), ('gender', 'string'), ('hire_date', 'string')]

In [54]:
dept_curr_mgr_detailsDF.show(4)

+-------+---------------+-------------------+----------+-------------+------+----------+----------+----------+------+----------+
|dept_no|      dept_name|      last_modified|mgr_emp_no|mgr_from_date|emp_no|birth_date|first_name| last_name|gender| hire_date|
+-------+---------------+-------------------+----------+-------------+------+----------+----------+----------+------+----------+
|   d002|        Finance|2018-01-28 23:59:59|        10|   1990-12-17|    10|1959-03-28|     Isamu|Legleitner|     F|1986-01-14|
|   d003|Human Resources|2018-01-28 23:59:59|     19827|   1993-03-21| 19827|1960-12-02|   Karsten|   Sigstam|     F|1986-08-04|
|   d001|      Marketing|2018-01-28 23:59:59|     45502|   1993-10-01| 45502|1967-06-21|  Vishwani|  Minakawa|     M|1988-04-12|
|   d005|    Development|2018-01-28 23:59:59|     64439|   1995-04-25| 64439|1970-04-25|      Leon|  DasSarma|     F|1989-10-21|
+-------+---------------+-------------------+----------+-------------+------+----------+---------

Rename columns as necessary

In [55]:
from pyspark.sql.functions import col

replacements = {'birth_date' : 'mgr_birth_date', 
                'first_name' : 'mgr_first_name',
                'last_name' : 'mgr_last_name',
                'gender' : 'mgr_gender',
                'hire_date' : 'mgr_hire_date'
               }

dept_curr_mgr_detailsDF = dept_curr_mgr_detailsDF.select([col(c).alias(replacements.get(c, c)) for c in dept_curr_mgr_detailsDF.columns])

Verify above DataFrame

In [56]:
dept_curr_mgr_detailsDF.show(4)

+-------+---------------+-------------------+----------+-------------+------+--------------+--------------+-------------+----------+-------------+
|dept_no|      dept_name|      last_modified|mgr_emp_no|mgr_from_date|emp_no|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+---------------+-------------------+----------+-------------+------+--------------+--------------+-------------+----------+-------------+
|   d002|        Finance|2018-01-28 23:59:59|        10|   1990-12-17|    10|    1959-03-28|         Isamu|   Legleitner|         F|   1986-01-14|
|   d003|Human Resources|2018-01-28 23:59:59|     19827|   1993-03-21| 19827|    1960-12-02|       Karsten|      Sigstam|         F|   1986-08-04|
|   d001|      Marketing|2018-01-28 23:59:59|     45502|   1993-10-01| 45502|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d005|    Development|2018-01-28 23:59:59|     64439|   1995-04-25| 64439|    1970-04-25|          Leon|     DasSar

Remove unwanted columns

In [57]:
dept_curr_mgr_detailsDF = dept_curr_mgr_detailsDF.drop('last_modified', 'emp_no')

In [58]:
dept_curr_mgr_detailsDF.dtypes

[('dept_no', 'string'), ('dept_name', 'string'), ('mgr_emp_no', 'int'), ('mgr_from_date', 'string'), ('mgr_birth_date', 'string'), ('mgr_first_name', 'string'), ('mgr_last_name', 'string'), ('mgr_gender', 'string'), ('mgr_hire_date', 'string')]

In [59]:
dept_curr_mgr_detailsDF.show(4)

+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|dept_no|      dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|   d002|        Finance|        10|   1990-12-17|    1959-03-28|         Isamu|   Legleitner|         F|   1986-01-14|
|   d003|Human Resources|     19827|   1993-03-21|    1960-12-02|       Karsten|      Sigstam|         F|   1986-08-04|
|   d001|      Marketing|     45502|   1993-10-01|    1967-06-21|      Vishwani|     Minakawa|         M|   1988-04-12|
|   d005|    Development|     64439|   1995-04-25|    1970-04-25|          Leon|     DasSarma|         F|   1989-10-21|
+-------+---------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
only showing top 4 rows

Join employees DataFrame with departments&employee DataFrame 
<br>Join employeesDF and active_dept_empDF based on emp_no 
<br>result DataFrame of the above is employee and his/her corresponding department

In [60]:
employeesDF.dtypes

[('emp_no', 'int'), ('birth_date', 'string'), ('first_name', 'string'), ('last_name', 'string'), ('gender', 'string'), ('hire_date', 'string')]

In [61]:
active_dept_empDF.dtypes

[('seq_no', 'int'), ('emp_no', 'int'), ('dept_no', 'string'), ('from_date', 'string'), ('to_date', 'string')]

In [62]:
emp_deptDF = active_dept_empDF.join(employeesDF,'emp_no','inner')

Verify the above result DataFrame

In [63]:
emp_deptDF.show(4)

+------+------+-------+----------+----------+----------+----------+---------+------+----------+
|emp_no|seq_no|dept_no| from_date|   to_date|birth_date|first_name|last_name|gender| hire_date|
+------+------+-------+----------+----------+----------+----------+---------+------+----------+
|   148|   156|   d005|1986-02-03|9999-01-01|1960-03-11|    Feipei| Nollmann|     M|1986-02-03|
|   463|   507|   d008|1986-02-06|9999-01-01|1955-04-15|Dharmaraja| Sadowsky|     M|1986-02-06|
|   496|   543|   d006|2001-09-02|9999-01-01|1964-03-29|      Mari|    Rotem|     M|1986-02-06|
|   833|   914|   d005|1994-03-25|9999-01-01|1961-09-14|      Huan|  Preusig|     M|1986-02-09|
+------+------+-------+----------+----------+----------+----------+---------+------+----------+
only showing top 4 rows

Rename the columns

In [64]:
from pyspark.sql.functions import col

replacements = {
    'from_date' : 'dept_from_date',
    'birth_date' : 'emp_birth_date',
    'first_name' : 'emp_first_name',
    'last_name' : 'emp_last_name',
    'gender' : 'emp_gender',
    'hire_date' : 'emp_hire_date'
}

emp_deptDF = emp_deptDF.select([col(c).alias(replacements.get(c, c)) for c in emp_deptDF.columns])

Verify DataFrame

In [65]:
emp_deptDF.show(4)

+------+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
|emp_no|seq_no|dept_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|
+------+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
|   148|   156|   d005|    1986-02-03|9999-01-01|    1960-03-11|        Feipei|     Nollmann|         M|   1986-02-03|
|   463|   507|   d008|    1986-02-06|9999-01-01|    1955-04-15|    Dharmaraja|     Sadowsky|         M|   1986-02-06|
|   496|   543|   d006|    2001-09-02|9999-01-01|    1964-03-29|          Mari|        Rotem|         M|   1986-02-06|
|   833|   914|   d005|    1994-03-25|9999-01-01|    1961-09-14|          Huan|      Preusig|         M|   1986-02-09|
+------+------+-------+--------------+----------+--------------+--------------+-------------+----------+-------------+
only showing top 4 rows

Verify active records count

In [66]:
emp_deptDF.count()

240124

Create a DataFrame with employees and respective manager's details

<br>DataFrame emp_deptDF contains all the active employees along with the their department 
<br>DataFrame dept_curr_mgr_detailsDF contains all the departments its manager''s details 
<br>Join these two DataFrames based on the dept_no, to result a DataFrame with employee''s along with the manager''s details.

In [67]:
emp_deptDF.dtypes

[('emp_no', 'int'), ('seq_no', 'int'), ('dept_no', 'string'), ('dept_from_date', 'string'), ('to_date', 'string'), ('emp_birth_date', 'string'), ('emp_first_name', 'string'), ('emp_last_name', 'string'), ('emp_gender', 'string'), ('emp_hire_date', 'string')]

In [68]:
emp_deptDF.count()

240124

In [69]:
dept_curr_mgr_detailsDF.dtypes

[('dept_no', 'string'), ('dept_name', 'string'), ('mgr_emp_no', 'int'), ('mgr_from_date', 'string'), ('mgr_birth_date', 'string'), ('mgr_first_name', 'string'), ('mgr_last_name', 'string'), ('mgr_gender', 'string'), ('mgr_hire_date', 'string')]

In [70]:
dept_curr_mgr_detailsDF.count()

9

Join by broadcasting the smaller table - efficient join

In [71]:
from pyspark.sql.functions import broadcast
active_emp_dept_mgrDF = emp_deptDF.join(broadcast(dept_curr_mgr_detailsDF), 'dept_no', 'inner')

Verify the DataFrame

In [72]:
active_emp_dept_mgrDF.show(4)

+-------+------+------+--------------+----------+--------------+--------------+-------------+----------+-------------+------------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|dept_no|emp_no|seq_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|         dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|
+-------+------+------+--------------+----------+--------------+--------------+-------------+----------+-------------+------------------+----------+-------------+--------------+--------------+-------------+----------+-------------+
|   d005|   148|   156|    1986-02-03|9999-01-01|    1960-03-11|        Feipei|     Nollmann|         M|   1986-02-03|       Development|     64439|   1995-04-25|    1970-04-25|          Leon|     DasSarma|         F|   1989-10-21|
|   d008|   463|   507|    1986-02-06|9999-01-01|    1955-04-15|    Dhar

Verify the counts

In [73]:
active_emp_dept_mgrDF.count()

240124

Make the DataFrame available in in-memory

In [74]:
active_emp_dept_mgrDF.cache()

DataFrame[dept_no: string, emp_no: int, seq_no: int, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string]

Join Salaries and Titles DataFrames

<br>active_salariesDF DataFrame contains the current salaries details of active employees 
<br>active_titlesDF DataFrame contains the current titles/designation details of active employees 
<br>join these two DataFrames based on the emp_no 
<br>result is DataFrame consists of all active employees along with their salaries and titles details

In [75]:
active_salariesDF.dtypes

[('emp_no', 'int'), ('salary', 'int'), ('sal_from_date', 'string')]

In [76]:
active_titlesDF.dtypes

[('emp_no', 'int'), ('title', 'string'), ('title_from_date', 'string')]

In [77]:
active_salariesDF.cache()
active_titlesDF.cache()

DataFrame[emp_no: int, title: string, title_from_date: string]

In [78]:
emp_sal_titlesDF = active_salariesDF.join(active_titlesDF, 'emp_no', 'inner')

Verify the DataFrame

In [79]:
emp_sal_titlesDF.show(4)

+------+------+-------------+---------------+---------------+
|emp_no|salary|sal_from_date|          title|title_from_date|
+------+------+-------------+---------------+---------------+
|   148|121640|   2003-01-30|Senior Engineer|     1993-02-04|
|   463| 63130|   2003-02-02|   Senior Staff|     1986-02-06|
|   496| 50281|   2002-08-16|       Engineer|     2000-08-17|
|   833| 52747|   2003-03-23|Senior Engineer|     1994-03-25|
+------+------+-------------+---------------+---------------+
only showing top 4 rows

Verify record count

In [80]:
emp_sal_titlesDF.count()

240124

In [81]:
emp_sal_titlesDF.cache()

DataFrame[emp_no: int, salary: int, sal_from_date: string, title: string, title_from_date: string]

Final Join

<br>By now there are 2 DataFrames 
<br>active_emp_dept_mgrDF consists of all active employees along with the manager''s details. 
<br>emp_sal_titlesDF consists of the current salary and titles details for all the active employees. 
<br>join these two DataFrames to result a DataFrame with all the details for all active employees

In [82]:
active_emp_dept_mgrDF.count()

240124

In [83]:
active_emp_dept_mgrDF.dtypes

[('dept_no', 'string'), ('emp_no', 'int'), ('seq_no', 'int'), ('dept_from_date', 'string'), ('to_date', 'string'), ('emp_birth_date', 'string'), ('emp_first_name', 'string'), ('emp_last_name', 'string'), ('emp_gender', 'string'), ('emp_hire_date', 'string'), ('dept_name', 'string'), ('mgr_emp_no', 'int'), ('mgr_from_date', 'string'), ('mgr_birth_date', 'string'), ('mgr_first_name', 'string'), ('mgr_last_name', 'string'), ('mgr_gender', 'string'), ('mgr_hire_date', 'string')]

In [84]:
emp_sal_titlesDF.count()

240124

In [85]:
emp_sal_titlesDF.dtypes

[('emp_no', 'int'), ('salary', 'int'), ('sal_from_date', 'string'), ('title', 'string'), ('title_from_date', 'string')]

In [86]:
emp_sal_titlesDF.cache()
active_emp_dept_mgrDF.cache()

DataFrame[dept_no: string, emp_no: int, seq_no: int, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string]

In [87]:
active_emp_detailsDF = active_emp_dept_mgrDF.join(emp_sal_titlesDF, 'emp_no', 'inner')

In [88]:
active_emp_detailsDF.cache()

DataFrame[emp_no: int, dept_no: string, seq_no: int, dept_from_date: string, to_date: string, emp_birth_date: string, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_hire_date: string, dept_name: string, mgr_emp_no: int, mgr_from_date: string, mgr_birth_date: string, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_hire_date: string, salary: int, sal_from_date: string, title: string, title_from_date: string]

Verify DataFrame

In [89]:
active_emp_detailsDF.show(4)

+------+-------+------+--------------+----------+--------------+--------------+-------------+----------+-------------+------------------+----------+-------------+--------------+--------------+-------------+----------+-------------+------+-------------+---------------+---------------+
|emp_no|dept_no|seq_no|dept_from_date|   to_date|emp_birth_date|emp_first_name|emp_last_name|emp_gender|emp_hire_date|         dept_name|mgr_emp_no|mgr_from_date|mgr_birth_date|mgr_first_name|mgr_last_name|mgr_gender|mgr_hire_date|salary|sal_from_date|          title|title_from_date|
+------+-------+------+--------------+----------+--------------+--------------+-------------+----------+-------------+------------------+----------+-------------+--------------+--------------+-------------+----------+-------------+------+-------------+---------------+---------------+
|   148|   d005|   156|    1986-02-03|9999-01-01|    1960-03-11|        Feipei|     Nollmann|         M|   1986-02-03|       Development|     644

Derive additional columns such as 

- emp_age = current_date - emp_birth_date
- emp_tenure = current_date - emp_hire_date
- mgr_age = current_date - mgr_birth_date
- mgr_tenure = current_date - mgr_hire_date
- salary_since = current_date - sal_from_date
- role_since = current_date - title_from_date
- emp_dept_tenure = current_date - dept_from_date
- mgr_dept_tenure = current_date - mgr_from_date

Create a temporary table/view to perform sql queries

In [90]:
active_emp_detailsDF.registerTempTable("active_emp_details_sqlTBL")

In [91]:
active_emp_detailsDF.dtypes

[('emp_no', 'int'), ('dept_no', 'string'), ('seq_no', 'int'), ('dept_from_date', 'string'), ('to_date', 'string'), ('emp_birth_date', 'string'), ('emp_first_name', 'string'), ('emp_last_name', 'string'), ('emp_gender', 'string'), ('emp_hire_date', 'string'), ('dept_name', 'string'), ('mgr_emp_no', 'int'), ('mgr_from_date', 'string'), ('mgr_birth_date', 'string'), ('mgr_first_name', 'string'), ('mgr_last_name', 'string'), ('mgr_gender', 'string'), ('mgr_hire_date', 'string'), ('salary', 'int'), ('sal_from_date', 'string'), ('title', 'string'), ('title_from_date', 'string')]

Change the order of the columns with the select and derive the columns as necessary

In [92]:
active_employees_data  = spark.sql("""
SELECT emp_no, emp_first_name, emp_last_name, emp_gender, emp_birth_date, emp_hire_date,
       round(datediff(current_date,to_date(emp_birth_date))/365) as emp_age,
       round(datediff(current_date,to_date(emp_hire_date))/365) as emp_tenure,
       salary, sal_from_date, 
       round(datediff(current_date,to_date(sal_from_date))/365) as salary_since,
       title, title_from_date,
       round(datediff(current_date,to_date(title_from_date))/365) as role_since,
       dept_no, dept_name, dept_from_date,
       round(datediff(current_date,to_date(dept_from_date))/365) as emp_dept_tenure,
       mgr_emp_no, mgr_first_name, mgr_last_name, mgr_gender, mgr_birth_date, mgr_hire_date, mgr_from_date,
       round(datediff(current_date,to_date(mgr_birth_date))/365) as mgr_age,
       round(datediff(current_date,to_date(mgr_hire_date))/365) as mgr_tenure,
       round(datediff(current_date,to_date(mgr_from_date))/365) as mgr_dept_tenure
FROM active_emp_details_sqlTBL""")

Verify the DataFrame

In [93]:
active_employees_data.show(4)

+------+--------------+-------------+----------+--------------+-------------+-------+----------+------+-------------+------------+---------------+---------------+----------+-------+------------------+--------------+---------------+----------+--------------+-------------+----------+--------------+-------------+-------------+-------+----------+---------------+
|emp_no|emp_first_name|emp_last_name|emp_gender|emp_birth_date|emp_hire_date|emp_age|emp_tenure|salary|sal_from_date|salary_since|          title|title_from_date|role_since|dept_no|         dept_name|dept_from_date|emp_dept_tenure|mgr_emp_no|mgr_first_name|mgr_last_name|mgr_gender|mgr_birth_date|mgr_hire_date|mgr_from_date|mgr_age|mgr_tenure|mgr_dept_tenure|
+------+--------------+-------------+----------+--------------+-------------+-------+----------+------+-------------+------------+---------------+---------------+----------+-------+------------------+--------------+---------------+----------+--------------+-------------+-------

Write the DataFrame to the persistent storage - HDFS

In [94]:
active_employees_data \
    .coalesce(1) \
    .write \
    .option("header", "false") \
    .csv("/tutorials/employees_data/results/active_employees_data")

In [95]:
active_employees_data.cache()

DataFrame[emp_no: int, emp_first_name: string, emp_last_name: string, emp_gender: string, emp_birth_date: string, emp_hire_date: string, emp_age: double, emp_tenure: double, salary: int, sal_from_date: string, salary_since: double, title: string, title_from_date: string, role_since: double, dept_no: string, dept_name: string, dept_from_date: string, emp_dept_tenure: double, mgr_emp_no: int, mgr_first_name: string, mgr_last_name: string, mgr_gender: string, mgr_birth_date: string, mgr_hire_date: string, mgr_from_date: string, mgr_age: double, mgr_tenure: double, mgr_dept_tenure: double]

In [96]:
active_employees_data.count()

240124

#### Aggregated Data

Create the Aggregations
- Based on Department
- Based on Department and Gender 
- Aggregate based on department

In [97]:
from pyspark.sql import functions as F

dept_aggrDF = active_employees_data.groupBy('dept_no').agg(
    F.min('salary').alias('Min_Salary'),
    F.max('salary').alias('Max_Salary'),
    F.mean('salary').alias('Mean_Salary'),
    F.count('salary').alias('Total_Employees'),
    F.stddev('salary').alias('StdDev_Salary'),
    F.sum('salary').alias('Total_salary'),
    F.min('emp_age').alias('Min_Age'),
    F.max('emp_age').alias('Max_Age'),
    F.mean('emp_age').alias('Mean_Age'),
    F.min('emp_tenure').alias('Min_Tenure'),
    F.max('emp_tenure').alias('Max_Tenure'),
    F.mean('emp_tenure').alias('Mean_Tenure'),
    F.mean('salary_since').alias('Mean_Salary_Since'),
    F.mean('role_since').alias('Mean_Role_Since')
)

In [98]:
dept_aggrDF.cache()

DataFrame[dept_no: string, Min_Salary: int, Max_Salary: int, Mean_Salary: double, Total_Employees: bigint, StdDev_Salary: double, Total_salary: bigint, Min_Age: double, Max_Age: double, Mean_Age: double, Min_Tenure: double, Max_Tenure: double, Mean_Tenure: double, Mean_Salary_Since: double, Mean_Role_Since: double]

In [99]:
dept_aggrDF.show()

+-------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+------------------+------------------+
|dept_no|Min_Salary|Max_Salary|       Mean_Salary|Total_Employees|     StdDev_Salary|Total_salary|Min_Age|Max_Age|          Mean_Age|Min_Tenure|Max_Tenure|       Mean_Tenure| Mean_Salary_Since|   Mean_Role_Since|
+-------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+------------------+------------------+
|   d005|     27036|    142434| 61181.55686964455|          61386|15810.299554517727|  3755691050|   31.0|   67.0| 48.09220343400776|       7.0|      34.0|23.058026260059297| 11.05574560974815| 16.93850389339589|
|   d009|     26878|    142950| 60803.75712903409|          17569|17354.510735479293|  1068261209|   31.0|   67.0| 48.08657294097558|       7.0|    

Aggregation based on Department and Gender

In [100]:
dept_gender_aggrDF = active_employees_data.groupBy('dept_no', 'emp_gender').agg(
    F.min('salary').alias('Min_Salary'),
    F.max('salary').alias('Max_Salary'),
    F.mean('salary').alias('Mean_Salary'),
    F.count('salary').alias('Total_Employees'),
    F.stddev('salary').alias('StdDev_Salary'),
    F.sum('salary').alias('Total_salary'),
    F.min('emp_age').alias('Min_Age'),
    F.max('emp_age').alias('Max_Age'),
    F.mean('emp_age').alias('Mean_Age'),
    F.min('emp_tenure').alias('Min_Tenure'),
    F.max('emp_tenure').alias('Max_Tenure'),
    F.mean('emp_tenure').alias('Mean_Tenure'),
    F.mean('salary_since').alias('Mean_Salary_Since'),
    F.mean('role_since').alias('Mean_Role_Since')
)

In [101]:
dept_gender_aggrDF.show()

+-------+----------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+------------------+------------------+
|dept_no|emp_gender|Min_Salary|Max_Salary|       Mean_Salary|Total_Employees|     StdDev_Salary|Total_salary|Min_Age|Max_Age|          Mean_Age|Min_Tenure|Max_Tenure|       Mean_Tenure| Mean_Salary_Since|   Mean_Role_Since|
+-------+----------+----------+----------+------------------+---------------+------------------+------------+-------+-------+------------------+----------+----------+------------------+------------------+------------------+
|   d006|         M|     27725|    128103| 58869.74002766889|           8674|15038.046513113391|   510636125|   31.0|   67.0| 48.11424948120821|       8.0|      34.0|23.034240258243024|11.039082314964261|16.942125893474753|
|   d006|         F|     27571|    119965| 59060.13470708447|           5872|14972.763774839172|   34680

Write above DataFrames to storage

In [102]:
dept_aggrDF \
    .coalesce(1) \
    .write \
    .option("header", "false") \
    .csv("/tutorials/employees_data/results/aggr_dept/")

In [103]:
dept_gender_aggrDF \
    .coalesce(1) \
    .write \
    .option("header", "false") \
    .csv("/tutorials/employees_data/results/aggr_dept_gender/")

#### DB Store

In [104]:
active_employees_data.write.mode("overwrite").saveAsTable("active_employees_data")

In [105]:
dept_aggrDF.write.mode("overwrite").saveAsTable("dept_aggr")

In [106]:
dept_gender_aggrDF.write.mode("overwrite").saveAsTable("dept_gender_aggr")

## Data Analysis using results sets

Convert spark dataframes to pandas dataframes

In [107]:
active_employees_DF = active_employees_data.toPandas()
dept_aggr_all_DF = dept_aggrDF.toPandas()
dept_aggr_gender_DF = dept_gender_aggrDF.toPandas()

Verify dataframes

In [108]:
active_employees_DF.head()

   emp_no emp_first_name emp_last_name emp_gender emp_birth_date  \
0     148         Feipei      Nollmann          M     1960-03-11   
1     463     Dharmaraja      Sadowsky          M     1955-04-15   
2     496           Mari         Rotem          M     1964-03-29   
3     833           Huan       Preusig          M     1961-09-14   
4    1088           Mohd       Lanteri          M     1961-07-01   

  emp_hire_date  emp_age  emp_tenure  salary sal_from_date       ...        \
0    1986-02-03     59.0        34.0  121640    2003-01-30       ...         
1    1986-02-06     64.0        34.0   63130    2003-02-02       ...         
2    1986-02-06     55.0        34.0   50281    2002-08-16       ...         
3    1986-02-09     58.0        34.0   52747    2003-03-23       ...         
4    1986-02-12     58.0        34.0   90090    2003-04-20       ...         

   mgr_emp_no mgr_first_name mgr_last_name  mgr_gender mgr_birth_date  \
0       64439           Leon      DasSarma       

In [109]:
dept_aggr_all_DF.head()

  dept_no  Min_Salary  Max_Salary   Mean_Salary  Total_Employees  \
0    d005       27036      142434  61181.556870            61386   
1    d009       26878      142950  60803.757129            17569   
2    d003       26936      140953  57418.410916            12898   
3    d001       27871      144128  73555.412613            14842   
4    d007       28392      157220  82365.130951            37701   

   StdDev_Salary  Total_salary  Min_Age  Max_Age   Mean_Age  Min_Tenure  \
0   15810.299555    3755691050     31.0     67.0  48.092203         7.0   
1   17354.510735    1068261209     31.0     67.0  48.086573         7.0   
2   14634.501201     740582664     31.0     67.0  48.065281         7.0   
3   18741.773908    1091709434     31.0     67.0  48.025873         7.0   
4   19036.502369    3105247802     31.0     67.0  48.085382         7.0   

   Max_Tenure  Mean_Tenure  Mean_Salary_Since  Mean_Role_Since  
0        34.0    23.058026          11.055746        16.938504  
1        3

In [110]:
dept_aggr_gender_DF.head()

  dept_no emp_gender  Min_Salary  Max_Salary   Mean_Salary  Total_Employees  \
0    d006          M       27725      128103  58869.740028             8674   
1    d006          F       27571      119965  59060.134707             5872   
2    d003          M       27611      140953  57270.070572             7751   
3    d001          F       27871      137842  73145.201228             5864   
4    d005          F       27638      142434  61085.771288            24533   

   StdDev_Salary  Total_salary  Min_Age  Max_Age   Mean_Age  Min_Tenure  \
0   15038.046513     510636125     31.0     67.0  48.114249         8.0   
1   14972.763775     346801111     31.0     68.0  48.071185         8.0   
2   14616.956593     443900317     31.0     67.0  48.093794         7.0   
3   18541.000716     428923460     31.0     67.0  47.901262         7.0   
4   15744.103761    1498617227     31.0     67.0  48.074960         7.0   

   Max_Tenure  Mean_Tenure  Mean_Salary_Since  Mean_Role_Since  
0        

In [111]:
active_employees_DF.dtypes

emp_no               int32
emp_first_name      object
emp_last_name       object
emp_gender          object
emp_birth_date      object
emp_hire_date       object
emp_age            float64
emp_tenure         float64
salary               int32
sal_from_date       object
salary_since       float64
title               object
title_from_date     object
role_since         float64
dept_no             object
dept_name           object
dept_from_date      object
emp_dept_tenure    float64
mgr_emp_no           int32
mgr_first_name      object
mgr_last_name       object
mgr_gender          object
mgr_birth_date      object
mgr_hire_date       object
mgr_from_date       object
mgr_age            float64
mgr_tenure         float64
mgr_dept_tenure    float64
dtype: object

In [112]:
### Import all the required libraries
import os
import sys

import pandas as pd
import numpy as np

from matplotlib import pyplot as plt

In [113]:
### Draw a scatter plot to identify the relation between employees age and tenure
plt.scatter(active_employees_DF.emp_age, active_employees_DF.emp_tenure)
X = active_employees_DF.emp_age
Y = active_employees_DF.emp_tenure
plt.xlabel("Age")
plt.ylabel("Tenure")
plt.title("Age vs Tenure")
plt.scatter(X,Y)
plt.show()

In [114]:
### Draw a scatter plot to identify the relation between employees age and salary
X = active_employees_DF.emp_age
Y = active_employees_DF.salary
plt.xlabel("Age")
plt.ylabel("Salary")
plt.title("Age vs Salary")
plt.scatter(X,Y)
plt.show()

In [115]:
### Draw a scatter plot to identify the relation between employees tenure and salary
X = active_employees_DF.emp_tenure
Y = active_employees_DF.salary
plt.xlabel("Tenure")
plt.ylabel("Salary")
plt.title("Tenure vs Salary")
plt.scatter(X,Y)
plt.show()

In [116]:
### Draw histogram for the employees count in each age group
plt.xlabel("Age")
plt.ylabel("Count")
plt.title("Age and Employees Count")
plt.hist(active_employees_DF.emp_age)
plt.show()

In [117]:
### Draw histogram for the employees count by tenure
plt.xlabel("Tenure")
plt.ylabel("Count")
plt.title("Tenure and Employees Count")
plt.hist(active_employees_DF.emp_tenure)
plt.show()

In [118]:
### Draw histogram for the employees count by salary
plt.xlabel("Salary")
plt.ylabel("Count")
plt.title("Salary and Employees Count")
plt.hist(active_employees_DF.salary)
plt.show()

In [119]:
dept_aggr_gender_DF["emp_gender"].value_counts()

F    9
M    9
Name: emp_gender, dtype: int64

In [120]:
### Extract Male and Female employees counts in each department
df = dept_aggr_gender_DF.pivot_table(index='dept_no', columns='emp_gender', values='Total_Employees').reset_index()
df

emp_gender dept_no      F      M
0             d001   5864   8978
1             d002   5014   7423
2             d003   5147   7751
3             d004  21393  31911
4             d005  24533  36853
5             d006   5872   8674
6             d007  14999  22702
7             d008   6181   9260
8             d009   7007  10562

In [121]:
# Setting the positions and width for the bars
pos = list(range(len(df['F']))) 
width = 0.25 
    
# Plotting the bars
fig, ax = plt.subplots(figsize=(10,5))

# Create a bar with F data,
# in position pos,
plt.bar(pos, 
        #using df['F'] data,
        df['F'], 
        # of width
        width, 
        # with alpha 0.5
        alpha=0.5, 
        # with color
        color='#EE3224', 
        # with label the first value in first_name
        label=df['dept_no'][0]) 

# Create a bar with mid_score data,
# in position pos + some width buffer,
plt.bar([p + width for p in pos], 
        #using df['mid_score'] data,
        df['M'],
        # of width
        width, 
        # with alpha 0.5
        alpha=0.5, 
        # with color
        color='#F78F1E', 
        # with label the second value in first_name
        label=df['dept_no'][1]) 

# Set the y axis label
ax.set_ylabel('Employee Counts')

# Set the chart's title
ax.set_title('Employees Count in each Department by Gender')

# Set the position of the x ticks
ax.set_xticks([p + 1.5 * width for p in pos])

# Set the labels for the x ticks
ax.set_xticklabels(df['dept_no'])

# Setting the x-axis and y-axis limits
plt.xlim(min(pos)-width, max(pos)+width*4)
plt.ylim([0, max(df['F'] + df['M'])] )

# Adding the legend and showing the plot
plt.legend(['F', 'M'], loc='upper left')
plt.grid()
plt.show()

In [None]:
plt.plot(h, pdf)