# NYC Jobs Data Pipeline
Structured using SOLID, KISS principles.

## 1. Setup

In [None]:
import findspark
findspark.init()

import logging
import os
import re
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, year
from pyspark.sql.types import StringType

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


## 2. Extraction

In [None]:
class SparkFactory:
    @staticmethod
    def create(app_name="NYC Jobs Analysis"):
        return (
            SparkSession.builder
            .appName(app_name)
            .config("spark.sql.adaptive.enabled", "true")
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .getOrCreate()
        )

class JobDataExtractor:
    def __init__(self, spark, input_path):
        self.spark = spark
        self.input_path = input_path

    def read(self):
        if os.stat(self.input_path).st_size == 0:
            logger.warning("Input file is empty.")
            return None
        df = self.spark.read.option("header", "true").option("inferSchema", "true").option("multiLine", "true") \
            .option("quote", "\"").option("escape", "\"").option("mode", "PERMISSIVE") \
            .option("encoding", "ISO-8859-1").csv(self.input_path)
        logger.info("Raw data loaded successfully.")
        return df


## 3. Wrangling

In [None]:
class JobDataWrangler:
    def __init__(self, df):
        self.df = df

    def sanitize_column_names(self):
        def sanitize(name):
            name = re.sub(r'[^\w]', ' ', name)
            parts = name.strip().split()
            return ''.join([parts[0].lower()] + [p.capitalize() for p in parts[1:]]) if parts else name
        for old_name in self.df.columns:
            self.df = self.df.withColumnRenamed(old_name, sanitize(old_name))
        return self.df


## 4. Profiling

In [None]:
def profile_data(df):
    print("Schema:")
    df.printSchema()
    print("Sample rows:")
    df.show(5)
    print("Column Summary:")
    df.describe().show()


## 5. Transforming

In [None]:
def classify_degree(text):
    if not text:
        return "Other"
    text_lower = text.lower()
    degree_patterns = [
        (r"(ph\.?d|doctorate|doctoral)", "PhD"),
        (r"(master'?s|m\.?a\.?|m\.?s\.?|mba)", "Masters"),
        (r"(bachelor'?s|b\.?a\.?|b\.?s\.?|baccalaureate)", "Bachelors"),
        (r"(associate'?s|a\.?a\.?|a\.?s\.?)", "Associate"),
        (r"(high school|h\.?s\.?|diploma|ged)", "High School"),
    ]
    for pattern, label in degree_patterns:
        if re.search(pattern, text_lower):
            return label
    return "Other"

degree_udf = udf(classify_degree, StringType())

def transform_data(df):
    df = df.withColumn("avgSalary", (col("salaryRangeFrom") + col("salaryRangeTo")) / 2)
    df = df.withColumn("postingYear", year(col("postingDate")))
    df = df.withColumn("degreeLevel", degree_udf(col("minimumQualRequirements")))
    return df


## 6. Validation

In [None]:
def validate_data(df):
    assert df.filter(col("avgSalary").isNull()).count() == 0, "Missing avgSalary"
    assert "degreeLevel" in df.columns, "degreeLevel not created"
    logger.info("Data validation passed.")


## 7. Loading

In [None]:
def save_data(df, path="output/jobs_cleaned.parquet"):
    df.write.mode("overwrite").parquet(path)
    logger.info(f"Data saved to {path}")


## 8. Visualising

In [None]:
def visualize_data(df):
    pd_df = df.select("postingYear", "avgSalary").toPandas()
    pd_df.groupby("postingYear").mean().plot(kind="bar", figsize=(8,5), legend=False)
    plt.title("Average Salary by Posting Year")
    plt.ylabel("Avg Salary")
    plt.xlabel("Year")
    plt.grid(True)
    plt.tight_layout()
    plt.show()


## 9. Unit Tests

In [None]:
import unittest

class TestDegreeClassifier(unittest.TestCase):
    def test_classify_degree(self):
        self.assertEqual(classify_degree("PhD in Computer Science"), "PhD")
        self.assertEqual(classify_degree("Master's in Business"), "Masters")
        self.assertEqual(classify_degree("Bachelor of Arts"), "Bachelors")
        self.assertEqual(classify_degree("High School Diploma"), "High School")
        self.assertEqual(classify_degree(None), "Other")
        self.assertEqual(classify_degree("some random text"), "Other")

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)
