In [1]:
# adding required packages
import findspark
import pprint
import matplotlib.pyplot as plt

In [2]:
#To make pyspark importable as a regular library
findspark.init()

In [14]:
#importing pyspark related package
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import col, when, avg, round, rank, isnan
from pyspark.sql.window import Window

In [4]:
%run /notebook/dataproduct/assesment_nyc_job_posting/lib/data_profiling_transform.ipynb

Data profile Function Imported


In [6]:
%run /notebook/dataproduct/assesment_nyc_job_posting/lib/kpi_transform.ipynb

KPI Transformations Imported


In [7]:
%run /notebook/dataproduct/assesment_nyc_job_posting/utils/spark_session.ipynb

Spark Session Imported


In [8]:
# Local System Configuration
# Total Memory = 16GB
# Total Cores = 10

def get_spark_conf():
    # Configure Spark settings
    spark_conf = SparkConf()
    spark_conf.set("spark.executor.instances", "4") # 4 instance per node

    # Set the number of executor cores
    spark_conf.set("spark.executor.cores", "1")  # Use 1 cores per executor

    # Set the executor memory
    spark_conf.set("spark.executor.memory", "1g")  # Use 1GB memory per executor

    # Set the driver memory
    spark_conf.set("spark.driver.memory", "2g")    # Use 2GB memory for the driver
    
    return spark_conf

In [9]:
# # Listing the columns based on its type
# def get_col_type_dict(df):
#     col_type_dict = {}

#     for col_name, col_type in df.dtypes:
#         if col_type in col_type_dict.keys():
#             col_type_dict[col_type].append(col_name)
#         else:
#             col_type_dict[col_type] = [col_name]

#     return col_type_dict

In [10]:
def data_profile(df):
    
    print("Starting Data Profiling")
    print("Schema of the dataset")
    
    #getting the schema
    df.printSchema()
    
    ## Display the first few rows of the DataFrame
    display(df.limit(10))
    
    # Getting counts
    total_count = df.count()
    print(f"Total Records: {total_count}")
    
    print("Getting missing value counts")
    missing_value_counts = calculate_missing_value_counts(df)
    display(missing_value_counts)
    
    print("Getting numerical status")
    numerical_columns = [col_name for col_name, col_type in df.dtypes if col_type in ["int", "double", "float"]]
    summary_stats = calculate_summary_stats(df, numerical_columns)
    display(summary_stats)
    
    print("Getting categorical columns") 
    # Most of the time categorical column should be string type but for this analysis -
    # assuming that categorical columns can be any type.
    distinct_threshold=10
    for col_name in df.columns:
        is_categorical_cond, distinct_values, top_values = profile_categorical_column(df, col_name)
        if is_categorical_cond:
            print(f"Column: {col_name}")
            print(f"Distinct Values: {distinct_values}")
            print("Top Values:")
            top_values.show(truncate=False)



In [11]:
def show_kpi(df):        
    # Salary range is linked with salary frequency. 
    # To get any metrics which depends on salary we need to have salary in same frequency.    
    print("Preparing Data for KPI")
    kpi_pre_df = df.withColumn("Annual Salary From",
                       when(col("Salary Frequency") == "Hourly", col("Salary Range From") * 2080)  # Assuming 2080 work hours per year
                       .when(col("Salary Frequency") == "Weekly", col("Salary Range From") * 52)
                       .when(col("Salary Frequency") == "Daily", col("Salary Range From") * 260)  # Assuming 5 workdays per week
                       .otherwise(col("Salary Range From")))
    
    kpi_df = kpi_pre_df.withColumn("Annual Salary To",
                       when(col("Salary Frequency") == "Hourly", col("Salary Range To") * 2080)
                       .when(col("Salary Frequency") == "Weekly", col("Salary Range To") * 52)
                       .when(col("Salary Frequency") == "Daily", col("Salary Range To") * 260)
                       .otherwise(col("Salary Range To")))
    
    
    print("Getting KPIs")
    print("Top 10 jobs posting per category")
    category_counts = get_top10_job_posting_per_cat(kpi_df)
    category_counts.show(truncate=False)
    
    print("The salary distribution per job category")
    salary_distribution = get_sal_dist_per_cat(kpi_df)
    salary_distribution.show(truncate=False)
    
    print("The job posting having the highest salary per agency")
    max_sal_per_agency_df = get_highes_sal_per_cat(kpi_df)
    max_sal_per_agency_df.show(truncate=False)
    
    #KPI5: Whats the job positings average salary per agency for the last 2 years
    print("The average salary per agency for the last 2 years")
    last_n_year = 2
    avg_salary_df = get_avg_sal_per_agency_last_n_year(kpi_df,last_n_year)
    avg_salary_df.show(truncate=False)

    # KPI6 6: What are the highest paid skills in the US market?

In [12]:
def main():
    
    job_name = 'nyc_assesment'
    
    #setting spark conf before creating spark session
    spark_conf = get_spark_conf()
    
    # Create a SparkSession with the configured settings
    spark = get_spark_session(spark_conf, job_name)
    
    # Listing all the spark conf
    spark.sparkContext.getConf().getAll()
    
    # setting spark conf for analysis
    spark.conf.set('spark.sql.repl.eagerEval.enabled',True)
    
    #reading dataset
    # adding escape charater after data profiling the data 
    df = spark.read.csv("/dataset/nyc-jobs.csv", header=True, inferSchema=True, escape='"')
    
    # reducing the shuffle partition to 4 
    # reason 1 data size is very less
    # reason 2 to use all the availble cores
    spark.conf.set('spark.sql.shuffle.partitions',4)
    
    # Creating data profile
    data_profile(df)
    
    # Prepreparing data for KPI
    # Need to implement
    
    # Writing prepared and cleaned data 
    # Needt to implement
    
    # Showing KPIs
    #show_kpi(df)

In [15]:
main()

Starting Data Profiling
Schema of the dataset
root
 |-- Job ID: integer (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Posting Type: string (nullable = true)
 |-- # Of Positions: integer (nullable = true)
 |-- Business Title: string (nullable = true)
 |-- Civil Service Title: string (nullable = true)
 |-- Title Code No: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Job Category: string (nullable = true)
 |-- Full-Time/Part-Time indicator: string (nullable = true)
 |-- Salary Range From: double (nullable = true)
 |-- Salary Range To: double (nullable = true)
 |-- Salary Frequency: string (nullable = true)
 |-- Work Location: string (nullable = true)
 |-- Division/Work Unit: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Minimum Qual Requirements: string (nullable = true)
 |-- Preferred Skills: string (nullable = true)
 |-- Additional Information: string (nullable = true)
 |-- To Apply: string (nullable = true)
 |-- Hours/S

Job ID,Agency,Posting Type,# Of Positions,Business Title,Civil Service Title,Title Code No,Level,Job Category,Full-Time/Part-Time indicator,Salary Range From,Salary Range To,Salary Frequency,Work Location,Division/Work Unit,Job Description,Minimum Qual Requirements,Preferred Skills,Additional Information,To Apply,Hours/Shift,Work Location 1,Recruitment Contact,Residency Requirement,Posting Date,Post Until,Posting Updated,Process Date
381492,NYC HOUSING AUTHO...,Internal,2,Community Associate,COMMUNITY ASSOCIATE,56057,0,"Policy, Research ...",,52000.0,61936.0,Annual,Visual Assessment...,Lead Hazard Contr...,The New York City...,Qualification Req...,1. Experience wi...,NYCHA employees a...,"Click the ""Apply ...",,,,NYCHA has no resi...,2019-05-10 00:00:00,,2019-07-05 00:00:00,2019-12-17 00:00:00
381492,NYC HOUSING AUTHO...,External,2,Community Associate,COMMUNITY ASSOCIATE,56057,0,"Policy, Research ...",,52000.0,61936.0,Annual,Visual Assessment...,Lead Hazard Contr...,The New York City...,Qualification Req...,1. Experience wi...,NYCHA employees a...,"Click the ""Apply ...",,,,NYCHA has no resi...,2019-05-10 00:00:00,,2019-07-05 00:00:00,2019-12-17 00:00:00
381752,DEPT OF ENVIRONME...,Internal,1,Associate Public ...,ASSOCIATE PUBLIC ...,31220,1,Health Public Saf...,F,58677.0,91199.0,Annual,96-05 Horace Hard...,Environmental Hea...,The NYC Departmen...,1. A baccalaureat...,â€¢ Excellent int...,Appointments are ...,"Click ""Apply Now""...",40 hours per week...,Owls Head Wastewa...,,New York City res...,2019-01-29 00:00:00,,2019-10-07 00:00:00,2019-12-17 00:00:00
381752,DEPT OF ENVIRONME...,External,1,Associate Public ...,ASSOCIATE PUBLIC ...,31220,1,Health Public Saf...,F,58677.0,91199.0,Annual,96-05 Horace Hard...,Environmental Hea...,The NYC Departmen...,1. A baccalaureat...,â€¢ Excellent int...,Appointments are ...,"Click ""Apply Now""...",40 hours per week...,Owls Head Wastewa...,,New York City res...,2019-01-29 00:00:00,,2019-10-07 00:00:00,2019-12-17 00:00:00
381848,NYC HOUSING AUTHO...,Internal,1,"Director, Lead Ha...",DIRECTOR OF CONTR...,80287,M4,"Public Safety, In...",,78574.0,202744.0,Annual,Lead Hazard - Off...,Lead Hazard Contr...,Please read this ...,A Baccalaureate d...,â€¢ Integrity â€“...,NYCHA employees a...,"Click the ""Apply ...",,,,NYCHA has no resi...,2019-01-29 00:00:00,,2019-04-24 00:00:00,2019-12-17 00:00:00
381848,NYC HOUSING AUTHO...,External,1,"Director, Lead Ha...",DIRECTOR OF CONTR...,80287,M4,"Public Safety, In...",,78574.0,202744.0,Annual,Lead Hazard - Off...,Lead Hazard Contr...,Please read this ...,A Baccalaureate d...,â€¢ Integrity â€“...,NYCHA employees a...,"Click the ""Apply ...",,,,NYCHA has no resi...,2019-01-29 00:00:00,,2019-04-24 00:00:00,2019-12-17 00:00:00
381982,ADMIN FOR CHILDRE...,Internal,1,Director of Security,DIRECTOR OF SECUR...,70822,M1,"Public Safety, In...",F,56990.0,115000.0,Annual,150 William Stree...,Facilities (Admin),The New York City...,1. A baccalaureat...,The preferred can...,Section 424-A of ...,"Click on the ""App...",,,,New York City res...,2019-02-05 00:00:00,,2019-12-10 00:00:00,2019-12-17 00:00:00
381982,ADMIN FOR CHILDRE...,External,1,Director of Security,DIRECTOR OF SECUR...,70822,M1,"Public Safety, In...",F,56990.0,115000.0,Annual,150 William Stree...,Facilities (Admin),The New York City...,1. A baccalaureat...,The preferred can...,Section 424-A of ...,"Click on the ""App...",,,,New York City res...,2019-02-05 00:00:00,,2019-12-10 00:00:00,2019-12-17 00:00:00
382050,DEPT OF ENVIRONME...,Internal,10,City Seasonal Aide,CITY SEASONAL AIDE,91406,0,Administration & ...,F,15.0,18.77,Hourly,59-17 Junction Bl...,Dept of Environme...,The NYC Departmen...,While there are n...,1.	Office Automat...,Appointments are ...,To apply click th...,35 hour week,59-17 Junction Bl...,,New York City res...,2019-02-08 00:00:00,,2019-12-11 00:00:00,2019-12-17 00:00:00
382050,DEPT OF ENVIRONME...,External,10,City Seasonal Aide,CITY SEASONAL AIDE,91406,0,Administration & ...,F,15.0,18.77,Hourly,59-17 Junction Bl...,Dept of Environme...,The NYC Departmen...,While there are n...,1.	Office Automat...,Appointments are ...,To apply click th...,35 hour week,59-17 Junction Bl...,,New York City res...,2019-02-08 00:00:00,,2019-12-11 00:00:00,2019-12-17 00:00:00


Total Records: 2946
Getting missing value counts


AnalysisException: "cannot resolve 'isnan(`Posting Date`)' due to data type mismatch: argument 1 requires (double or float) type, however, '`Posting Date`' is of timestamp type.;;\n'Aggregate [count(CASE WHEN ((((Contains(cast(Job ID#335 as string), None) || Contains(cast(Job ID#335 as string), NULL)) || (Job ID#335 = cast( as int))) || isnull(Job ID#335)) || isnan(cast(Job ID#335 as double))) THEN Job ID END) AS Job ID_missing#651L, count(CASE WHEN ((((Contains(Agency#336, None) || Contains(Agency#336, NULL)) || (Agency#336 = )) || isnull(Agency#336)) || isnan(cast(Agency#336 as double))) THEN Agency END) AS Agency_missing#653L, count(CASE WHEN ((((Contains(Posting Type#337, None) || Contains(Posting Type#337, NULL)) || (Posting Type#337 = )) || isnull(Posting Type#337)) || isnan(cast(Posting Type#337 as double))) THEN Posting Type END) AS Posting Type_missing#655L, count(CASE WHEN ((((Contains(cast(# Of Positions#338 as string), None) || Contains(cast(# Of Positions#338 as string), NULL)) || (# Of Positions#338 = cast( as int))) || isnull(# Of Positions#338)) || isnan(cast(# Of Positions#338 as double))) THEN # Of Positions END) AS # Of Positions_missing#657L, count(CASE WHEN ((((Contains(Business Title#339, None) || Contains(Business Title#339, NULL)) || (Business Title#339 = )) || isnull(Business Title#339)) || isnan(cast(Business Title#339 as double))) THEN Business Title END) AS Business Title_missing#659L, count(CASE WHEN ((((Contains(Civil Service Title#340, None) || Contains(Civil Service Title#340, NULL)) || (Civil Service Title#340 = )) || isnull(Civil Service Title#340)) || isnan(cast(Civil Service Title#340 as double))) THEN Civil Service Title END) AS Civil Service Title_missing#661L, count(CASE WHEN ((((Contains(Title Code No#341, None) || Contains(Title Code No#341, NULL)) || (Title Code No#341 = )) || isnull(Title Code No#341)) || isnan(cast(Title Code No#341 as double))) THEN Title Code No END) AS Title Code No_missing#663L, count(CASE WHEN ((((Contains(Level#342, None) || Contains(Level#342, NULL)) || (Level#342 = )) || isnull(Level#342)) || isnan(cast(Level#342 as double))) THEN Level END) AS Level_missing#665L, count(CASE WHEN ((((Contains(Job Category#343, None) || Contains(Job Category#343, NULL)) || (Job Category#343 = )) || isnull(Job Category#343)) || isnan(cast(Job Category#343 as double))) THEN Job Category END) AS Job Category_missing#667L, count(CASE WHEN ((((Contains(Full-Time/Part-Time indicator#344, None) || Contains(Full-Time/Part-Time indicator#344, NULL)) || (Full-Time/Part-Time indicator#344 = )) || isnull(Full-Time/Part-Time indicator#344)) || isnan(cast(Full-Time/Part-Time indicator#344 as double))) THEN Full-Time/Part-Time indicator END) AS Full-Time/Part-Time indicator_missing#669L, count(CASE WHEN ((((Contains(cast(Salary Range From#345 as string), None) || Contains(cast(Salary Range From#345 as string), NULL)) || (Salary Range From#345 = cast( as double))) || isnull(Salary Range From#345)) || isnan(Salary Range From#345)) THEN Salary Range From END) AS Salary Range From_missing#671L, count(CASE WHEN ((((Contains(cast(Salary Range To#346 as string), None) || Contains(cast(Salary Range To#346 as string), NULL)) || (Salary Range To#346 = cast( as double))) || isnull(Salary Range To#346)) || isnan(Salary Range To#346)) THEN Salary Range To END) AS Salary Range To_missing#673L, count(CASE WHEN ((((Contains(Salary Frequency#347, None) || Contains(Salary Frequency#347, NULL)) || (Salary Frequency#347 = )) || isnull(Salary Frequency#347)) || isnan(cast(Salary Frequency#347 as double))) THEN Salary Frequency END) AS Salary Frequency_missing#675L, count(CASE WHEN ((((Contains(Work Location#348, None) || Contains(Work Location#348, NULL)) || (Work Location#348 = )) || isnull(Work Location#348)) || isnan(cast(Work Location#348 as double))) THEN Work Location END) AS Work Location_missing#677L, count(CASE WHEN ((((Contains(Division/Work Unit#349, None) || Contains(Division/Work Unit#349, NULL)) || (Division/Work Unit#349 = )) || isnull(Division/Work Unit#349)) || isnan(cast(Division/Work Unit#349 as double))) THEN Division/Work Unit END) AS Division/Work Unit_missing#679L, count(CASE WHEN ((((Contains(Job Description#350, None) || Contains(Job Description#350, NULL)) || (Job Description#350 = )) || isnull(Job Description#350)) || isnan(cast(Job Description#350 as double))) THEN Job Description END) AS Job Description_missing#681L, count(CASE WHEN ((((Contains(Minimum Qual Requirements#351, None) || Contains(Minimum Qual Requirements#351, NULL)) || (Minimum Qual Requirements#351 = )) || isnull(Minimum Qual Requirements#351)) || isnan(cast(Minimum Qual Requirements#351 as double))) THEN Minimum Qual Requirements END) AS Minimum Qual Requirements_missing#683L, count(CASE WHEN ((((Contains(Preferred Skills#352, None) || Contains(Preferred Skills#352, NULL)) || (Preferred Skills#352 = )) || isnull(Preferred Skills#352)) || isnan(cast(Preferred Skills#352 as double))) THEN Preferred Skills END) AS Preferred Skills_missing#685L, count(CASE WHEN ((((Contains(Additional Information#353, None) || Contains(Additional Information#353, NULL)) || (Additional Information#353 = )) || isnull(Additional Information#353)) || isnan(cast(Additional Information#353 as double))) THEN Additional Information END) AS Additional Information_missing#687L, count(CASE WHEN ((((Contains(To Apply#354, None) || Contains(To Apply#354, NULL)) || (To Apply#354 = )) || isnull(To Apply#354)) || isnan(cast(To Apply#354 as double))) THEN To Apply END) AS To Apply_missing#689L, count(CASE WHEN ((((Contains(Hours/Shift#355, None) || Contains(Hours/Shift#355, NULL)) || (Hours/Shift#355 = )) || isnull(Hours/Shift#355)) || isnan(cast(Hours/Shift#355 as double))) THEN Hours/Shift END) AS Hours/Shift_missing#691L, count(CASE WHEN ((((Contains(Work Location 1#356, None) || Contains(Work Location 1#356, NULL)) || (Work Location 1#356 = )) || isnull(Work Location 1#356)) || isnan(cast(Work Location 1#356 as double))) THEN Work Location 1 END) AS Work Location 1_missing#693L, count(CASE WHEN ((((Contains(Recruitment Contact#357, None) || Contains(Recruitment Contact#357, NULL)) || (Recruitment Contact#357 = )) || isnull(Recruitment Contact#357)) || isnan(cast(Recruitment Contact#357 as double))) THEN Recruitment Contact END) AS Recruitment Contact_missing#695L, count(CASE WHEN ((((Contains(Residency Requirement#358, None) || Contains(Residency Requirement#358, NULL)) || (Residency Requirement#358 = )) || isnull(Residency Requirement#358)) || isnan(cast(Residency Requirement#358 as double))) THEN Residency Requirement END) AS Residency Requirement_missing#697L, ... 4 more fields]\n+- Relation[Job ID#335,Agency#336,Posting Type#337,# Of Positions#338,Business Title#339,Civil Service Title#340,Title Code No#341,Level#342,Job Category#343,Full-Time/Part-Time indicator#344,Salary Range From#345,Salary Range To#346,Salary Frequency#347,Work Location#348,Division/Work Unit#349,Job Description#350,Minimum Qual Requirements#351,Preferred Skills#352,Additional Information#353,To Apply#354,Hours/Shift#355,Work Location 1#356,Recruitment Contact#357,Residency Requirement#358,... 4 more fields] csv\n"