In [57]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import plotly.express as px
import plotly.io as pio
from scipy.stats import t
pio.renderers.default = "notebook"

In [2]:
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/04 16:59:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")

                                                                                

In [7]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- LAST_UPDATED_DATE: date (nullable = true)
 |-- LAST_UPDATED_TIMESTAMP: timestamp (nullable = true)
 |-- DUPLICATES: integer (nullable = true)
 |-- POSTED: date (nullable = true)
 |-- EXPIRED: date (nullable = true)
 |-- DURATION: integer (nullable = true)
 |-- SOURCE_TYPES: string (nullable = true)
 |-- SOURCES: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- ACTIVE_URLS: string (nullable = true)
 |-- ACTIVE_SOURCES_INFO: string (nullable = true)
 |-- TITLE_RAW: string (nullable = true)
 |-- BODY: string (nullable = true)
 |-- MODELED_EXPIRED: date (nullable = true)
 |-- MODELED_DURATION: integer (nullable = true)
 |-- COMPANY: integer (nullable = true)
 |-- COMPANY_NAME: string (nullable = true)
 |-- COMPANY_RAW: string (nullable = true)
 |-- COMPANY_IS_STAFFING: boolean (nullable = true)
 |-- EDUCATION_LEVELS: string (nullable = true)
 |-- EDUCATION_LEVELS_NAME: string (nullable = true)
 |-- MIN_EDULEVELS: integer (nullable

In [8]:
df.show(5)

25/04/04 17:01:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-----------------+----------------------+----------+----------+----------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+---

In [18]:
df_feature = df.select("SALARY", "MIN_YEARS_EXPERIENCE", "MIN_EDULEVELS", "EMPLOYMENT_TYPE_NAME")

In [19]:
df_feature = df_feature.dropna(subset=["SALARY", "MIN_YEARS_EXPERIENCE", "MIN_EDULEVELS", "EMPLOYMENT_TYPE_NAME"])

In [20]:
df_feature = df_feature.withColumn("MIN_YEARS_EXPERIENCE", df_feature["MIN_YEARS_EXPERIENCE"].cast("double"))
df_feature = df_feature.withColumn("MIN_EDULEVELS", df_feature["MIN_EDULEVELS"].cast("double"))

In [21]:
indexer = StringIndexer(inputCol="EMPLOYMENT_TYPE_NAME", outputCol="EMPLOYMENT_TYPE_INDEX")
df_feature = indexer.fit(df_feature).transform(df_feature)

                                                                                

In [22]:
encoder = OneHotEncoder(inputCol="EMPLOYMENT_TYPE_INDEX", outputCol="EMPLOYMENT_TYPE_VEC")
df_feature = encoder.fit(df_feature).transform(df_feature)

In [24]:
assembler = VectorAssembler(
    inputCols=["MIN_YEARS_EXPERIENCE", "MIN_EDULEVELS", "EMPLOYMENT_TYPE_VEC"],
    outputCol="features"
)
df_assemble = assembler.transform(df_feature)

In [29]:
df_assemble.show(5)

+------+--------------------+-------------+--------------------+---------------------+-------------------+------------------+
|SALARY|MIN_YEARS_EXPERIENCE|MIN_EDULEVELS|EMPLOYMENT_TYPE_NAME|EMPLOYMENT_TYPE_INDEX|EMPLOYMENT_TYPE_VEC|          features|
+------+--------------------+-------------+--------------------+---------------------+-------------------+------------------+
| 92962|                 2.0|          2.0|Full-time (> 32 h...|                  0.0|      (2,[0],[1.0])| [2.0,2.0,1.0,0.0]|
|107645|                10.0|          0.0|Full-time (> 32 h...|                  0.0|      (2,[0],[1.0])|[10.0,0.0,1.0,0.0]|
|192800|                 6.0|          2.0|Full-time (> 32 h...|                  0.0|      (2,[0],[1.0])| [6.0,2.0,1.0,0.0]|
|125900|                12.0|          1.0|Full-time (> 32 h...|                  0.0|      (2,[0],[1.0])|[12.0,1.0,1.0,0.0]|
|170000|                 6.0|         99.0|Full-time (> 32 h...|                  0.0|      (2,[0],[1.0])|[6.0,99.0,1.

In [25]:
train_df, test_df = df_assemble.randomSplit([0.8, 0.2], seed=42)

In [28]:
lr = LinearRegression(featuresCol="features", labelCol="SALARY")
model = lr.fit(train_df)

25/04/04 17:32:12 WARN Instrumentation: [30844730] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [31]:
predictions = model.transform(test_df)

In [33]:
evaluator_r2 = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

                                                                                

In [34]:
evaluator_rmse = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)

                                                                                

In [35]:
evaluator_mae = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions)

                                                                                

In [36]:
print(f"R²: {r2:.4f}, RMSE: {rmse:.2f}, MAE: {mae:.2f}")

R²: 0.2700, RMSE: 37936.17, MAE: 29024.54


In [58]:
summary = model.summary
coefficients = [model.intercept] + list(model.coefficients.toArray())
intercept = model.intercept
std_errors = summary.coefficientStandardErrors
t_values = [coef / se for coef, se in zip(coefficients, std_errors)]

In [59]:
n = train_df.count()
k = len(model.coefficients)  
degree_freedom = n - k - 1

                                                                                

In [63]:
p_values = [2 * (1 - t.cdf(abs(t_val), degree_freedom)) for t_val in t_values]

In [64]:
num_oh_features = len(model.coefficients) - 2 
feature_names = ["Intercept", "MIN_YEARS_EXPERIENCE", "MIN_EDULEVELS"] + \
    [f"EMPLOYMENT_TYPE_{i}" for i in range(num_oh_features)]

In [65]:
coef_df = pd.DataFrame({
    "Feature": feature_names,
    "Coefficient": coefficients,
    "Std Error": std_errors,
    "t Value": t_values,
    "p Value": p_values
})

In [66]:
coef_df["CI Lower"] = coef_df["Coefficient"] - 1.96 * coef_df["Std Error"]
coef_df["CI Upper"] = coef_df["Coefficient"] + 1.96 * coef_df["Std Error"]

In [67]:
coef_df

Unnamed: 0,Feature,Coefficient,Std Error,t Value,p Value,CI Lower,CI Upper
0,Intercept,83023.309584,80.759288,1028.034198,0.0,82865.021381,83181.597788
1,MIN_YEARS_EXPERIENCE,6577.499356,7.766182,846.941218,0.0,6562.27764,6592.721072
2,MIN_EDULEVELS,90.316192,2033.491187,0.044414,0.964575,-3895.326536,4075.958919
3,EMPLOYMENT_TYPE_0,1314.666485,2598.516714,0.50593,0.612912,-3778.426275,6407.759245
4,EMPLOYMENT_TYPE_1,-6887.181247,2043.371276,-3.370499,0.000752,-10892.188948,-2882.173546


In [68]:
print(f"R²: {r2:.4f}, RMSE: {rmse:.2f}, MAE: {mae:.2f}")

R²: 0.2700, RMSE: 37936.17, MAE: 29024.54


The linear regression model reveals that years of experience and certain employment types significantly influence salary, with each additional year of experience increasing salary by $6,577.50 and part-time employment reducing it by $6,887.18, while education level has no significant effect. However, the model’s R² of 0.2700 and RMSE of $37,936.17 indicate it captures only 27% of the salary variance with substantial prediction errors, underscoring the need for additional features like location or industry to improve performance.