In [8]:
# ---------------------------------------------------------
# Initialize Spark Session
# ---------------------------------------------------------
# Purpose:
# - Create a Spark entry point for DataFrame operations
# - Optimize for local Docker execution by reducing shuffle partitions
# ---------------------------------------------------------

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NYC_Jobs_Data_Exploration") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

spark


In [9]:
# ---------------------------------------------------------
# Load Raw NYC Jobs Dataset
# ---------------------------------------------------------
# Purpose:
# - Read the source CSV as provided
# - This represents the raw (bronze) layer
# ---------------------------------------------------------

df_raw = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("../data/nyc-jobs.csv")

df_raw.printSchema()


root
 |-- Job ID: integer (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Posting Type: string (nullable = true)
 |-- # Of Positions: integer (nullable = true)
 |-- Business Title: string (nullable = true)
 |-- Civil Service Title: string (nullable = true)
 |-- Title Code No: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Job Category: string (nullable = true)
 |-- Full-Time/Part-Time indicator: string (nullable = true)
 |-- Salary Range From: double (nullable = true)
 |-- Salary Range To: double (nullable = true)
 |-- Salary Frequency: string (nullable = true)
 |-- Work Location: string (nullable = true)
 |-- Division/Work Unit: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Minimum Qual Requirements: string (nullable = true)
 |-- Preferred Skills: string (nullable = true)
 |-- Additional Information: string (nullable = true)
 |-- To Apply: string (nullable = true)
 |-- Hours/Shift: string (nullable = true)
 |-- Work Locat

In [10]:
# ---------------------------------------------------------
# Function: standardize_column_names
# ---------------------------------------------------------
# Purpose:
# - Convert space-separated column names to snake_case
# - Improve readability and downstream maintainability
# - Avoid quoting/backticks in Spark SQL
# ---------------------------------------------------------

def standardize_column_names(df):
    for c in df.columns:
        df = df.withColumnRenamed(
            c,
            c.strip().lower().replace(" ", "_")
        )
    return df



In [11]:
df = standardize_column_names(df_raw)
df.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- agency: string (nullable = true)
 |-- posting_type: string (nullable = true)
 |-- #_of_positions: integer (nullable = true)
 |-- business_title: string (nullable = true)
 |-- civil_service_title: string (nullable = true)
 |-- title_code_no: string (nullable = true)
 |-- level: string (nullable = true)
 |-- job_category: string (nullable = true)
 |-- full-time/part-time_indicator: string (nullable = true)
 |-- salary_range_from: double (nullable = true)
 |-- salary_range_to: double (nullable = true)
 |-- salary_frequency: string (nullable = true)
 |-- work_location: string (nullable = true)
 |-- division/work_unit: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- minimum_qual_requirements: string (nullable = true)
 |-- preferred_skills: string (nullable = true)
 |-- additional_information: string (nullable = true)
 |-- to_apply: string (nullable = true)
 |-- hours/shift: string (nullable = true)
 |-- work_locat

In [12]:
# ---------------------------------------------------------
# Function: clean_salary_columns
# ---------------------------------------------------------
# Purpose:
# - Remove records with missing salary information
# - Salary fields are mandatory for KPI analysis
# ---------------------------------------------------------

from pyspark.sql.functions import col

def clean_salary_columns(df):
    return df.filter(
        col("salary_range_from").isNotNull() &
        col("salary_range_to").isNotNull()
    )
df = clean_salary_columns(df)
df.show(4)

+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+
|job_id|              agency|posting_type|#_of_positions|      business_title| civil_service_title|title_code_no|level|        job_category|full-time/part-time_indicator|salary_range_from|salary_range_to|salary_frequency|       work_location|  division/work_unit|     job_description|minimum_qual_requirements|    preferred_skills|additional_information|            to_apply|         hours/shift|     work_location_1| recruitment_contact|residency_require

In [14]:
# ---------------------------------------------------------
# Data Wrangling: Salary Sanity Validation
# ---------------------------------------------------------
# Purpose:
# - Remove logically invalid salary records
# - Ensure salary_range_from <= salary_range_to
# ---------------------------------------------------------

def validate_salary_ranges(df):
    return df.filter(
        col("salary_range_from") <= col("salary_range_to")
    )

df = validate_salary_ranges(df)
df.show(4)

+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+
|job_id|              agency|posting_type|#_of_positions|      business_title| civil_service_title|title_code_no|level|        job_category|full-time/part-time_indicator|salary_range_from|salary_range_to|salary_frequency|       work_location|  division/work_unit|     job_description|minimum_qual_requirements|    preferred_skills|additional_information|            to_apply|         hours/shift|     work_location_1| recruitment_contact|residency_require

In [16]:
# ---------------------------------------------------------
# Transformation: Text Normalization
# ---------------------------------------------------------
# Purpose:
# - Normalize qualification text for consistent pattern matching
# - Avoid case-sensitivity issues
# ---------------------------------------------------------

from pyspark.sql.functions import lower, trim

df = df.withColumn(
    "minimum_qual_requirements",
    lower(trim(col("minimum_qual_requirements")))
)
df.show(4)

+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+
|job_id|              agency|posting_type|#_of_positions|      business_title| civil_service_title|title_code_no|level|        job_category|full-time/part-time_indicator|salary_range_from|salary_range_to|salary_frequency|       work_location|  division/work_unit|     job_description|minimum_qual_requirements|    preferred_skills|additional_information|            to_apply|         hours/shift|     work_location_1| recruitment_contact|residency_require

In [17]:
# ---------------------------------------------------------
# Feature 1: avg_salary
# ---------------------------------------------------------
# Purpose:
# - Convert salary range into a single comparable metric
# - Used across all salary-related KPIs
# ---------------------------------------------------------

from pyspark.sql.functions import round

df = df.withColumn(
    "avg_salary",
    round(
        (col("salary_range_from") + col("salary_range_to")) / 2,
        2
    )
)
df.show(4)

+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+----------+
|job_id|              agency|posting_type|#_of_positions|      business_title| civil_service_title|title_code_no|level|        job_category|full-time/part-time_indicator|salary_range_from|salary_range_to|salary_frequency|       work_location|  division/work_unit|     job_description|minimum_qual_requirements|    preferred_skills|additional_information|            to_apply|         hours/shift|     work_location_1| recruitment_contact|reside

In [18]:
# ---------------------------------------------------------
# Feature 2: posting_date & posting_year
# ---------------------------------------------------------
# Purpose:
# - Convert posting_date from string to DateType
# - Extract posting_year for time-based analysis
# ---------------------------------------------------------

from pyspark.sql.functions import to_date, year

df = df.withColumn(
    "posting_date",
    to_date(col("posting_date"), "MM/dd/yyyy")
)

df = df.withColumn(
    "posting_year",
    year(col("posting_date"))
)
df.show(4)

+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+---------------------+------------+--------------------+--------------------+--------------------+----------+------------+
|job_id|              agency|posting_type|#_of_positions|      business_title| civil_service_title|title_code_no|level|        job_category|full-time/part-time_indicator|salary_range_from|salary_range_to|salary_frequency|       work_location|  division/work_unit|     job_description|minimum_qual_requirements|    preferred_skills|additional_information|            to_apply|         hours/shift|     work_location_1| recruitment_contact|r

In [19]:
# ---------------------------------------------------------
# Feature 3: requires_degree
# ---------------------------------------------------------
# Purpose:
# - Identify whether a job requires higher education
# - Derived from minimum qualification text
# ---------------------------------------------------------

df = df.withColumn(
    "requires_degree",
    col("minimum_qual_requirements").rlike(
        "Bachelor|Master|PhD|Degree"
    )
)
df.show(4)

+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+---------------------+------------+--------------------+--------------------+--------------------+----------+------------+---------------+
|job_id|              agency|posting_type|#_of_positions|      business_title| civil_service_title|title_code_no|level|        job_category|full-time/part-time_indicator|salary_range_from|salary_range_to|salary_frequency|       work_location|  division/work_unit|     job_description|minimum_qual_requirements|    preferred_skills|additional_information|            to_apply|         hours/shift|     work_location_1| recru

In [20]:
# ---------------------------------------------------------
# Function: drop_unused_columns
# ---------------------------------------------------------
# Purpose:
# - Remove high-cardinality and free-text columns
# - Reduce storage footprint
# - Improve analytical performance
# ---------------------------------------------------------

def drop_unused_columns(df):
    return df.drop(
        "job_description",
        "preferred_skills",
        "additional_information"
    )


In [22]:
df = drop_unused_columns(df)
df.show(4)

+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+---------------------+------------+--------------------+--------------------+--------------------+----------+------------+---------------+
|job_id|              agency|posting_type|#_of_positions|      business_title| civil_service_title|title_code_no|level|        job_category|full-time/part-time_indicator|salary_range_from|salary_range_to|salary_frequency|       work_location|  division/work_unit|minimum_qual_requirements|            to_apply|         hours/shift|     work_location_1| recruitment_contact|residency_requirement|posting_date|          post_until|     posting_updated|        process_date|avg_salary|postin

In [23]:
# ---------------------------------------------------------
# Final Schema Check
# ---------------------------------------------------------
# Purpose:
# - Validate curated schema before persistence
# ---------------------------------------------------------

df.printSchema()
df.show(5, truncate=False)


root
 |-- job_id: integer (nullable = true)
 |-- agency: string (nullable = true)
 |-- posting_type: string (nullable = true)
 |-- #_of_positions: integer (nullable = true)
 |-- business_title: string (nullable = true)
 |-- civil_service_title: string (nullable = true)
 |-- title_code_no: string (nullable = true)
 |-- level: string (nullable = true)
 |-- job_category: string (nullable = true)
 |-- full-time/part-time_indicator: string (nullable = true)
 |-- salary_range_from: double (nullable = true)
 |-- salary_range_to: double (nullable = true)
 |-- salary_frequency: string (nullable = true)
 |-- work_location: string (nullable = true)
 |-- division/work_unit: string (nullable = true)
 |-- minimum_qual_requirements: string (nullable = true)
 |-- to_apply: string (nullable = true)
 |-- hours/shift: string (nullable = true)
 |-- work_location_1: string (nullable = true)
 |-- recruitment_contact: string (nullable = true)
 |-- residency_requirement: string (nullable = true)
 |-- posting_

In [24]:
# ---------------------------------------------------------
# Persist Processed Dataset
# ---------------------------------------------------------
# Purpose:
# - Store cleaned and feature-engineered data
# - Use Parquet for efficient analytics and reuse
# ---------------------------------------------------------

df.write \
    .mode("overwrite") \
    .parquet("../data/processed_nyc_jobs.parquet")
