# Notebook for real ingestion of parquet data from Azure Data Lake (RAW) to Delta Lake (Data Hub)

## Getting the Azure Data Lake RAW file and the ADF pipeline parameters

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, sha2
import os
from io import StringIO
from pyspark.sql.functions import lit

#Getting parameters from the ADF pipeline
dbutils.widgets.text("container", "")
dbutils.widgets.text("folder", "")
dbutils.widgets.text("file", "")
dbutils.widgets.text("odate", "")
dbutils.widgets.text("timestamp", "")
container = dbutils.widgets.get("container")
folder = dbutils.widgets.get("folder")
file = dbutils.widgets.get("file")
odate = dbutils.widgets.get("odate")
timestamp = dbutils.widgets.get("timestamp")

#ADLS route of the file
input_path = f"abfss://{container}@datalakecontoso.dfs.core.windows.net/{folder}/{file}"


Wrote 333 bytes.


True

## Creation of Spark Dataframe

In [0]:
#The file is read as a Spark dataframe
df = spark.read.parquet(input_path, header=True, inferSchema=True)
#The odate and timestamp columns are added for later partitioning.
df = df.withColumn("odate", lit(odate)) \
       .withColumn("timestamp", lit(timestamp))
df.show()

+--------------------+----------+----------------+------+--------+--------+--------------+
|      account_number| full_name|transaction_type|amount|currency|   odate|     timestamp|
+--------------------+----------+----------------+------+--------+--------+--------------+
|ES912100041845020...|  John Doe|         deposit|1200.5|     EUR|20250622|20250623155157|
|ES912100041845020...|  John Doe|      withdrawal| 300.0|     USD|20250622|20250623155157|
|ES912100041845020...|  John Doe|         deposit|1200.5|     EUR|20250622|20250623155157|
|ES762077002400310...|Jane Smith|        transfer|500.75|     EUR|20250622|20250623155157|
|ES762077002400310...|Jane Smith|         deposit| 800.0|     EUR|20250622|20250623155157|
+--------------------+----------+----------------+------+--------+--------+--------------+



%md
## Transformations (duplicate elimination and anonymization of a column are applied as an example)

In [0]:
#Duplicate elimination
df_no_dup = df.dropDuplicates()
df_no_dup.show()

+--------------------+----------+----------------+------+--------+--------+--------------+
|      account_number| full_name|transaction_type|amount|currency|   odate|     timestamp|
+--------------------+----------+----------------+------+--------+--------+--------------+
|ES912100041845020...|  John Doe|      withdrawal| 300.0|     USD|20250622|20250623155157|
|ES762077002400310...|Jane Smith|        transfer|500.75|     EUR|20250622|20250623155157|
|ES762077002400310...|Jane Smith|         deposit| 800.0|     EUR|20250622|20250623155157|
|ES912100041845020...|  John Doe|         deposit|1200.5|     EUR|20250622|20250623155157|
+--------------------+----------+----------------+------+--------+--------+--------------+



In [0]:
#Anonymization of the account_number column with SHA256
df_anon = df_no_dup.withColumn("account_number", sha2(col("account_number"), 256))
df_anon.show()

+--------------------+----------+----------------+------+--------+--------+--------------+
|      account_number| full_name|transaction_type|amount|currency|   odate|     timestamp|
+--------------------+----------+----------------+------+--------+--------+--------------+
|b179add486f3c7e1a...|  John Doe|      withdrawal| 300.0|     USD|20250622|20250623155157|
|bbb011054ca051c10...|Jane Smith|        transfer|500.75|     EUR|20250622|20250623155157|
|bbb011054ca051c10...|Jane Smith|         deposit| 800.0|     EUR|20250622|20250623155157|
|b179add486f3c7e1a...|  John Doe|         deposit|1200.5|     EUR|20250622|20250623155157|
+--------------------+----------+----------------+------+--------+--------+--------------+



## Saving of the transformed Dataframe in Delta Lake

In [0]:
#Saving the df into DL
delta_path = "dbfs:/datahub"+folder+"/"+file
df_anon.write.format("delta").partitionBy("odate", "timestamp").mode("overwrite").save(delta_path)
df_anon.show(truncate=False)

+----------------------------------------------------------------+----------+----------------+------+--------+--------+--------------+
|account_number                                                  |full_name |transaction_type|amount|currency|odate   |timestamp     |
+----------------------------------------------------------------+----------+----------------+------+--------+--------+--------------+
|b179add486f3c7e1a11795d0948ce1744fbd1981d9c97ab48bb943099ab83982|John Doe  |withdrawal      |300.0 |USD     |20250622|20250623155157|
|bbb011054ca051c10be5d18345af9e3e0741769a5d1c2915c31b070fe105dece|Jane Smith|transfer        |500.75|EUR     |20250622|20250623155157|
|bbb011054ca051c10be5d18345af9e3e0741769a5d1c2915c31b070fe105dece|Jane Smith|deposit         |800.0 |EUR     |20250622|20250623155157|
|b179add486f3c7e1a11795d0948ce1744fbd1981d9c97ab48bb943099ab83982|John Doe  |deposit         |1200.5|EUR     |20250622|20250623155157|
+------------------------------------------------------

## Test operations on the Delta Table

In [0]:
%sql
--Creating the schema and table into Hive Metastore (Unity Catalog preferred)
USE CATALOG hive_metastore;
CREATE DATABASE IF NOT EXISTS spain;
USE spain;

CREATE TABLE IF NOT EXISTS accounts
USING DELTA
LOCATION 'dbfs:/datahub/file/spain/20250622/20250623155157/accounts_demo.parquet';

In [0]:
%sql
--Testing one query to see the data
SELECT * FROM hive_metastore.spain.accounts;

account_number,full_name,transaction_type,amount,currency,odate,timestamp
bbb011054ca051c10be5d18345af9e3e0741769a5d1c2915c31b070fe105dece,Jane Smith,transfer,500.75,EUR,20250622,20250623155157
b179add486f3c7e1a11795d0948ce1744fbd1981d9c97ab48bb943099ab83982,John Doe,withdrawal,300.0,,20250622,20250623155157
bbb011054ca051c10be5d18345af9e3e0741769a5d1c2915c31b070fe105dece,Jane Smith,deposit,800.0,EUR,20250622,20250623155157
b179add486f3c7e1a11795d0948ce1744fbd1981d9c97ab48bb943099ab83982,John Doe,deposit,1200.5,EUR,20250622,20250623155157


In [0]:
%sql
--Testing data partitions
show partitions hive_metastore.spain.accounts;

odate,timestamp
20250622,20250623155157
