---
title: Assignment 04
author:
  - name: Wei Wang
    affiliations:
      - id: bu
        name: Boston University
        city: Boston
        state: MA
number-sections: true
date: '2025-10-04'
date-modified: today
date-format: long
format:
  html:
    theme: cerulean
    toc: true
    toc-depth: 2

execute:
  echo: false
  eval: false
  freeze: auto
---

# Load the Dataset

In [22]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")

# Show Schema and Sample Data
# print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
df.show(5)
print((df.count(),len(df.columns)))

                                                                                

+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+------

[Stage 117:>                                                        (0 + 1) / 1]

(72498, 131)


                                                                                

# Missing Value Treatment
Replace the missing values in Salary by Median of Salary based on the REMOTE_TYPE_NAME, if missing then replace with the overall median of Salary.

In [None]:

from pyspark.sql import Window
from pyspark.sql.functions import col, when, isnan, count, expr, median
from pyspark.sql import functions as F

# Calculate overall median salary
overall_median_salary = df.approxQuantile("SALARY", [0.5], 0.01)[0]

# Calculate median salary by REMOTE_TYPE_NAME
remote_type_median = (
    df.groupBy("REMOTE_TYPE_NAME")
      .agg(expr("percentile_approx(SALARY, 0.5)").alias("remote_type_median_salary"))
)

# Join median values back to the original dataframe
df_salary_imputed = df.join(remote_type_median, on="REMOTE_TYPE_NAME", how="left")

# Replace missing SALARY values
df_salary_imputed = df_salary_imputed.withColumn(
    "SALARY",
    when(col("SALARY").isNull(),
         when(col("remote_type_median_salary").isNotNull(), col("remote_type_median_salary"))
         .otherwise(overall_median_salary)
    ).otherwise(col("SALARY"))
)

df_salary_imputed.show(5, truncate=False)
print((df_salary_imputed.count(),len(df_salary_imputed.columns)))

                                                                                

+----------------+----------------------------------------+-----------------+-----------------------+----------+--------+---------+--------+-------------------+---------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

[Stage 128:>                                                        (0 + 1) / 1]

(72498, 132)


                                                                                

# Feature Engineering

In [27]:
#| eval: true
#| echo: false
#| fig-align: center

from pyspark.sql.functions import col, pow, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import BooleanType, StringType, IntegerType

# Creating simplified REMOTE_TYPE column
df_final = df_salary_imputed.withColumn(
    "REMOTE_TYPE",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .otherwise("Onsite")
)

# Drop rows with missing values in relevant columns
regression_df = df_final.dropna(subset=[
    "SALARY", "MIN_YEARS_EXPERIENCE", "DURATION",
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP",
     "STATE_NAME", "REMOTE_TYPE","remote_type_median_salary"
]).select(
    "SALARY", "MIN_YEARS_EXPERIENCE", "DURATION",
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP",
     "STATE_NAME", "REMOTE_TYPE","remote_type_median_salary"
)

# Cast boolean columns to integer
regression_df = regression_df.withColumn("IS_INTERNSHIP", col("IS_INTERNSHIP").cast(IntegerType()))
regression_df = regression_df.withColumn("COMPANY_IS_STAFFING", col("COMPANY_IS_STAFFING").cast(IntegerType()))

# Categorical columns
categorical_cols = ["STATE_NAME", "REMOTE_TYPE"]

# Index and One-Hot Encode
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid='skip') for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") for col in categorical_cols]

# Create squared experience feature
regression_df = regression_df.withColumn("MIN_YEARS_EXPERIENCE_SQ", pow(col("MIN_YEARS_EXPERIENCE"), 2))

# Assemble base features
base_features = [
    "MIN_YEARS_EXPERIENCE", "DURATION", "COMPANY_IS_STAFFING",
    "IS_INTERNSHIP", "STATE_NAME_vec", "REMOTE_TYPE_vec"
]

# Assemble polynomial features (add squared column)
poly_features = base_features + ["MIN_YEARS_EXPERIENCE_SQ"]

assembler_base = VectorAssembler(inputCols=base_features, outputCol="features")
assembler_poly = VectorAssembler(inputCols=poly_features, outputCol="features_poly")

# Build pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler_base, assembler_poly])

# Fit and transform

regression_data = pipeline.fit(regression_df).transform(regression_df)

regression_data.select("SALARY","features","features_poly").show(5, truncate=False)
print((regression_data.count(),len(regression_data.columns)))

                                                                                

+--------+-----------------------------------------+-------------------------------------------------+
|SALARY  |features                                 |features_poly                                    |
+--------+-----------------------------------------+-------------------------------------------------+
|118560.0|(56,[0,1,2,27,55],[5.0,20.0,1.0,1.0,1.0])|(57,[0,1,2,27,55,56],[5.0,20.0,1.0,1.0,1.0,25.0])|
|112800.0|(56,[0,1,7,55],[1.0,30.0,1.0,1.0])       |(57,[0,1,7,55,56],[1.0,30.0,1.0,1.0,1.0])        |
|156038.0|(56,[0,1,34,55],[2.0,15.0,1.0,1.0])      |(57,[0,1,34,55,56],[2.0,15.0,1.0,1.0,4.0])       |
|141600.0|(56,[0,1,30,55],[7.0,54.0,1.0,1.0])      |(57,[0,1,30,55,56],[7.0,54.0,1.0,1.0,49.0])      |
|130000.0|(56,[0,1,16,55],[5.0,5.0,1.0,1.0])       |(57,[0,1,16,55,56],[5.0,5.0,1.0,1.0,25.0])       |
+--------+-----------------------------------------+-------------------------------------------------+
only showing top 5 rows


                                                                                

(30018, 15)


# Train/Test Split
I split the data into 80% training and 20% testing, a common and balanced choice for regression tasks. This ensures the model has sufficient data to learn patterns effectively while keeping enough unseen data to evaluate performance reliably. With over 24,000 training samples, the model has sufficient data to learn complex patterns without overfitting.

In [28]:
#| eval: true
#| echo: false
#| fig-align: center

# Split Data
regression_train, regression_test = regression_data.randomSplit([0.8, 0.2], seed=42)

print((regression_data.count(), len(regression_data.columns)))
print((regression_train.count(), len(regression_train.columns)))
print((regression_test.count(), len(regression_test.columns)))

                                                                                

(30018, 15)


                                                                                

(24046, 15)


                                                                                

(5972, 15)
