In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("./data/lightcast_job_postings.csv")

# Show Schema and Sample Data
print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
df.show(5)
print((df.count(), len(df.columns)))


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/02 23:48:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

---This is Diagnostic check, No need to print it in the final doc---


25/10/02 23:48:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+------

[Stage 3:>                                                          (0 + 1) / 1]

(72498, 131)


                                                                                

# Feature Engineering

In [None]:
#|eval: true
#|echo: false
#| fig-align: center


from pyspark.sql.functions import col, pow, when
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType, BooleanType, StringType



# Drop rows with NA values in relevant columns
df_feature_eng = df.dropna(subset=["SALARY", "MIN_YEARS_EXPERIENCE", "STATE_NAME", "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME", "MIN_EDULEVELS_NAME", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"])




eda_cols = ["SALARY", "MIN_YEARS_EXPERIENCE", "DURATION", "COMPANY_IS_STAFFING", "IS_INTERNSHIP", "STATE_NAME", "REMOTE_TYPE_NAME", "EMPLOYMENT_TYPE_NAME", "MIN_EDULEVELS_NAME"]


df_feature_eng = df_feature_eng.select(eda_cols)


#Clean categorical variable REMOTE_TYPE_NAME
df_feature_eng = df_feature_eng.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "[None]", "Undefined")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On Premise")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when(col("REMOTE_TYPE_NAME").isNull(), "On Premise")
    .otherwise(col("REMOTE_TYPE_NAME"))
)



#Clean categorical variable EMPLOYMENT_TYPE_NAME
df_feature_eng = df_feature_eng.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME") == "Part-time / full-time", "Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Part-time (â‰¤ 32 hours)", "Parttime")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Full-time (> 32 hours)", "Fulltime")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Fulltime")
    .otherwise(col("EMPLOYMENT_TYPE_NAME"))
)


# Categorical columns
categorical_cols = ["EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME"]

# continuous columns
continuous_cols = ["MIN_YEARS_EXPERIENCE", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"]

# Setup pipeline stages for indexing and encoding categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_Idx", handleInvalid="skip") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_Idx", outputCol=f"{col}_vec") for col in categorical_cols]



----EMPLOYMENT_TYPE_NAME----


25/10/03 00:10:55 ERROR CodeGenerator: Failed to compile the generated Java code.
org.codehaus.commons.compiler.InternalCompilerException: Compiling "GeneratedClass" in File 'generated.java', Line 1, Column 1: File 'generated.java', Line 414, Column 14: Compiling "hashAgg_doAggregateWithKeys_0()"
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:402)
	at org.codehaus.janino.UnitCompiler.access$000(UnitCompiler.java:236)
	at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:363)
	at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:361)
	at org.codehaus.janino.Java$CompilationUnit.accept(Java.java:371)
	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:264)
	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:294)
	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:288)
	at org.codehaus.janino.ClassBodyE

+--------------------+
|EMPLOYMENT_TYPE_NAME|
+--------------------+
|Flexible            |
|Fulltime            |
|Parttime            |
+--------------------+


----REMOTE_TYPE_NAME----


25/10/03 00:11:03 ERROR CodeGenerator: Failed to compile the generated Java code.
org.codehaus.commons.compiler.InternalCompilerException: Compiling "GeneratedClass" in File 'generated.java', Line 1, Column 1: File 'generated.java', Line 495, Column 14: Compiling "hashAgg_doAggregateWithKeys_0()"
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:402)
	at org.codehaus.janino.UnitCompiler.access$000(UnitCompiler.java:236)
	at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:363)
	at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:361)
	at org.codehaus.janino.Java$CompilationUnit.accept(Java.java:371)
	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:264)
	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:294)
	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:288)
	at org.codehaus.janino.ClassBodyE

+----------------+
|REMOTE_TYPE_NAME|
+----------------+
|Remote          |
|On Premise      |
|Hybrid          |
|Undefined       |
+----------------+



                                                                                

In [30]:
assembler = VectorAssembler(
inputCols=["MIN_YEARS_EXPERIENCE", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"] + [f"{col}_vec" for col in categorical_cols], outputCol="features"
)


# Build Pipeline and Transform
pipeline = Pipeline(stages=indexers + encoders + [assembler])
data = pipeline.fit(df_feature_eng).transform(df_feature_eng)


# Create squared term for Polynomial Regression
data = data.withColumn("MIN_YEARS_EXPERIENCE_SQ", pow(col("MIN_YEARS_EXPERIENCE"), 2))

# Assemble Polynomial Features
assembler_poly = VectorAssembler(
inputCols=[ "MIN_YEARS_EXPERIENCE", "MIN_YEARS_EXPERIENCE_SQ", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"] + [f"{col}_vec" for col in categorical_cols], outputCol="features_poly"
)
data = assembler_poly.transform(data)


# Split Data
regression_train, regression_test = data.randomSplit([0.8, 0.2], seed=42)


# Show final structure
data.select("SALARY", "features", "features_poly").show(5, truncate=False)

25/10/03 00:45:31 ERROR CodeGenerator: Failed to compile the generated Java code.
org.codehaus.commons.compiler.InternalCompilerException: Compiling "GeneratedClass" in File 'generated.java', Line 1, Column 1: File 'generated.java', Line 430, Column 14: Compiling "hashAgg_doAggregateWithKeys_0()"
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:402)
	at org.codehaus.janino.UnitCompiler.access$000(UnitCompiler.java:236)
	at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:363)
	at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:361)
	at org.codehaus.janino.Java$CompilationUnit.accept(Java.java:371)
	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:264)
	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:294)
	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:288)
	at org.codehaus.janino.ClassBodyE

+------+--------------------------------------+-------------------------------------------+
|SALARY|features                              |features_poly                              |
+------+--------------------------------------+-------------------------------------------+
|192800|(9,[0,1,4,6],[6.0,55.0,1.0,1.0])      |(10,[0,1,2,5,7],[6.0,36.0,55.0,1.0,1.0])   |
|125900|(9,[0,1,4,6],[12.0,18.0,1.0,1.0])     |(10,[0,1,2,5,7],[12.0,144.0,18.0,1.0,1.0]) |
|118560|[5.0,20.0,0.0,1.0,1.0,0.0,0.0,1.0,0.0]|[5.0,25.0,20.0,0.0,1.0,1.0,0.0,0.0,1.0,0.0]|
|192800|(9,[0,1,4,6],[6.0,55.0,1.0,1.0])      |(10,[0,1,2,5,7],[6.0,36.0,55.0,1.0,1.0])   |
|116500|(9,[0,1,4,6],[12.0,16.0,1.0,1.0])     |(10,[0,1,2,5,7],[12.0,144.0,16.0,1.0,1.0]) |
+------+--------------------------------------+-------------------------------------------+
only showing top 5 rows


# Missing Value Treatment

1. Replace the missing values in Salary by Median of Salary based on the EMPLOYMENT_TYPE or EMPLOYMENT_TYPE_NAME. If both are missing, then use the overall median of Salary to replace it.



In [60]:
# Missing Value Treatment
from pyspark.sql.functions import col, when, isnan, count

# 1. Replace the missing values in Salary by Median of Salary based on the EMPLOYMENT_TYPE or EMPLOYMENT_TYPE_NAME. If both are missing, then use the overall median of Salary to replace it.

from pyspark.sql import Window
from pyspark.sql.functions import col, when, isnan, count, expr, median
from pyspark.sql import functions as F

# Calculate overall median salary
overall_median_salarly = df.approxQuantile("SALARY", [0.5], 0.01)[0]

median_by_employment_type = df.groupBy("EMPLOYMENT_TYPE").agg(expr("percentile_approx(SALARY, 0.5)").alias("median_salary_emp_type"))
median_by_employment_type_name = df.groupBy("EMPLOYMENT_TYPE_NAME").agg(expr("percentile_approx(SALARY, 0.5)").alias("median_salary_emp_type_name"))

# Join median values back to the original dataframe
df_salary_imputed = df.join(median_by_employment_type, on="EMPLOYMENT_TYPE", how = "left").join(median_by_employment_type_name, on="EMPLOYMENT_TYPE_NAME", how = "left")


# Replace missing SALARY values
df_salary_imputed=df_salary_imputed.withColumn("SALARY", when(col("SALARY").isNull(), 
                                when (col("median_salary_emp_type").isNotNull(), col("median_salary_emp_type"))
                                .when(col("median_salary_emp_type_name").isNotNull(), col("median_salary_emp_type_name"))
                                .otherwise(overall_median_salarly)
).otherwise(col("SALARY"))) 


df_salary_imputed.show(5)

                                                                                

+--------------------+---------------+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+--------------------+--------------------+-------------+--------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+----

In [61]:
#|eval: true
#|echo: false
#| fig-align: center

from pyspark.sql.functions import col, pow
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType, BooleanType, StringType

# Drop Rows from Relevant Columns
regression_df = df_salary_imputed.dropna(subset=["SALARY", "MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE", "EDUCATION_LEVELS_NAME", "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING",  "median_salary_emp_type", "median_salary_emp_type_name"
    ]).select("SALARY", "MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE", "EDUCATION_LEVELS_NAME", "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING", "median_salary_emp_type", "median_salary_emp_type_name")

# Categorical Columns
categorical_cols = ["EDUCATION_LEVELS_NAME", "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"  
    
    
]

regression_df = regression_df.withColumn("IS_INTERNSHIP", col("IS_INTERNSHIP").cast(IntegerType()))
regression_df = regression_df.withColumn("COMPANY_IS_STAFFING", col("COMPANY_IS_STAFFING").cast(IntegerType()))
regression_df.show(5, truncate=False)



[Stage 731:>                (0 + 1) / 1][Stage 732:>                (0 + 1) / 1]

+--------+--------------------+--------------------+-----------------------------+----------------------+----------------+--------+-------------+-------------------+----------------------+---------------------------+
|SALARY  |MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|EDUCATION_LEVELS_NAME        |EMPLOYMENT_TYPE_NAME  |REMOTE_TYPE_NAME|DURATION|IS_INTERNSHIP|COMPANY_IS_STAFFING|median_salary_emp_type|median_salary_emp_type_name|
+--------+--------------------+--------------------+-----------------------------+----------------------+----------------+--------+-------------+-------------------+----------------------+---------------------------+
|116500.0|2                   |2                   |[\n  "Bachelor's degree"\n]  |Full-time (> 32 hours)|[None]          |6       |0            |0                  |116500                |116500                     |
|116500.0|7                   |7                   |[\n  "No Education Listed"\n]|Full-time (> 32 hours)|[None]          |18      |0

                                                                                

In [62]:
# Clean Education Levels by cleaning \n and array brackets
from pyspark.sql.functions import regexp_replace, trim
regression_df = regression_df.withColumn("EDUCATION_LEVELS_NAME", trim(regexp_replace(col("EDUCATION_LEVELS_NAME"), r"[\[\]\n']", "")))


# Index and One-Hot Encode
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="skip") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_vec") for col in categorical_cols]

#Assemble base features
assembler = VectorAssembler(inputCols=["MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE", "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"] + [f"{col}_vec" for col in categorical_cols], outputCol="features")


pipeline = Pipeline(stages=indexers + encoders + [assembler])
regression_data =  pipeline.fit(regression_df).transform(regression_df)
regression_data.select("SALARY","features").show(5, truncate=False)
# regression_df.show(5, truncate=False)

                                                                                

+--------+--------------------------------------------------------------------+
|SALARY  |features                                                            |
+--------+--------------------------------------------------------------------+
|116500.0|(89,[0,1,2,5,23,25,40,87,88],[2.0,2.0,6.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|116500.0|(89,[0,1,2,4,6,23,25,52,87],[7.0,7.0,18.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|116500.0|(89,[0,1,2,4,6,23,25,31,87],[1.0,1.0,8.0,1.0,1.0,1.0,1.0,1.0,1.0])  |
|116500.0|(89,[0,1,2,5,23,25,39,87,88],[1.0,1.0,32.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|131100.0|(89,[0,1,2,5,23,25,28,87,88],[2.0,2.0,11.0,1.0,1.0,1.0,1.0,1.0,1.0])|
+--------+--------------------------------------------------------------------+
only showing top 5 rows


Train and Split

In [63]:
#| eval: true
#| echo: false
#| fig-align: center

# Split Data
regression_train, regression_test = regression_data.randomSplit([0.8, 0.2], seed=42)
print((regression_data.count(), len(regression_data.columns)))
print((regression_train.count(), len(regression_test.columns)))
print((regression_test.count(), len(regression_test.columns)))


                                                                                

(5039, 24)


                                                                                

(4070, 24)


[Stage 853:>                                                        (0 + 1) / 1]

(969, 24)


                                                                                