In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import to_date
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc
from pyspark.sql.functions import current_date, add_months
import matplotlib.pyplot as plt
import seaborn as sns



spark = SparkSession.builder. \
    appName("pyspark-1"). \
    getOrCreate()

ModuleNotFoundError: No module named 'pyspark'

### Read data

In [None]:
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True)
df.printSchema()

root
 |-- Job ID: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Posting Type: string (nullable = true)
 |-- # Of Positions: string (nullable = true)
 |-- Business Title: string (nullable = true)
 |-- Civil Service Title: string (nullable = true)
 |-- Title Code No: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Job Category: string (nullable = true)
 |-- Full-Time/Part-Time indicator: string (nullable = true)
 |-- Salary Range From: string (nullable = true)
 |-- Salary Range To: string (nullable = true)
 |-- Salary Frequency: string (nullable = true)
 |-- Work Location: string (nullable = true)
 |-- Division/Work Unit: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Minimum Qual Requirements: string (nullable = true)
 |-- Preferred Skills: string (nullable = true)
 |-- Additional Information: string (nullable = true)
 |-- To Apply: string (nullable = true)
 |-- Hours/Shift: string (nullable = true)
 |-- Work Locatio

### Check total records

In [5]:
df.count()
df.select("Job ID").distinct().count()


NameError: name 'df' is not defined

### Check Null % Per Column
# -----------------------------------------------------------
# Function: get_null_percentages
# Calculates percentage of null values for each column
# Helps identify columns needing cleaning or removal
# -----------------------------------------------------------

In [None]:

total = df.count()

null_df = df.select([
    (count(when(col(c).isNull(), c)) / total * 100).alias(c)
    for c in df.columns
])

null_df.show()


### Clean Salary Columns
# -----------------------------------------------------------
# Function: clean_salary
# Removes special characters from salary columns
# Converts them to numeric (double) for aggregation
# Necessary before performing statistical analysis
# -----------------------------------------------------------



def clean_salary(df):
    return df.withColumn(
        "salary_from",
        regexp_replace(col("Salary Range From"), "[^0-9.]", "").cast("double")
    ).withColumn(
        "salary_to",
        regexp_replace(col("Salary Range To"), "[^0-9.]", "").cast("double")
    )


### Date conversion

# -----------------------------------------------------------
# Function: convert_dates
# Converts posting date string into Spark date type
# Enables time-based filtering and analysis
# -----------------------------------------------------------



def convert_dates(df):
    return df.withColumn(
        "posting_date",
        to_date(col("Posting Date"))
    )


### Feature Engineering Average Salary
# -----------------------------------------------------------
# Function: add_avg_salary
# Creates a derived column representing midpoint salary
# Used for more realistic salary analytics
# -----------------------------------------------------------

In [None]:
def add_avg_salary(df):
    return df.withColumn(
        "avg_salary",
        (col("salary_from") + col("salary_to")) / 2
    )


### Degree Encoding

# -----------------------------------------------------------
# Function: encode_degree
# Encodes education level into numeric scale:
# 0 = No degree mentioned
# 1 = Bachelor
# 2 = Master
# 3 = PhD
# Enables correlation analysis with salary
# -----------------------------------------------------------

In [None]:


def encode_degree(df):
    return df.withColumn(
        "degree_encoded",
        when(lower(col("Minimum Qual Requirements")).contains("phd"), 3)
        .when(lower(col("Minimum Qual Requirements")).contains("master"), 2)
        .when(lower(col("Minimum Qual Requirements")).contains("bachelor"), 1)
        .otherwise(0)
    )


### Extract Skills
# -----------------------------------------------------------
# Function: extract_skills
# Creates binary flags for selected technical skills
# Skill presence is detected via keyword matching in job description
# Enables salary comparison by skill demand
# -----------------------------------------------------------

In [None]:
def extract_skills(df):
    skills = ["python", "sql", "aws", "spark", "azure", "hadoop"]
    for skill in skills:
        df = df.withColumn(
            f"skill_{skill}",
            when(lower(col("Job Description")).contains(skill), 1).otherwise(0)
        )
    return df


### Feature Removal

In [None]:
df = df.drop("Salary Range From", "Salary Range To")


### KPI 1 CALCULATIONS Top 10 Categories
# -----------------------------------------------------------
# KPI 1: Top 10 Job Categories
# Groups dataset by job category and counts postings
# Sorted in descending order to identify highest demand categories
# -----------------------------------------------------------

In [None]:
top_10 = (
    df.groupBy("Job Category")
      .count()
      .orderBy(col("count").desc())
      .limit(10)
)

top_10_pd = top_10.toPandas()



In [None]:
plt.figure(figsize=(12,6))
sns.barplot(
    data=top_10_pd,
    x="count",
    y="Job Category"
)

plt.title("Top 10 Job Categories by Number of Postings")
plt.xlabel("Number of Postings")
plt.ylabel("Job Category")
plt.tight_layout()
plt.show()


### KPI 2 – Salary Distribution per Category
# -----------------------------------------------------------
# Purpose:
# Analyze how salary varies across different job categories.
# Compute statistical metrics including:
#   - Mean salary
#   - Minimum salary
#   - Maximum salary
#   - Standard deviation
#
# Uses the engineered "avg_salary" feature for consistency.
# Spark aggregation ensures scalability on large datasets.
# -----------------------------------------------------------


In [None]:
top_categories = top_10_pd["Job Category"].tolist()

salary_sample = df.filter(
    col("Job Category").isin(top_categories)
).select("Job Category", "avg_salary").toPandas()


In [None]:
plt.figure(figsize=(12,6))
sns.boxplot(
    data=salary_sample,
    x="Job Category",
    y="avg_salary"
)

plt.xticks(rotation=45)
plt.title("Salary Distribution per Top Job Categories")
plt.tight_layout()
plt.show()


### KPI 3 – Correlation Degree vs Salary
# -----------------------------------------------------------
# Purpose:
# Determine whether higher educational qualifications
# are associated with higher salary ranges.
#
# Degree encoding scale:
#   0 = No degree mentioned
#   1 = Bachelor
#   2 = Master
#   3 = PhD
#
# Uses Spark's built-in correlation method for distributed computation.
# Result interpretation:
#   > 0  → Positive relationship
#   = 0  → No linear relationship
#   < 0  → Negative relationship
# -----------------------------------------------------------

In [None]:
degree_salary = (
    df.groupBy("degree_encoded")
      .agg(avg("avg_salary").alias("mean_salary"))
)

degree_salary_pd = degree_salary.toPandas()


In [None]:
plt.figure(figsize=(8,5))
sns.barplot(
    data=degree_salary_pd,
    x="degree_encoded",
    y="mean_salary"
)

plt.title("Average Salary by Degree Level")
plt.xlabel("Degree Level (0=None, 1=Bachelor, 2=Master, 3=PhD)")
plt.ylabel("Average Salary")
plt.show()


### KPI 4 – Highest Salary per Agency
# -----------------------------------------------------------
# KPI 4: Highest Salary per Agency
# Uses window function to rank salaries within each agency
# Selects highest paying job per agency
# -----------------------------------------------------------


In [None]:

window = Window.partitionBy("Agency").orderBy(col("salary_to").desc())

highest_salary = (
    df.withColumn("rank", row_number().over(window))
      .filter(col("rank") == 1)
)


In [None]:
top_10_highest = highest_salary_df.orderBy(desc("salary_to")).limit(10)
top_10_pd = top_10_highest.toPandas()

plt.figure(figsize=(12,6))
sns.barplot(
    data=top_10_pd,
    x="salary_to",
    y="Agency"
)

plt.title("Top 10 Highest Paying Job Postings per Agency")
plt.xlabel("Maximum Salary")
plt.ylabel("Agency")
plt.tight_layout()
plt.show()


### KPI 5 – Avg Salary per Agency (Last 2 Years)
# -----------------------------------------------------------
# KPI 5: Average Salary per Agency (Last 2 Years)
# Filters dataset dynamically using current date
# Aggregates average salary per agency
# -----------------------------------------------------------

In [None]:

last_2_years = df.filter(
    col("posting_date") >= add_months(current_date(), -24)
)

agency_avg = (
    last_2_years.groupBy("Agency")
                .agg(avg("avg_salary").alias("avg_salary_last_2_years"))
)


In [None]:
top_10_agency_avg = agency_avg_salary_df.limit(10)
agency_avg_pd = top_10_agency_avg.toPandas()

plt.figure(figsize=(12,6))
sns.barplot(
    data=agency_avg_pd,
    x="avg_salary_last_2_years",
    y="Agency"
)

plt.title("Top 10 Agencies by Avg Salary (Last 2 Years)")
plt.xlabel("Average Salary")
plt.ylabel("Agency")
plt.tight_layout()
plt.show()


### KPI 6 – Highest Paid Skills
# -----------------------------------------------------------
# KPI 6: Highest Paid Skills
# Calculates average salary for postings containing each skill
# Results used for skill ranking visualization
# -----------------------------------------------------------

In [None]:
skill_salary = []

for skill in ["python","sql","aws","spark","azure"]:
    avg_val = df.filter(col(f"skill_{skill}") == 1) \
                .agg(avg("avg_salary")) \
                .collect()[0][0]
    skill_salary.append((skill, avg_val))

spark.createDataFrame(skill_salary, ["Skill","Avg Salary"]) \
     .orderBy(col("Avg Salary").desc()) \
     .show()


In [None]:
skill_salary_pd = skill_salary_df.toPandas()

plt.figure(figsize=(10,6))
sns.barplot(
    data=skill_salary_pd,
    x="Avg Salary",
    y="Skill"
)

plt.title("Highest Paid Skills in NYC Job Market")
plt.xlabel("Average Salary")
plt.ylabel("Skill")
plt.tight_layout()
plt.show()


### TEST cases


In [None]:
def test_no_negative_salary(df):
    assert df.filter(col("avg_salary") < 0).count() == 0

def test_salary_columns_numeric(df):
    assert dict(df.dtypes)["salary_from"] == "double"

def test_no_duplicate_job_ids(df):
    total = df.count()
    distinct = df.select("Job ID").distinct().count()
    assert total == distinct

def test_degree_encoded_exists(df):
    assert "degree_encoded" in df.columns

def test_date_conversion(df):
    assert dict(df.dtypes)["posting_date"] == "date"
