In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")

# Show Schema and Sample Data
print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
df.show(5)
print(df.count())
#pd.set_option("display.max_rows", None)  
#pd.DataFrame(df.columns, columns=["Column Names"])


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/05 14:50:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/10/05 14:50:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/10/05 14:50:25 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

---This is Diagnostic check, No need to print it in the final doc---


25/10/05 14:50:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+------

[Stage 3:>                                                          (0 + 1) / 1]

72498


                                                                                

# Feature Engineering

Feature Engineering is a crucial step in preparing your data for machine learning. In this lab, we will focus on the following tasks:

1. Drop rows with missing values in the target variable and key features.
2. By now you are already familiar with the code and the data. Based on your understanding please choose any 3 (my code output has 10) variables as:
   1. three continuous variables and, `MIN_YEARS_EXPERIENCE` (total 4, use your best judgment!)
   2. two categorical.
   3. Your dependent variable (y) is `SALARY`.

3. Convert categorical variables into numerical representations using StringIndexer and OneHotEncoder.
4. Assemble features into a single vector using VectorAssembler.
5. Split the data into training and testing sets.
6. You can use pipeline to do the above steps in one go.
7. Create a new column `MIN_YEARS_EXPERIENCE_SQ` by squaring the `MIN_YEARS_EXPERIENCE` column.
8. Assemble the polynomial features into a new vector column `features_poly` using VectorAssembler.
9. Show the final structure of the DataFrame with the new features.

In [2]:
#| eval: true
#| echo: false
#| fig-align: center

from pyspark.sql.functions import col, pow
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

eda_cols = [
    "SALARY",
    "MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE", "DURATION",
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP",
    "STATE_NAME", "REMOTE_TYPE_NAME", "EMPLOYMENT_TYPE_NAME",
    "MIN_EDULEVELS_NAME", "MAX_EDULEVELS_NAME"
]

df_eda = df.select(eda_cols)
df_eda.show(5, truncate=False)


+------+--------------------+--------------------+--------+-------------------+-------------+----------+----------------+----------------------+-------------------+------------------+
|SALARY|MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|DURATION|COMPANY_IS_STAFFING|IS_INTERNSHIP|STATE_NAME|REMOTE_TYPE_NAME|EMPLOYMENT_TYPE_NAME  |MIN_EDULEVELS_NAME |MAX_EDULEVELS_NAME|
+------+--------------------+--------------------+--------+-------------------+-------------+----------+----------------+----------------------+-------------------+------------------+
|NULL  |2                   |2                   |6       |false              |false        |Arkansas  |[None]          |Full-time (> 32 hours)|Bachelor's degree  |NULL              |
|NULL  |3                   |3                   |NULL    |true               |false        |Maine     |Remote          |Full-time (> 32 hours)|No Education Listed|NULL              |
|NULL  |5                   |NULL                |35      |false              |f

In [3]:
from pyspark.sql.functions import col, sum as spark_sum, when, trim, length
import hvplot.pandas  # enables hvplot on pandas

missing_df = df_eda.select([
    spark_sum(
        when(col(c).isNull() | (length(trim(col(c))) == 0), 1)
        .otherwise(0)
    ).alias(c)
    for c in df_eda.columns
])

#print(missing_df.show())

#to table with T Transpose
missing_pd = missing_df.toPandas().T.reset_index()
#put names to columns
missing_pd.columns = ["column", "missing_count"]

total_rows = df_eda.count()
missing_pd["missing_pct"] = 100 * missing_pd["missing_count"] / total_rows

# hvplot.bar ; line; scatter; (hist); (box); area; (heatmap); (hexbin); points
missing_pd.sort_values("missing_pct", ascending=False).hvplot.bar(
    x="column", y="missing_pct", rot=90,
    title="Percentage of Missing Values by Column",
    height=600, width=900,
    ylabel="Missing Percentage (%)", xlabel="Features"
).opts(xrotation=45)


                                                                                

In [4]:
# For REMOTE_TYPE_NAME replace Remote with Remote, [None] with Undefined,
# Not Remote with On Premise, Hybrid Remote with Hybrid, and Null with On Premise
## data frame (eda) exploratory data analysis

df_eda = df_eda.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "[None]", "Undefined")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On-Premise")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when(col("REMOTE_TYPE_NAME").isNull(), "On-Premise")
    .otherwise(col("REMOTE_TYPE_NAME"))
)

# df_eda.createOrReplaceTempView("df_eda")
categorical_cols = [
    "REMOTE_TYPE_NAME"
]

for colname in categorical_cols:
    print(f"\n---- {colname} ----")
    df_eda.select(colname).distinct().show(10, truncate=False)



---- REMOTE_TYPE_NAME ----


[Stage 13:>                                                         (0 + 1) / 1]

+----------------+
|REMOTE_TYPE_NAME|
+----------------+
|Remote          |
|On-Premise      |
|Hybrid          |
|Undefined       |
+----------------+



                                                                                

In [5]:
# ---- EMPLOYMENT_TYPE_NAME ----
                                                                                
# +------------------------+
# |EMPLOYMENT_TYPE_NAME    |
# +------------------------+
# |Part-time / full-time   |
# |Part-time (â‰¤ 32 hours)|
# |Full-time (> 32 hours)  |
# |NULL                    |
# +------------------------+

df_eda = df_eda.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME") == "Part-time / full-time", "Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Part-time (â‰¤ 32 hours)", "Parttime")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Full-time (> 32 hours)", "Fulltime")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Fulltime")
    .otherwise(col("EMPLOYMENT_TYPE_NAME"))
)

# df_eda.createOrReplaceTempView("df_eda")
categorical_cols = [
    "EMPLOYMENT_TYPE_NAME"
]

for colname in categorical_cols:
    print(f"\n---- {colname} ----")
    df_eda.select(colname).distinct().show(10, truncate=False)



---- EMPLOYMENT_TYPE_NAME ----


[Stage 16:>                                                         (0 + 1) / 1]

+--------------------+
|EMPLOYMENT_TYPE_NAME|
+--------------------+
|Flexible            |
|Fulltime            |
|Parttime            |
+--------------------+



                                                                                

In [6]:
# replace COMPANY_IS_STAFFING NULL with false, and IS_INTERNSHIP NULL with false
df_eda = df_eda.withColumn(
    "COMPANY_IS_STAFFING",
    when(col("COMPANY_IS_STAFFING").isNull(), False)
    .otherwise(col("COMPANY_IS_STAFFING"))
)

df_eda = df_eda.withColumn(
    "IS_INTERNSHIP",
    when(col("IS_INTERNSHIP").isNull(), False)
    .otherwise(col("IS_INTERNSHIP"))
)

# df_eda.createOrReplaceTempView("df_eda")
categorical_cols = [
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP"
]

for colname in categorical_cols:
    print(f"\n---- {colname} ----")
    df_eda.select(colname).distinct().show(10, truncate=False)



---- COMPANY_IS_STAFFING ----


                                                                                

+-------------------+
|COMPANY_IS_STAFFING|
+-------------------+
|true               |
|false              |
+-------------------+


---- IS_INTERNSHIP ----


[Stage 22:>                                                         (0 + 1) / 1]

+-------------+
|IS_INTERNSHIP|
+-------------+
|true         |
|false        |
+-------------+



                                                                                

In [7]:
import pandas as pd

# sample subset of data only 1% of the data
df_sample = df_eda.sample(fraction=0.01, seed=42).toPandas()

#print(df_eda.count())  #72498
#print(len(df_sample))  #790

# create new DataFrame where each cell missing (True) or not (False)
missing_mask = df_sample.isnull()

# Melt into long-form  | 4 columns: index, column, is_missing
missing_long = (
    missing_mask.reset_index()
    .melt(id_vars="index", var_name="column", value_name="is_missing")
)

# Convert boolean to int
missing_long["is_missing"] = missing_long["is_missing"].astype(int)

print(missing_long)

# Plot heatmap
missing_long.hvplot.heatmap(
    x="column", y="index", C="is_missing",
    cmap="Reds", colorbar=False,
    width=900, height=700,
    title="Heatmap of Missing Values (Sample)"
).opts(xrotation=45)


                                                                                

      index              column  is_missing
0         0              SALARY           1
1         1              SALARY           0
2         2              SALARY           1
3         3              SALARY           1
4         4              SALARY           1
...     ...                 ...         ...
8685    785  MAX_EDULEVELS_NAME           0
8686    786  MAX_EDULEVELS_NAME           0
8687    787  MAX_EDULEVELS_NAME           1
8688    788  MAX_EDULEVELS_NAME           1
8689    789  MAX_EDULEVELS_NAME           1

[8690 rows x 3 columns]


In [8]:
from pyspark.sql.functions import countDistinct

#show number of unique values per column
df_eda.select([
    countDistinct(c).alias(c + "_nunique")
    for c in df_eda.columns
]).show(truncate=False)


[Stage 26:>                                                         (0 + 1) / 1]

+--------------+----------------------------+----------------------------+----------------+---------------------------+---------------------+------------------+------------------------+----------------------------+--------------------------+--------------------------+
|SALARY_nunique|MIN_YEARS_EXPERIENCE_nunique|MAX_YEARS_EXPERIENCE_nunique|DURATION_nunique|COMPANY_IS_STAFFING_nunique|IS_INTERNSHIP_nunique|STATE_NAME_nunique|REMOTE_TYPE_NAME_nunique|EMPLOYMENT_TYPE_NAME_nunique|MIN_EDULEVELS_NAME_nunique|MAX_EDULEVELS_NAME_nunique|
+--------------+----------------------------+----------------------------+----------------+---------------------------+---------------------+------------------+------------------------+----------------------------+--------------------------+--------------------------+
|6052          |16                          |15                          |60              |2                          |2                    |51                |4                       |3       

                                                                                

In [9]:
categorical_cols = [
    "STATE_NAME", "REMOTE_TYPE_NAME", "EMPLOYMENT_TYPE_NAME",
    "MIN_EDULEVELS_NAME", "COMPANY_IS_STAFFING", "IS_INTERNSHIP"
]

for colname in categorical_cols:
    print(f"\n---- {colname} ----")
    df_eda.select(colname).distinct().show(10, truncate=False)



---- STATE_NAME ----


                                                                                

+------------+
|STATE_NAME  |
+------------+
|Utah        |
|Hawaii      |
|Minnesota   |
|Ohio        |
|Arkansas    |
|Oregon      |
|Texas       |
|North Dakota|
|Pennsylvania|
|Connecticut |
+------------+
only showing top 10 rows

---- REMOTE_TYPE_NAME ----


                                                                                

+----------------+
|REMOTE_TYPE_NAME|
+----------------+
|Remote          |
|On-Premise      |
|Hybrid          |
|Undefined       |
+----------------+


---- EMPLOYMENT_TYPE_NAME ----


                                                                                

+--------------------+
|EMPLOYMENT_TYPE_NAME|
+--------------------+
|Flexible            |
|Fulltime            |
|Parttime            |
+--------------------+


---- MIN_EDULEVELS_NAME ----


                                                                                

+----------------------------+
|MIN_EDULEVELS_NAME          |
+----------------------------+
|Bachelor's degree           |
|Ph.D. or professional degree|
|High school or GED          |
|Master's degree             |
|No Education Listed         |
|Associate degree            |
|NULL                        |
+----------------------------+


---- COMPANY_IS_STAFFING ----


                                                                                

+-------------------+
|COMPANY_IS_STAFFING|
+-------------------+
|true               |
|false              |
+-------------------+


---- IS_INTERNSHIP ----


[Stage 47:>                                                         (0 + 1) / 1]

+-------------+
|IS_INTERNSHIP|
+-------------+
|true         |
|false        |
+-------------+



                                                                                

In [10]:
# Calculate median of the Duration Column

median_duration = df_eda.approxQuantile("DURATION", [0.5], 0.01)[0]

# Check for missing values in Duration column and replace null with median

df_eda = df_eda.withColumn(
    "DURATION",
    when(col("DURATION").isNull(), median_duration)
    .otherwise(col("DURATION"))
) # Assuming median duration is 30 days


                                                                                

In [11]:
import pandas as pd

# sample subset of data
df_sample = df_eda.sample(fraction=0.10, seed=42).toPandas()

# Boolean mask (True if missing)
missing_mask = df_sample.isnull()

# Melt into long-form
missing_long = (
    missing_mask.reset_index()
    .melt(id_vars="index", var_name="column", value_name="is_missing")
)

# Convert boolean to int
missing_long["is_missing"] = missing_long["is_missing"].astype(int)

# Plot heatmap
missing_long.hvplot.heatmap(
    x="column", y="index", C="is_missing",
    cmap="Reds", colorbar=False,
    width=900, height=700,
    title="Heatmap of Missing Values (Sample)"
).opts(xrotation=45)


                                                                                

In [12]:
df_eda.show(5, truncate=False)

+------+--------------------+--------------------+--------+-------------------+-------------+----------+----------------+--------------------+-------------------+------------------+
|SALARY|MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|DURATION|COMPANY_IS_STAFFING|IS_INTERNSHIP|STATE_NAME|REMOTE_TYPE_NAME|EMPLOYMENT_TYPE_NAME|MIN_EDULEVELS_NAME |MAX_EDULEVELS_NAME|
+------+--------------------+--------------------+--------+-------------------+-------------+----------+----------------+--------------------+-------------------+------------------+
|NULL  |2                   |2                   |6.0     |false              |false        |Arkansas  |Undefined       |Fulltime            |Bachelor's degree  |NULL              |
|NULL  |3                   |3                   |18.0    |true               |false        |Maine     |Remote          |Fulltime            |No Education Listed|NULL              |
|NULL  |5                   |NULL                |35.0    |false              |false      

In [20]:
# Drop rows with NA values in relevant columns
df_feature_engg = df_eda.dropna(subset=[
    "SALARY", "MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE","STATE_NAME",
     "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME","MIN_EDULEVELS_NAME",
    "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"
])

# Categorical columns
categorical_cols = ["STATE_NAME","MIN_EDULEVELS_NAME","EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME"]

# Index and One-Hot Encode
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid='skip') for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") for col in categorical_cols]

pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(df_feature_engg).transform(df_feature_engg)
indexed_df.select("EMPLOYMENT_TYPE_NAME","EMPLOYMENT_TYPE_NAME_idx","REMOTE_TYPE_NAME","REMOTE_TYPE_NAME_idx").show()

pipeline = Pipeline(stages=indexers + encoders)
encoded_df = pipeline.fit(df_feature_engg).transform(df_feature_engg)
encoded_df.show()



                                                                                

+--------------------+------------------------+----------------+--------------------+
|EMPLOYMENT_TYPE_NAME|EMPLOYMENT_TYPE_NAME_idx|REMOTE_TYPE_NAME|REMOTE_TYPE_NAME_idx|
+--------------------+------------------------+----------------+--------------------+
|            Fulltime|                     0.0|       Undefined|                 0.0|
|            Fulltime|                     0.0|       Undefined|                 0.0|
|            Fulltime|                     0.0|          Remote|                 1.0|
|            Fulltime|                     0.0|       Undefined|                 0.0|
|            Fulltime|                     0.0|          Remote|                 1.0|
|            Fulltime|                     0.0|       Undefined|                 0.0|
|            Fulltime|                     0.0|          Remote|                 1.0|
|            Fulltime|                     0.0|          Remote|                 1.0|
|            Fulltime|                     0.0|       

                                                                                

+------+--------------------+--------------------+--------+-------------------+-------------+--------------+----------------+--------------------+------------------+--------------------+--------------+----------------------+------------------------+--------------------+---------------+----------------------+------------------------+--------------------+
|SALARY|MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|DURATION|COMPANY_IS_STAFFING|IS_INTERNSHIP|    STATE_NAME|REMOTE_TYPE_NAME|EMPLOYMENT_TYPE_NAME|MIN_EDULEVELS_NAME|  MAX_EDULEVELS_NAME|STATE_NAME_idx|MIN_EDULEVELS_NAME_idx|EMPLOYMENT_TYPE_NAME_idx|REMOTE_TYPE_NAME_idx| STATE_NAME_vec|MIN_EDULEVELS_NAME_vec|EMPLOYMENT_TYPE_NAME_vec|REMOTE_TYPE_NAME_vec|
+------+--------------------+--------------------+--------+-------------------+-------------+--------------+----------------+--------------------+------------------+--------------------+--------------+----------------------+------------------------+--------------------+---------------+--

In [23]:


# Assemble base features (for GLR and Random Forest)
assembler = VectorAssembler(
    inputCols=[
        "MIN_YEARS_EXPERIENCE", "DURATION",
        "IS_INTERNSHIP", "COMPANY_IS_STAFFING"
    ] + [f"{col}_vec" for col in categorical_cols],
    outputCol="features"
)

# Build pipeline and transform
pipeline = Pipeline(stages=indexers + encoders + [assembler])
data = pipeline.fit(df_feature_engg).transform(df_feature_engg)

data.show(5, truncate=False)

# Create squared term for Polynomial Regression
data = data.withColumn("MIN_YEARS_EXPERIENCE_SQ", pow(col("MIN_YEARS_EXPERIENCE"), 2))

# Assemble polynomial features
assembler_poly = VectorAssembler(
    inputCols=[
        "MIN_YEARS_EXPERIENCE", "MIN_YEARS_EXPERIENCE_SQ",
        "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"
    ] + [f"{col}_vec" for col in categorical_cols],
    outputCol="features_poly"
    
)

data=assembler_poly.transform(data)

#show final structure
data.select("SALARY", "features", "features_poly").show(5, truncate=False)







                                                                                

+------+--------------------+--------------------+--------+-------------------+-------------+-----------+----------------+--------------------+------------------+----------------------------+--------------+----------------------+------------------------+--------------------+---------------+----------------------+------------------------+--------------------+-------------------------------------------------------+
|SALARY|MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|DURATION|COMPANY_IS_STAFFING|IS_INTERNSHIP|STATE_NAME |REMOTE_TYPE_NAME|EMPLOYMENT_TYPE_NAME|MIN_EDULEVELS_NAME|MAX_EDULEVELS_NAME          |STATE_NAME_idx|MIN_EDULEVELS_NAME_idx|EMPLOYMENT_TYPE_NAME_idx|REMOTE_TYPE_NAME_idx|STATE_NAME_vec |MIN_EDULEVELS_NAME_vec|EMPLOYMENT_TYPE_NAME_vec|REMOTE_TYPE_NAME_vec|features                                               |
+------+--------------------+--------------------+--------+-------------------+-------------+-----------+----------------+--------------------+------------------+----

In [24]:
#| eval: true
#| echo: false
#| fig-align: center

# Split Data
regression_train, regression_test = data.randomSplit([0.8, 0.2], seed=42)
print((data.count(), len(data.columns)))
print((regression_train.count(), len(regression_train.columns)))
print((regression_test.count(), len(regression_test.columns)))


                                                                                

(3756, 22)


                                                                                

(3060, 22)


[Stage 407:>                                                        (0 + 1) / 1]

(696, 22)


                                                                                

In [None]:
from pyspark.ml.regression import LinearRegression

# Initialize  Regression model
# Basic Linear Regression model
mr = LinearRegression(featuresCol="features", labelCol="SALARY")

# Polynomial Regression using squared term features
#mr = LinearRegression(featuresCol="features_poly", labelCol="SALARY")



# Train the model
mr_model = mr.fit(regression_train)

# Evaluate on test data
test_results = mr_model.evaluate(regression_test)

# Print metrics
print("RMSE:", test_results.rootMeanSquaredError)
print("R2:", test_results.r2)

25/10/05 16:06:07 WARN Instrumentation: [2af9de3e] regParam is zero, which might cause numerical instability and overfitting.
[Stage 418:>                                                        (0 + 1) / 1]

RMSE: 29114.307979774672
R2: 0.34792446541830324


                                                                                

In [None]:
coeffs = mr_model.coefficients
intercept = mr_model.intercept

print("Intercept:", intercept)
print("Coefficients:", coeffs)


Intercept: 131328.3004796009
Coefficients: [8523.062910159928,-98.93546907813675,2782.413073078266,-1078.1337256635686,12347.650329505026,12099.991428639481,5508.201281626465,4103.394476179377,6827.789398867635,1859.9349418510367,7751.353854768509,9315.672167780182,-755.6703653785958,9973.18649877864,1314.7076955620487,-1299.0660250961937,2093.9361992914087,7193.052647920884,-5124.810223960465,4945.904023459982,-293.75547075283794,434.7029433494427,4697.278843397262,7021.701277240336,7659.209522281594,-11920.218306321509,-2098.935017913266,28.958169601522584,10422.439688057706,-1534.9827239260164,3298.1875476982027,2561.1538943029723,8352.834248147985,-3844.4745256701326,-1105.821356233869,1840.7175686222201,487.1782311291216,5803.378236307139,139.85835159896615,2939.1994362592077,-3041.9102995939397,10591.372903022557,-577.2862695106961,-3619.269866311597,5432.8888205860085,311.95208019053894,3281.416667399771,-520.5179852949578,3209.602981992066,-3999.8839871360237,-3502.285240485341