In [None]:
import findspark   # Import the findspark library, which helps locate the local Spark installation

findspark.init()   # Initialize findspark so that PySpark can be imported and used in this notebook

## Introduction


NYC Jobs Dataset Analysis This notebook demonstrates a production-grade ETL pipeline using PySpark. We will:
- Profile source data 
- Clean & preprocess 
- Apply feature engineering 
- Remove irrelevant features 
- Store processed data 
- Resolve KPIs 
- Visualize results 
- Run test cases

### setup spark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NYCJobsAnalysis").getOrCreate()

### Read data

In [None]:
df_raw = spark.read.csv("/dataset/nyc-jobs.csv", header=True, sep=",", quote='"', multiLine=True, escape='"') 
df_raw.printSchema() 
df_raw.show(5, truncate=False)

### Data Exploration

In [None]:
# Profiling: count nulls per column 
for col_name in df_raw.columns: print(col_name, df_raw.filter(df_raw[col_name].isNull()).count())

### Functions (Preprocessing + Feature Engineering)

In [None]:
from pyspark.sql.functions import col, when, year, to_timestamp, explode, split, trim, avg, max, min, count, dense_rank, row_number, current_date
from pyspark.sql.window import Window

# Preprocessing
def cast_columns(df):
    return df.withColumn("Salary_From", col("Salary Range From").cast("double")) \
             .withColumn("Salary_To", col("Salary Range To").cast("double"))

def normalize_salary(df):
    return df.withColumn(
        "Salary_Normalized",
        when(col("Salary Frequency") == "Annual", col("Salary_To"))
        .when(col("Salary Frequency") == "Daily", col("Salary_To") * 260)
        .when(col("Salary Frequency") == "Weekly", col("Salary_To") * 52)
        .when(col("Salary Frequency") == "Hourly", col("Salary_To") * 2080)
        .otherwise(None)
    )

def categorize_degree(df):
    return df.withColumn(
        "Degree_Level",
        when(col("Minimum Qual Requirements").rlike("baccalaureate|Bachelor"), "Bachelor")
        .when(col("Minimum Qual Requirements").rlike("Master"), "Master")
        .when(col("Minimum Qual Requirements").rlike("Doctor|PhD"), "Doctorate")
        .when(col("Minimum Qual Requirements").rlike("High school|equivalent"), "High School")
        .otherwise("Other")
    )

def extract_skills(df):
    return df.withColumn("Skill", explode(split(col("Preferred Skills"), "â€¢|,|;"))) \
             .withColumn("Skill", trim(col("Skill")))

def add_posting_year(df):
    return df.withColumn("Posting_Year", year(to_timestamp(col("Posting Date"), "yyyy-MM-dd'T'HH:mm:ss.SSS")))

def remove_unnecessary_features(df):
    return df.drop("Recruitment Contact","Additional Information","To Apply","Process Date","Post Until","Posting Updated","Work Location 1","Division/Work Unit","Hours/Shift")




### Functions (KPI)

In [None]:
# KPI Functions
def jobs_per_category(df, top_n=10):
    category_counts = df.groupBy("Job Category").agg(count("*").alias("Job_Count"))
    windowSpec = Window.orderBy(col("Job_Count").desc())
    return (category_counts.withColumn("rank", dense_rank().over(windowSpec))
                           .filter(col("rank") <= top_n)
                           .orderBy(col("rank")))

def salary_distribution(df):
    return df.groupBy("Job Category").agg(
        avg("Salary_Normalized").alias("Avg_Salary"),
        min("Salary_Normalized").alias("Min_Salary"),
        max("Salary_Normalized").alias("Max_Salary")
    ).orderBy(col("Avg_Salary").desc())

def degree_salary_correlation(df):
    return df.groupBy("Degree_Level").agg(avg("Salary_Normalized").alias("Avg_Salary")).orderBy(col("Avg_Salary").desc())

def highest_salary_per_agency(df):
    df_norm = (df.withColumn("Salary_To", col("Salary Range To").cast("double"))
                 .withColumn("Salary_Normalized",
                     when(col("Salary Frequency") == "Annual", col("Salary_To"))
                     .when(col("Salary Frequency") == "Daily", col("Salary_To") * 260)
                     .when(col("Salary Frequency") == "Weekly", col("Salary_To") * 52)
                     .when(col("Salary Frequency") == "Hourly", col("Salary_To") * 2080)
                     .otherwise(None))
                 .select("Agency","Business Title","Job Category","Salary Frequency",
                         "Salary Range From","Salary Range To","Salary_Normalized"))
    windowSpec = Window.partitionBy("Agency").orderBy(col("Salary_Normalized").desc())
    return (df_norm.withColumn("rank", row_number().over(windowSpec))
                   .filter(col("rank") == 1)
                   .select("Agency","Business Title","Job Category","Salary Frequency","Salary Range From","Salary Range To","Salary_Normalized"))

def avg_salary_last_2_years(df):
    current_year_val = year(current_date())
    recent_jobs = df.filter(col("Posting_Year") >= (current_year_val - 2))
    return recent_jobs.groupBy("Agency").agg(avg("Salary_Normalized").alias("Avg_Salary")).orderBy(col("Avg_Salary").desc())

def highest_paid_skills(df, top_n=10):
    return df.groupBy("Skill").agg(
        avg("Salary_Normalized").alias("Avg_Salary"),
        max("Salary_Normalized").alias("Max_Salary")
    ).orderBy(col("Avg_Salary").desc()).limit(top_n)

### Apply Pipeline

In [None]:
df = cast_columns(df_raw)
df = normalize_salary(df)
df = categorize_degree(df)
df = extract_skills(df)
df = add_posting_year(df)
df = remove_unnecessary_features(df)
df.show(5, truncate=False)


### Store Processed Data

In [None]:
df.write.parquet("/dataset/nyc-jobs-processed.parquet", mode="overwrite")

### Run KPIs

In [None]:
jobs_per_category(df).show(truncate=False)
salary_distribution(df).show(truncate=False)
degree_salary_correlation(df).show(truncate=False)
highest_salary_per_agency(df).show(truncate=False)
avg_salary_last_2_years(df).show(truncate=False)
highest_paid_skills(df).show(truncate=False)

### Visualizations

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Top 10 job categories
top_categories = jobs_per_category(df).toPandas()
sns.barplot(x="Job_Count", y="Job Category", data=top_categories)
plt.title("Top 10 Job Categories by Posting Count")
plt.show()

# Average salary by degree level
salary_by_degree = degree_salary_correlation(df).toPandas()
sns.barplot(x="Degree_Level", y="Avg_Salary", data=salary_by_degree)
plt.title("Average Salary by Degree Level")
plt.show()


### Test Cases

In [None]:
# Salary normalization test
test_df = spark.createDataFrame([("Hourly", 40.0)], ["Salary Frequency","Salary_To"])
test_df = normalize_salary(test_df)
assert test_df.collect()[0]["Salary_Normalized"] == 40*2080

# Degree categorization test
test_df = spark.createDataFrame([("A baccalaureate degree required")], ["Minimum Qual Requirements"])
test_df = categorize_degree(test_df)
assert test_df.collect()[0]["Degree_Level"] == "Bachelor"

# Skill extraction test
test_df = spark.createDataFrame([("Excellent writing; Foreign language")], ["Preferred Skills"])
test_df = extract_skills(test_df)
skills = [row["Skill"] for row in test_df.collect()]
assert "Excellent writing" in skills and "Foreign language" in skills


## Conclusion

### Insights
- Most postings are concentrated in a few job categories.
- Salary distributions vary widely across categories.
- Higher degrees (Master/Doctorate) generally correlate with higher salaries.
- Certain agencies consistently post the highest-paying jobs.
- Skills like data analysis, project management, and specialized technical expertise are linked to higher salaries.
