---
title: Assignment 04
subtitle: "Regression Analysis in Pyspark"
author:
  - name: Tracy Anyasi
    affiliations:
      - id: bu
        name: Boston University
        city: Boston
        state: MA
number-sections: true
date: '2025-10-08'
date-modified: today
date-format: long
format:
  html:
    theme: cerulean
    toc: true
    toc-depth: 2

execute:
  echo: false
  eval: false
  freeze: auto
---

In [61]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("./data/lightcast_job_postings.csv")

# Show Schema and Sample Data
print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
df.show(5)

                                                                                

---This is Diagnostic check, No need to print it in the final doc---
+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+

## Feature Engineering

Remove incomplete data, keep relevant variables, and iron out complicated string values
Encoder turns categorical columns (remote, hybrid, onsite) to numeric ones (1 or 0) based on input

In [83]:
#| eval: true
#| echo: falase
#| fig-align: center

from pyspark.sql.functions import col, pow, when
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline

#remove rows with NAs
df_cleaned = df.dropna(subset=[
    "SALARY", "MIN_YEARS_EXPERIENCE", "STATE_NAME", "EMPLOYMENT_TYPE_NAME",
    "REMOTE_TYPE_NAME", "MIN_EDULEVELS_NAME", "DURATION", 
    "IS_INTERNSHIP", "COMPANY_IS_STAFFING"
])

eda_cols = [
    "SALARY", "MIN_YEARS_EXPERIENCE", "DURATION", "COMPANY_IS_STAFFING",
    "IS_INTERNSHIP", "STATE_NAME", "REMOTE_TYPE_NAME",
    "EMPLOYMENT_TYPE_NAME", "MIN_EDULEVELS_NAME"
]
df_cleaned = df_cleaned.select(eda_cols)

#clean up REMOTE_TYPE_NAME and reduce the different inputs
df_cleaned = df_cleaned.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "[None]", "Undefined")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On Premise")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when(col("REMOTE_TYPE_NAME").isNull(), "On Premise")
    .otherwise(col("REMOTE_TYPE_NAME"))
)

#clean EMPLOYMENT_TYPE_NAME
df_cleaned = df_cleaned.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME") == "Part-time / full-time", "Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Part-time (â‰¤ 32 hours)", "Parttime")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Full-time (> 32 hours)", "Fulltime")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Fulltime")
    .otherwise(col("EMPLOYMENT_TYPE_NAME"))
)

#df_cleaned = df_cleaned.filter(col("REMOTE_TYPE_NAME") != "Undefined") -- initially wnated to remove the undefined but it dropped the
#percentage of the regression model

# Categorical and numeric columns
categorical_cols = ["EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME"]
continuous_cols = ["MIN_YEARS_EXPERIENCE", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"]

# Index and One-Hot Encode
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_Idx", handleInvalid="skip") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_Idx", outputCol=f"{col}_vec") for col in categorical_cols]


In [84]:
# combines all categorical and numeric columns into one vector coulmn
assembler = VectorAssembler(
    inputCols=continuous_cols + [f"{col}_vec" for col in categorical_cols],
    outputCol="features"
)

#create pipeline for sequential transformation
pipeline = Pipeline(stages=indexers + encoders + [assembler])
data = pipeline.fit(df_cleaned).transform(df_cleaned)

# polynomial regression for MIN_YEARS_EXPERIENCE
data = data.withColumn("MIN_YEARS_EXPERIENCE_SQ", pow(col("MIN_YEARS_EXPERIENCE"), 2))

#assemble og features plus poly feature into new feature vector
assembler_poly = VectorAssembler(
    inputCols=["features", "MIN_YEARS_EXPERIENCE_SQ"],
    outputCol="features_poly"
)
data = assembler_poly.transform(data)

#split data into training and testing sets for model evaluation
reg_train, reg_test = data.randomSplit([0.8, 0.2], seed=51)
#This split reserves 80% of the data for training and 20% for testing which provides enough data for the model to learn while keeping a reliable holdout set for evaluation.

#displays final structure with salary as target
data.select("SALARY", "features", "features_poly").show(5, truncate=False)

                                                                                

+------+--------------------------------------+-------------------------------------------+
|SALARY|features                              |features_poly                              |
+------+--------------------------------------+-------------------------------------------+
|192800|(9,[0,1,4,6],[6.0,55.0,1.0,1.0])      |(10,[0,1,4,6,9],[6.0,55.0,1.0,1.0,36.0])   |
|125900|(9,[0,1,4,6],[12.0,18.0,1.0,1.0])     |(10,[0,1,4,6,9],[12.0,18.0,1.0,1.0,144.0]) |
|118560|[5.0,20.0,0.0,1.0,1.0,0.0,0.0,1.0,0.0]|[5.0,20.0,0.0,1.0,1.0,0.0,0.0,1.0,0.0,25.0]|
|192800|(9,[0,1,4,6],[6.0,55.0,1.0,1.0])      |(10,[0,1,4,6,9],[6.0,55.0,1.0,1.0,36.0])   |
|116500|(9,[0,1,4,6],[12.0,16.0,1.0,1.0])     |(10,[0,1,4,6,9],[12.0,16.0,1.0,1.0,144.0]) |
+------+--------------------------------------+-------------------------------------------+
only showing top 5 rows


In [85]:
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}_Idx", handleInvalid="skip")
    model = indexer.fit(df_cleaned)
    print(f"\nMapping for {col}:")
    for i, category in enumerate(model.labels):
        print(f"  Index {i} -> {category}")


                                                                                


Mapping for EMPLOYMENT_TYPE_NAME:
  Index 0 -> Fulltime
  Index 1 -> Parttime
  Index 2 -> Flexible


[Stage 340:>                                                        (0 + 1) / 1]


Mapping for REMOTE_TYPE_NAME:
  Index 0 -> Undefined
  Index 1 -> Remote
  Index 2 -> Hybrid
  Index 3 -> On Premise


                                                                                

## Linear Regression

In [86]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
from scipy.stats import t

lin_reg = LinearRegression(featuresCol="features", labelCol="SALARY")

lin_reg_fit = lin_reg.fit(reg_train) #fit model to training data

salary_pred = lin_reg_fit.transform(reg_test) #predicting on test data

#define the evaluators
r2_eval = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="r2")
rmse_eval = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="rmse")
mae_eval = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="mae")

#generate metrics
r2_score = r2_eval.evaluate(salary_pred)
rmse_val = rmse_eval.evaluate(salary_pred)
mae_val = mae_eval.evaluate(salary_pred)

print(f"R² Score: {r2_score:.4f}")
print(f"RMSE: {rmse_val:.2f}")
print(f"MAE: {mae_val:.2f}")


25/10/05 19:25:01 WARN Instrumentation: [b8b5abf9] regParam is zero, which might cause numerical instability and overfitting.
[Stage 350:>                                                        (0 + 1) / 1]

R² Score: 0.2840
RMSE: 35315.94
MAE: 27676.61


                                                                                

In [87]:
#getting coefs and stats
model_summary = lin_reg_fit.summary

coefs = lin_reg_fit.coefficients
intercept_val = lin_reg_fit.intercept
stderr = model_summary.coefficientStandardErrors
t_vals = model_summary.tValues
p_vals = model_summary.pValues

#compute degrees of freedom for t-distribution
n_obs = model_summary.numInstances
num_features = len(coefs)
dfree = n_obs - num_features - 1

#critical t-value for 95% CI
t_crit = t.ppf(0.975, dfree)

feature_labels = [
    "MIN_YEARS_EXPERIENCE",
    "DURATION",
    "IS_INTERNSHIP",
    "COMPANY_IS_STAFFING",
    "EMPLOYMENT_TYPE_NAME_A",
    "EMPLOYMENT_TYPE_NAME_B",
    "REMOTE_TYPE_NAME_X",
    "REMOTE_TYPE_NAME_Y",
    "REMOTE_TYPE_NAME_Z"
]

coef_table = pd.DataFrame({
    "Feature": ["Intercept"] + feature_labels,
    "Estimate": [intercept_val] + coefs.tolist(),
    "Std Error": stderr,
    "t-Stat": t_vals,
    "p-Value": p_vals
})

#add confidence intervals
coef_table["95% CI Lower"] = coef_table["Estimate"] - t_crit * coef_table["Std Error"]
coef_table["95% CI Upper"] = coef_table["Estimate"] + t_crit * coef_table["Std Error"]

print("\nCoefficient Summary:")
print(coef_table)


Coefficient Summary:
                  Feature      Estimate    Std Error     t-Stat       p-Value  \
0               Intercept  76735.577948   102.356000  66.277489  0.000000e+00   
1    MIN_YEARS_EXPERIENCE   6783.898664    23.632630  -1.831651  6.702934e-02   
2                DURATION    -43.286721  6866.446762  -1.025459  3.051684e-01   
3           IS_INTERNSHIP  -7041.257613  1063.265929  -0.595172  5.517402e-01   
4     COMPANY_IS_STAFFING   -632.826152  3011.540238  -0.421858  6.731367e-01   
5  EMPLOYMENT_TYPE_NAME_A  -1270.441524  3605.833255  -1.077711  2.811853e-01   
6  EMPLOYMENT_TYPE_NAME_B  -3886.046755  2462.498142   3.306373  9.480161e-04   
7      REMOTE_TYPE_NAME_X   8141.937756  2529.947825   3.635882  2.782402e-04   
8      REMOTE_TYPE_NAME_Y   9198.592476  3172.252546   7.636575  2.398082e-14   
9      REMOTE_TYPE_NAME_Z  24225.144004  3579.670705  21.436491  0.000000e+00   

   95% CI Lower  95% CI Upper  
0  76534.942765  76936.213130  
1   6737.574686   6830

The linear regression model explains approximately 28% of the variance in salaries, showing that while job attributes like as experience and remote status influence pay, substantial variation remains unexplained. This is likely due to qualitative factors like role seniority, company size, or negotiation effects. Undefined roles were initally excluded but that reduced the model's reliabilty and was subsequentially added as a baseline for remote roles.

Some statistically significant predictors include remote and hybrid roles (Remote Type X & Y) and Flexible employment type, all of which show clear positive or negative salary impacts. Compared to the baseline groups (Fulltime employment and Undefined remote), Flexible roles pay significantly less than Fulltime roles, whereas Parttime roles do not show a significant difference. For remote types, Remote, Hybrid, and On Premise roles show meaningful salary increases relative to Undefined roles, with On Premise roles exhibiting the largest premium of approximately $24K. 

Non-significant coefficients, such as Parttime or certain remote categories, suggest that observed differences may be due to random variation rather than a true effect, while the significant predictors highlight areas where job structure meaningfully affects compensation.

## Polynominal Linear Regression

In [88]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.regression import LinearRegression

reg_train = reg_train.drop("features_poly")
reg_test = reg_test.drop("features_poly")

# Assuming your original features column is "features"
polyExpansion = PolynomialExpansion(degree=2, inputCol="features", outputCol="features_poly")

train_poly = polyExpansion.transform(reg_train)
test_poly = polyExpansion.transform(reg_test)


poly_lr = LinearRegression(featuresCol="features_poly", labelCol="SALARY")
poly_model = poly_lr.fit(train_poly)


predictions_poly = poly_model.transform(test_poly)

#define the evaluators
r2_eval = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="r2")
rmse_eval = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="rmse")
mae_eval = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="mae")

#generate metrics
r2 = r2_eval.evaluate(predictions_poly)
rmse = rmse_eval.evaluate(predictions_poly)
mae = mae_eval.evaluate(predictions_poly)

print(f"Polynomial Regression R²: {r2}")
print(f"Polynomial Regression RMSE: {rmse}")
print(f"Polynomial Regression MAE: {mae}")

25/10/05 19:26:06 WARN Instrumentation: [6e0fc769] regParam is zero, which might cause numerical instability and overfitting.
25/10/05 19:26:10 WARN Instrumentation: [6e0fc769] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
[Stage 356:>                                                        (0 + 1) / 1]

Polynomial Regression R²: 0.3114361363313357
Polynomial Regression RMSE: 34631.812165496616
Polynomial Regression MAE: 26847.381847776545


                                                                                

In [89]:
from scipy.stats import t

summary_poly = poly_model.summary
coefficients = poly_model.coefficients
intercept = poly_model.intercept
std_errors = summary_poly.coefficientStandardErrors
t_values = summary_poly.tValues
p_values = summary_poly.pValues

#degrees of freedom
n = summary_poly.numInstances
p = len(coefficients)
dof = n - p - 1

#critical t-value for 95% CI
critical_value = t.ppf(0.975, dof)

feature_labels = [
    "MIN_YEARS_EXPERIENCE",
    "DURATION",
    "IS_INTERNSHIP",
    "COMPANY_IS_STAFFING",
    "EMPLOYMENT_TYPE_NAME_A",
    "EMPLOYMENT_TYPE_NAME_B",
    "REMOTE_TYPE_NAME_X",
    "REMOTE_TYPE_NAME_Y",
    "REMOTE_TYPE_NAME_Z"
]

#create df
coef_poly_df = pd.DataFrame({
    "Feature": ["Intercept"] + [f"PolyFeature_{i}" for i in range(p)],
    "Coefficient": [intercept] + coefficients.tolist(),
    "Std Error": std_errors,
    "t-value": t_values,
    "p-value": p_values
})

#add confidence intervals
coef_poly_df["95% CI Lower"] = coef_poly_df["Coefficient"] - critical_value * coef_poly_df["Std Error"]
coef_poly_df["95% CI Upper"] = coef_poly_df["Coefficient"] + critical_value * coef_poly_df["Std Error"]

print(coef_poly_df)

UnsupportedOperationException: No Std. Error of coefficients available for this LinearRegressionModel