1 Load the Dataset

    Load the Raw Dataset:
        Use Pyspark to the lightcast_data.csv file into a DataFrame:
        You can reuse the previous code.
        Copying code from your friend constitutes plagiarism. DO NOT DO THIS.


In [35]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")

# Show Schema and Sample Data
# print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
# df.show(5)

                                                                                


2 Feature Engineering

Feature Engineering is a crucial step in preparing your data for machine learning. In this lab, we will focus on the following tasks:

    Drop rows with missing values in the target variable and key features.
    By now you are already familiar with the code and the data. Based on your understanding please choose any 3 (my code output has 10) variables as:
        three continuous variables and, MIN_YEARS_EXPERIENCE (total 4, use your best judgment!)
        two categorical .
        Your dependent variable (y) is SALARY.
    Convert categorical variables into numerical representations using StringIndexer and OneHotEncoder.
    Assemble features into a single vector using VectorAssembler.
    Split the data into training and testing sets.
    You can use pipeline to do the above steps in one go.
    Create a new column MIN_YEARS_EXPERIENCE_SQ by squaring the MIN_YEARS_EXPERIENCE column.
    Assemble the polynomial features into a new vector column features_poly using VectorAssembler.
    Show the final structure of the Data`Frame with the new features.



In [36]:
#| eval: true
#| echo: fa;se
#| fig-align: center

from pyspark.sql.functions import col, pow
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

eda_cols = [
    "Salary",
    "MIN_YEARS_EXPERIENCE", "DURATION",
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP",
    "STATE_NAME", "REMOTE_TYPE_NAME", "EMPLOYMENT_TYPE_NAME",
    "MIN_EDULEVELS_NAME"
]

df = df.select(eda_cols)
df.show(5, truncate=False)

+------+--------------------+--------+-------------------+-------------+----------+----------------+----------------------+-------------------+
|Salary|MIN_YEARS_EXPERIENCE|DURATION|COMPANY_IS_STAFFING|IS_INTERNSHIP|STATE_NAME|REMOTE_TYPE_NAME|EMPLOYMENT_TYPE_NAME  |MIN_EDULEVELS_NAME |
+------+--------------------+--------+-------------------+-------------+----------+----------------+----------------------+-------------------+
|NULL  |2                   |6       |false              |false        |Arkansas  |[None]          |Full-time (> 32 hours)|Bachelor's degree  |
|NULL  |3                   |NULL    |true               |false        |Maine     |Remote          |Full-time (> 32 hours)|No Education Listed|
|NULL  |5                   |35      |false              |false        |Texas     |[None]          |Full-time (> 32 hours)|Bachelor's degree  |
|NULL  |3                   |48      |false              |false        |Arizona   |[None]          |Full-time (> 32 hours)|No Education 

In [37]:
from pyspark.sql.functions import col, sum as spark_sum, when, trim, length
import hvplot.pandas

missing_df = df.select([
    spark_sum(
        when(col(c).isNull() | (length(trim(col(c))) == 0), 1
        ).otherwise(0)).alias(c)
        for c in df.columns
])

missing_pd = missing_df.toPandas().T.reset_index()
missing_pd.columns = ["column", "missing_count"]

missing_pd = missing_df.toPandas().T.reset_index()
missing_pd.columns = ["column", "missing_count"]
total_rows = df.count()
missing_pd["missing_pct"] = 100 * missing_pd["missing_count"] / total_rows

missing_pd.sort_values("missing_pct", ascending=False).hvplot.bar(
    x="column", y="missing_pct", rot=90,
    title="Percentage of Missing Values by Column",
    height=600, width=900,
    ylabel="Missing Percentage (%)", xlabel="Features"
).opts(xrotation=45)

                                                                                

In [38]:
from pyspark.sql.functions import col, when

#Calculate Median

median_duration = df.approxQuantile("DURATION",[0.5], 0.01)[0]

# check for missing values in duration column and replace null with median

df = df.withColumn(
    "DURATION",
    when(col("DURATION").isNull(), median_duration)
    .otherwise(col("DURATION"))
)

                                                                                

In [39]:
import pandas as pd

df_sample = df.sample(fraction=0.01, seed=42).toPandas()

missing_mask = df_sample.isnull()

missing_long = (
    missing_mask.reset_index()
    .melt(id_vars="index", var_name="column", value_name="is_missing")
)

missing_long["is_missing"] = missing_long["is_missing"].astype(int)

missing_long.hvplot.heatmap(
    x="column", y="index", C="is_missing",
    cmap="Reds", colorbar=False,
    width=900, height=800,
    title="Heatmap of Missing Values (Sample)"
).opts(xrotation=45)

                                                                                

In [40]:
import hvplot.pandas

df_pd = df.sample(fraction=1.00, seed=42).toPandas()

df_pd["EMPLOYMENT_TYPE_NAME"].value_counts().hvplot.bar(
    title="Employment Type Frequency", ylabel="Count", xlabel="Employment Type"
)

                                                                                

In [41]:
from pyspark.sql.functions import countDistinct

df.select([
    countDistinct(c).alias(c+"_nunique")
    for c in df.columns
]).show(truncate=False)

[Stage 149:>                                                        (0 + 1) / 1]

+--------------+----------------------------+----------------+---------------------------+---------------------+------------------+------------------------+----------------------------+--------------------------+
|Salary_nunique|MIN_YEARS_EXPERIENCE_nunique|DURATION_nunique|COMPANY_IS_STAFFING_nunique|IS_INTERNSHIP_nunique|STATE_NAME_nunique|REMOTE_TYPE_NAME_nunique|EMPLOYMENT_TYPE_NAME_nunique|MIN_EDULEVELS_NAME_nunique|
+--------------+----------------------------+----------------+---------------------------+---------------------+------------------+------------------------+----------------------------+--------------------------+
|6052          |16                          |60              |2                          |2                    |51                |4                       |3                           |6                         |
+--------------+----------------------------+----------------+---------------------------+---------------------+------------------+-----------------

                                                                                

In [42]:
categorical_cols =[
    "STATE_NAME", "REMOTE_TYPE_NAME", "EMPLOYMENT_TYPE_NAME",
    "MIN_EDULEVELS_NAME",
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP"
]

for colname in categorical_cols:
    print(f"\n----{colname} ----")
    df.select(colname).distinct().show(50, truncate=False)


----STATE_NAME ----


                                                                                

+---------------------------------------+
|STATE_NAME                             |
+---------------------------------------+
|Utah                                   |
|Hawaii                                 |
|Minnesota                              |
|Ohio                                   |
|Arkansas                               |
|Oregon                                 |
|Texas                                  |
|North Dakota                           |
|Pennsylvania                           |
|Connecticut                            |
|Nebraska                               |
|Vermont                                |
|Nevada                                 |
|Washington                             |
|Illinois                               |
|Oklahoma                               |
|Delaware                               |
|Alaska                                 |
|New Mexico                             |
|West Virginia                          |
|Missouri                         

                                                                                

+----------------+
|REMOTE_TYPE_NAME|
+----------------+
|Remote          |
|[None]          |
|Not Remote      |
|Hybrid Remote   |
|NULL            |
+----------------+


----EMPLOYMENT_TYPE_NAME ----


                                                                                

+------------------------+
|EMPLOYMENT_TYPE_NAME    |
+------------------------+
|Part-time / full-time   |
|Part-time (â‰¤ 32 hours)|
|Full-time (> 32 hours)  |
|NULL                    |
+------------------------+


----MIN_EDULEVELS_NAME ----


                                                                                

+----------------------------+
|MIN_EDULEVELS_NAME          |
+----------------------------+
|Bachelor's degree           |
|Ph.D. or professional degree|
|High school or GED          |
|Master's degree             |
|No Education Listed         |
|Associate degree            |
|NULL                        |
+----------------------------+


----COMPANY_IS_STAFFING ----


                                                                                

+-------------------+
|COMPANY_IS_STAFFING|
+-------------------+
|true               |
|false              |
|NULL               |
+-------------------+


----IS_INTERNSHIP ----


[Stage 170:>                                                        (0 + 1) / 1]

+-------------+
|IS_INTERNSHIP|
+-------------+
|true         |
|false        |
|NULL         |
+-------------+



                                                                                

In [43]:
# for remote_type_name replace remote with remote, [none] with undefined, not remote with on premise, hybrid remote with hybrid, and null with undefined

df = df.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "REMOTE", "REMOTE")
    .when(col("REMOTE_TYPE_NAME") == "[None]", "Undefined")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On Premise")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when(col("REMOTE_TYPE_NAME").isNull(), "On Premise")
    .otherwise(col("REMOTE_TYPE_NAME"))
)

categorical_cols =[
    "REMOTE_TYPE_NAME"
]

for colname in categorical_cols:
    print(f"\n----{colname} ----")
    df.select(colname).distinct().show(50, truncate=False)


----REMOTE_TYPE_NAME ----


[Stage 173:>                                                        (0 + 1) / 1]

+----------------+
|REMOTE_TYPE_NAME|
+----------------+
|Remote          |
|On Premise      |
|Hybrid          |
|Undefined       |
+----------------+



                                                                                

In [44]:
# Change EMPLOYMENT_TYPE_NAME

df = df.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME") == "Part-time / full-time", "Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Part-time (â‰¤ 32 hours)", "Part-time")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Full-time (> 32 hours)", "Full-time")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Full-time")
    .otherwise(col("EMPLOYMENT_TYPE_NAME"))
)
categorical_cols =[
    "EMPLOYMENT_TYPE_NAME"
]

for colname in categorical_cols:
    print(f"\n----{colname} ----")
    df.select(colname).distinct().show(50, truncate=False)


----EMPLOYMENT_TYPE_NAME ----


[Stage 176:>                                                        (0 + 1) / 1]

+--------------------+
|EMPLOYMENT_TYPE_NAME|
+--------------------+
|Part-time           |
|Flexible            |
|Full-time           |
+--------------------+



                                                                                

In [45]:
# Min EDU LEVELS - Getting rid of nul to make it none

df = df.withColumn(
    "MIN_EDULEVELS_NAME",
    when(col("MIN_EDULEVELS_NAME") == "Bachelor's degree", "Bachelor's Degree")
    .when(col("MIN_EDULEVELS_NAME") == "Ph.D. or professional degree", "Ph.D. or Professional Degree")
    .when(col("MIN_EDULEVELS_NAME") == "High School or GED", "High School or GED")
    .when(col("MIN_EDULEVELS_NAME") == "Master's Degree", "Master's Degree")
    .when(col("MIN_EDULEVELS_NAME") == "No Education Listed", "None")
    .when(col("MIN_EDULEVELS_NAME") == "Associate Degree", "Associate Degree")
    .when(col("MIN_EDULEVELS_NAME").isNull(), "None")
    .otherwise(col("MIN_EDULEVELS_NAME"))
)

categorical_cols =[
    "MIN_EDULEVELS_NAME"
]

for colname in categorical_cols:
    print(f"\n----{colname} ----")
    df.select(colname).distinct().show(50, truncate=False)


----MIN_EDULEVELS_NAME ----


[Stage 179:>                                                        (0 + 1) / 1]

+----------------------------+
|MIN_EDULEVELS_NAME          |
+----------------------------+
|None                        |
|Bachelor's Degree           |
|High school or GED          |
|Master's degree             |
|Associate degree            |
|Ph.D. or Professional Degree|
+----------------------------+



                                                                                

In [46]:
df = df.withColumn(
    "COMPANY_IS_STAFFING",
    when(col("COMPANY_IS_STAFFING").isNull(), False)
    .otherwise(col("COMPANY_IS_STAFFING"))
)

df = df.withColumn(
    "IS_INTERNSHIP",
    when(col("IS_INTERNSHIP").isNull(), False)
    .otherwise(col("IS_INTERNSHIP"))
)

categorical_cols =[
    "COMPANY_IS_STAFFING", "IS_INTERNSHIP"
]

for colname in categorical_cols:
    print(f"\n----{colname} ----")
    df.select(colname).distinct().show(50, truncate=False)



----COMPANY_IS_STAFFING ----


                                                                                

+-------------------+
|COMPANY_IS_STAFFING|
+-------------------+
|true               |
|false              |
+-------------------+


----IS_INTERNSHIP ----


[Stage 185:>                                                        (0 + 1) / 1]

+-------------+
|IS_INTERNSHIP|
+-------------+
|true         |
|false        |
+-------------+



                                                                                

In [47]:
from pyspark.sql.functions import col, pow
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Dropping rows with NA values in relevant columns
df = df.dropna(subset=[
    "SALARY", "MIN_YEARS_EXPERIENCE","EMPLOYMENT_TYPE_NAME","REMOTE_TYPE_NAME",
    "DURATION", "IS_INTERNSHIP","COMPANY_IS_STAFFING", "STATE_NAME", "MIN_EDULEVELS_NAME"
])

#Categorical columns
categorical_cols = ["EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME","STATE_NAME"]

#Index and One-Hot encode
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid='skip') for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") for col in categorical_cols]

# Assemble base features (for GLR and Random Forrest)

assembler = VectorAssembler(
    inputCols=[
        "MIN_YEARS_EXPERIENCE", "DURATION",
        "IS_INTERNSHIP", "COMPANY_IS_STAFFING"
    ] + [f"{col}_vec" for col in categorical_cols],
    outputCol="features"
)

# Build pipeline and transform
pipeline = Pipeline(stages=indexers + encoders + [assembler])
data = pipeline.fit(df).transform(df)

# Create squared term for Polynomical Regression
data = data.withColumn("MIN_YEARS_EXPERIENCE_SQ", pow(col("MIN_YEARS_EXPERIENCE"),2))

#Assemble polynomial features
assembler_poly = VectorAssembler(
    inputCols=[
        "MIN_YEARS_EXPERIENCE", "DURATION",
        "IS_INTERNSHIP", "COMPANY_IS_STAFFING"
    ] + [f"{col}_vec" for col in categorical_cols],
    outputCol="features_poly"
)

data = assembler_poly.transform(data)

data.select("SALARY","features", "features_poly").show(5, truncate=False)


                                                                                

+------+-----------------------------------------+-----------------------------------------+
|SALARY|features                                 |features_poly                            |
+------+-----------------------------------------+-----------------------------------------+
|92962 |(59,[0,1,4,6,11],[2.0,18.0,1.0,1.0,1.0]) |(59,[0,1,4,6,11],[2.0,18.0,1.0,1.0,1.0]) |
|107645|(59,[0,1,4,9],[10.0,18.0,1.0,1.0])       |(59,[0,1,4,9],[10.0,18.0,1.0,1.0])       |
|192800|(59,[0,1,4,6,24],[6.0,55.0,1.0,1.0,1.0]) |(59,[0,1,4,6,24],[6.0,55.0,1.0,1.0,1.0]) |
|125900|(59,[0,1,4,6,22],[12.0,18.0,1.0,1.0,1.0])|(59,[0,1,4,6,22],[12.0,18.0,1.0,1.0,1.0])|
|170000|(59,[0,1,4,6,9],[6.0,18.0,1.0,1.0,1.0])  |(59,[0,1,4,6,9],[6.0,18.0,1.0,1.0,1.0])  |
+------+-----------------------------------------+-----------------------------------------+
only showing top 5 rows



In [51]:
pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(df).transform(df)
indexed_df.select("SALARY").show()

                                                                                

+------+
|SALARY|
+------+
| 92962|
|107645|
|192800|
|125900|
|170000|
|118560|
| 79000|
|192800|
| 75026|
|116500|
|166500|
|156038|
|149695|
|161200|
|110000|
|192800|
|149695|
|188600|
|192800|
|149695|
+------+
only showing top 20 rows

