---
title: Assignment 04
author:
  - name: Dakota Alder
    affiliations:
      - id: bu
        name: Boston University
        city: Boston
        state: MA
number-sections: true
date: '2025-10-05'
date-modified: today
date-format: long
format:
  html:
    theme: cerulean
    toc: true
    toc-depth: 2

execute:
  echo: false
  eval: false
  freeze: auto
---

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/08 07:59:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [2]:
df_grouped = df.select("EMPLOYMENT_TYPE_NAME").distinct().show(10, truncate=False)



+------------------------+
|EMPLOYMENT_TYPE_NAME    |
+------------------------+
|Part-time / full-time   |
|Part-time (â‰¤ 32 hours)|
|Full-time (> 32 hours)  |
|NULL                    |
+------------------------+



                                                                                

In [7]:
# Missing Value Treatment
#| eval: true
#| echo: false
#| fig-align: center

df_cleaned = df.select(
    "SALARY", "DURATION", "MIN_YEARS_EXPERIENCE", "EMPLOYMENT_TYPE_NAME",
    "REMOTE_TYPE_NAME", "IS_INTERNSHIP"
)

#Cleaning the Remote Type Column
from pyspark.sql.functions import when, col
df_cleaned = df_cleaned.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when(col("REMOTE_TYPE_NAME") == "[None]", "On-site")
    .when(col("REMOTE_TYPE_NAME").isNull(), "On-site")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On-site")
    .otherwise(col("REMOTE_TYPE_NAME"))
)



#Cleaning the Employment Type Column
df_cleaned = df_cleaned.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME") == "Part-time / full-time", "Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Full-Time")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Part-time (â‰¤ 32 hours)", "Part-Time")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Full-time (> 32 hours)", "Full-Time")
    .otherwise(col("EMPLOYMENT_TYPE_NAME")) 
)

#Cleaning Duration (Adding Median to Nulls)
median_duration = df_cleaned.approxQuantile("DURATION", [0.5], 0.01)[0]
df_cleaned = df_cleaned.withColumn(
    "DURATION",
    when(col("DURATION").isNull(), median_duration).otherwise(col("DURATION"))
)
df_cleaned.show(5, truncate=False)

                                                                                

+------+--------+--------------------+--------------------+----------------+-------------+
|SALARY|DURATION|MIN_YEARS_EXPERIENCE|EMPLOYMENT_TYPE_NAME|REMOTE_TYPE_NAME|IS_INTERNSHIP|
+------+--------+--------------------+--------------------+----------------+-------------+
|NULL  |6.0     |2                   |Full-Time           |On-site         |false        |
|NULL  |18.0    |3                   |Full-Time           |Remote          |false        |
|NULL  |35.0    |5                   |Full-Time           |On-site         |false        |
|NULL  |48.0    |3                   |Full-Time           |On-site         |false        |
|92500 |15.0    |NULL                |Flexible            |On-site         |false        |
+------+--------+--------------------+--------------------+----------------+-------------+
only showing top 5 rows


In [8]:
#| eval: true
#| echo: false
#| fig-align: center

from pyspark.sql.functions import col, pow
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import BooleanType, StringType, IntegerType

# Data Cleaning

regression_df = df_cleaned.dropna(subset=[
    "SALARY", "MIN_YEARS_EXPERIENCE",
    "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME", "DURATION", "IS_INTERNSHIP"
]).select("SALARY", "MIN_YEARS_EXPERIENCE",
    "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME", "DURATION", "IS_INTERNSHIP")

#Categorical Encoding
categorical_columns = ["EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME", "IS_INTERNSHIP"]

#Convert Boolean to Integer
regression_df = regression_df.withColumn("IS_INTERNSHIP", col("IS_INTERNSHIP").cast(IntegerType()))

regression_df.show(5, truncate=False)


+------+--------------------+--------------------+----------------+--------+-------------+
|SALARY|MIN_YEARS_EXPERIENCE|EMPLOYMENT_TYPE_NAME|REMOTE_TYPE_NAME|DURATION|IS_INTERNSHIP|
+------+--------------------+--------------------+----------------+--------+-------------+
|92962 |2                   |Full-Time           |On-site         |18.0    |0            |
|107645|10                  |Full-Time           |On-site         |18.0    |0            |
|192800|6                   |Full-Time           |On-site         |55.0    |0            |
|125900|12                  |Full-Time           |On-site         |18.0    |0            |
|170000|6                   |Full-Time           |On-site         |18.0    |0            |
+------+--------------------+--------------------+----------------+--------+-------------+
only showing top 5 rows


In [12]:


#Indexing and One-Hot Encoding

indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid='skip') for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") for col in categorical_columns]

#Assemble Features (For Linear Regression and Random Forest)
assembler = VectorAssembler(
    inputCols=[
        "MIN_YEARS_EXPERIENCE", "DURATION",
        "IS_INTERNSHIP"
        ] + [f"{col}_vec" for col in categorical_columns],
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + encoders + [assembler])
regression_data = pipeline.fit(regression_df).transform(regression_df)

#Squared Min Years Experience Column

regression_data = regression_data.withColumn("MIN_YEARS_EXPERIENCE_SQ", pow(col("MIN_YEARS_EXPERIENCE"), 2))

#Assemble Features (For Polynomial Regression)

assembler_poly = VectorAssembler(
    inputCols=[
        "MIN_YEARS_EXPERIENCE", "MIN_YEARS_EXPERIENCE_SQ", "DURATION",
        "IS_INTERNSHIP"
    ] + [f"{col}_vec" for col in categorical_columns],
    outputCol="poly_features"
)
regression_data = assembler_poly.transform(regression_data)

regression_data.select("SALARY", "features", "poly_features").show(5, truncate=False)





                                                                                

+------+-----------------------------------+-----------------------------------------+
|SALARY|features                           |poly_features                            |
+------+-----------------------------------+-----------------------------------------+
|92962 |[2.0,18.0,0.0,1.0,0.0,1.0,0.0,1.0] |[2.0,4.0,18.0,0.0,1.0,0.0,1.0,0.0,1.0]   |
|107645|[10.0,18.0,0.0,1.0,0.0,1.0,0.0,1.0]|[10.0,100.0,18.0,0.0,1.0,0.0,1.0,0.0,1.0]|
|192800|[6.0,55.0,0.0,1.0,0.0,1.0,0.0,1.0] |[6.0,36.0,55.0,0.0,1.0,0.0,1.0,0.0,1.0]  |
|125900|[12.0,18.0,0.0,1.0,0.0,1.0,0.0,1.0]|[12.0,144.0,18.0,0.0,1.0,0.0,1.0,0.0,1.0]|
|170000|[6.0,18.0,0.0,1.0,0.0,1.0,0.0,1.0] |[6.0,36.0,18.0,0.0,1.0,0.0,1.0,0.0,1.0]  |
+------+-----------------------------------+-----------------------------------------+
only showing top 5 rows


In [10]:
#| eval: true
#| echo: false
#| fig-align: center

regression_train_data, regression_test_data = regression_data.randomSplit([0.8, 0.2], seed=42)
print((regression_data.count(), len(regression_data.columns)))
print((regression_train_data.count(), len(regression_train_data.columns)))
print((regression_test_data.count(), len(regression_test_data.columns)))

                                                                                

(23697, 13)


                                                                                

(18966, 13)


[Stage 36:>                                                         (0 + 1) / 1]

(4731, 13)


                                                                                