# Assignment 03

Julio Vargas (Boston University)  
September 21, 2025

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql.functions import col, monotonically_increasing_id
from pyspark.sql.types import StructType  # to/from JSON

import json
import re
import numpy as np
import pandas as pd

import plotly.express as px
import plotly.io as pio
import plotly.graph_objects as go


np.random.seed(30)  # set a fixed seed for reproducibility
pio.renderers.default = "notebook"
# Initialize Spark Session
spark = SparkSession.builder.appName("JobPostingsAnalysis").getOrCreate()
# Load schema from JSON file
with open("data/schema_lightcast.json") as f:
    schema = StructType.fromJson(json.load(f))

# Load Data
df = (spark.read
      .option("header", "true")
      .option("inferSchema", "false")
      .schema(schema)              # saved schema
      .option("multiLine", "true")
      .option("escape", "\"")
      .csv("data/lightcast_job_postings.csv"))
# Show Schema and Sample Data
#df.printSchema()  
#df.show(5)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/21 23:31:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Histogram of SALARY distribution
salary_df = df.filter(col("SALARY").isNotNull() & (col("SALARY") > 0))
fig = px.histogram(
    salary_df.toPandas(),
    x="SALARY",
    nbins=50,
    title="Salary Distribution"
)

fig.update_layout(bargap=0.1)
fig

25/09/21 23:31:59 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/09/21 23:32:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1$adapted(SerializerHelper.scala:40)
	at org.apache.spark.serializer.SerializerHelper$$$Lambda$3196/0x00007240ccf204b8.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStr

ConnectionRefusedError: [Errno 111] Connection refused

# 1. Data Preparation

In [None]:
# Step 1: Casting salary and experience columns
from pyspark.sql.functions import col

df = df.withColumn("SALARY", col("SALARY").cast("float")) \
       .withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
       .withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
       .withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float"))\
       .withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))

# Step 2: Computing medians for salary columns
def compute_median(sdf, col_name):
    q = sdf.approxQuantile(col_name, [0.5], 0.01)
    return q[0] if q else None

median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")

print("Medians:", median_from, median_to)

# Step 3: Imputing missing salaries, but not experience
df = df.fillna({
    "SALARY_FROM": median_from,
    "SALARY_TO": median_to
    "SALARY": median_salary
})

# Step 5: Computing average salary
df = df.withColumn("Average_Salary",
                   (col("SALARY_FROM") + col("SALARY_TO")) / 2)

# Step 6: Selecting required columns
export_cols = [
    "EDUCATION_LEVELS_NAME",
    "REMOTE_TYPE_NAME",
    "MAX_YEARS_EXPERIENCE",
    "Average_Salary",
    "SALARY",
    "LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]
df_selected = df.select(*export_cols)

# Step 7: Saving to CSV
pdf = df_selected.toPandas()
pdf.to_csv("data/lightcast_cleaned.csv", index=False)

print("Data cleaning complete. Rows retained:", len(pdf))

## 1.1 Salary Distribution by Industry and Employment Type

-   Compare salary variations across industries.

**Filter the dataset** - Remove records where **salary is missing or
zero**.

**Aggregate Data** - Group by **NAICS industry codes** (e.g.,
`NAICS2_NAME`). - Group by **employment type** (`EMPLOYMENT_TYPE_NAME`)
and compute salary distribution. - Calculate **salary percentiles**
(25th, 50th, 75th) for each group.

**Visualize results** - Create a **box plot** where: - **X-axis** =
`NAICS2_NAME` - **Y-axis** = `SALARY_FROM`, or `SALARY_TO`, or
`SALARY` - **Group/color** = `EMPLOYMENT_TYPE_NAME` - Customize colors,
fonts, and styles.

**Explanation:** Write two sentences about what the graph reveals (e.g.,
median differences across industries and dispersion by employment type).