---
title: Assignment 04
author:
  - name: Bhargavi Manyala
    affiliations:
      - id: bu
        name: Boston University
        city: Boston
        state: MA
number-sections: true
date: today
date-modified: today
date-format: long
format:
  html:
    theme: cerulean
    toc: true
    toc-depth: 2
engine: jupyter
jupyter: assignment-04-kernel
execute:
  echo: true
  eval: true
  output: true
  freeze: auto
---



 # 1. Load the Dataset

In [50]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np


np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("multiLine", "true")
    .option("escape", "\"")  
    .csv("data/lightcast_job_postings.csv")

)

df.createOrReplaceTempView("job_postings")
#df.show(5)


                                                                                

### Missing Value Treatment

In [51]:
# Missing Value Treatment
from pyspark.sql import Window
from pyspark.sql.functions import col, when, isnan, count, expr, median
from pyspark.sql import functions as F

# Calculate overall median salary
overall_median_salarly = df.approxQuantile("SALARY", [0.5], 0.01)[0]

median_by_employment_type = df.groupBy("EMPLOYMENT_TYPE").agg(expr("percentile_approx(SALARY, 0.5)").alias("median_salary_emp_type"))
median_by_employment_type_name = df.groupBy("EMPLOYMENT_TYPE_NAME").agg(expr("percentile_approx(SALARY, 0.5)").alias("median_salary_emp_type_name"))

# Join median values back to the original dataframe
df_salary_imputed = df.join(median_by_employment_type, on="EMPLOYMENT_TYPE", how = "left").join(median_by_employment_type_name, on="EMPLOYMENT_TYPE_NAME", how = "left")


# Replace missing SALARY values
df_salary_imputed=df_salary_imputed.withColumn("SALARY", when(col("SALARY").isNull(), 
                                when (col("median_salary_emp_type").isNotNull(), col("median_salary_emp_type"))
                                .when(col("median_salary_emp_type_name").isNotNull(), col("median_salary_emp_type_name"))
                                .otherwise(overall_median_salarly)
).otherwise(col("SALARY"))) 




                                                                                

# 2. Feature Engineering

### Take columns needed for anlaysis

In [52]:

from pyspark.sql.functions import col, pow
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import BooleanType, StringType, IntegerType
from pyspark.sql.functions import regexp_replace, trim

# Drop rows with NA values 
regression_df = df_salary_imputed.dropna(subset=[
    "SALARY", "MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE",
    "EDUCATION_LEVELS_NAME", "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME",
    "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING","median_salary_emp_type", "median_salary_emp_type_name"
]).select(
    "SALARY", "MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE",
    "EDUCATION_LEVELS_NAME", "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME",
    "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING","median_salary_emp_type", "median_salary_emp_type_name"
   
)

# Cast Duration to integer
regression_df = regression_df.withColumn("DURATION", col("DURATION").cast(IntegerType()))



### Clean categorical columns 


In [53]:
# Categorical columns
categorical_cols = [ "EDUCATION_LEVELS_NAME", "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"]

# Cast boolean columns to integer
regression_df = regression_df.withColumn("IS_INTERNSHIP", col("IS_INTERNSHIP").cast(IntegerType()))
regression_df = regression_df.withColumn("COMPANY_IS_STAFFING", col("COMPANY_IS_STAFFING").cast(IntegerType()))


# Clean Remote Type Name
regression_df = regression_df.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "[None]", "Undefined")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On Premise")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when(col("REMOTE_TYPE_NAME").isNull(), "On Premise")
    .otherwise(col("REMOTE_TYPE_NAME"))
)

# Clean Employment Type Name
regression_df = regression_df.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME") == "Part-time / full-time", "Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Part-time (â‰¤ 32 hours)", "Parttime")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Full-time (> 32 hours)", "Fulltime")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Fulltime")
    .otherwise(col("EMPLOYMENT_TYPE_NAME"))
)

# Clean Education Levels
regression_df = regression_df.withColumn(
    "EDUCATION_LEVELS_NAME",
    trim(regexp_replace(col("EDUCATION_LEVELS_NAME"), r"[\[\]\n]", ""))
)


regression_df.show(5, truncate=False)



                                                                                

+--------+--------------------+--------------------+---------------------+--------------------+----------------+--------+-------------+-------------------+----------------------+---------------------------+
|SALARY  |MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|EDUCATION_LEVELS_NAME|EMPLOYMENT_TYPE_NAME|REMOTE_TYPE_NAME|DURATION|IS_INTERNSHIP|COMPANY_IS_STAFFING|median_salary_emp_type|median_salary_emp_type_name|
+--------+--------------------+--------------------+---------------------+--------------------+----------------+--------+-------------+-------------------+----------------------+---------------------------+
|116500.0|2                   |2                   |"Bachelor's degree"  |Fulltime            |Undefined       |6       |0            |0                  |116500                |116500                     |
|116500.0|7                   |7                   |"No Education Listed"|Fulltime            |Undefined       |18      |0            |1                  |116500           

### Final Features Structure

In [54]:
# Index and One-Hot Encode
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="skip") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") for col in categorical_cols]

#Assemble base features
assembler = VectorAssembler(inputCols=["MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"] + [f"{col}_vec" for col in categorical_cols], outputCol="features")


pipeline = Pipeline(stages=indexers + encoders + [assembler])
regression_data =  pipeline.fit(regression_df).transform(regression_df)
regression_data.select("SALARY","features").show(5, truncate=False)

                                                                                

+--------+-------------------------------------------------------------+
|SALARY  |features                                                     |
+--------+-------------------------------------------------------------+
|116500.0|(30,[0,1,2,5,23,25,28,29],[2.0,2.0,6.0,1.0,1.0,1.0,1.0,1.0]) |
|116500.0|(30,[0,1,2,4,6,23,25,28],[7.0,7.0,18.0,1.0,1.0,1.0,1.0,1.0]) |
|116500.0|(30,[0,1,2,4,6,23,25,28],[1.0,1.0,8.0,1.0,1.0,1.0,1.0,1.0])  |
|116500.0|(30,[0,1,2,5,23,25,28,29],[1.0,1.0,32.0,1.0,1.0,1.0,1.0,1.0])|
|131100.0|(30,[0,1,2,5,23,25,28,29],[2.0,2.0,11.0,1.0,1.0,1.0,1.0,1.0])|
+--------+-------------------------------------------------------------+
only showing top 5 rows



# 3. Train/Test Split

In [None]:
# Split Data
regression_train, regression_test = regression_data.randomSplit([0.8, 0.2], seed=42)

print((regression_data.count(), len(regression_data.columns)))
print((regression_train.count(), len(regression_train.columns)))
print((regression_test.count(), len(regression_test.columns)))

                                                                                

(5039, 22)


                                                                                

(4070, 22)


[Stage 912:>                (0 + 1) / 1][Stage 913:>                (0 + 1) / 1]