### Task 3.1 - Data Analysis using Big Data Tools 


Develop a PySpark application to load data Spark DataFrames and save it into Hive tables on a Hadoop cluster in an optimized format

In [2]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data Analysis using big Data Tools") \
        .config("spark.sql.warehouse.dir", "hdfs://localhost:54310/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

In [4]:
from pyspark.sql.functions import *

In [5]:
sc = spark.sparkContext

Loading the data into pyspark dataframes

In [6]:
DF1=spark.read.load('/home/hduser/Downloads/sharedfolder/hr_Hiring_details.csv', format='csv', sep=',',inferSchema='true',header='true')

In [7]:
DF1.printSchema()

root
 |-- Candidate Ref: integer (nullable = true)
 |-- DOJ Extended: string (nullable = true)
 |-- Duration to accept offer: integer (nullable = true)
 |-- Notice period: integer (nullable = true)
 |-- Offered band: string (nullable = true)
 |-- Pecent hike expected in CTC: double (nullable = true)
 |-- Percent hike offered in CTC: double (nullable = true)
 |-- Percent difference CTC: double (nullable = true)
 |-- Joining Bonus: string (nullable = true)
 |-- Candidate relocate actual: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Candidate Source: string (nullable = true)
 |-- Rex in Yrs: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- LOB_id: integer (nullable = true)



Renaming the columns for storing in the tables.

In [8]:
hr_hiring = DF1.withColumnRenamed("Candidate Ref","Candidate_Ref") \
        .withColumnRenamed("DOJ Extended","DOJ_Extended") \
        .withColumnRenamed("Duration to accept offer","Duration_to_accept_offer") \
        .withColumnRenamed("Notice period","Notice_period") \
        .withColumnRenamed("Offered band","Offered_band") \
        .withColumnRenamed("Pecent hike expected in CTC","Percent_hike_offered_in_CTC") \
        .withColumnRenamed("Percent hike offered in CTC","Pecent_hike_expected_in_CTC") \
        .withColumnRenamed("Percent difference CTC","Percent_difference_CTC") \
        .withColumnRenamed("Joining Bonus","Joining_Bonus")\
        .withColumnRenamed("Candidate relocate actual","Candidate_relocate_actual")\
        .withColumnRenamed("Candidate Source","Candidate_Source")\
        .withColumnRenamed("Rex in Yrs","Rex_in_Yrs")\
.withColumnRenamed("LOB_id","LOB_id")


In [9]:
hr_hiring.printSchema()

root
 |-- Candidate_Ref: integer (nullable = true)
 |-- DOJ_Extended: string (nullable = true)
 |-- Duration_to_accept_offer: integer (nullable = true)
 |-- Notice_period: integer (nullable = true)
 |-- Offered_band: string (nullable = true)
 |-- Percent_hike_offered_in_CTC: double (nullable = true)
 |-- Pecent_hike_expected_in_CTC: double (nullable = true)
 |-- Percent_difference_CTC: double (nullable = true)
 |-- Joining_Bonus: string (nullable = true)
 |-- Candidate_relocate_actual: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Candidate_Source: string (nullable = true)
 |-- Rex_in_Yrs: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- LOB_id: integer (nullable = true)



In [10]:
DF2=spark.read.load('/home/hduser/Downloads/sharedfolder/Employee_joining_status.csv', format='csv', sep=',',inferSchema='true',header='true')

In [11]:
DF2.printSchema()

root
 |-- Candidate Ref: integer (nullable = true)
 |-- Status: string (nullable = true)



In [12]:
joining_status = DF2.withColumnRenamed("Candidate Ref","Candidate_Ref")

In [13]:
joining_status.printSchema()

root
 |-- Candidate_Ref: integer (nullable = true)
 |-- Status: string (nullable = true)



In [14]:
lob_mapping=spark.read.load('/home/hduser/Downloads/sharedfolder/LOB_mapping_P4.csv', format='csv', sep=',',inferSchema='true',header='true')

In [15]:
lob_mapping.printSchema()

root
 |-- LOB_id: integer (nullable = true)
 |-- LOB: string (nullable = true)



Executing Hive Queries in Pyspark.   
Creation of new database : capstone4

In [65]:
spark.sql("CREATE DATABASE IF NOT EXISTS capstone4")

DataFrame[]

Loading the data into the respective tables from the pyspark dataframes.

In [66]:
hr_hiring.coalesce(1).write.saveAsTable("capstone4.hr_hiring_details")

In [68]:
joining_status.coalesce(1).write.saveAsTable("capstone4.employee_joining_status")

In [69]:
lob_mapping.coalesce(1).write.saveAsTable("capstone4.lob_mapping_p4")

Three tables are created:  
    1) hr_hiring_details     
    2) employee_joining_status   
    3) lob_mapping_p4  

In [16]:
spark.sql('use capstone4')

DataFrame[]

In [17]:
spark.sql('show tables').show()

+---------+--------------------+-----------+
| database|           tableName|isTemporary|
+---------+--------------------+-----------+
|capstone4|employee_joining_...|      false|
|capstone4|   hr_hiring_details|      false|
|capstone4|hr_hiring_details...|      false|
|capstone4|      lob_mapping_p4|      false|
+---------+--------------------+-----------+



Perform profiling of the data through PySpark and ensure that it is migrated correctly whereever the source is an RDBMS

In [18]:
spark.sql('select * from hr_hiring_details limit 10').show()

+-------------+------------+------------------------+-------------+------------+---------------------------+---------------------------+----------------------+-------------+-------------------------+------+-----------------+----------+--------+---+------+
|Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|Notice_period|Offered_band|Percent_hike_offered_in_CTC|Pecent_hike_expected_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender| Candidate_Source|Rex_in_Yrs|Location|Age|LOB_id|
+-------------+------------+------------------------+-------------+------------+---------------------------+---------------------------+----------------------+-------------+-------------------------+------+-----------------+----------+--------+---+------+
|      2110407|         Yes|                      14|           30|          E2|                     -20.79|                      13.16|                 42.86|           No|                       No|Female|           Agency|        

In [19]:
spark.sql('select * from employee_joining_status limit 10').show()

+-------------+------+
|Candidate_Ref|Status|
+-------------+------+
|      2110407|Joined|
|      2112635|Joined|
|      2112838|Joined|
|      2115021|Joined|
|      2115125|Joined|
|      2117167|Joined|
|      2119124|Joined|
|      2127572|Joined|
|      2138169|Joined|
|      2143362|Joined|
+-------------+------+



In [20]:
spark.sql('select * from lob_mapping_p4 limit 10').show()

+------+----------+
|LOB_id|       LOB|
+------+----------+
|     1|       ERS|
|     2|     INFRA|
|     3|Healthcare|
|     4|      BFSI|
|     5|      CSMP|
|     6|       ETS|
|     7|      AXON|
|     8|       EAS|
|     9|       MMS|
+------+----------+



In [23]:
spark.sql('desc hr_hiring_details').show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       Candidate_Ref|      int|   null|
|        DOJ_Extended|   string|   null|
|Duration_to_accep...|      int|   null|
|       Notice_period|      int|   null|
|        Offered_band|   string|   null|
|Percent_hike_offe...|   double|   null|
|Pecent_hike_expec...|   double|   null|
|Percent_differenc...|   double|   null|
|       Joining_Bonus|   string|   null|
|Candidate_relocat...|   string|   null|
|              Gender|   string|   null|
|    Candidate_Source|   string|   null|
|          Rex_in_Yrs|      int|   null|
|            Location|   string|   null|
|                 Age|      int|   null|
|              LOB_id|      int|   null|
+--------------------+---------+-------+



In [25]:
spark.sql('desc employee_joining_status').show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|Candidate_Ref|      int|   null|
|       Status|   string|   null|
+-------------+---------+-------+



In [26]:
spark.sql('desc lob_mapping_p4').show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  LOB_id|      int|   null|
|     LOB|   string|   null|
+--------+---------+-------+



Spark Dataframes' describe method is used for summary statistics. The summary statistics includes min, max, count, mean and standard deviations.

In [79]:
#Checking Statistics
resultdf=hr_hiring.describe('Candidate_Ref','DOJ_Extended','Duration_to_accept_offer','Notice_period','Offered_band','Pecent_hike_expected_in_CTC','Percent_hike_offered_in_CTC','Percent_difference_CTC','Joining_Bonus','Candidate_relocate_actual','Gender','Candidate_Source','Rex_in_Yrs','Location','Age','LOB_id')
resultdf.show()

+-------+-----------------+------------+------------------------+------------------+------------+---------------------------+---------------------------+----------------------+-------------+-------------------------+------+-----------------+------------------+---------+------------------+------------------+
|summary|    Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|     Notice_period|Offered_band|Pecent_hike_expected_in_CTC|Percent_hike_offered_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender| Candidate_Source|        Rex_in_Yrs| Location|               Age|            LOB_id|
+-------+-----------------+------------+------------------------+------------------+------------+---------------------------+---------------------------+----------------------+-------------+-------------------------+------+-----------------+------------------+---------+------------------+------------------+
|  count|             8995|        8995|                    8995|        

Checking the null values

In [29]:
from pyspark.sql.functions import col,isnan, when, count
hr_hiring.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in hr_hiring.columns]).show()

+-------------+------------+------------------------+-------------+------------+---------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+--------+---+------+
|Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|Notice_period|Offered_band|Percent_hike_offered_in_CTC|Pecent_hike_expected_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender|Candidate_Source|Rex_in_Yrs|Location|Age|LOB_id|
+-------------+------------+------------------------+-------------+------------+---------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+--------+---+------+
|            0|           0|                       0|            0|          13|                          0|                          0|                     0|            0|                        0|     0|               0|         0| 

Checking the median for the column Age.

In [92]:
df1.approxQuantile("Age",[0.5],0.25)


Treating the null values.

In [30]:
df1=hr_hiring.na.fill({'Offered_band':'E1'}) #Filling null values for offered Band with E1(Mode)
df1=df1.na.fill({'Age':27}) #Filling null values for Age with the median i.e. 27

In [31]:
#Checking the Null values
df1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df1.columns]).show()

+-------------+------------+------------------------+-------------+------------+---------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+--------+---+------+
|Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|Notice_period|Offered_band|Percent_hike_offered_in_CTC|Pecent_hike_expected_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender|Candidate_Source|Rex_in_Yrs|Location|Age|LOB_id|
+-------------+------------+------------------------+-------------+------------+---------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+--------+---+------+
|            0|           0|                       0|            0|           0|                          0|                          0|                     0|            0|                        0|     0|               0|         0| 

In [95]:
from pyspark.sql.functions import col,isnan, when, count
joining_status.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in joining_status.columns]).show()

+-------------+------+
|Candidate_Ref|Status|
+-------------+------+
|            0|     0|
+-------------+------+



In [96]:
from pyspark.sql.functions import col,isnan, when, count
lob_mapping.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in lob_mapping.columns]).show()

+------+---+
|LOB_id|LOB|
+------+---+
|     0|  0|
+------+---+



No null values are observed in the given datasets

In [98]:
df1.coalesce(1).write.saveAsTable("capstone4.hr_hiring_details_final")

In [101]:
query1=spark.sql("select status,count(status) from employee_joining_status group by status")
query1.show()

+----------+-------------+
|    status|count(status)|
+----------+-------------+
|Not Joined|         1682|
|    Joined|         7313|
+----------+-------------+

