---
title: Assignment 05
author:
  - name: Zhengyu Zhou
    affiliations:
      - id: bu
        name: Boston University
        city: Boston
        state: MA
number-sections: true
date: today
format:
  docx:
    fontsize: 12pt
date-modified: today
date-format: long
jupyter: python3
execute:
  echo: false
  eval: true
  freeze: auto
---

In [None]:
#1
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")

# Show Schema and Sample Data
print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
df.show(5)

                                                                                

---This is Diagnostic check, No need to print it in the final doc---


25/11/16 04:29:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+------

In [None]:
#2
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# target
target = "SALARY"

# continuous_features
continuous_features = [
    "MIN_YEARS_EXPERIENCE",
    "MAX_YEARS_EXPERIENCE",
    "DURATION",
    "SALARY_FROM"
]

# categorical_features
categorical_features = [
    "STATE_NAME",
    "EDUCATION_LEVELS_NAME"
]

# key columns
key_cols = [target] + continuous_features + categorical_features

# only select the key columns
df_model = df.select(*key_cols)

print("Row count before dropna:", df_model.count())

# drop rows with nulls in key columns
df_model = df_model.dropna(subset=key_cols)

print("Row count after dropna :", df_model.count())
df_model.show(5)

                                                                                

Row count before dropna: 72498


                                                                                

Row count after dropna : 2243
+------+--------------------+--------------------+--------+-----------+----------+---------------------+
|SALARY|MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|DURATION|SALARY_FROM|STATE_NAME|EDUCATION_LEVELS_NAME|
+------+--------------------+--------------------+--------+-----------+----------+---------------------+
|131100|                   2|                   2|      11|     113400|   Arizona| [\n  "Bachelor's ...|
|136950|                   3|                   3|      28|     115300| Minnesota| [\n  "Bachelor's ...|
|136950|                   3|                   3|      28|     115300|   Georgia| [\n  "Bachelor's ...|
|104000|                   3|                   3|       8|     104000|     Texas| [\n  "Bachelor's ...|
| 80000|                   3|                   3|      37|      60000|  Oklahoma| [\n  "Bachelor's ...|
+------+--------------------+--------------------+--------+-----------+----------+---------------------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# for each categorical column, create a StringIndexer
indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=f"{c}_idx",
        handleInvalid="keep"   # avoid errors for unseen labels
    )
    for c in categorical_features
]

# for each indexed categorical column, create a OneHotEncoder
encoders = [
    OneHotEncoder(
        inputCol=f"{c}_idx",
        outputCol=f"{c}_oh"
    )
    for c in categorical_features
]

# assemble all features into a single vector column
feature_cols = continuous_features + [f"{c}_oh" for c in categorical_features]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# fit and transform the data
fe_model = pipeline.fit(df_model)
df_fe = fe_model.transform(df_model)

df_fe.select(target, *continuous_features, *[f"{c}_oh" for c in categorical_features], "features") \
     .show(5, truncate=False)

                                                                                

+------+--------------------+--------------------+--------+-----------+---------------+------------------------+----------------------------------------------------+
|SALARY|MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|DURATION|SALARY_FROM|STATE_NAME_oh  |EDUCATION_LEVELS_NAME_oh|features                                            |
+------+--------------------+--------------------+--------+-----------+---------------+------------------------+----------------------------------------------------+
|131100|2                   |2                   |11      |113400     |(51,[14],[1.0])|(18,[0],[1.0])          |(73,[0,1,2,3,18,55],[2.0,2.0,11.0,113400.0,1.0,1.0])|
|136950|3                   |3                   |28      |115300     |(51,[21],[1.0])|(18,[4],[1.0])          |(73,[0,1,2,3,25,59],[3.0,3.0,28.0,115300.0,1.0,1.0])|
|136950|3                   |3                   |28      |115300     |(51,[15],[1.0])|(18,[4],[1.0])          |(73,[0,1,2,3,19,59],[3.0,3.0,28.0,115300.0,1.0,1.0])|
|104

In [None]:
# train-test split
train_df, test_df = df_fe.randomSplit([0.8, 0.2], seed=688)
print(f"Train rows: {train_df.count()}, Test rows: {test_df.count()}")

In [6]:
# create polynomial feature for MIN_YEARS_EXPERIENCE
from pyspark.sql.functions import pow

df_poly = df_fe.withColumn(
    "MIN_YEARS_EXPERIENCE_SQ",
    pow(col("MIN_YEARS_EXPERIENCE"), 2)
)

In [7]:
#VectorAssembler for polynomial feature
poly_cols = ["MIN_YEARS_EXPERIENCE", "MIN_YEARS_EXPERIENCE_SQ"]

poly_assembler = VectorAssembler(
    inputCols=poly_cols,
    outputCol="features_poly"
)

df_poly = poly_assembler.transform(df_poly)

In [8]:
# show the new polynomial feature
df_poly.select(
    target,
    "MIN_YEARS_EXPERIENCE",
    "MIN_YEARS_EXPERIENCE_SQ",
    "features",
    "features_poly"
).printSchema()

# 看前几行实际长什么样
df_poly.select(
    target,
    "MIN_YEARS_EXPERIENCE",
    "MIN_YEARS_EXPERIENCE_SQ",
    "features",
    "features_poly"
).show(5, truncate=False)

root
 |-- SALARY: integer (nullable = true)
 |-- MIN_YEARS_EXPERIENCE: integer (nullable = true)
 |-- MIN_YEARS_EXPERIENCE_SQ: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- features_poly: vector (nullable = true)

+------+--------------------+-----------------------+----------------------------------------------------+-------------+
|SALARY|MIN_YEARS_EXPERIENCE|MIN_YEARS_EXPERIENCE_SQ|features                                            |features_poly|
+------+--------------------+-----------------------+----------------------------------------------------+-------------+
|131100|2                   |4.0                    |(73,[0,1,2,3,18,55],[2.0,2.0,11.0,113400.0,1.0,1.0])|[2.0,4.0]    |
|136950|3                   |9.0                    |(73,[0,1,2,3,25,59],[3.0,3.0,28.0,115300.0,1.0,1.0])|[3.0,9.0]    |
|136950|3                   |9.0                    |(73,[0,1,2,3,19,59],[3.0,3.0,28.0,115300.0,1.0,1.0])|[3.0,9.0]    |
|104000|3                   |9.0    

We created a new column MIN_YEARS_EXPERIENCE_SQ by squaring MIN_YEARS_EXPERIENCE, and then assembled [MIN_YEARS_EXPERIENCE, MIN_YEARS_EXPERIENCE_SQ] into a separate vector column features_poly.
This polynomial representation allows the model to capture a non-linear relationship between years of experience and salary (for example, fast growth at low experience and diminishing returns at higher experience levels), instead of assuming a strictly linear effect.

In [9]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

df_poly = df_poly.withColumn("SALARY", col("SALARY").cast(DoubleType()))

In [11]:
#3 Train/Test Split

train_df, test_df = df_poly.randomSplit([0.8, 0.2], seed=688)

print("Train rows:", train_df.count())
print("Test rows :", test_df.count())

                                                                                

Train rows: 1821


[Stage 21:>                                                         (0 + 1) / 1]

Test rows : 422


                                                                                

In [None]:
#4
