---
title: Assignment 04
author:
  - name: Ava Godsy
    affiliations:
      - id: bu
        name: Boston University
        city: Boston
        state: MA
number-sections: true
date: today
date-modified: today
date-format: long
format:
  html:
    theme: cerulean
    toc: true
    toc-depth: 2
  docx: default
  pdf: default
execute:
  echo: false
  eval: false
  freeze: auto
---

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
import numpy as np

np.random.seed(42)

pio.renderers.default = "notebook+notebook_connected+vscode"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("lightcast_job_postings.csv")

# Show Schema and Sample Data
# print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
# df.show(5)

                                                                                

---This is Diagnostic check, No need to print it in the final doc---


25/10/07 02:25:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+------

In [11]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, pow as spark_pow
from pyspark.ml.regression import LinearRegression

# Initialize Spark Session (if not already created)
spark = SparkSession.builder.appName("SalaryPrediction").getOrCreate()

# Assuming df is your existing DataFrame
# Step 1: Combine remote type values before cleaning
# Combine null, "[None]", and "Onsite" into "Onsite"
from pyspark.sql.functions import when, trim

df_processed = df.withColumn('REMOTE_TYPE_NAME',
    when((col('REMOTE_TYPE_NAME').isNull()) | 
         (trim(col('REMOTE_TYPE_NAME')) == '[None]') |
         (trim(col('REMOTE_TYPE_NAME')) == 'Not Remote' ) |
         (trim(col('REMOTE_TYPE_NAME')) == 'Onsite'), 'Onsite')
    .when((col('REMOTE_TYPE_NAME') == 'Hybrid Remote'), 'Hybrid')
    .otherwise(col('REMOTE_TYPE_NAME'))
)

print("=== REMOTE_TYPE_NAME VALUE COUNTS AFTER COMBINING ===")
df_processed.groupBy('REMOTE_TYPE_NAME').count().orderBy('count', ascending=False).show()

# Step 2: Drop rows with missing values in target and key features
selected_columns = ['MIN_YEARS_EXPERIENCE', 'MAX_YEARS_EXPERIENCE', 'SALARY_FROM', 
                   'MSA_NAME', 'REMOTE_TYPE_NAME', 'SALARY']

df_clean = df_processed.select(selected_columns).dropna()

print("Original DataFrame count:", df.count())
print("Cleaned DataFrame count:", df_clean.count())
print("\nCleaned DataFrame Schema:")
df_clean.printSchema()

# Step 2: Create squared feature for MIN_YEARS_EXPERIENCE
df_clean = df_clean.withColumn('MIN_YEARS_EXPERIENCE_SQ', 
                               spark_pow(col('MIN_YEARS_EXPERIENCE'), 2))

print("\nDataFrame with squared feature:")
df_clean.show(5)

# Step 3: Create Pipeline for encoding and feature assembly

# StringIndexer for categorical variables
msa_indexer = StringIndexer(inputCol='MSA_NAME', 
                            outputCol='MSA_NAME_INDEX',
                            handleInvalid='keep')

remote_indexer = StringIndexer(inputCol='REMOTE_TYPE_NAME', 
                               outputCol='REMOTE_TYPE_NAME_INDEX',
                               handleInvalid='keep')

# OneHotEncoder for categorical variables
msa_encoder = OneHotEncoder(inputCol='MSA_NAME_INDEX', 
                           outputCol='MSA_NAME_VEC',
                           dropLast=True)

remote_encoder = OneHotEncoder(inputCol='REMOTE_TYPE_NAME_INDEX', 
                              outputCol='REMOTE_TYPE_NAME_VEC',
                              dropLast=True)

# VectorAssembler for basic features
feature_cols = ['MIN_YEARS_EXPERIENCE', 'MAX_YEARS_EXPERIENCE', 'SALARY_FROM',
               'MSA_NAME_VEC', 'REMOTE_TYPE_NAME_VEC']

assembler = VectorAssembler(inputCols=feature_cols, 
                            outputCol='features',
                            handleInvalid='keep')

# VectorAssembler for polynomial features
poly_feature_cols = ['MIN_YEARS_EXPERIENCE', 'MIN_YEARS_EXPERIENCE_SQ', 
                    'MAX_YEARS_EXPERIENCE', 'SALARY_FROM',
                    'MSA_NAME_VEC', 'REMOTE_TYPE_NAME_VEC']

poly_assembler = VectorAssembler(inputCols=poly_feature_cols, 
                                outputCol='features_poly',
                                handleInvalid='keep')

# Create Pipeline
pipeline = Pipeline(stages=[
    msa_indexer,
    remote_indexer,
    msa_encoder,
    remote_encoder,
    assembler,
    poly_assembler
])

# Fit and transform the data
pipeline_model = pipeline.fit(df_clean)
df_transformed = pipeline_model.transform(df_clean)

print("\nTransformed DataFrame with all features:")
df_transformed.select('MIN_YEARS_EXPERIENCE', 'MIN_YEARS_EXPERIENCE_SQ', 
                     'MAX_YEARS_EXPERIENCE', 'SALARY_FROM', 
                     'MSA_NAME', 'REMOTE_TYPE_NAME', 'SALARY',
                     'features', 'features_poly').show(5, truncate=False)

# Step 4: Split data into training and testing sets
# Using 70-30 split for better evaluation capability
# Justification:
# - 70% training: Provides sufficient data for model to learn patterns
# - 30% testing: Larger test set gives more reliable performance metrics
# - Good balance for datasets with moderate size (thousands of records)
# - Seed=42 ensures reproducibility across runs
train_data, test_data = df_transformed.randomSplit([0.7, 0.3], seed=42)

print(f"\n=== DATA SPLIT SUMMARY ===")
print(f"Training set count: {train_data.count()} ({train_data.count()/df_transformed.count()*100:.1f}%)")
print(f"Testing set count: {test_data.count()} ({test_data.count()/df_transformed.count()*100:.1f}%)")
print(f"Random seed: 42 (for reproducibility)")
print("\nSplit Justification:")
print("• 70-30 split provides robust model evaluation")
print("• Larger test set (30%) improves confidence in performance metrics")
print("• Balanced approach for moderate-sized datasets")
print("• Alternative splits: 80-20 for large datasets, 60-40 for small datasets")

# Step 5: Show final structure
print("\n=== FINAL DATAFRAME STRUCTURE ===")
df_transformed.printSchema()

print("\n=== SAMPLE OF FINAL DATA ===")
df_transformed.select('MIN_YEARS_EXPERIENCE', 'MIN_YEARS_EXPERIENCE_SQ',
                     'MAX_YEARS_EXPERIENCE', 'SALARY_FROM',
                     'SALARY', 'features_poly').show(10)

# Display feature statistics
print("\n=== FEATURE STATISTICS ===")
df_transformed.select('MIN_YEARS_EXPERIENCE', 'MIN_YEARS_EXPERIENCE_SQ',
                     'MAX_YEARS_EXPERIENCE', 'SALARY_FROM', 
                     'SALARY').describe().show()

# Optional: Show unique values in categorical columns
print("\n=== CATEGORICAL VARIABLE COUNTS ===")
print("MSA_NAME unique values:", df_clean.select('MSA_NAME').distinct().count())
print("REMOTE_TYPE_NAME unique values:", df_clean.select('REMOTE_TYPE_NAME').distinct().count())

# Save transformed data for future use (optional)
# df_transformed.write.parquet("transformed_salary_data.parquet", mode='overwrite')

print("\n✓ Data preprocessing pipeline completed successfully!")
print("✓ Ready for model training with 'features_poly' as input and 'SALARY' as target")

=== REMOTE_TYPE_NAME VALUE COUNTS AFTER COMBINING ===


                                                                                

+----------------+-----+
|REMOTE_TYPE_NAME|count|
+----------------+-----+
|          Onsite|57741|
|          Remote|12497|
|          Hybrid| 2260|
+----------------+-----+



                                                                                

Original DataFrame count: 72498


                                                                                

Cleaned DataFrame count: 3596

Cleaned DataFrame Schema:
root
 |-- MIN_YEARS_EXPERIENCE: integer (nullable = true)
 |-- MAX_YEARS_EXPERIENCE: integer (nullable = true)
 |-- SALARY_FROM: integer (nullable = true)
 |-- MSA_NAME: string (nullable = true)
 |-- REMOTE_TYPE_NAME: string (nullable = true)
 |-- SALARY: integer (nullable = true)


DataFrame with squared feature:
+--------------------+--------------------+-----------+--------------------+----------------+------+-----------------------+
|MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|SALARY_FROM|            MSA_NAME|REMOTE_TYPE_NAME|SALARY|MIN_YEARS_EXPERIENCE_SQ|
+--------------------+--------------------+-----------+--------------------+----------------+------+-----------------------+
|                   2|                   2|      79500|New York-Newark-J...|          Onsite| 92962|                    4.0|
|                   2|                   2|      75026|         Jackson, MS|          Onsite| 75026|                    4.0|
| 

                                                                                


Transformed DataFrame with all features:
+--------------------+-----------------------+--------------------+-----------+-------------------------------------+----------------+------+-----------------------------------------------+-----------------------------------------------------+
|MIN_YEARS_EXPERIENCE|MIN_YEARS_EXPERIENCE_SQ|MAX_YEARS_EXPERIENCE|SALARY_FROM|MSA_NAME                             |REMOTE_TYPE_NAME|SALARY|features                                       |features_poly                                        |
+--------------------+-----------------------+--------------------+-----------+-------------------------------------+----------------+------+-----------------------------------------------+-----------------------------------------------------+
|2                   |4.0                    |2                   |79500      |New York-Newark-Jersey City, NY-NJ-PA|Onsite          |92962 |(217,[0,1,2,3,214],[2.0,2.0,79500.0,1.0,1.0])  |(218,[0,1,2,3,4,215],[2.0,4.0,2.0,795

                                                                                

Training set count: 2574 (71.6%)


                                                                                

Testing set count: 1022 (28.4%)
Random seed: 42 (for reproducibility)

Split Justification:
• 70-30 split provides robust model evaluation
• Larger test set (30%) improves confidence in performance metrics
• Balanced approach for moderate-sized datasets
• Alternative splits: 80-20 for large datasets, 60-40 for small datasets

=== FINAL DATAFRAME STRUCTURE ===
root
 |-- MIN_YEARS_EXPERIENCE: integer (nullable = true)
 |-- MAX_YEARS_EXPERIENCE: integer (nullable = true)
 |-- SALARY_FROM: integer (nullable = true)
 |-- MSA_NAME: string (nullable = true)
 |-- REMOTE_TYPE_NAME: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- MIN_YEARS_EXPERIENCE_SQ: double (nullable = true)
 |-- MSA_NAME_INDEX: double (nullable = false)
 |-- REMOTE_TYPE_NAME_INDEX: double (nullable = false)
 |-- MSA_NAME_VEC: vector (nullable = true)
 |-- REMOTE_TYPE_NAME_VEC: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- features_poly: vector (nullable = true)


=== SAMPLE OF FI

                                                                                

+-------+--------------------+-----------------------+--------------------+------------------+------------------+
|summary|MIN_YEARS_EXPERIENCE|MIN_YEARS_EXPERIENCE_SQ|MAX_YEARS_EXPERIENCE|       SALARY_FROM|            SALARY|
+-------+--------------------+-----------------------+--------------------+------------------+------------------+
|  count|                3596|                   3596|                3596|              3596|              3596|
|   mean|  3.6384872080088986|       18.9972191323693|  3.6384872080088986| 91714.32619577309| 107798.5881535039|
| stddev|   2.400048294044211|     24.190746240439758|   2.400048294044211|32683.349277662266|36636.119374840724|
|    min|                   0|                    0.0|                   0|             14000|             31640|
|    max|                  12|                  144.0|                  12|            324000|            338750|
+-------+--------------------+-----------------------+--------------------+-------------

                                                                                

MSA_NAME unique values: 211


[Stage 160:>                                                        (0 + 1) / 1]

REMOTE_TYPE_NAME unique values: 3

✓ Data preprocessing pipeline completed successfully!
✓ Ready for model training with 'features_poly' as input and 'SALARY' as target


                                                                                