In [None]:
# # One time run to mount the Azure storage account to Databricks file system

# # Getting credentials from Azure Key Vault through Databricks secret scope
# client_id = dbutils.secrets.get(scope="adzuna-project-db-secret-scope",key="adzuna-project-app-client-id")
# client_secret = dbutils.secrets.get(scope="adzuna-project-db-secret-scope",key="adzuna-project-app-client-secret")
# tenant_id = dbutils.secrets.get(scope="adzuna-project-db-secret-scope",key="adzuna-project-app-tenant-id")
# client_endpoint = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"

# # Mounting the storage account
# configs = {"fs.azure.account.auth.type": "OAuth",
# "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
# "fs.azure.account.oauth2.client.id": client_id,
# "fs.azure.account.oauth2.client.secret": client_secret,
# "fs.azure.account.oauth2.client.endpoint": client_endpoint}

# dbutils.fs.mount(
#     source = "abfss://raw-data@saadzunaproject.dfs.core.windows.net", # contrainer@storageacc
#     mount_point = "/mnt/adzuna_data",
#     extra_configs = configs)

In [None]:
# ETL pipeline code

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, explode, current_timestamp, to_date
from pyspark.sql import functions as F

# Create the Spark session
spark = SparkSession.builder.appName("adzuna-project").getOrCreate()

# Define paths for raw data, staging table, and final (mart) table
raw_data_path = "/mnt/adzuna_data/json/"
staging_path = "/mnt/delta/tables/staging/adzuna_jobs"
final_path = "/mnt/delta/tables/final/adzuna_jobs"

# ============================================================
# PART 1: Ingest Raw Data and Merge into Staging with elt_updated (timestamp)
# ============================================================

# Read raw JSON data from ADLS
adzuna_jobs_raw = spark.read.format("json").load(raw_data_path)

# Transform raw data:
#   - Explode the "items" array
#   - Select desired fields
#   - Drop duplicates based on "id"
#   - Add the elt_updated column as current_timestamp()
adzuna_jobs_batch = adzuna_jobs_raw.withColumn("item", explode("items")) \
    .select(
        col("item.id").alias("id"),
        col("item.title").alias("title"),
        col("item.location.display_name").alias("location_display_name"),
        col("item.company.display_name").alias("company_display_name"),
        col("item.category.label").alias("category_label"),
        col("item.description").alias("description"),
        col("item.redirect_url").alias("redirect_url"),
        col("item.created").alias("created")
    ).dropDuplicates(["id"]) \
    .withColumn("elt_updated", current_timestamp())

# Merge (upsert) the new batch into the staging Delta table.
# When a record is updated or inserted, its elt_updated is set to current_timestamp()
if DeltaTable.isDeltaTable(spark, staging_path):
    stagingDeltaTable = DeltaTable.forPath(spark, staging_path)
    stagingDeltaTable.alias("t").merge(
        adzuna_jobs_batch.alias("s"),
        "t.id = s.id"
    ).whenMatchedUpdate(set={
        "title": "s.title",
        "location_display_name": "s.location_display_name",
        "company_display_name": "s.company_display_name",
        "category_label": "s.category_label",
        "description": "s.description",
        "redirect_url": "s.redirect_url",
        "created": "s.created",
        "elt_updated": "current_timestamp()"
    }).whenNotMatchedInsert(values={
        "id": "s.id",
        "title": "s.title",
        "location_display_name": "s.location_display_name",
        "company_display_name": "s.company_display_name",
        "category_label": "s.category_label",
        "description": "s.description",
        "redirect_url": "s.redirect_url",
        "created": "s.created",
        "elt_updated": "current_timestamp()"
    }).execute()
else:
    # If the staging table does not exist, create it.
    adzuna_jobs_batch.write.format("delta") \
        .mode("overwrite") \
        .save(staging_path)

# ============================================================
# PART 2: Incrementally Update the Final Table from Staging
#         (Process only records where elt_updated > max(elt_updated) in final)
# ============================================================

# Determine the maximum elt_updated value from the final table (if it exists)
try:
    final_df = spark.read.format("delta").load(final_path)
    max_elt_final = final_df.agg(F.max("elt_updated").alias("max_elt")).collect()[0]["max_elt"]
except Exception as e:
    max_elt_final = None

# Read the entire staging Delta table
staging_df = spark.read.format("delta").load(staging_path)

# Filter staging records:
#   - If final table is empty, process all records.
#   - Otherwise, process only records where elt_updated > max_elt_final.
if max_elt_final:
    staging_incremental = staging_df.filter(col("elt_updated") > F.lit(max_elt_final))
else:
    staging_incremental = staging_df

# Transform the incremental staging records to the final (mart) schema.
# Rename columns and add a partition column based on job_created date.
final_incremental = staging_incremental \
    .withColumnRenamed("id", "job_id") \
    .withColumnRenamed("title", "job_title") \
    .withColumnRenamed("location_display_name", "job_location") \
    .withColumnRenamed("company_display_name", "company_name") \
    .withColumnRenamed("category_label", "job_category") \
    .withColumnRenamed("description", "job_description") \
    .withColumnRenamed("redirect_url", "job_url") \
    .withColumnRenamed("created", "job_created") \
    .withColumn("job_created", col("job_created").cast("timestamp")) \
    .withColumn("job_created_date", to_date("job_created")) \
    .withColumn("elt_updated", col("elt_updated")) \
    .dropDuplicates(["job_id"])

# Merge (upsert) the incremental final data into the final Delta table
if DeltaTable.isDeltaTable(spark, final_path):
    finalDeltaTable = DeltaTable.forPath(spark, final_path)
    finalDeltaTable.alias("t").merge(
        final_incremental.alias("s"),
        "t.job_id = s.job_id"
    ).whenMatchedUpdate(set={
        "job_title": "s.job_title",
        "job_location": "s.job_location",
        "company_name": "s.company_name",
        "job_category": "s.job_category",
        "job_description": "s.job_description",
        "job_url": "s.job_url",
        "job_created": "s.job_created",
        "job_created_date": "s.job_created_date",
        "elt_updated": "s.elt_updated"
    }).whenNotMatchedInsert(values={
        "job_id": "s.job_id",
        "job_title": "s.job_title",
        "job_location": "s.job_location",
        "company_name": "s.company_name",
        "job_category": "s.job_category",
        "job_description": "s.job_description",
        "job_url": "s.job_url",
        "job_created": "s.job_created",
        "job_created_date": "s.job_created_date",
        "elt_updated": "s.elt_updated"
    }).execute()
else:
    final_incremental.write.format("delta") \
        .mode("overwrite") \
        .partitionBy("job_created_date") \
        .save(final_path)


In [None]:
# Creating stg_adzuna_jobs and adzuna_jobs SQL tables
# Execute only once!

# %sql
# CREATE TABLE IF NOT EXISTS stg_adzuna_jobs
# USING DELTA
# LOCATION '/mnt/delta/tables/staging/adzuna_jobs';

# CREATE TABLE IF NOT EXISTS adzuna_jobs
# USING DELTA
# LOCATION '/mnt/delta/tables/final/adzuna_jobs';

In [None]:
# Some useful functions for debugging

# dbutils.fs.rm('/mnt/delta/tables/staging/adzuna_jobs', True)
# dbutils.fs.rm('/mnt/delta/tables/final/adzuna_jobs', True)

#%fs ls '/mnt/delta/tables/final/adzuna_jobs'

# %sql
# select * from stg_adzuna_jobs limit 5