In [123]:
import findspark
findspark.init()

### Create SparkSession

In [2]:
# Create SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder. \
    appName("pyspark-assessment"). \
    getOrCreate()

### Import Libs

In [79]:
# Spark & Python utilities

from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType, NumericType
from pyspark.sql.window import Window
import re
import pandas as pd

# Pandas display settings (for notebook readability)
pd.set_option('display.max_colwidth', 200)
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)

### Read Source 

In [148]:
def read_source_data(spark: SparkSession, path: str) -> DataFrame:
    
#     Reads the NYC Jobs CSV file with proper CSV handling options.
#     Returns a Spark DataFrame.
    
    return (
        spark.read
        .option("header", "true")
        .option("delimiter", ",")
        .option("inferSchema", "true")
        .option("quote", '"')
        .option("escape", '"')
        .option("multiLine", "true")
        .option("ignoreLeadingWhiteSpace", "true")
        .option("ignoreTrailingWhiteSpace", "true")
        .option("mode", "PERMISSIVE")
        .csv(path)
    )



# Read data
df = read_source_data(spark, "/dataset/nyc-jobs.csv")
# df.limit(10).toPandas()


### Source Data Analysis

#### Column Datatypes

In [127]:

def get_column_profile(df: DataFrame) -> DataFrame:

    # Returns column metadata: name, datatype, nullable.
    
    profile = [(f.name, f.dataType.simpleString(), f.nullable) for f in df.schema.fields]
    return spark.createDataFrame(profile, ["column", "data_type", "nullable"])


# Column profiling
get_column_profile(df).toPandas()


Unnamed: 0,column,data_type,nullable
0,Job ID,int,True
1,Agency,string,True
2,Posting Type,string,True
3,# Of Positions,int,True
4,Business Title,string,True
5,Civil Service Title,string,True
6,Title Code No,string,True
7,Level,string,True
8,Job Category,string,True
9,Full-Time/Part-Time indicator,string,True


#### Null Percentage

In [128]:
def get_null_percentage(df: DataFrame) -> DataFrame:

#     Calculates null percentage for each column.

    total_rows = df.count()
    exprs = [
        ((count(when(col(c).isNull(), c)) / total_rows) * 100).alias(c)
        for c in df.columns
    ]
    return df.select(exprs)


# Null percentage
get_null_percentage(df).toPandas()


Unnamed: 0,Job ID,Agency,Posting Type,# Of Positions,Business Title,Civil Service Title,Title Code No,Level,Job Category,Full-Time/Part-Time indicator,Salary Range From,Salary Range To,Salary Frequency,Work Location,Division/Work Unit,Job Description,Minimum Qual Requirements,Preferred Skills,Additional Information,To Apply,Hours/Shift,Work Location 1,Recruitment Contact,Residency Requirement,Posting Date,Post Until,Posting Updated,Process Date
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.067889,6.619145,0.0,0.0,0.0,0.0,0.0,0.0,0.678887,13.340122,37.06721,0.033944,69.993211,53.903598,100.0,0.135777,0.135777,70.434487,0.135777,0.135777


#### Category Column Analysis

In [130]:

def get_categorical_distribution(df: DataFrame, column: str) -> DataFrame:
    
#     Returns frequency distribution for a categorical column.
    
    return df.groupBy(column).count().orderBy(col("count").desc())


# Categorical analysis

categorical_cols = [
    "Agency", "Posting Type", "Job Category",
    "Full-Time/Part-Time indicator", "Salary Frequency", "Work Location"
]

for c in categorical_cols:
    df_show = get_categorical_distribution(df, c)
    df_show.show(truncate=False)



+------------------------------+-----+
|Agency                        |count|
+------------------------------+-----+
|DEPT OF ENVIRONMENT PROTECTION|655  |
|NYC HOUSING AUTHORITY         |231  |
|DEPT OF HEALTH/MENTAL HYGIENE |188  |
|DEPARTMENT OF TRANSPORTATION  |183  |
|DEPT OF DESIGN & CONSTRUCTION |142  |
|TAXI & LIMOUSINE COMMISSION   |134  |
|ADMIN FOR CHILDREN'S SVCS     |108  |
|DEPT OF INFO TECH & TELECOMM  |107  |
|LAW DEPARTMENT                |95   |
|HOUSING PRESERVATION & DVLPMNT|86   |
|OFFICE OF THE COMPTROLLER     |64   |
|POLICE DEPARTMENT             |64   |
|OFFICE OF MANAGEMENT & BUDGET |58   |
|NYC EMPLOYEES RETIREMENT SYS  |54   |
|DEPARTMENT OF INVESTIGATION   |53   |
|DEPARTMENT OF BUSINESS SERV.  |52   |
|DEPARTMENT OF CORRECTION      |51   |
|DEPT OF PARKS & RECREATION    |48   |
|DEPARTMENT OF CITY PLANNING   |45   |
|DEPT OF CITYWIDE ADMIN SVCS   |42   |
+------------------------------+-----+
only showing top 20 rows

+------------+-----+
|Posting Type|cou

### Data Cleaning

#### Rename the column names

In [131]:
def clean_column_names(df: DataFrame) -> DataFrame:
    
#     Standardizes column names: lowercase, underscores, remove special chars.
    
    def clean_col(c):
        c = c.lower()
        c = c.replace("#", "num").replace("/", "_").replace("-", "_").replace(" ", "_")
        c = re.sub(r"[^a-z0-9_]", "", c)
        c = re.sub(r"_+", "_", c)
        return c.strip("_")

    return df.select([col(c).alias(clean_col(c)) for c in df.columns])


df_clean = clean_column_names(df)
df_clean.printSchema()


root
 |-- job_id: integer (nullable = true)
 |-- agency: string (nullable = true)
 |-- posting_type: string (nullable = true)
 |-- num_of_positions: integer (nullable = true)
 |-- business_title: string (nullable = true)
 |-- civil_service_title: string (nullable = true)
 |-- title_code_no: string (nullable = true)
 |-- level: string (nullable = true)
 |-- job_category: string (nullable = true)
 |-- full_time_part_time_indicator: string (nullable = true)
 |-- salary_range_from: double (nullable = true)
 |-- salary_range_to: double (nullable = true)
 |-- salary_frequency: string (nullable = true)
 |-- work_location: string (nullable = true)
 |-- division_work_unit: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- minimum_qual_requirements: string (nullable = true)
 |-- preferred_skills: string (nullable = true)
 |-- additional_information: string (nullable = true)
 |-- to_apply: string (nullable = true)
 |-- hours_shift: string (nullable = true)
 |-- work_loc

#### Features Addition

In [149]:
def add_features(df: DataFrame) -> DataFrame:

#     Adds:
#     - salary_midpoint
#     - posting_age_days
#     - education_level

    return (
        df
        .withColumn("salary_midpoint", (col("salary_range_to") + col("salary_range_from")) / 2)
        .withColumn("posting_age_days", datediff(current_date(), col("posting_date")))
        .withColumn(
            "education_level",
            when(lower(col("minimum_qual_requirements")).rlike("master"), "Masters")
            .when(lower(col("minimum_qual_requirements")).rlike("baccalaureate|bachelor"), "Bachelors")
            .when(lower(col("minimum_qual_requirements")).rlike("high school"), "High School")
            .otherwise("NA")
        )
    )


# Feature engineering
df_features = add_features(df_clean)
df_features.limit(2).toPandas()


Unnamed: 0,job_id,agency,posting_type,num_of_positions,business_title,civil_service_title,title_code_no,level,job_category,full_time_part_time_indicator,salary_range_from,salary_range_to,salary_frequency,work_location,division_work_unit,job_description,minimum_qual_requirements,preferred_skills,additional_information,to_apply,hours_shift,work_location_1,recruitment_contact,residency_requirement,posting_date,post_until,posting_updated,process_date,salary_midpoint,posting_age_days,education_level
0,87990,DEPARTMENT OF BUSINESS SERV.,Internal,1,Account Manager,CONTRACT REVIEWER (OFFICE OF L,40563,1,,,42405.0,65485.0,Annual,110 William St. N Y,Strategy & Analytics,Division of Economic & Financial Opportunity (DEFO) Mayor Michael R. Bloomberg and SBS are committed to encouraging a competitive and diverse New York City business environment by promoting the...,1.\tA baccalaureate degree from an accredited college and two years of experience in community work or community centered activities in an area related to the duties described above; or 2.\tHigh ...,â€¢\tExcellent interpersonal and organizational skills. â€¢\tExcellent analytic and operational skills. â€¢\tExcellent writing and editing skills. â€¢\tKnowledge of government procurement proce...,"Salary range for this position is: $42,405 - $45,000 per year",,,,,"New York City residency is generally required within 90 days of appointment. However, City Employees in certain titles who have worked for the City for 2 continuous years may also be eligible to r...",2011-06-24,,2011-06-24,2019-12-17,53945.0,5341,Bachelors
1,97899,DEPARTMENT OF BUSINESS SERV.,Internal,1,"EXECUTIVE DIRECTOR, BUSINESS DEVELOPMENT",ADMINISTRATIVE BUSINESS PROMOT,10009,M3,,F,60740.0,162014.0,Annual,110 William St. N Y,Tech Talent Pipeline,"The New York City Department of Small Business Services (SBS) is a vibrant, client-centered agency whose mission is to serve New Yorkâ€™s small businesses, jobseekers and commercial districts. SB...","1. A baccalaureate degree from an accredited college or university and five years of full-time paid experience acquired within the last fifteen years, of supervisory or administrative experience i...",,,"In addition to applying through this website, also email your resume and cover letter including the following subject line: Executive Director â€“ Business Development to: careers@sbs.nyc.gov...",,,,"New York City residency is generally required within 90 days of appointment. However, City Employees in certain titles who have worked for the City for 2 continuous years may also be eligible to r...",2012-01-26,,2012-01-26,2019-12-17,111377.0,5125,Bachelors


#### Features Removal

In [151]:
def remove_unused_features(df: DataFrame) -> DataFrame:

#     Drops free-text / non-required columns.

    return df.drop(
        "job_description",
        "additional_information",
        "to_apply",
        "process_date"
    )

# Remove features
df_final = remove_unused_features(df_features)


#### Top 10 Job Postings per Category

In [133]:
def get_top_jobs_per_category(df: DataFrame) -> DataFrame:
    return df.groupBy("job_category").count().orderBy(col("count").desc()).limit(10)


get_top_jobs_per_category(df_final).toPandas()

Unnamed: 0,job_category,count
0,"Engineering, Architecture, & Planning",504
1,"Technology, Data & Innovation",313
2,Legal Affairs,226
3,"Public Safety, Inspections, & Enforcement",182
4,Building Operations & Maintenance,181
5,"Finance, Accounting, & Procurement",169
6,Administration & Human Resources,134
7,Constituent Services & Community Programs,129
8,Health,125
9,"Policy, Research & Analysis",124


#### Salary Distribution per Job Category

In [134]:
def get_salary_distribution(df: DataFrame) -> DataFrame:
    return (
        df.groupBy("job_category")
          .agg(
              avg("salary_range_from").alias("avg_salary_min"),
              avg("salary_range_to").alias("avg_salary_max")
          )
    )

get_salary_distribution(df_final).toPandas()

Unnamed: 0,job_category,avg_salary_min,avg_salary_max
0,"Administration & Human Resources Communications & Intergovernmental Affairs Engineering, Architecture, & Planning Policy, Research & Analysis",90000.0,100000.0
1,"Health Policy, Research & Analysis Public Safety, Inspections, & Enforcement",113504.0,143885.0
2,"Administration & Human Resources Building Operations & Maintenance Policy, Research & Analysis",54100.0,83981.0
3,Information Technology & Telecommunications Policy & Analysis Social Services,68239.0,85644.0
4,"Finance, Accounting, & Procurement Public Safety, Inspections, & Enforcement",55659.0,70390.0
5,"Engineering, Architecture, & Planning Building Operations & Maintenance Public Safety, Inspections, & Enforcement",52307.03,74392.53
6,"Legal Affairs Policy, Research & Analysis Public Safety, Inspections, & Enforcement",68615.666667,109181.666667
7,"Administration & Human Resources Finance, Accounting, & Procurement Building Operations & Maintenance Policy, Research & Analysis",45491.0,60660.0
8,Constituent Services & Community Programs,50116.325578,65638.830029
9,Building Operations & Maintenance,30188.100766,45006.693915


#### Correlation Between Degree & Salary

In [139]:
def get_avg_salary_by_education_level(df: DataFrame) -> DataFrame:

#     Calculates the average salary per education level.

    return (
        df.groupBy("education_level")
          .agg(avg("salary_range_to").alias("avg_salary"))
          .orderBy(col("avg_salary").desc())
    )


avg_salary_by_edu_df = get_avg_salary_by_education_level(df_final)

avg_salary_by_edu_df.toPandas()

Unnamed: 0,education_level,avg_salary
0,Masters,101324.468675
1,Bachelors,86227.938491
2,,81951.870803
3,High School,43680.853911


#### Highest Salary Job per Agency

In [135]:

def get_highest_salary_per_agency(df: DataFrame) -> DataFrame:
    window_spec = Window.partitionBy("agency").orderBy(col("salary_range_to").desc())
    return (
        df.withColumn("rn", row_number().over(window_spec))
          .filter(col("rn") == 1)
          .select("agency", "business_title", "salary_range_to")
    )

get_highest_salary_per_agency(df_final).toPandas()


Unnamed: 0,agency,business_title,salary_range_to
0,LANDMARKS PRESERVATION COMM,"LANDMARKS PRESERVATIONIST, PRESERVATION DEPT",64297.0
1,OFFICE OF COLLECTIVE BARGAININ,COLLEGE AIDE - CLERICAL,10.36
2,FIRE DEPARTMENT,Senior Enterprise Applications Integration Developer,144929.0
3,ADMIN FOR CHILDREN'S SVCS,Director of Technical Support,156829.0
4,MANHATTAN COMMUNITY BOARD #8,Community Assistant,19.0
5,TAX COMMISSION,CITY ASSESSOR,90177.0
6,HRA/DEPT OF SOCIAL SERVICES,"EXECUTIVE DIRECTOR, SERVER INFRASTRUCTURE - BUILD",153017.0
7,TAXI & LIMOUSINE COMMISSION,"Executive Director, Technology Strategy",160000.0
8,EQUAL EMPLOY PRACTICES COMM,Director of Learning and Development,72712.0
9,DEPARTMENT OF BUSINESS SERV.,"EXECUTIVE DIRECTOR, BUSINESS DEVELOPMENT",162014.0


#### Average Salary per Agency (Last 2 Years)

In [136]:
def get_avg_salary_last_2_years(df: DataFrame) -> DataFrame:
    return (
        df.filter(col("posting_date") >= date_sub(current_date(), 730))
          .groupBy("agency")
          .agg(avg("salary_midpoint").alias("avg_salary"))
    )

get_avg_salary_last_2_years(df_final).toPandas()

Unnamed: 0,agency,avg_salary


#### Highest Paid Skills

In [137]:

# Highest paid jobs as per the business title


def get_highest_paid_skills(df: DataFrame) -> DataFrame:
    return (
        df.groupBy("business_title")
          .agg(avg("salary_midpoint").alias("avg_salary"))
          .orderBy(col("avg_salary").desc())
          .limit(10)
    )

get_highest_paid_skills(df_final).toPandas()

Unnamed: 0,business_title,avg_salary
0,"Deputy Commissioner, Bureau of Customer Services",218587.0
1,"Deputy Commissioner, Public Information, M-VII",217201.0
2,"Deputy Commissioner, Water and Sewer Operations",209585.0
3,"Deputy Commissioner, Wastewater Treatment",198518.0
4,Co-Chief Information Officer,191913.0
5,"Assistant Commissioner, Capital Planning & Delivery",182500.0
6,Vice-President for Support Services,180000.0
7,ADMINISTRATIVE MANAGEMENT AUDITOR,177374.5
8,"Physician, Bureau of Sexually Transmitted Infections",170133.84
9,executive Vice President for Operations,169011.0


### TestCases

In [143]:
def test_clean_column_names():
    mock_data = [(1,)]
    schema = ["Salary Range From"]

    df = spark.createDataFrame(mock_data, schema)
    df_clean = clean_column_names(df)

    assert "salary_range_from" in df_clean.columns
    print("test_clean_column_names passed")

test_clean_column_names()

test_clean_column_names passed


In [147]:
from pyspark.sql.functions import to_date

def test_salary_midpoint():
    # Mock data must include ALL columns used inside add_features()
    mock_data = [
        (50000, 70000, "2020-01-01", "Bachelor degree required")
    ]
    schema = [
        "salary_range_from",
        "salary_range_to",
        "posting_date",
        "minimum_qual_requirements"
    ]

    df = spark.createDataFrame(mock_data, schema)

    df = df.withColumn("posting_date", to_date("posting_date"))
    df = add_features(df)

    result = df.select("salary_midpoint").collect()[0][0]
    assert result == 60000

    print("test_salary_midpoint passed")


test_salary_midpoint()


test_salary_midpoint passed


In [145]:
from pyspark.sql.functions import to_date

def test_education_level():
    mock_data = [
        ("Master degree required", 100000, 80000, "2020-01-01")
    ]
    schema = [
        "minimum_qual_requirements",
        "salary_range_to",
        "salary_range_from",
        "posting_date"
    ]

    df = spark.createDataFrame(mock_data, schema)
    df = df.withColumn("posting_date", to_date("posting_date"))

    df = add_features(df)

    assert df.select("education_level").collect()[0][0] == "Masters"


    print("test_education_level passed")

test_education_level()   


test_education_level passed


#### Write the final output

In [138]:
# Writing the final dataframe in parquet format

def write_output(df: DataFrame, path: str):
    """
    Writes final DataFrame as Parquet.
    """
    df.write.mode("overwrite").parquet(path)

    
write_output(df_final, "/output/processed_nyc_jobs")
