## Installing PySpark in Jupyter Notebook and other environments.

In [None]:


%pip install findspark
%pip install pyspark
%pip install py4j



### Import all dependencies for the Spark job

In [None]:

# Importing necessary libraries

import findspark
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, max, min, collect_list
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql.functions import year
from pyspark.sql.functions import col
from datetime import datetime




In [None]:
findspark.init()

In [None]:
# Set up logging
logging.basicConfig(level=logging.INFO)

# Initialize SparkSession
spark = SparkSession.builder. \
    appName("data_engineering_task"). \
    enableHiveSupport(). \
    getOrCreate()

#Reading data and printing schema values

In [None]:
# Load the dataset
data_file = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load("/workspaces/data_engineering_task/dataset/nyc-jobs.csv")



# Remove duplicate rows
print("count before remove duplicate",data_file.count())
#data_file = data_file.dropDuplicates()
# Print the schema to understand column types
print("count after removed duplicate",data_file.count())
# Print the schema to understand column types
data_file.printSchema()
# Show the first few rows of the DataFrame
#data_file.show(5)

In [None]:

# Summary statistics for numerical columns
data_file.describe(["# Of Positions", "Salary Range From", "Salary Range To"]).show()

# Count distinct values for categorical columns
categorical_columns = ["Agency", "Posting Type", "Title Code No", "Level", "Job Category", "Full-Time/Part-Time indicator", "Salary Frequency"]

for column in categorical_columns:
    print(f"Distinct values in {column}:")
    data_file.select(column).distinct().show(10, truncate=False)


# Count null values in each column
data_file.select([sum(col(c).isNull().cast("int")).alias(c) for c in data_file.columns]).show()

#List of KPIs to be resolved

In [None]:
# 1. Count the number of jobs per agency
agency_counts = data_file.groupBy("Agency").count().orderBy("count", ascending=False)
agency_counts.show(10, truncate=False)

In [None]:
# 2.number of jobs posting per category top 10:
job_category_counts = data_file.filter(col("Job Category").isNotNull()).groupBy("Job Category").count().orderBy("count", ascending=False)
job_category_counts.show(10, truncate=False)

In [None]:
# 3. salary distribution per job category
salary_distribution = data_file.groupBy("Job Category").agg(sum("Salary Range From").alias("Total Salary Range From"),
                                                            sum("Salary Range To").alias("Total Salary Range To"),
                                                            sum("# Of Positions").alias("Total Positions")) \
    .orderBy("Total Salary Range From", ascending=False)
salary_distribution.show(10, truncate=False)  




In [None]:
# 4.Correlation between the higher degree and the salary.

data_with_degree = data_file.withColumn(
    "Degree Type",
    regexp_extract(col("Minimum Qual Requirements"), r"(\w+)\sdegree", 1)
)

# Show the results
data_with_degree.select("Degree Type").show(10, truncate=False)

In [None]:
# 5. Job posting having the highest salary per agency
highest_salary_per_agency = data_file.groupBy("Agency").agg(
    {"Salary Range From": "max", "Salary Range To": "max"}
).orderBy("max(Salary Range To)", ascending=False)
highest_salary_per_agency.show(10, truncate=False)

In [None]:
# 6. Job positings average salary per agency for the last 2 years

#finding most lasted year to find last 2 years data.
 
find_year = data_file.withColumn("Year", year(col("Posting Date"))).distinct()
current_year=find_year.filter(col("year").isNotNull()).select("Year").orderBy("Year",ascending=False).first()["Year"]


# Filter data for the last 2 years
filtered_data = data_file.filter(
    (col("Posting Date").isNotNull()) & 
    (year(col("Posting Date")) >= (current_year - 2))
)

# Calculate average salary per agency
average_salary_per_agency = filtered_data.filter(col("Posting Date").isNotNull()) \
    .groupBy("Agency","Posting Date") \
    .agg({"Salary Range From": "avg", "Salary Range To": "avg"}) \
    .orderBy("avg(Salary Range To)", ascending=False)
average_salary_per_agency.show(10, truncate=False)

In [None]:
# 7 highest paid skills in the US market

def find_highest_paid_skills(df: DataFrame) -> DataFrame:
    # Extract skills or keywords from the "Job Description" or "Minimum Qual Requirements" column
    df_with_skills = df.withColumn(
        "Skill",
        regexp_extract(col("Minimum Qual Requirements"), r"(\w+)\s(skill|experience|knowledge)", 1)
    )
    
    # Filter rows where skills are identified
    df_with_skills = df_with_skills.filter(col("Skill").isNotNull())
    
    # Calculate the average salary for each skill
    skill_salary = df_with_skills.groupBy("Skill").agg(
        avg("Salary Range From").alias("Avg Salary Range From"),
        avg("Salary Range To").alias("Avg Salary Range To")
    ).orderBy("Avg Salary Range To", ascending=False)
    
    return skill_salary

# Example usage
highest_paid_skills = find_highest_paid_skills(data_file)
highest_paid_skills.show(10, truncate=False)


In [None]:
try:
    def clean_dataset(df: DataFrame) -> DataFrame:
        print(f"Count before removing duplicates: {df.count()}")
        df = df.dropDuplicates()
        print(f"Count after removing duplicates: {df.count()}")
        return df

    # Function to preprocess columns (e.g., extract year, clean text)
    def preprocess_columns(df: DataFrame) -> DataFrame:
        df = df.withColumn("Year", year(col("Posting Date")))
        df = df.withColumn(
            "Degree Type",
            regexp_extract(col("Minimum Qual Requirements"), r"(\w+)\sdegree", 1)
        )
        return df

    # Function for data wrangling (e.g., filtering, grouping)
    def wrangle_data(df: DataFrame, current_year: int) -> DataFrame:
        # Filter data for the last 2 years
        filtered_data = df.filter(
            (col("Posting Date").isNotNull()) & 
            (year(col("Posting Date")) >= (current_year - 2))
        )
        return filtered_data

    # Function for data transformation (e.g., aggregations, calculations)
    def transform_data(df: DataFrame) -> DataFrame:
        # Example: Calculate average salary per agency
        transformed_df = df.groupBy("Agency").agg(
            sum("Salary Range From").alias("Total Salary Range From"),
            sum("Salary Range To").alias("Total Salary Range To"),
            sum("# Of Positions").alias("Total Positions")
        ).orderBy("Total Salary Range From", ascending=False)
        return transformed_df

    # Clean the dataset
    data_file = clean_dataset(data_file)

    # Preprocess columns
    data_file = preprocess_columns(data_file)

    # Find the current year
    current_year = data_file.select(year(col("Posting Date")).alias("Year")).distinct().orderBy(col("Year").desc()).first()["Year"]

    # Wrangle the data
    filtered_data = wrangle_data(data_file, current_year)

    # Transform the data
    final_data = transform_data(filtered_data)

    # Final transformed data to save as file.
    final_data.write.format("csv").mode("overwrite").option("header", "true").save("/workspaces/data_engineering_task/dataset/final_transformed_data.csv")

    # final data to write to hive table (make sure final table exists).
    spark.sql("CREATE DATABASE IF NOT EXISTS sample_db")
    final_data.write.format("orc").mode("overwrite").saveAsTable("sample_db.final_transformed_data")

except Exception as e:
    logging.error(f"An error occurred: {e}")
finally:
    # Stop the Spark session
    spark.stop()

