# Assignment 04

Caroline O’Sullivan (Boston University)  
September 27, 2025

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(45)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("./data/lightcast_job_postings.csv")

# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/08 23:17:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/10/08 23:17:37 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+------

##Feature Engineering
Feature Engineering is a crucial step in preparing your data for machine learning. In this lab, we will focus on the following tasks:

1. Drop rows with missing values in the target variable and key features.
2. By now you are already familiar with the code and the data. Based on your understanding please choose any 3 (my code output has 10) variables as:
three continuous variables and, MIN_YEARS_EXPERIENCE (total 4, use your best judgment!)
two categorical .
Your dependent variable (y) is SALARY.
3. Convert categorical variables into numerical representations using StringIndexer and OneHotEncoder.
4. Assemble features into a single vector using VectorAssembler.
5. Split the data into training and testing sets.
6. You can use pipeline to do the above steps in one go.
7. Create a new column MIN_YEARS_EXPERIENCE_SQ by squaring the MIN_YEARS_EXPERIENCE column.
8. Assemble the polynomial features into a new vector column features_poly using VectorAssembler.
9. Show the final structure of the DataFrame with the new features.
#Missing Value Treatment
1. Replace missing values in Salary by Median of Salary based on the Employment Type, if missing then replace with the overall median of Salary


In [2]:
#Missing Value Treatment
#1. Replace missing values in Salary by Median of Salary based on the Employment Type, if missing then replace with the overall median of Salary

from pyspark.sql import Window
from pyspark.sql.functions import col, when, isnan, count, lit, expr, avg, median, pow
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

#select subset of columns
eda_cols = ["SALARY","MIN_YEARS_EXPERIENCE","EMPLOYMENT_TYPE_NAME","REMOTE_TYPE_NAME","STATE_NAME","MIN_EDULEVELS_NAME","COMPANY_IS_STAFFING","IS_INTERNSHIP"]
df_subset = df.select(eda_cols)

#Fix Remote Type Name incorrect labels to Remote, Hybrid, Onsite.  None and NULL are Onsite.
df_subset = df_subset.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On Site")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when((col("REMOTE_TYPE_NAME").isNull()) | (col("REMOTE_TYPE_NAME") == "[None]"), "On Site")
    .otherwise("On Site")
)

#Clean Employment Type
df_subset = df_subset.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME").like("Part-time (â‰¤ 32%"), "Part Time")
    .when(col("EMPLOYMENT_TYPE_NAME").like("Part-time / full-%"), "Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME").like("Full-time (> 32%"), "Full Time")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Full Time")
    .otherwise("Full Time")  # fallback for anything else
)
#Calculate the median of MIN_YEARS_EXPERIENCE column based on Employment Type
median_by_emp_type_min_exp = df.groupBy("EMPLOYMENT_TYPE").agg(expr('percentile_approx(MIN_YEARS_EXPERIENCE, 0.5)').alias('median_min_years_experience_by_emp_type'))

#Median Salary
median_salary = df_subset.approxQuantile("SALARY", [0.5], 0.25)[0]
median_by_emp_type = (
    df_subset.groupBy("EMPLOYMENT_TYPE_NAME")
    .agg(expr("percentile_approx(SALARY, 0.5)").alias("median_salary_by_emp_type"))
)

df_subset = df_subset.join(median_by_emp_type, on="EMPLOYMENT_TYPE_NAME", how="left")
df_subset = df_subset.withColumn(
    "SALARY",
    when(
        col("SALARY").isNull(),
        when(col("median_salary_by_emp_type").isNotNull(), col("median_salary_by_emp_type"))
        .otherwise(lit(median_salary))
    ).otherwise(col("SALARY"))
).drop("median_salary_by_emp_type")

#Median MIN_YEARS_EXPERIENCE
median_by_emp_type_min_exp = df.groupBy("EMPLOYMENT_TYPE_NAME").agg(
    expr('percentile_approx(MIN_YEARS_EXPERIENCE, 0.5)').alias('median_min_years_experience_by_emp_type')
)
df_subset = df_subset.join(median_by_emp_type_min_exp, on="EMPLOYMENT_TYPE_NAME", how="left")
df_subset = df_subset.withColumn(
    "MIN_YEARS_EXPERIENCE",
    when(
        col("MIN_YEARS_EXPERIENCE").isNull(),
        when(col("median_min_years_experience_by_emp_type").isNotNull(), col("median_min_years_experience_by_emp_type"))
        .otherwise(lit(0))  # fallback if needed
    ).otherwise(col("MIN_YEARS_EXPERIENCE"))
).drop("median_min_years_experience_by_emp_type")

#False for IS_INTERNSHIP and COMPANY_IS_STAFFING if NULL
df_subset = df_subset.withColumn(
    "IS_INTERNSHIP",
    when(col("IS_INTERNSHIP").isNull(), lit(False)).otherwise(col("IS_INTERNSHIP"))
).withColumn(
    "COMPANY_IS_STAFFING",
    when(col("COMPANY_IS_STAFFING").isNull(), lit(False)).otherwise(col("COMPANY_IS_STAFFING"))
)

df_subset = df_subset.withColumn("IS_INTERNSHIP_num", col("IS_INTERNSHIP").cast("int"))
df_subset = df_subset.withColumn("COMPANY_IS_STAFFING_num", col("COMPANY_IS_STAFFING").cast("int"))

df_subset = df_subset.dropna()

                                                                                

Linear Regression Model

In [3]:
#Feature Engineering
#String Indexing and One Hot Encoding for Categorical Variables
categorical_cols = ["EMPLOYMENT_TYPE_NAME","REMOTE_TYPE_NAME","MIN_EDULEVELS_NAME","STATE_NAME"]
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid='skip') for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") for col in categorical_cols]

assembler = VectorAssembler(
    inputCols=[
        "MIN_YEARS_EXPERIENCE",
        "COMPANY_IS_STAFFING",
        "IS_INTERNSHIP"
    ] + [f"{col}_vec" for col in categorical_cols],
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + encoders + [assembler])
data = pipeline.fit(df_subset).transform(df_subset)

                                                                                

In [4]:
#Normalize Features
from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features",outputCol="normFeatures",p=2.0)

# Apply normalization
data_normalized = normalizer.transform(data)

# Optional: view results
data_normalized.select("features", "normFeatures", "SALARY").show(5, truncate=False)


[Stage 48:>                                                         (0 + 1) / 1]

+---------------------------------------+------------------------------------------------------------------------------------------------------------------+--------+
|features                               |normFeatures                                                                                                      |SALARY  |
+---------------------------------------+------------------------------------------------------------------------------------------------------------------+--------+
|(62,[0,4,5,7,23],[3.0,1.0,1.0,1.0,1.0])|(62,[0,4,5,7,23],[0.8320502943378437,0.2773500981126146,0.2773500981126146,0.2773500981126146,0.2773500981126146])|103573.0|
|(62,[4,5,8,16],[1.0,1.0,1.0,1.0])      |(62,[4,5,8,16],[0.5,0.5,0.5,0.5])                                                                                 |86390.0 |
|(62,[0,4,5,7,22],[3.0,1.0,1.0,1.0,1.0])|(62,[0,4,5,7,22],[0.8320502943378437,0.2773500981126146,0.2773500981126146,0.2773500981126146,0.2773500981126146])|86390.0 |
|(62

                                                                                

In [4]:
#Test Train Split
train_data, test_data = data.randomSplit([0.8, 0.2], seed=45)

In [5]:
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.sql import Row

feature_names = assembler.getInputCols()

glr = GeneralizedLinearRegression(
    featuresCol="features",
    labelCol="SALARY",
    family="gaussian",
    link="identity",
    maxIter=10,
    regParam=0.3,
)

glr_model = glr.fit(train_data)
glr_summary = glr_model.summary


                                                                                

In [6]:
#Model Summary

print("Intercept: {:.4f}".format(glr_model.intercept))
print("Coefficients:")
for i, coef in enumerate(glr_model.coefficients):
    print(f" Feature {i + 1}: {coef:.4f}")

Intercept: 102278.9407
Coefficients:
 Feature 1: 2378.7010
 Feature 2: -2835.2513
 Feature 3: -3435.8114
 Feature 4: 12006.3027
 Feature 5: -8290.8537
 Feature 6: 1538.9051
 Feature 7: 1088.4958
 Feature 8: -5950.0787
 Feature 9: -3615.2379
 Feature 10: -22564.1717
 Feature 11: -11979.7180
 Feature 12: 6353.9330
 Feature 13: -1553.1378
 Feature 14: 2688.0705
 Feature 15: -2274.1549
 Feature 16: 490.1882
 Feature 17: 576.8265
 Feature 18: -164.8164
 Feature 19: -1131.9933
 Feature 20: -1786.4970
 Feature 21: -2290.7826
 Feature 22: 1624.7187
 Feature 23: -2157.5368
 Feature 24: 2.7998
 Feature 25: -687.9366
 Feature 26: -2652.3943
 Feature 27: 3176.2818
 Feature 28: -1391.5348
 Feature 29: -3838.0329
 Feature 30: -2622.0994
 Feature 31: -1952.8745
 Feature 32: -1542.7176
 Feature 33: -641.1122
 Feature 34: -3499.4323
 Feature 35: -930.2304
 Feature 36: -996.0715
 Feature 37: 2871.9190
 Feature 38: -2375.4212
 Feature 39: -2173.8021
 Feature 40: -1138.4543
 Feature 41: -4351.5362
 Featur

In [7]:
#Summary Statistics
print("Coefficient Standard Errors:", [f"{val:.4f}" for val in glr_summary.coefficientStandardErrors])
print("T Values:", [f"{val:.4f}" for val in glr_summary.tValues])
print("P Values:", [f"{val:.4f}" for val in glr_summary.pValues])

[Stage 64:>                                                         (0 + 1) / 1]

Coefficient Standard Errors: ['32.1098', '345.5202', '758.5322', '998.1229', '1182.6848', '667.5953', '711.0274', '2935.8109', '2938.5952', '2974.3190', '2986.2789', '3017.8056', '3103.7287', '3105.9301', '3125.4587', '3126.0381', '3126.9004', '3129.9410', '3139.8876', '3141.4249', '3143.4861', '3143.4611', '3153.0125', '3157.2838', '3167.5916', '3177.9271', '3177.1376', '3188.2625', '3189.0496', '3192.7196', '3202.2389', '3205.5528', '3208.6622', '3227.4155', '3228.2393', '3244.4764', '3257.6197', '3281.2210', '3307.3549', '3317.3929', '3315.0877', '3323.9190', '3323.6527', '3334.4591', '3339.7964', '3331.9678', '3364.9221', '3402.6602', '3400.3004', '3395.5481', '3402.1817', '3423.3624', '3502.2973', '3560.7282', '3596.8296', '3635.7330', '3648.5880', '3711.9215', '3718.6007', '3826.0438', '3879.8121', '4034.6588', '4397.4497']
T Values: ['74.0801', '-8.2057', '-4.5296', '12.0289', '-7.0102', '2.3051', '1.5309', '-2.0267', '-1.2303', '-7.5863', '-4.0116', '2.1055', '-0.5004', '0.8655

                                                                                

In [9]:
#Dispersion
print(f"Null Deviance: {glr_summary.nullDeviance:.4f}")
print(f"Residual DF Null: {glr_summary.residualDegreeOfFreedomNull}")
print(f"Residual DF: {glr_summary.residualDegreeOfFreedom}")
print(f"AIC: {glr_summary.aic:.4f}")
print(f"Deviance: {glr_summary.deviance:.4f}")

Null Deviance: 51607738345410.0703
Residual DF Null: 57956
Residual DF: 57894


[Stage 81:>                                                         (0 + 1) / 1]

AIC: 1350095.6573
Deviance: 44306505655818.4141


                                                                                

In [11]:
feature_names = glr_summary._call_java("featureNames")
features = ["Intercept"] + feature_names
coefs = [glr_model.intercept] + list(glr_model.coefficients)
se = list(glr_summary.coefficientStandardErrors)
t_values = list(glr_summary.tValues)
p_values = list(glr_summary.pValues)

print("Length of features:", len(features))
print("Length of coefficients:", len(coefs))
print("Length of standard errors:", len(se))
print("Length of t-values:", len(t_values))
print("Length of p-values:", len(p_values))

Length of features: 63
Length of coefficients: 63
Length of standard errors: 63
Length of t-values: 63
Length of p-values: 63


In [17]:
import pandas as pd
from tabulate import tabulate
from IPython.display import HTML

summary_df = pd.DataFrame({
    "Feature": features,
    "Estimate": [f"{v:.4f}" if v is not None else None for v in coefs],
    "Std. Error": [f"{v:.4f}" if v is not None else None for v in se],
    "t-value": [f"{v:.4f}" if v is not None else None for v in t_values],
    "p-value": [f"{v:.4f}" if v is not None else None for v in p_values]
})
summary_df.to_csv("output/glr_model_summary.csv", index=False)
summary_df

Unnamed: 0,Feature,Estimate,Std. Error,t-value,p-value
0,Intercept,102278.9407,32.1098,74.0801,0.0000
1,MIN_YEARS_EXPERIENCE,2378.7010,345.5202,-8.2057,0.0000
2,COMPANY_IS_STAFFING,-2835.2513,758.5322,-4.5296,0.0000
3,IS_INTERNSHIP,-3435.8114,998.1229,12.0289,0.0000
4,EMPLOYMENT_TYPE_NAME_vec_Full Time,12006.3027,1182.6848,-7.0102,0.0000
...,...,...,...,...,...
58,STATE_NAME_vec_Alaska,-7546.0152,3718.6007,0.8721,0.3831
59,STATE_NAME_vec_Vermont,3243.1689,3826.0438,0.2121,0.8320
60,STATE_NAME_vec_Montana,811.4712,3879.8121,-1.9168,0.0553
61,STATE_NAME_vec_West Virginia,-7436.7514,4034.6588,-2.7238,0.0065


# Random Forest