# Objectives {.unnumbered}

By the end of this lab, you will:
1. Load and analyze the **Lightcast dataset** in **Spark DataFrame**.
2. Create **five easy and three medium-complexity visualizations** using **Plotly**.
3. Explore **salary distributions, employment trends, and job postings**.
4. Analyze **skills in relation to NAICS/SOC/ONET codes and salaries**.
5. Customize **colors, fonts, and styles** in all visualizations (**default themes result in a 2.5-point deduction**).
6. Follow **best practices for reporting on data communication**.

# Step 1: Load the Dataset {.unnumbered}


In [1]:

import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "vscode"
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, stddev, min, max, to_date, explode, from_json, regexp_replace, split, expr, percentile_approx



# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()


# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("lightcast_job_postings.csv")

# Show Schema and Sample Data
df.printSchema()
df.show(5)


25/03/23 13:35:00 WARN Utils: Your hostname, CASSIEdeMacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.178 instead (on interface en0)
25/03/23 13:35:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/23 13:35:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

root
 |-- id: string (nullable = true)
 |-- last_updated_date: string (nullable = true)
 |-- last_updated_timestamp: string (nullable = true)
 |-- duplicates: integer (nullable = true)
 |-- posted: string (nullable = true)
 |-- expired: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- title_raw: string (nullable = true)
 |-- body: string (nullable = true)
 |-- modeled_expired: boolean (nullable = true)
 |-- modeled_duration: integer (nullable = true)
 |-- company: integer (nullable = true)
 |-- company_name: string (nullable = true)
 |-- company_raw: string (nullable = true)
 |-- company_is_staffing: boolean (nullable = true)
 |-- education_levels: string (nullable = true)
 |-- education_levels_name: string (nullable = true)
 |-- min_edu_levels: integer (nullable = true)
 |-- min_edu_levels_name: string (nullable = true)
 |-- max_edu_levels: integer (nullable = true)
 |-- max_edu_levels_name: string (nullable = true)
 |-- employment_type: integer (nullable = true)

25/03/23 13:35:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+-----------------+----------------------+----------+--------+---------+--------+--------------------+--------------------+---------------+----------------+--------+--------------------+-----------+-------------------+----------------+---------------------+--------------+-------------------+--------------+-------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+-------------+------+--------------+-----+--------------------+-----+----------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+--------------------+------------------+-------------------+--------------------+-----------------

# Salary Distribution by Employment Type
- Identify salary trends across different employment types.
- **Filter the dataset**
  - Remove records where **salary is missing or zero**.
- **Aggregate Data**
  - Group by **employment type** and compute salary distribution.
- **Visualize results**
  - Create a **box plot** where:
    - **X-axis** = `EMPLOYMENT_TYPE_NAME`
    - **Y-axis** = `SALARY_FROM`
  - Customize **colors, fonts, and styles** to avoid a **2.5-point deduction**.
- **Explanation:** Write two sentences about what the graph reveals.



In [2]:

pdf = df.select("EMPLOYMENT_TYPE_NAME", "SALARY").toPandas()
fig = px.box(pdf, x="EMPLOYMENT_TYPE_NAME", y="SALARY", title="Salary Distribution by Employment Type", color_discrete_sequence=["#636EFA"])
fig.update_layout(font_family="Arial", title_font_size=16)
fig.show()

fig.write_image("_output/salary_distribution_byEmploymetTypes.svg")

                                                                                

# Salary Distribution by Industry
- Compare salary variations across industries.
- **Filter the dataset**
  - Keep records where **salary is greater than zero**.
- **Aggregate Data**
  - Group by **NAICS industry codes**.
- **Visualize results**
  - Create a **box plot** where:
    - **X-axis** = `NAICS2_NAME`
    - **Y-axis** = `SALARY_FROM`
  - Customize colors, fonts, and styles.
- **Explanation:** Write two sentences about what the graph reveals.

In [4]:


df_filtered_1 = df.dropna(subset=["NAICS2_NAME", "SALARY_FROM"])

summary_df = df_filtered_1.filter(col("SALARY_FROM") > 0) \
    .groupBy("NAICS2_NAME") \
    .agg(
        count("*").alias("count"),
        avg("SALARY_FROM").alias("avg_salary"),
        stddev("SALARY_FROM").alias("std_salary"),
        min("SALARY_FROM").alias("min_salary"),
        max("SALARY_FROM").alias("max_salary")
    )

summary_df = summary_df.filter(col("count") > 50)
summary_pd = summary_df.toPandas()

fig = px.bar(
    summary_pd.sort_values("avg_salary", ascending=False),
    x="NAICS2_NAME",
    y="avg_salary",
    title="Average Salary by Industry (SALARY_FROM > 0)",
    labels={"avg_salary": "Average Salary", "NAICS2_NAME": "Industry"},
)

fig.update_layout(
    xaxis_title="Industry",
    yaxis_title="Starting Salary (SALARY_FROM)",
    font=dict(family="Arial", size=12),
    showlegend=False,
    boxmode="group",
    yaxis=dict(
                tickvals=[50000, 65000, 80000, 95000, 110000, 125000, 140000, 155000],
                ticktext=["50K", "65K", "80K", "95K", "110K", "125K", "140K", "155K"],
                title="Starting Salary (SALARY_FROM)",
                range=[50000, 155000]
)

)


fig.show()

fig.write_image("_output/salary_distribution_ByIndustry.svg")


                                                                                

# Job Posting Trends Over Time
- Analyze how job postings fluctuate over time.
- **Aggregate Data**
  - Count job postings per **posted date (`POSTED`)**.
- **Visualize results**
  - Create a **line chart** where:
    - **X-axis** = `POSTED`
    - **Y-axis** = `Number of Job Postings`
  - Apply custom colors and font styles.
- **Explanation:** Write two sentences about what the graph reveals.

In [5]:


df_posted = df.dropna(subset=["POSTED"])

trend_df = df_posted.groupBy("POSTED").agg(count("*").alias("job_postings"))
trend_df = trend_df.orderBy("POSTED")

trend_pd = trend_df.toPandas()


fig = px.line(
    trend_pd,
    x="POSTED",
    y="job_postings",
    title="Job Posting Trends Over Time",
    labels={"POSTED": "Date", "job_postings": "Number of Job Postings"},
    line_shape="spline"  # Smooth line
)


fig.update_layout(
    font=dict(family="Arial", size=14),
    xaxis_title="Date",
    yaxis_title="Number of Job Postings",
    plot_bgcolor="white",
    hovermode="x unified",
    margin=dict(t=60, b=60, l=60, r=20)
)

fig.show()

fig.write_image("_output/Job_PostingTrend_OverTime.svg")

                                                                                

CodeCache: size=131072Kb used=27198Kb max_used=27198Kb free=103873Kb
 bounds [0x00000001089f8000, 0x000000010a4b8000, 0x00000001109f8000]
 total_blobs=10148 nmethods=9191 adapters=868
 compilation: disabled (not enough contiguous free space left)


# Top 10 Job Titles by Count
- Identify the most frequently posted job titles.
- **Aggregate Data**
  - Count the occurrences of each **job title (`TITLE_NAME`)**.
  - Select the **top 10 most frequent titles**.
- **Visualize results**
  - Create a **bar chart** where:
    - **X-axis** = `TITLE_NAME`
    - **Y-axis** = `Job Count`
  - Apply custom colors and font styles.
- **Explanation:** Write two sentences about what the graph reveals.

In [6]:

df_titles = df.dropna(subset=["TITLE_NAME"])


title_counts = df_titles.groupBy("TITLE_NAME") \
    .agg(count("*").alias("job_count"))


top_titles_df = title_counts.orderBy(col("job_count").desc()).limit(10)

top_titles_pd = top_titles_df.toPandas()

top_titles_pd = top_titles_pd.sort_values("job_count", ascending=True)

import plotly.express as px

fig = px.bar(
    top_titles_pd,
    x="job_count",
    y="TITLE_NAME",
    orientation='h',
    title="Top 10 Job Titles by Count",
    labels={"job_count": "Number of Job Postings", "TITLE_NAME": "Job Title"},
    text="job_count",
    color="job_count",
    color_continuous_scale="blues"
)

fig.update_layout(
    font=dict(family="Arial", size=14),
    plot_bgcolor="white",
    xaxis_title="Job Count",
    yaxis_title="Title Name",
    margin=dict(t=60, b=60, l=150, r=20)
)

fig.update_traces(textposition="outside")

fig.show()

fig.write_image("_output/Top10_JobTitles_By_Count.svg")



                                                                                

# Remote vs On-Site Job Postings
- Compare the proportion of remote and on-site job postings.
- **Aggregate Data**
  - Count job postings by **remote type (`REMOTE_TYPE_NAME`)**.
- **Visualize results**
  - Create a **pie chart** where:
    - **Labels** = `REMOTE_TYPE_NAME`
    - **Values** = `Job Count`
  - Apply custom colors and font styles.
- **Explanation:** Write two sentences about what the graph reveals.

In [7]:

df_remote = df.dropna(subset=["REMOTE_TYPE_NAME"])


remote_counts = df_remote.groupBy("REMOTE_TYPE_NAME") \
    .agg(count("*").alias("job_count"))

remote_counts_pd = remote_counts.toPandas()

fig = px.pie(
    remote_counts_pd,
    names="REMOTE_TYPE_NAME",
    values="job_count",
    title="Remote vs On-Site Job Postings",
    color_discrete_sequence=px.colors.sequential.Blues
)

fig.update_traces(
    textposition="inside",
    textinfo="percent+label",
    pull=[0.05]*len(remote_counts_pd) 
)

fig.update_layout(
    font=dict(family="Arial", size=18),
    showlegend=True
)

fig.show()

fig.write_image("_output/Remote_VS_OnsiteJob_Postings.svg")

                                                                                

# Skill Demand Analysis by Industry (Stacked Bar Chart)
- Identify which skills are most in demand in various industries.
- **Aggregate Data**
  - Extract **skills** from job postings.
  - Count occurrences of skills grouped by **NAICS industry codes**.
- **Visualize results**
  - Create a **stacked bar chart** where:
    - **X-axis** = `Industry`
    - **Y-axis** = `Skill Count`
    - **Color** = `Skill`
  - Apply custom colors and font styles.
- **Explanation:** Write two sentences about what the graph reveals.

In [8]:
df_cleaned = df.withColumn(
    "cert_str",
    regexp_replace(col("certifications_name"), r'^\[|\]$', '')
)

# Step 1: Remove extra quotes (") from the string
df_cleaned = df_cleaned.withColumn(
    "cert_str",
    regexp_replace(col("cert_str"), r'"', '')
)

# Step 2: Split the string into an array
df_cleaned = df_cleaned.withColumn(
    "cert_array",
    split(col("cert_str"), ",\\s*")
)

# Step 3: Explode the array into separate rows
df_exploded = df_cleaned.dropna(subset=["NAICS2_NAME", "cert_array"]) \
    .withColumn("certifications_name", explode(col("cert_array")))

cert_counts = df_exploded.groupBy("NAICS2_NAME", "certifications_name") \
    .count().withColumnRenamed("count", "cert_count")

cert_counts_pd = cert_counts.toPandas()


fig = px.bar(
    cert_counts_pd,
    x="NAICS2_NAME",
    y="cert_count",
    color="certifications_name",
    title="Top Certifications by Industry",
    labels={"NAICS2_NAME": "Industry", "cert_count": "Certification Count"},
)

fig.update_layout(
    barmode="stack",
    xaxis_tickangle=-45,
    font=dict(family="Arial", size=10),
    plot_bgcolor="white",
    margin=dict(t=60, b=120, l=60, r=20)
)

fig.show()

fig.write_image("_output/Skill_Demand_Analysis_By_Industry.svg")




                                                                                


# Salary Analysis by ONET Occupation Type (Bubble Chart)
- Analyze how salaries differ across ONET occupation types.
- **Aggregate Data**
  - Compute **median salary** for each occupation in the **ONET taxonomy**.
- **Visualize results**
  - Create a **bubble chart** where:
    - **X-axis** = `ONET_NAME`
    - **Y-axis** = `Median Salary`
    - **Size** = Number of job postings
  - Apply custom colors and font styles.
- **Explanation:** Write two sentences about what the graph reveals.

In [11]:
df_salary = df.dropna(subset=["onet_name", "salary"])


onet_salary_stats = df_salary.groupBy("onet_name").agg(
    percentile_approx("salary", 0.5).alias("median_salary"),
    count("*").alias("job_count")
)

onet_salary_pd = onet_salary_stats.toPandas()

fig = px.scatter(
    onet_salary_pd,
    x="onet_name",
    y="median_salary",
    size="job_count",
    color="median_salary",
    title="Median Salary by ONET Occupation",
    labels={
        "onet_name": "ONET Occupation",
        "median_salary": "Median Salary",
        "job_count": "Job Postings"
    },
    size_max=60
)

fig.show()

fig.write_image("_output/SalaryAnalysis_By_ONET_Occupation.svg")



                                                                                

# Career Pathway Trends (Sankey Diagram)
- Visualize job transitions between different occupation levels.
- **Aggregate Data**
  - Identify career transitions between **SOC job classifications**.
- **Visualize results**
  - Create a **Sankey diagram** where:
    - **Source** = `SOC_2021_2_NAME`
    - **Target** = `SOC_2021_3_NAME`
    - **Value** = Number of transitions
  - Apply custom colors and font styles.
- **Explanation:** Write two sentences about what the graph reveals.

In [12]:
import plotly.graph_objects as go

df_titles = df.filter(
    col("soc_2021_2_name").isNotNull() &
    col("soc_2021_3_name").isNotNull()
)

transitions = df_titles.groupBy(
    "soc_2021_2_name", "soc_2021_3_name"
).agg(count("*").alias("count"))

transitions_pd = transitions.toPandas()

all_labels = list(set(transitions_pd["soc_2021_2_name"]) | set(transitions_pd["soc_2021_3_name"]))

# Map labels to indices
label_to_index = {label: i for i, label in enumerate(all_labels)}

# Create Sankey source/
transitions_pd["source"] = transitions_pd["soc_2021_2_name"].map(label_to_index)
transitions_pd["target"] = transitions_pd["soc_2021_3_name"].map(label_to_index)

fig = go.Figure(data=[go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=all_labels,
        color="cornflowerblue"
    ),
    link=dict(
        source=transitions_pd["source"],
        target=transitions_pd["target"],
        value=transitions_pd["count"]
    )
)])

fig.update_layout(
    title_text="Career Pathway Trends by SOC",
    font=dict(size=12)
)

fig.show()

fig.write_image("_output/Career_Pathway_Trends_By_SOC.svg")

                                                                                