---
title: Assignment 04
subtitle: https://github.com/met-ad-688/assignment-04-leoliu36.git
author:
  - name: Leo Liu
    affiliations:
      - id: bu
        name: Boston University
        city: Boston
        state: MA
number-sections: true
date: '2025-9-30'
format:
  docx:
    toc: true
    number-sections: true
date-modified: today
date-format: long
execute:
  echo: false
  eval: true
  freeze: auto
---

In [1]:
# Data Loading & Setup
import pandas as pd
import plotly.express as px
import plotly.io as pio
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when, trim, monotonically_increasing_id, pow, length, sum as spark_sum
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

np.random.seed(42)

pio.renderers.default = "notebook"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine", "true").option("quote", "\"").option("escape", "\"").csv("../data/lightcast_job_postings.csv")
df.createOrReplaceTempView("job_postings")

# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")
#df.printSchema() # comment this line when rendering the submission
df.show(5)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/01 05:29:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/10/01 05:29:19 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+------

In [None]:
# Drop columns that are not needed for this analysis 
columns_to_drop = [
  # tracking & other metadata
    "ID", "LAST_UPDATED_DATE", "LAST_UPDATED_TIMESTAMP", "DUPLICATES",
    "SOURCE_TYPES", "SOURCES", "URL", "ACTIVE_URLS", "ACTIVE_SOURCES_INFO", "MODELED_EXPIRED", "MODELED_DURATION", "TITLE_RAW", "ORIGINAL_PAY_PERIOD"
  # outdated NAICS and SOC codes
    "NAICS2", "NAICS2_NAME", "NAICS3", "NAICS3_NAME",
    "NAICS4", "NAICS4_NAME", "NAICS5", "NAICS5_NAME",
    "NAICS6", "NAICS6_NAME", 
    "SOC_2", "SOC_2_NAME", "SOC_3", "SOC_3_NAME",
    "SOC_4", "SOC_4_NAME", "SOC_5", "SOC_5_NAME",
    "SOC_2021_2", "SOC_2021_2_NAME", "SOC_2021_3", "SOC_2021_3_NAME",
    "SOC_2021_5", "SOC_2021_5_NAME",
    "NAICS_2022_2", "NAICS_2022_2_NAME", "NAICS_2022_3", "NAICS_2022_3_NAME",
    "NAICS_2022_4", "NAICS_2022_4_NAME", "NAICS_2022_5", "NAICS_2022_5_NAME"
  # Location encodings
    "COUNTY_OUTGOING", "COUNTY_NAME_OUTGOING",
    "COUNTY_INCOMING", "COUNTY_NAME_INCOMING",
    "MSA_OUTGOING", "MSA_NAME_OUTGOING",
    "MSA_INCOMING", "MSA_NAME_INCOMING"
]

# Drop columns 
df = df.drop(*columns_to_drop)

# Show resulting schema
df.printSchema()

In [10]:
# Define columns for EDA: 
## dependent variable: SALARY
## indepdendent variable: MIN_YEARS_EXPERIENCE, SALARY_FROM, SALARY_TO, DURATION 
## categorical variables: COMPANY_IS_STAFFING, IS_INTERNSHIP, REMOTE_TYPE_NAME, EMPLOYMENT_TYPE_NAME, MIN_EDULEVELS_NAME, MAX_EDULEVELS_NAME, STATE_NAME

from pyspark.sql.functions import col, pow
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

eda_columns = [
    "SALARY", "SALARY_FROM", "SALARY_TO",
    "MIN_YEARS_EXPERIENCE", "DURATION",
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP", "REMOTE_TYPE_NAME", "EMPLOYMENT_TYPE_NAME",
    "MIN_EDULEVELS_NAME", "STATE_NAME"
]
df_eda = df.select(eda_columns)
df_eda.show(5, truncate=False)

+------+-----------+---------+--------------------+--------+-------------------+-------------+----------------+----------------------+-------------------+----------+
|SALARY|SALARY_FROM|SALARY_TO|MIN_YEARS_EXPERIENCE|DURATION|COMPANY_IS_STAFFING|IS_INTERNSHIP|REMOTE_TYPE_NAME|EMPLOYMENT_TYPE_NAME  |MIN_EDULEVELS_NAME |STATE_NAME|
+------+-----------+---------+--------------------+--------+-------------------+-------------+----------------+----------------------+-------------------+----------+
|NULL  |NULL       |NULL     |2                   |6       |false              |false        |[None]          |Full-time (> 32 hours)|Bachelor's degree  |Arkansas  |
|NULL  |NULL       |NULL     |3                   |NULL    |true               |false        |Remote          |Full-time (> 32 hours)|No Education Listed|Maine     |
|NULL  |NULL       |NULL     |5                   |35      |false              |false        |[None]          |Full-time (> 32 hours)|Bachelor's degree  |Texas     |
|NUL

In [None]:
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when, trim, monotonically_increasing_id, pow, length, sum as spark_sum
import hvplot.pandas

# Visualize the percentage of missing values for each column
df_na = df_eda.select([
    spark_sum(
        when(col(c).isNull() | (length(trim(col(c))) == 0), 1)
    ).alias(c)
    for c in df_eda.columns
])

df_na_pd = df_na.toPandas().T.reset_index()
df_na_pd.columns = ["column", "missing_count"]

total_rows = df.count()
df_na_pd["missing_pct"] = df_na_pd["missing_count"] / total_rows * 100


df_na_pd.sort_values("missing_pct", ascending=False).hvplot.bar(
    x="column",
    y="missing_pct",
    title="Percentage of Missing Values by Column",
    xlabel="Column Name",
    ylabel="Percentage of Missing Values",
    rot=45,
    height=600,
    width=1000
)

In [None]:
import pandas as pd
import hvplot.pandas  # make sure this is imported for hvplot support

# Sample a small fraction of the data and convert to Pandas
df_sample = df_eda.sample(fraction=0.05, seed=42).toPandas()

# Create a boolean mask of missing values
missing_mask = df_sample.isnull()

# Melt the mask into long-form format
missing_long = (
    missing_mask.reset_index()
    .melt(id_vars="index", var_name="column", value_name="is_missing")
)

# Convert boolean to int (True → 1, False → 0)
missing_long["is_missing"] = missing_long["is_missing"].astype(int)

missing_long["index"] = missing_long["index"].astype(str)
missing_long["column"] = missing_long["column"].astype(str)
missing_long["is_missing"] = missing_long["is_missing"].astype(int)

# Plot heatmap
missing_long.hvplot.heatmap(
    x="column", y="index", C="is_missing",
    cmap="Reds", colorbar=False,
    width=900, height=700,
    title="Heatmap of Missing Values (Sample)"
).opts(xrotation=45)


In [20]:
from pyspark.sql.functions import countDistinct

df_eda.select([countDistinct(col(c)).alias(c) for c in df_eda.columns]).show(truncate=False)

[Stage 42:>                                                         (0 + 1) / 1]

+------+-----------+---------+--------------------+--------+-------------------+-------------+----------------+--------------------+------------------+----------+
|SALARY|SALARY_FROM|SALARY_TO|MIN_YEARS_EXPERIENCE|DURATION|COMPANY_IS_STAFFING|IS_INTERNSHIP|REMOTE_TYPE_NAME|EMPLOYMENT_TYPE_NAME|MIN_EDULEVELS_NAME|STATE_NAME|
+------+-----------+---------+--------------------+--------+-------------------+-------------+----------------+--------------------+------------------+----------+
|6052  |4172       |4455     |16                  |60      |2                  |2            |4               |3                   |6                 |51        |
+------+-----------+---------+--------------------+--------+-------------------+-------------+----------------+--------------------+------------------+----------+



                                                                                

In [21]:
# Define categorical column(s) to inspect
categorical_cols = [
    "REMOTE_TYPE_NAME", "COMPANY_IS_STAFFING", "IS_INTERNSHIP", "EMPLOYMENT_TYPE_NAME", "MIN_EDULEVELS_NAME", "STATE_NAME"
]

# Display distinct values for each categorical column
for colname in categorical_cols:
    print(f"\n---- {colname} ----")
    df_eda.select(colname).distinct().show(50, truncate=False)


---- REMOTE_TYPE_NAME ----


                                                                                

+----------------+
|REMOTE_TYPE_NAME|
+----------------+
|Remote          |
|[None]          |
|Not Remote      |
|Hybrid Remote   |
|NULL            |
+----------------+


---- COMPANY_IS_STAFFING ----


                                                                                

+-------------------+
|COMPANY_IS_STAFFING|
+-------------------+
|true               |
|false              |
|NULL               |
+-------------------+


---- IS_INTERNSHIP ----


                                                                                

+-------------+
|IS_INTERNSHIP|
+-------------+
|true         |
|false        |
|NULL         |
+-------------+


---- EMPLOYMENT_TYPE_NAME ----


                                                                                

+------------------------+
|EMPLOYMENT_TYPE_NAME    |
+------------------------+
|Part-time / full-time   |
|Part-time (â‰¤ 32 hours)|
|Full-time (> 32 hours)  |
|NULL                    |
+------------------------+


---- MIN_EDULEVELS_NAME ----


                                                                                

+----------------------------+
|MIN_EDULEVELS_NAME          |
+----------------------------+
|Bachelor's degree           |
|Ph.D. or professional degree|
|High school or GED          |
|Master's degree             |
|No Education Listed         |
|Associate degree            |
|NULL                        |
+----------------------------+


---- STATE_NAME ----


[Stage 63:>                                                         (0 + 1) / 1]

+---------------------------------------+
|STATE_NAME                             |
+---------------------------------------+
|Utah                                   |
|Hawaii                                 |
|Minnesota                              |
|Ohio                                   |
|Arkansas                               |
|Oregon                                 |
|Texas                                  |
|North Dakota                           |
|Pennsylvania                           |
|Connecticut                            |
|Nebraska                               |
|Vermont                                |
|Nevada                                 |
|Washington                             |
|Illinois                               |
|Oklahoma                               |
|Delaware                               |
|Alaska                                 |
|New Mexico                             |
|West Virginia                          |
|Missouri                         

                                                                                

In [None]:
# For REMOTE_TYPE_NAME replace Remote with Remote, [None] with undefined, Not Remote with On Premise, Hybrid Remote with Hybrid, and Null with On Premise 

from pyspark.sql.functions import col, when

df_eda = df_eda.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "[None]", "On Premise")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On Premise")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when(col("REMOTE_TYPE_NAME").isNull(), "On Premise")
    .otherwise(col("REMOTE_TYPE_NAME"))
)

# create a temporary SQL view if using Spark SQL queries later
df_eda.createOrReplaceTempView("df_eda")

+------+-----------+---------+--------------------+--------+-------------------+-------------+----------------+----------------------+-------------------+----------+
|SALARY|SALARY_FROM|SALARY_TO|MIN_YEARS_EXPERIENCE|DURATION|COMPANY_IS_STAFFING|IS_INTERNSHIP|REMOTE_TYPE_NAME|EMPLOYMENT_TYPE_NAME  |MIN_EDULEVELS_NAME |STATE_NAME|
+------+-----------+---------+--------------------+--------+-------------------+-------------+----------------+----------------------+-------------------+----------+
|NULL  |NULL       |NULL     |2                   |6       |false              |false        |On Premise      |Full-time (> 32 hours)|Bachelor's degree  |Arkansas  |
|NULL  |NULL       |NULL     |3                   |NULL    |true               |false        |Remote          |Full-time (> 32 hours)|No Education Listed|Maine     |
|NULL  |NULL       |NULL     |5                   |35      |false              |false        |On Premise      |Full-time (> 32 hours)|Bachelor's degree  |Texas     |
|NUL

In [None]:
from pyspark.sql.functions import countDistinct

# Count number of unique values per column
df_eda.select([
    countDistinct(c).alias(c + "_nunique")
    for c in df_eda.columns
]).show(truncate=False)

categorical_cols = [
    "STATE_NAME", "REMOTE_TYPE_NAME", "EMPLOYMENT_TYPE_NAME",
    "MIN_EDULEVELS_NAME", "MAX_EDULEVELS_NAME",
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP"
]

for colname in categorical_cols:
    print(f"\n---- {colname} ----")
    df_eda.select(colname).distinct().show(50, truncate=False)

In [None]:
from pyspark.sql.functions import col, when

# Calculate median of the DURATION column
median_duration = df_eda.approxQuantile("DURATION", [0.5], 0.01)[0]

# Fill missing DURATION values with the median (assume 30 if needed)
df_eda = df_eda.withColumn(
    "DURATION",
    when(col("DURATION").isNull(), median_duration).otherwise(col("DURATION"))
)

In [None]:
import hvplot.pandas  # needed to enable hvplot on Pandas DataFrames
from pyspark.sql.functions import col, when, trim, length, sum as spark_sum

# Calculate missing value counts for each column in df_eda
missing_df = df_eda.select([
    spark_sum(
        when(col(c).isNull() | (length(trim(col(c))) == 0), 1).otherwise(0)
    ).alias(c)
    for c in df_eda.columns
])

# Convert to pandas for visualization
missing_pd = missing_df.toPandas().T.reset_index()
missing_pd.columns = ["column", "missing_count"]

# Get total number of rows in original df
total_rows = df_eda.count()

# Calculate missing percentage
missing_pd["missing_pct"] = 100 * missing_pd["missing_count"] / total_rows

# Plot missing values using hvplot
missing_pd.sort_values("missing_pct", ascending=False).hvplot.bar(
    x="column", y="missing_pct", rot=90,
    title="Percentage of Missing Values by Column",
    height=600, width=900,
    ylabel="Missing Percentage (%)", xlabel="Features"
).opts(xrotation=45)