ETL Pipeline Overview
Extract: Read raw data from Azure Delta Lake Storage (ADLS Gen2).

Transform: Process data using Databricks and Delta Lake.

Load: Store the transformed data in Azure Synapse Analytics

Prerequisites
Azure Data Lake Storage (ADLS) Gen2: Stores raw data.

Databricks Cluster: Runs PySpark jobs.

Delta Lake: Manages ACID transactions.

Azure Synapse Analytics: Stores the final data.

Azure Service Principal: Securely connects Databricks to ADLS and Synapse.






In [1]:
# Step 1: Install Required Libraries
# Ensure your Databricks cluster has the following libraries installed:

# Azure Storage SDK (azure-storage)

# JDBC Driver for Synapse (com.microsoft.sqlserver:mssql-jdbc:9.4.0.jre8)

Step 2: Configure Access to ADLS and Synapse



In [None]:
# Databricks Configurations
spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<service-principal-client-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", "<service-principal-secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net",
               "https://login.microsoftonline.com/<tenant-id>/oauth2/token")


Step 3: Extract Data from Azure Delta Lake

In [None]:
# Read raw data from Azure Data Lake Storage
input_path = "abfss://<container>@<storage-account>.dfs.core.windows.net/raw/data.json"

df = spark.read.format("json").load(input_path)

df.show(5)  # Display sample records


Step 4: Data Transformation Using Delta Lake

In [None]:
from pyspark.sql.functions import col, to_date

# Clean and transform data
df_transformed = (df
    .withColumn("cleaned_date", to_date(col("timestamp_column")))  # Convert to date format
    .filter(col("status") == "active")  # Keep only active records
)

# Write to Delta Table
delta_table_path = "abfss://<container>@<storage-account>.dfs.core.windows.net/delta/processed_data"

df_transformed.write.format("delta").mode("overwrite").save(delta_table_path)


Step 5: Load Transformed Data into Azure Synapse

In [None]:
synapse_jdbc_url = "jdbc:sqlserver://<synapse-server>.database.windows.net:1433;database=<synapse-database>;user=<synapse-user>;password=<synapse-password>;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

synapse_table = "dbo.ProcessedData"

df_transformed.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", synapse_jdbc_url) \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", synapse_table) \
    .option("tempdir", "abfss://<container>@<storage-account>.dfs.core.windows.net/tempdir/") \
    .mode("overwrite") \
    .save()


Final Summary
Extract: Reads raw JSON data from ADLS Gen2.

Transform: Cleans and processes the data using Delta Lake.

Load: Writes the processed data to Azure Synapse Analytics.