In [64]:
import os, findspark

#  Spark 4.0.1
os.environ["SPARK_HOME"] = "/opt/spark-4.0.1-bin-hadoop3"
os.environ["PATH"] = os.path.join(os.environ["SPARK_HOME"], "bin") + ":" + os.environ["PATH"]


findspark.init("/opt/spark-4.0.1-bin-hadoop3")

# create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("JobPostingsAnalysis").getOrCreate()

In [65]:
# Load CSV file into Spark DataFrame

df = (
    spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("multiLine", "true")
        .option("escape", "\"")
        .csv("../data/lightcast_job_postings.csv")
)

                                                                                

In [66]:
from pyspark.sql import functions as F

# Remove rows with missing SALARY
df = df.filter(F.col("SALARY").isNotNull())

# Remove duplicates based on key job-identifying fields
df = df.dropDuplicates(
    subset=[
        "CITY_NAME",
        "COUNTY_NAME",
        "STATE_NAME",
        "TITLE_CLEAN",
        "COMPANY_RAW",
        "POSTED"
    ]
)


print("Rows after cleaning:", df.count())

[Stage 146:>                                                        (0 + 1) / 1]

Rows after cleaning: 30613


                                                                                

In [67]:
from pyspark.sql import functions as F

df = df.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    F.when(F.col("EMPLOYMENT_TYPE_NAME").like("%Part-time / full-time%"), "Flexible")
     .when(F.col("EMPLOYMENT_TYPE_NAME").like("%Full-time%"), "Full-time")
     .when(F.col("EMPLOYMENT_TYPE_NAME").like("%Part-time%"), "Part-time")
     .otherwise(None)
)

In [68]:
from pyspark.sql import functions as F

df = df.withColumn(
    "EDUCATION_LEVELS_NAME",
    F.regexp_replace(F.col("EDUCATION_LEVELS_NAME"), r"[\\[\\]\"]", "")
)

df = df.withColumn(
    "EDUCATION_LEVELS_NAME",
    F.when(F.col("EDUCATION_LEVELS_NAME").rlike("PhD|Doctorate|Professional"), "PhD")
     .when(F.col("EDUCATION_LEVELS_NAME").rlike("Master"), "Master")
     .when(F.col("EDUCATION_LEVELS_NAME").rlike("Bachelor"), "Bachelor")
     .when(F.col("EDUCATION_LEVELS_NAME").rlike("Associate|GED|No Education Listed|High School"),
           "Associate or Lower")
     .otherwise("Other")
)


In [69]:
from pyspark.sql import functions as F

df = df.withColumn(
    "REMOTE_TYPE_NAME",
    F.when(F.col("REMOTE_TYPE_NAME").like("%Hybrid%"), "Hybrid")
     .when(F.col("REMOTE_TYPE_NAME").like("%Remote%"), "Remote")
     .otherwise("Onsite")
)

In [70]:
from pyspark.sql import functions as F

# Step 1: compute median
median_value = df.approxQuantile("MIN_YEARS_EXPERIENCE", [0.5], 0.01)[0]

# Step 2: fill missing with median
df = df.withColumn(
    "MIN_YEARS_EXPERIENCE",
    F.when(F.col("MIN_YEARS_EXPERIENCE").isNull(), median_value)
     .otherwise(F.col("MIN_YEARS_EXPERIENCE"))
)


                                                                                

In [71]:
df.show(5)
len(df.columns)

[Stage 155:>                                                        (0 + 1) / 1]

+--------------------+-----------------+----------------------+----------+--------+----------+--------+--------------------+--------------------+--------------------+-----------+-------------------+---------+--------------------+---------------+----------------+--------+------------------+--------------------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+--------------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--

                                                                                

131

In [72]:
df.printSchema() 

root
 |-- ID: string (nullable = true)
 |-- LAST_UPDATED_DATE: string (nullable = true)
 |-- LAST_UPDATED_TIMESTAMP: timestamp (nullable = true)
 |-- DUPLICATES: integer (nullable = true)
 |-- POSTED: string (nullable = true)
 |-- EXPIRED: string (nullable = true)
 |-- DURATION: integer (nullable = true)
 |-- SOURCE_TYPES: string (nullable = true)
 |-- SOURCES: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- ACTIVE_URLS: string (nullable = true)
 |-- ACTIVE_SOURCES_INFO: string (nullable = true)
 |-- TITLE_RAW: string (nullable = true)
 |-- BODY: string (nullable = true)
 |-- MODELED_EXPIRED: string (nullable = true)
 |-- MODELED_DURATION: integer (nullable = true)
 |-- COMPANY: integer (nullable = true)
 |-- COMPANY_NAME: string (nullable = true)
 |-- COMPANY_RAW: string (nullable = true)
 |-- COMPANY_IS_STAFFING: boolean (nullable = true)
 |-- EDUCATION_LEVELS: string (nullable = true)
 |-- EDUCATION_LEVELS_NAME: string (nullable = false)
 |-- MIN_EDULEVELS: integer 

In [73]:
columns_to_drop = [
    "ID", "URL", "ACTIVE_URLS", "DUPLICATES", "LAST_UPDATED_TIMESTAMP",
    "NAICS2", "NAICS3", "NAICS4", "NAICS5", "NAICS6",
    "SOC_2", "SOC_3", "SOC_4", "SOC_5",
    "NAICS_2022_2", "NAICS_2022_3","NAICS_2022_4","NAICS_2022_5",
    "NAICS_2022_2_NAME","NAICS_2022_3_NAME","NAICS_2022_4_NAME","NAICS_2022_5_NAME",
    "SOC_5_NAME","SOC_4_NAME","SOC_3_NAME","SOC_2_NAME",
    "NAICS2_NAME","NAICS3_NAME","NAICS4_NAME","NAICS5_NAME",
    "LAST_UPDATED_DATE","EXPIRED","DURATION",    # date variables are not related to our topic 
    "MODELED_EXPIRED","MODELED_DURATION","MODELED_EXPIRED",
    "COMPANY", "TITLE", "SKILLS",                  # remove numerical COMPANY
    "MAX_EDULEVELS","MAX_EDULEVELS_NAME",
    "EMPLOYMENT_TYPE","REMOTE_TYPE", "EMPLOYMENT_TYPE",     # remove numerical EMPLOYMENT_TYPE, REMOTE_TYPE, keeping the text version
    "LOCATION", "CITY","COUNTY","MSA","STATE", 
    "MSA_NAME", "COUNTY_NAME_OUTGOING", "COUNTY_NAME_INCOMING", "MSA_NAME_OUTGOING", "MSA_NAME_INCOMING",   
    "COUNTY_OUTGOING","COUNTY_INCOMING", "MSA_OUTGOING", "MSA_INCOMING",         # LOCATION contains latitude and longitude coordinates, CITY ontains encoded strings instead of actual city names 
    "EDUCATION_LEVELS", "MIN_EDULEVELS",  # encoded strings instead of actual EDUCATION_LEVELS, we keep EDUCATION_LEVELS_NAME. MIN_EDULEVELS contains number 
    "SPECIALIZED_SKILLS", "CERTIFICATIONS", "COMMON_SKILLS", "SOFTWARE_SKILLS",    # encoded strings
    "SOC_2021_2", "SOC_2021_2_NAME", "SOC_2021_3", "SOC_2021_3_NAME", "SOC_2021_5", "SOC_2021_5_NAME",
    "LOT_CAREER_AREA", "LOT_OCCUPATION", "LOT_SPECIALIZED_OCCUPATION", "LOT_OCCUPATION_GROUP","LOT_V6_SPECIALIZED_OCCUPATION", "LOT_V6_CAREER_AREA", "LOT_V6_OCCUPATION_GROUP",  
    "ACTIVE_SOURCES_INFO", "MAX_YEARS_EXPERIENCE", "LIGHTCAST_SECTORS", "LIGHTCAST_SECTORS_NAME", "ORIGINAL_PAY_PERIOD",   # missing values >50%
    "SOURCE_TYPES", "SOURCES", "BODY", 
    "COMPANY_NAME", "TITLE_NAME",  #keep  COMPANY_RAW and TITLE_CLEAN (simplified version)
    "ONET", "ONET_2019","CIP6", "CIP4", "CIP2",       # numerical 
    "CIP6_NAME","CIP4_NAME","CIP2_NAME",     # remove zipcode-related variables, we keep location name from state to county
    
] 

df = df.drop(*columns_to_drop)

In [74]:
df.show(5)

len(df.columns)

[Stage 158:>                                                        (0 + 1) / 1]

+---------+---------+--------------------+-------------------+---------------------+-------------------+--------------------+--------------------+-------------+------+----------------+---------+-----------+--------------------+--------------+----------+--------------------+-----------+--------------------+-----------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+---------------+--------------------+--------------------+-------------------------------+-------------------------+----------------------------------+-----------------+----------------------+----------------------------+-----------------------+------------+--------------------+
|   POSTED|TITLE_RAW|         COMPANY_RAW|COMPANY_IS_STAFFING|EDUCATION_LEVELS_NAME| MIN_EDULEVELS_NAME|EMPLOYMENT_TYPE_NAME|MIN_YEARS_EXPERIENCE|IS_INTERNSHIP|SALARY|REMOTE_TYPE_NAME|SALARY_TO|SALARY_FROM|           CITY_NAME|   COUNTY_NAME|STATE_NAME|         NAICS6_NAME|

                                                                                

38

In [75]:
from pyspark.sql import functions as F

total_rows = df.count()

exprs = [
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
]

missing_row = df.agg(*exprs).collect()[0].asDict()

missing_stats = [(col, missing_row[col], missing_row[col] / total_rows)
                 for col in df.columns]

missing_df = spark.createDataFrame(
    missing_stats,
    ["column", "missing_count", "missing_ratio"]
)

missing_df.orderBy("missing_ratio", ascending=False).show(60)

                                                                                

+--------------------+-------------+--------------------+
|              column|missing_count|       missing_ratio|
+--------------------+-------------+--------------------+
|         COMPANY_RAW|          226|0.007382484565380...|
|         TITLE_CLEAN|           51|0.001665958906346...|
|           TITLE_RAW|           23|7.513148009015778E-4|
|SPECIALIZED_SKILL...|            0|                 0.0|
|              POSTED|            0|                 0.0|
| CERTIFICATIONS_NAME|            0|                 0.0|
| COMPANY_IS_STAFFING|            0|                 0.0|
|  COMMON_SKILLS_NAME|            0|                 0.0|
|EDUCATION_LEVELS_...|            0|                 0.0|
|SOFTWARE_SKILLS_NAME|            0|                 0.0|
|  MIN_EDULEVELS_NAME|            0|                 0.0|
|           ONET_NAME|            0|                 0.0|
|EMPLOYMENT_TYPE_NAME|            0|                 0.0|
|      ONET_2019_NAME|            0|                 0.0|
|MIN_YEARS_EXP

# **1. Unsupervised Learning**

We selected industry codes (NAICS_2022_6), education level(MIN_EDULEVELS_NAME), minimum years of experience (MIN_YEARS_EXPERIENCE), employment type(EMPLOYMENT_TYPE_NAME), and internship status(IS_INTERNSHIP). 

These features reflect the core requirements and structure of job roles, allowing the KMeans algorithm to identify meaningful clusters across the labor market. 

We intentionally excluded geographic and remote-work attributes so that the resulting clusters represent job-type patterns rather than location-driven differences.

In [77]:
cols = [
    "NAICS_2022_6",
    "MIN_EDULEVELS_NAME",
    "MIN_YEARS_EXPERIENCE",
    "EMPLOYMENT_TYPE_NAME",
    "IS_INTERNSHIP"
]


print(df[cols].dtypes)


[('NAICS_2022_6', 'int'), ('MIN_EDULEVELS_NAME', 'string'), ('MIN_YEARS_EXPERIENCE', 'double'), ('EMPLOYMENT_TYPE_NAME', 'string'), ('IS_INTERNSHIP', 'boolean')]
