In [39]:
import findspark
findspark.init()

In [40]:
from pyspark.sql import SparkSession, DataFrame
from collections import Counter
from pyspark.sql.functions import col, when, year, current_date, datediff, to_date, year, current_date
import datetime

spark = SparkSession.builder. \
    appName("pyspark-1"). \
    getOrCreate()

### Read data

In [41]:
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True, inferSchema=True)
df.printSchema()

root
 |-- Job ID: integer (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Posting Type: string (nullable = true)
 |-- # Of Positions: integer (nullable = true)
 |-- Business Title: string (nullable = true)
 |-- Civil Service Title: string (nullable = true)
 |-- Title Code No: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Job Category: string (nullable = true)
 |-- Full-Time/Part-Time indicator: string (nullable = true)
 |-- Salary Range From: double (nullable = true)
 |-- Salary Range To: double (nullable = true)
 |-- Salary Frequency: string (nullable = true)
 |-- Work Location: string (nullable = true)
 |-- Division/Work Unit: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Minimum Qual Requirements: string (nullable = true)
 |-- Preferred Skills: string (nullable = true)
 |-- Additional Information: string (nullable = true)
 |-- To Apply: string (nullable = true)
 |-- Hours/Shift: string (nullable = true)
 |-- Work Locat

In [42]:
all_columns = df.columns

# Count the occurrences of each column name
column_counts = Counter(all_columns)

# Filter for names that appear more than once
duplicate_columns = [col_name for col_name, count in column_counts.items() if count > 1]

print(duplicate_columns)

[]


### Step 1: Utility & Normalizing Functions

In [43]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when, year, current_date, datediff, count, when

# 1. Clean salary columns
def normalize_salary(df: DataFrame) -> DataFrame:
    """
    Normalize salary to annual salary.
    Assumes 2080 working hours/year for hourly postings.
    """
    return df.withColumn(
        "annual_salary",
        when(col("Salary Frequency")=="Hourly", col("Salary Range To")*2080)
        .otherwise(col("Salary Range To"))
    )

# 2. Create average salary column
def add_avg_salary(df: DataFrame) -> DataFrame:
    return df.withColumn("avg_salary", (col("Salary Range From")+col("Salary Range To"))/2)

# 3. Flag degree requirement
def add_degree_flag(df: DataFrame) -> DataFrame:
    return df.withColumn(
        "requires_degree",
        when(col("Minimum Qual Requirements").rlike("Master|PhD"), 1).otherwise(0)
    )

# 4. Add posting recency
def add_posting_recency(df: DataFrame) -> DataFrame:
    return df.withColumn("days_since_posting", datediff(current_date(), col("Posting Date")))

### Step 2: KPI Functions

In [44]:
# KPI 1: Top 10 job categories
def top_job_categories(df: DataFrame) -> DataFrame:
    return df.groupBy("Job Category").count().orderBy(col("count").desc()).limit(10)

# KPI 2: Salary distribution per category
def salary_distribution(df: DataFrame) -> DataFrame:
    return df.groupBy("Job Category").agg({"avg_salary":"avg"})

# KPI 3: Correlation between degree and salary
def degree_salary_correlation(df: DataFrame) -> float:
    return df.stat.corr("requires_degree", "annual_salary")

# KPI 4: Highest salary per agency
def highest_salary_per_agency(df: DataFrame) -> DataFrame:
    return df.groupBy("Agency").agg({"annual_salary":"max"})

# KPI 5: Average salary per agency (last 2 years)
def avg_salary_last2yrs(df: DataFrame) -> DataFrame:
    
    ## converting posting date column to date type first
    df = df.withColumn("Posting Date", to_date(col("Posting Date"), "MM/dd/yyyy"))
    
    ## applying logic to get average salary per agency for last 2 years
    df_recent = df.withColumn("year", year(col("Posting Date"))).filter(col("year") >= datetime.now().year - 1)
    return df_recent.groupBy("Agency").agg({"avg_salary":"avg"})

### Step 3: Visualization functions

In [45]:
import seaborn as sns
import matplotlib.pyplot as plt

def plot_top_categories(df: DataFrame):
    top10 = df.groupBy("Job Category").count().orderBy("count", ascending=False).limit(10)
    pdf = top10.toPandas()
    sns.barplot(x="Job Category", y="count", data=pdf)
    plt.xticks(rotation=90)
    plt.show()

def plot_salary_distribution(df: DataFrame):
    pdf = df.toPandas()
    sns.boxplot(x="Job Category", y="avg_salary", data=pdf)
    plt.xticks(rotation=90, ha="right")
    plt.show()


### Step 4: Feature Removal

In [46]:
# As we have added average salary and annual salary in the dataframe, we can now remove Salary From, Salary To and Salary Frequency
def drop_salary_features(df: DataFrame) -> DataFrame:
    cols_to_drop = ["Salary From", "Salary To", "Salary Frequency"]
    df = df.drop(*cols_to_drop)
    return df

# Drop sparse columns like those columns with more than 70 or 80 percent null values
def drop_sparse_columns(df: DataFrame) -> DataFrame:
    threshold = 0.7
    total_rows = df.count()
    null_counts = df.select([
        (count(when(col(c).isNull(), c)) / total_rows).alias(c)
        for c in df.columns
    ]).collect()[0].asDict()

    sparse_columns = [
        col_name for col_name, null_ratio in null_counts.items()
        if null_ratio > threshold
    ]

#     df = df.drop(*sparse_columns)
    print(sparse_columns)
    return df

### Step 5: Test Cases

In [47]:
def test_salary_normalization(df: DataFrame):
    assert df.filter(df["Salary Frequency"]=="Hourly").select("annual_salary").first()[0] > 0

def test_top_categories(df: DataFrame):
    top10 = top_job_categories(df).count()

### Step 6: Putting It All Together

In [48]:
def process_data(df: DataFrame) -> DataFrame:
    df = normalize_salary(df)
    df = add_avg_salary(df)
    df = add_degree_flag(df)
    df = add_posting_recency(df)
    df = drop_salary_features(df)
    df = drop_sparse_columns(df)
    return df

### executing the pipeline using above functions

In [49]:
df = process_data(df)

top_job_categories(df).show()
salary_distribution(df).show()
print("Correlation:", degree_salary_correlation(df))
highest_salary_per_agency(df).show()
avg_salary_last2yrs(df).show()

NameError: name 'total_rows' is not defined

### Visualizations

In [None]:
plot_top_categories(df)
plot_salary_distribution(df)

### Sample function

In [None]:
def get_salary_frequency(df) -> list:
    row_list = df.select('Salary Frequency').distinct().collect()
    return [row['Salary Frequency'] for row in row_list]

### Example of test function

In [None]:
mock_data = [('A', 'Annual'), ('B', 'Daily')]
expected_result = ['Annual', 'Daily']

In [None]:
def test_get_salary_frequency(mock_data: list, 
                              expected_result: list,
                              schema: list = ['id', 'Salary Frequency']):  
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert get_salary_frequency(mock_df) == expected_result

In [None]:
print(get_salary_frequency(df))