# Task 2.2: Data Analysis using Big Data tools

### Loading data into PySpark

In [29]:
import findspark
findspark.init('/usr/local/spark')
import pyspark
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("capstone_checkpoint_two") \
    .getOrCreate()

In [3]:
data = spark.read.csv('hdfs://localhost:54310/capstone1/HR_raw.csv', inferSchema="true", header="true")
data.printSchema()

root
 |-- Sno: integer (nullable = true)
 |-- Candidate_Ref: integer (nullable = true)
 |-- DOJ_Extended: string (nullable = true)
 |-- Duration_to_accept_offer: integer (nullable = true)
 |-- Notice_Period: integer (nullable = true)
 |-- Offered_Band: string (nullable = true)
 |-- Percent_hike_expected_in_CTC: double (nullable = true)
 |-- Percent_hike_offered_in_CTC: double (nullable = true)
 |-- Percent_difference_CTC: double (nullable = true)
 |-- Joining_Bonus: string (nullable = true)
 |-- Candidate_relocate_actual: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Candidate_Source: string (nullable = true)
 |-- Rex_in_Yrs: integer (nullable = true)
 |-- Location_ID: string (nullable = true)
 |-- Postal_Code: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- LOB_Id: integer (nullable = true)
 |-- Domicile_Id: string (nullable = true)



In [4]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [5]:
spark.sql("CREATE DATABASE IF NOT EXISTS db1")
db=spark.sql("show databases")
db.show()

+------------+
|databaseName|
+------------+
|         db1|
|     default|
+------------+



In [6]:
data.write.mode("overwrite").saveAsTable("db1.table1")
table=spark.sql('select * from db1.table1')
table.show()

+---+-------------+------------+------------------------+-------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+-----------------+----------+-----------+-----------+---+------+-----------+
|Sno|Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|Notice_Period|Offered_Band|Percent_hike_expected_in_CTC|Percent_hike_offered_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender| Candidate_Source|Rex_in_Yrs|Location_ID|Postal_Code|Age|LOB_Id|Domicile_Id|
+---+-------------+------------+------------------------+-------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+-----------------+----------+-----------+-----------+---+------+-----------+
|  1|      2110407|         Yes|                      14|           30|          E2|                      -20.79|                      1

In [7]:
table.printSchema()

root
 |-- Sno: integer (nullable = true)
 |-- Candidate_Ref: integer (nullable = true)
 |-- DOJ_Extended: string (nullable = true)
 |-- Duration_to_accept_offer: integer (nullable = true)
 |-- Notice_Period: integer (nullable = true)
 |-- Offered_Band: string (nullable = true)
 |-- Percent_hike_expected_in_CTC: double (nullable = true)
 |-- Percent_hike_offered_in_CTC: double (nullable = true)
 |-- Percent_difference_CTC: double (nullable = true)
 |-- Joining_Bonus: string (nullable = true)
 |-- Candidate_relocate_actual: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Candidate_Source: string (nullable = true)
 |-- Rex_in_Yrs: integer (nullable = true)
 |-- Location_ID: string (nullable = true)
 |-- Postal_Code: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- LOB_Id: integer (nullable = true)
 |-- Domicile_Id: string (nullable = true)



In [8]:
df=spark.sql('select * from db1.table1')

In [9]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+-------------+------------+------------------------+-------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+-----------+-----------+---+------+-----------+
|Sno|Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|Notice_Period|Offered_Band|Percent_hike_expected_in_CTC|Percent_hike_offered_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender|Candidate_Source|Rex_in_Yrs|Location_ID|Postal_Code|Age|LOB_Id|Domicile_Id|
+---+-------------+------------+------------------------+-------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+-----------+-----------+---+------+-----------+
|  0|            0|           0|                       0|            0|          13|                           0|                          

In [10]:
df2 = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns])
df2.show()

+---+-------------+------------+------------------------+-------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+-----------+-----------+---+------+-----------+
|Sno|Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|Notice_Period|Offered_Band|Percent_hike_expected_in_CTC|Percent_hike_offered_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender|Candidate_Source|Rex_in_Yrs|Location_ID|Postal_Code|Age|LOB_Id|Domicile_Id|
+---+-------------+------------+------------------------+-------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+-----------+-----------+---+------+-----------+
|  0|            0|           0|                       0|            0|          13|                           0|                          

In [22]:
df3 = spark.sql("SELECT Candidate_Ref FROM DB1.Table1 where Age IS NULL OR Offered_Band IS NULL").show()

+-------------+
|Candidate_Ref|
+-------------+
|      2207823|
|      2286381|
|      2365129|
|      2452418|
|      2569520|
|      2780669|
|      2845929|
|      3211382|
|      3300563|
|      3379843|
|      3644811|
|      2116859|
|      2117861|
|      2118480|
|      2119541|
|      2121112|
|      2128251|
|      2143765|
|      2151137|
|      2160044|
+-------------+
only showing top 20 rows



In [14]:
df.describe().show()

+-------+------------------+-----------------+------------+------------------------+------------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+-----------------+------------------+-----------+------------------+------------------+------------------+-----------+
|summary|               Sno|    Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|     Notice_Period|Offered_Band|Percent_hike_expected_in_CTC|Percent_hike_offered_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender| Candidate_Source|        Rex_in_Yrs|Location_ID|       Postal_Code|               Age|            LOB_Id|Domicile_Id|
+-------+------------------+-----------------+------------+------------------------+------------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+-----------------+---------------

In [16]:
df.count()

8995

In [21]:
df.select(countDistinct("Candidate_Ref")).show()

+-----------------------------+
|count(DISTINCT Candidate_Ref)|
+-----------------------------+
|                         8995|
+-----------------------------+



In [17]:
df.select(countDistinct("Offered_Band")).show()

+----------------------------+
|count(DISTINCT Offered_Band)|
+----------------------------+
|                           4|
+----------------------------+



In [18]:
df.select(countDistinct("Location_ID")).show()

+---------------------------+
|count(DISTINCT Location_ID)|
+---------------------------+
|                         11|
+---------------------------+



In [19]:
df.select(countDistinct("LOB_ID")).show()

+----------------------+
|count(DISTINCT LOB_ID)|
+----------------------+
|                     9|
+----------------------+



In [20]:
df.select(countDistinct("Domicile_ID")).show()

+---------------------------+
|count(DISTINCT Domicile_ID)|
+---------------------------+
|                         24|
+---------------------------+



In [25]:
df_1=spark.read.csv("hdfs://localhost:54310/capstone1/HR_raw.csv",header=True,mode='DROPMALFORMED')

In [27]:
df_1.count()

8995

In [39]:
amount_missing_df = df_1.select([(count(when(isnan(c) | col(c).isNull(), c))*100/count(lit(1))).alias(c) for c in df_1.columns])
amount_missing_df.show()

+---+-------------+------------+------------------------+-------------+-------------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+-----------+-----------+-------------------+------+-----------+
|Sno|Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|Notice_Period|       Offered_Band|Percent_hike_expected_in_CTC|Percent_hike_offered_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender|Candidate_Source|Rex_in_Yrs|Location_ID|Postal_Code|                Age|LOB_Id|Domicile_Id|
+---+-------------+------------+------------------------+-------------+-------------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+-----------+-----------+-------------------+------+-----------+
|0.0|          0.0|         0.0|                     0.0|          0.0

In [32]:
nonulldf = df_1.na.drop()

In [34]:
nonulldf.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in nonulldf.columns]).show()


+---+-------------+------------+------------------------+-------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+-----------+-----------+---+------+-----------+
|Sno|Candidate_Ref|DOJ_Extended|Duration_to_accept_offer|Notice_Period|Offered_Band|Percent_hike_expected_in_CTC|Percent_hike_offered_in_CTC|Percent_difference_CTC|Joining_Bonus|Candidate_relocate_actual|Gender|Candidate_Source|Rex_in_Yrs|Location_ID|Postal_Code|Age|LOB_Id|Domicile_Id|
+---+-------------+------------+------------------------+-------------+------------+----------------------------+---------------------------+----------------------+-------------+-------------------------+------+----------------+----------+-----------+-----------+---+------+-----------+
|  0|            0|           0|                       0|            0|           0|                           0|                          

In [35]:
nonulldf.count()

8968