In [0]:
spark

In [0]:
#Show all scopes in Databricks

scopes = dbutils.secrets.listScopes()
display(scopes)

name
lms-scope


In [0]:
#Show all secrets in scope

secrets = dbutils.secrets.list("lms-scope")
display(secrets)

key
encryptionkey
GitaccessToken
GitKV
lms-appid
lms-secretid
lms-tenant
SqlKV
StorageKV


In [0]:
#Reading secrets from scope

appid = dbutils.secrets.get(scope="lms-scope",key="lms-appid")
service_credential = dbutils.secrets.get(scope="lms-scope",key="lms-secretid")
directoryid = dbutils.secrets.get(scope="lms-scope",key="lms-tenant")

display(appid,service_credential,directoryid)

'[REDACTED]'

'[REDACTED]'

'[REDACTED]'


######Mounting ADB with ADLS Gen2 (For reading the data)

In [0]:
# %python
# configs = {"fs.azure.account.auth.type": "OAuth",
#           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
#           "fs.azure.account.oauth2.client.id": appid,
#           "fs.azure.account.oauth2.client.secret": service_credential,
#           "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directoryid}/oauth2/token"}

# # Optionally, you can add <directory-name> to the source URI of your mount point.
# dbutils.fs.mount(
#   source = "abfss://bronze@lmsstorageaccount24.dfs.core.windows.net/",
#   mount_point = "/mnt/bronze",
#   extra_configs = configs)

In [0]:
#Show all data in silver

display(dbutils.fs.ls("/mnt/bronze"))

path,name,size,modificationTime
dbfs:/mnt/bronze/book_copies/,book_copies/,0,1740393650000
dbfs:/mnt/bronze/books/,books/,0,1740393669000
dbfs:/mnt/bronze/students/,students/,0,1740393665000
dbfs:/mnt/bronze/transactions/,transactions/,0,1740393746000


In [0]:
#Email encryption library installation
%pip install pycryptodome

Collecting pycryptodome
  Using cached pycryptodome-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Using cached pycryptodome-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.3 MB)
Installing collected packages: pycryptodome
Successfully installed pycryptodome-3.21.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
#Loading all the required libraries
from pyspark.sql.functions import *

#Email encryption functions
from pyspark.sql.functions import udf
from Crypto.Cipher import AES
import base64
import os

#Library for Delta Table
from pyspark.sql.utils import AnalysisException

#For reading incrementally data of transaction table
import os


####Books Table

In [0]:
#Reading the data

books = spark.read.parquet("/mnt/bronze/books/")

display(books)

book_id,ISBN,book_title,author,publisher,book_price,department,subject_year,subject_number,edition_number
BK001,978-1-08-191030-3,CSE Y1 S1 E1,George Glover,Bush-Gilbert,657,CSE,Y1,S1,E1
BK002,978-1-80963-694-2,CSE Y1 S1 E2,Blake Guzman,"Ramos, Matthews and Ross",670,CSE,Y1,S1,E2
BK003,978-0-7051-2742-4,CSE Y1 S1 E3,Andrew Day,"Perez, Nelson and Crawford",684,CSE,Y1,S1,E3
BK004,978-0-89443-174-6,CSE Y1 S1 E4,David Ellis,"Page, Carter and Castillo",550,CSE,Y1,S1,E4
BK005,978-1-4994-4124-6,CSE Y1 S2 E1,Ryan Mitchell,"Byrd, Marquez and Moore",643,CSE,Y1,S2,E1
BK006,978-0-7418-2099-0,CSE Y1 S2 E2,Debbie Smith,Martin-Beck,964,CSE,Y1,S2,E2
BK007,978-0-7367-6463-6,CSE Y1 S2 E3,Dr. Lisa Hernandez,Vega-Jones,704,CSE,Y1,S2,E3
BK008,978-1-4388-1429-2,CSE Y1 S2 E4,Heather Payne,Anderson and Sons,851,CSE,Y1,S2,E4
BK009,978-1-04-560198-8,CSE Y1 S3 E1,Chelsea Farmer,Garcia LLC,899,CSE,Y1,S3,E1
BK010,978-0-615-46905-8,CSE Y1 S3 E2,Taylor Anderson,"Miller, Frye and Kelly",616,CSE,Y1,S3,E2


In [0]:
#Data-type of each column

books.printSchema()

root
 |-- book_id: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- book_price: string (nullable = true)
 |-- department: string (nullable = true)
 |-- subject_year: string (nullable = true)
 |-- subject_number: string (nullable = true)
 |-- edition_number: string (nullable = true)



In [0]:
#How many records in books dataset

books.count()

993

In [0]:
#Convert book_price column to double

books = books.withColumn('book_price', round(col('book_price')))

In [0]:
#Descriptive stats of book_price

books.select('book_price').describe().show() 

+-------+------------------+
|summary|        book_price|
+-------+------------------+
|  count|               993|
|   mean| 761.3041289023162|
| stddev|146.21964171084497|
|    min|             501.0|
|    max|            1000.0|
+-------+------------------+



In [0]:
#Renaming book_id to BK 

# books = books.withColumnRenamed('book_id','BK')
# books.display(10)

In [0]:
#Checking for Null values in each column

books.select([sum(col(c).isNull().cast("int")).alias(c) for c in books.columns]).display()

book_id,ISBN,book_title,author,publisher,book_price,department,subject_year,subject_number,edition_number
0,0,0,0,0,0,0,0,0,0


In [0]:
#Checking how many duplicate rows we have

books.count() - books.distinct().count()  ##total rows - unique rows

33

In [0]:
#Checking for duplicate rows for book_id unique column

#Find duplicate book_id values
duplicate_book_ids = books.groupBy("book_id").count().filter(col("count") > 1).select("book_id")

#Join back to original DataFrame to get all rows with duplicate book_id
duplicate_rows = books.join(duplicate_book_ids, on="book_id", how="inner")

#Show duplicate rows
duplicate_rows.display()

book_id,ISBN,book_title,author,publisher,book_price,department,subject_year,subject_number,edition_number
BK023,978-0-506-66342-8,CSE Y1 S6 E3,Lisa Ross,Garcia-Parker,880.0,CSE,Y1,S6,E3
BK040,978-0-490-41501-0,CSE Y1 S10 E4,Jacob Hood,"Boyer, Kelly and Kelley",743.0,CSE,Y1,S10,E4
BK050,978-0-16-183565-9,CSE Y2 S3 E2,Jean Rivas,"Acosta, Bailey and Nelson",981.0,CSE,Y2,S3,E2
BK056,978-1-4767-6023-0,CSE Y2 S4 E4,Dr. Justin Williams Jr.,"Nichols, Robles and Weiss",946.0,CSE,Y2,S4,E4
BK072,978-0-7154-2100-0,CSE Y2 S8 E4,Scott Nelson,"Sellers, Singleton and Terry",768.0,CSE,Y2,S8,E4
BK089,978-0-268-06463-1,CSE Y3 S3 E1,Katelyn Gonzales,"Green, Riley and Smith",1000.0,CSE,Y3,S3,E1
BK102,978-1-68676-392-2,CSE Y3 S6 E2,Michelle Garrison,Edwards Ltd,666.0,CSE,Y3,S6,E2
BK103,978-0-259-50983-7,CSE Y3 S6 E3,Michael Goodman,Morgan LLC,546.0,CSE,Y3,S6,E3
BK112,978-1-78127-192-6,CSE Y3 S8 E4,Michael Sanders,Harrison LLC,533.0,CSE,Y3,S8,E4
BK114,978-0-7648-2943-7,CSE Y3 S9 E2,Holly Bradshaw,"Mullins, Gomez and Kennedy",539.0,CSE,Y3,S9,E2


In [0]:
#Verifying the duplicate records by considering one value 

books.filter(col("book_id") == "BK023").display()

book_id,ISBN,book_title,author,publisher,book_price,department,subject_year,subject_number,edition_number
BK023,978-0-506-66342-8,CSE Y1 S6 E3,Lisa Ross,Garcia-Parker,880.0,CSE,Y1,S6,E3
BK023,978-0-506-66342-8,CSE Y1 S6 E3,Lisa Ross,Garcia-Parker,880.0,CSE,Y1,S6,E3
BK023,978-0-506-66342-8,CSE Y1 S6 E3,Lisa Ross,Garcia-Parker,880.0,CSE,Y1,S6,E3


In [0]:
#Removing the duplicates records from book_id column
books = books.dropDuplicates(["book_id"])

#Verifying whether the duplicate rows removed or not
books.filter(col("book_id") == "BK023").display()

book_id,ISBN,book_title,author,publisher,book_price,department,subject_year,subject_number,edition_number
BK023,978-0-506-66342-8,CSE Y1 S6 E3,Lisa Ross,Garcia-Parker,880.0,CSE,Y1,S6,E3


In [0]:
books.count()

#32 duplicate rows have been removed

960

In [0]:
#Converting author column values to lowercase

books = books.withColumn('author', lower(col('author')))
books = books.withColumn('publisher', lower(col('publisher')))

books.display() 

book_id,ISBN,book_title,author,publisher,book_price,department,subject_year,subject_number,edition_number
BK001,978-1-08-191030-3,CSE Y1 S1 E1,george glover,bush-gilbert,657.0,CSE,Y1,S1,E1
BK002,978-1-80963-694-2,CSE Y1 S1 E2,blake guzman,"ramos, matthews and ross",670.0,CSE,Y1,S1,E2
BK003,978-0-7051-2742-4,CSE Y1 S1 E3,andrew day,"perez, nelson and crawford",684.0,CSE,Y1,S1,E3
BK004,978-0-89443-174-6,CSE Y1 S1 E4,david ellis,"page, carter and castillo",550.0,CSE,Y1,S1,E4
BK005,978-1-4994-4124-6,CSE Y1 S2 E1,ryan mitchell,"byrd, marquez and moore",643.0,CSE,Y1,S2,E1
BK006,978-0-7418-2099-0,CSE Y1 S2 E2,debbie smith,martin-beck,964.0,CSE,Y1,S2,E2
BK007,978-0-7367-6463-6,CSE Y1 S2 E3,dr. lisa hernandez,vega-jones,704.0,CSE,Y1,S2,E3
BK008,978-1-4388-1429-2,CSE Y1 S2 E4,heather payne,anderson and sons,851.0,CSE,Y1,S2,E4
BK009,978-1-04-560198-8,CSE Y1 S3 E1,chelsea farmer,garcia llc,899.0,CSE,Y1,S3,E1
BK010,978-0-615-46905-8,CSE Y1 S3 E2,taylor anderson,"miller, frye and kelly",616.0,CSE,Y1,S3,E2



####Saving books dataset into Delta format under Silver 

In [0]:
#Saving the data in delta format under silver location

books.write.format("delta").mode("overwrite").save("abfss://silver@lmsstorageaccount24.dfs.core.windows.net/books")

In [0]:
%sql
--Creating external table for the silver location in Unity Catalog
DROP TABLE IF EXISTS `lms-catalog`.silver.books;

CREATE TABLE `lms-catalog`.silver.books
USING DELTA
LOCATION 'abfss://silver@lmsstorageaccount24.dfs.core.windows.net/books';


####Books Copies Table

In [0]:
display(dbutils.fs.ls('/mnt/bronze/'))

path,name,size,modificationTime
dbfs:/mnt/bronze/book_copies/,book_copies/,0,1740393650000
dbfs:/mnt/bronze/books/,books/,0,1740393669000
dbfs:/mnt/bronze/students/,students/,0,1740393665000
dbfs:/mnt/bronze/transactions/,transactions/,0,1740393746000


In [0]:
#Reading the book copies dataset

books_copies = spark.read.parquet('/mnt/bronze/book_copies', header=True, inferSchema=True)

books_copies.display()

copy_id,book_id,copy_number,location,rack,shelf,status
CP00001,BK001,copy1,L1,L1R01,L1R01S1,Available
CP00002,BK001,copy2,L1,L1R01,L1R01S1,Available
CP00003,BK001,copy3,L1,L1R01,L1R01S1,Available
CP00004,BK001,copy4,L1,L1R01,L1R01S1,Available
CP00005,BK001,copy5,L1,L1R01,L1R01S1,Available
CP00006,BK001,copy6,L1,L1R01,L1R01S1,Available
CP00007,BK001,copy7,L1,L1R01,L1R01S1,Available
CP00008,BK001,copy8,L1,L1R01,L1R01S1,Available
CP00009,BK001,copy9,L1,L1R01,L1R01S1,Available
CP00010,BK001,copy10,L1,L1R01,L1R01S1,Available


In [0]:
#Data-type of each column

books_copies.printSchema()

root
 |-- copy_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- copy_number: string (nullable = true)
 |-- location: string (nullable = true)
 |-- rack: string (nullable = true)
 |-- shelf: string (nullable = true)
 |-- status: string (nullable = true)



In [0]:
#Total records

books_copies.count()

28814

In [0]:
#Checking for missing values

books_copies.select([sum(col(c).isNull().cast("int")).alias(c) for c in books_copies.columns]).show()

+-------+-------+-----------+--------+----+-----+------+
|copy_id|book_id|copy_number|location|rack|shelf|status|
+-------+-------+-----------+--------+----+-----+------+
|      0|      0|          0|       0|   0|    0|     0|
+-------+-------+-----------+--------+----+-----+------+



In [0]:
#Total duplicate rows we have in our dataset 

books_copies.count() - books_copies.distinct().count()  #total rows - unique rows

14

In [0]:
#Checking for duplicate rows for copy_id unique column

#Find duplicate copy_id values
duplicate_copy_ids = books_copies.groupBy("copy_id").count().filter(col("count") > 1).select("copy_id")

#Join back to original DataFrame to get all rows with duplicate book_id
duplicate_rows_copy = books_copies.join(duplicate_copy_ids, on="copy_id", how="inner")

#Show duplicate rows
duplicate_rows_copy.display()

copy_id,book_id,copy_number,location,rack,shelf,status
CP01417,BK048,copy7,L1,L1R08,L1R08S6,Available
CP01417,BK048,copy7,L1,L1R08,L1R08S6,Available
CP00381,BK013,copy21,L1,L1R03,L1R03S1,Available
CP00381,BK013,copy21,L1,L1R03,L1R03S1,Available
CP01416,BK048,copy6,L1,L1R08,L1R08S6,Available
CP01416,BK048,copy6,L1,L1R08,L1R08S6,Available
CP01408,BK047,copy28,L1,L1R08,L1R08S5,Available
CP01408,BK047,copy28,L1,L1R08,L1R08S5,Available
CP00774,BK026,copy24,L1,L1R05,L1R05S2,Available
CP00774,BK026,copy24,L1,L1R05,L1R05S2,Available


In [0]:
#Verifying the duplicate rows in copy_id column 

duplicate_rows_copy.filter(col("copy_id") == "CP00281").display()

copy_id,book_id,copy_number,location,rack,shelf,status
CP00281,BK010,copy11,L1,L1R02,L1R02S4,Available
CP00281,BK010,copy11,L1,L1R02,L1R02S4,Available


In [0]:
#Removing the duplicates observation from copy_id column
books_copies = books_copies.dropDuplicates(["copy_id"])

#Verifying whether the duplicate rows removed or not
books_copies.filter(col("copy_id") == "CP00281").display()

copy_id,book_id,copy_number,location,rack,shelf,status
CP00281,BK010,copy11,L1,L1R02,L1R02S4,Available


In [0]:
books_copies.count() #14 duplicates rows removed from copy_id column

28800

In [0]:
books_copies.show(5)

+-------+-------+-----------+--------+-----+-------+---------+
|copy_id|book_id|copy_number|location| rack|  shelf|   status|
+-------+-------+-----------+--------+-----+-------+---------+
|CP00001|  BK001|      copy1|      L1|L1R01|L1R01S1|Available|
|CP00002|  BK001|      copy2|      L1|L1R01|L1R01S1|Available|
|CP00003|  BK001|      copy3|      L1|L1R01|L1R01S1|Available|
|CP00004|  BK001|      copy4|      L1|L1R01|L1R01S1|Available|
|CP00005|  BK001|      copy5|      L1|L1R01|L1R01S1|Available|
+-------+-------+-----------+--------+-----+-------+---------+
only showing top 5 rows


In [0]:
#Converting the status column values into lower case

books_copies = books_copies.withColumn("status", lower(col("status")))
books_copies.show(5)

+-------+-------+-----------+--------+-----+-------+---------+
|copy_id|book_id|copy_number|location| rack|  shelf|   status|
+-------+-------+-----------+--------+-----+-------+---------+
|CP00001|  BK001|      copy1|      L1|L1R01|L1R01S1|available|
|CP00002|  BK001|      copy2|      L1|L1R01|L1R01S1|available|
|CP00003|  BK001|      copy3|      L1|L1R01|L1R01S1|available|
|CP00004|  BK001|      copy4|      L1|L1R01|L1R01S1|available|
|CP00005|  BK001|      copy5|      L1|L1R01|L1R01S1|available|
+-------+-------+-----------+--------+-----+-------+---------+
only showing top 5 rows



####Saving books copies dataset into Delta format under Silver 

In [0]:
#Saving the data in delta format under silver location

books_copies.write.format("delta").mode("overwrite").save("abfss://silver@lmsstorageaccount24.dfs.core.windows.net/books_copies")

In [0]:
%sql
--Creating external table for the silver location in Unity Catalog
DROP TABLE IF EXISTS `lms-catalog`.silver.books_copies;

CREATE TABLE `lms-catalog`.silver.books_copies
USING DELTA
LOCATION 'abfss://silver@lmsstorageaccount24.dfs.core.windows.net/books_copies';


####Students Table

In [0]:
display(dbutils.fs.ls('/mnt/bronze/'))

path,name,size,modificationTime
dbfs:/mnt/bronze/book_copies/,book_copies/,0,1740393650000
dbfs:/mnt/bronze/books/,books/,0,1740393669000
dbfs:/mnt/bronze/students/,students/,0,1740393665000
dbfs:/mnt/bronze/transactions/,transactions/,0,1740393746000


In [0]:
#Reading the students dataset

students = spark.read.parquet('/mnt/bronze/students')

students.display()

student_id,first_name,last_name,department,student_year,section,email
S20CV001,Sai,Joshi,CIVIL,1,A,sai.joshi@example.com
S20CV002,Rudra,Singh,CIVIL,1,A,rudra.singh@example.com
S20CV003,Sai,Kumar,CIVIL,1,A,sai.kumar@example.com
S20CV004,Nisha,Verma,CIVIL,1,A,nisha.verma@example.com
S20CV005,Ira,Joshi,CIVIL,1,A,ira.joshi@example.com
S20CV006,Aarav,Choudhary,CIVIL,1,A,aarav.choudhary@example.com
S20CV007,Reyansh,Kapoor,CIVIL,1,A,reyansh.kapoor@example.com
S20CV008,Riya,Sharma,CIVIL,1,A,riya.sharma@example.com
S20CV009,Nisha,Desai,CIVIL,1,A,nisha.desai@example.com
S20CV010,Pooja,Patel,CIVIL,1,A,pooja.patel@example.com


In [0]:
#Data-types of students columns

students.printSchema()

root
 |-- student_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- student_year: string (nullable = true)
 |-- section: string (nullable = true)
 |-- email: string (nullable = true)



In [0]:
#Convert student_year column to int

students = students.withColumn("student_year", col("student_year").cast("int"))

In [0]:
#Total rows

students.count()

2014

In [0]:
#Checking for missing values in students table

students.select([sum(col(c).isNull().cast("int")).alias(c) for c in students.columns]).show()

+----------+----------+---------+----------+------------+-------+-----+
|student_id|first_name|last_name|department|student_year|section|email|
+----------+----------+---------+----------+------------+-------+-----+
|         0|         0|        0|         0|           0|      0|    0|
+----------+----------+---------+----------+------------+-------+-----+



In [0]:
#Checking how many duplicate rows we have

students.count() - students.distinct().count()  #total rows - unique rows

14

In [0]:
#Checking for duplicate rows for students_id unique column

#Find duplicate student_id values
duplicate_students_ids = students.groupBy("student_id").count().filter(col("count") > 1).select("student_id")

#Join back to original DataFrame to get all rows with duplicate student_id
duplicate_rows_students = students.join(duplicate_students_ids, on="student_id", how="inner")

#Show duplicate rows
display(duplicate_rows_students)

student_id,first_name,last_name,department,student_year,section,email
S17CS157,Ira,Gupta,CSE,4,B,ira.gupta@example.com
S17CS157,Ira,Gupta,CSE,4,B,ira.gupta@example.com
S17CS158,Aditya,Desai,CSE,4,B,aditya.desai@example.com
S17CS158,Aditya,Desai,CSE,4,B,aditya.desai@example.com
S17CS154,Anaya,Sharma,CSE,4,B,anaya.sharma@example.com
S17CS154,Anaya,Sharma,CSE,4,B,anaya.sharma@example.com
S17CS155,Reyansh,Singh,CSE,4,B,reyansh.singh@example.com
S17CS155,Reyansh,Singh,CSE,4,B,reyansh.singh@example.com
S17ME012,Aadhya,Desai,MECH,4,A,aadhya.desai@example.com
S17ME012,Aadhya,Desai,MECH,4,A,aadhya.desai@example.com


In [0]:
#Verifying the duplicate rows in students_id column 

duplicate_rows_students.filter(col("student_id") == "S20EC012").display()

student_id,first_name,last_name,department,student_year,section,email
S20EC012,Reyansh,Sethi,ECE,1,A,reyansh.sethi@example.com
S20EC012,Reyansh,Sethi,ECE,1,A,reyansh.sethi@example.com


In [0]:
#Removing the duplicates observation from students_id column
students = students.dropDuplicates(["student_id"])

#Verifying whether the duplicate rows removed or not
students.filter(col("student_id") == "S20EC012").display()

student_id,first_name,last_name,department,student_year,section,email
S20EC012,Reyansh,Sethi,ECE,1,A,reyansh.sethi@example.com


In [0]:
students.count() #14 duplicates rows removed from student_id column

2000

In [0]:
#Converting first_name and last_name columns values to lower

students = students.withColumn('first_name', lower(col('first_name')))
students = students.withColumn('last_name', lower(col('last_name')))

students.select('first_name','last_name').show(5) 

+----------+---------+
|first_name|last_name|
+----------+---------+
|     ayaan|    menon|
|      diya|    reddy|
|   reyansh|   kapoor|
|    aadhya|    mehta|
|   reyansh|choudhary|
+----------+---------+
only showing top 5 rows


In [0]:
#unique (distinct) all values of section column

students.select('section').distinct().display()

section
B#
B
A
A#


In [0]:
#Update section column values
students = students.withColumn(
                "section",when(col("section") == "A#", "A")\
               .when(col("section") == "B#", "B")\
               .otherwise(col("section"))
)

#Show updated DataFrame
students.select('section').distinct().display()

#Now only 2 values are there, A and B. No more values like A# and B# etc.

section
B
A



#####Encrypting the email column

In [0]:
#Creating AES Encryption Function
def encrypt_email(email: str, key: str) -> str:
    if email is None:
        return None
    key = key.ljust(32)[:32].encode("utf-8")  #Ensure key is 32 bytes
    cipher = AES.new(key, AES.MODE_EAX)
    ciphertext, tag = cipher.encrypt_and_digest(email.encode("utf-8"))
    return base64.b64encode(cipher.nonce + tag + ciphertext).decode("utf-8")

#Define the encryption key securely (Using Databricks Secrets instead of hardcoding)
encryption_key = dbutils.secrets.get(scope="lms-scope", key="encryptionkey")

#Register UDF
encrypt_udf = udf(lambda email: encrypt_email(email, encryption_key))

#Apply Encryption to the DataFrame
students = students.withColumn("email_encrypted", encrypt_udf(students.email))

#Drop original email column for security
students = students.drop("email")

#Show Encrypted Emails
students.show(5)

+----------+----------+---------+----------+------------+-------+--------------------+
|student_id|first_name|last_name|department|student_year|section|     email_encrypted|
+----------+----------+---------+----------+------------+-------+--------------------+
|  S17CS001|     ayaan|    menon|       CSE|           4|      A|cfviiv17ooYhZQji8...|
|  S17CS002|      diya|    reddy|       CSE|           4|      A|BSPZoKl/+ka9uyFxH...|
|  S17CS003|   reyansh|   kapoor|       CSE|           4|      A|Sfi8Joi0mxr5TBq3z...|
|  S17CS004|    aadhya|    mehta|       CSE|           4|      A|D3N1WUcMTrTshTGrm...|
|  S17CS005|   reyansh|choudhary|       CSE|           4|      A|6BM+bFreD1BqXjde+...|
+----------+----------+---------+----------+------------+-------+--------------------+
only showing top 5 rows


In [0]:
#Defining the Decryption Function

def decrypt_email(enc_email: str, key: str) -> str:
    if enc_email is None:
        return None
    key = key.ljust(32)[:32].encode("utf-8")
    enc_bytes = base64.b64decode(enc_email)
    nonce, tag, ciphertext = enc_bytes[:16], enc_bytes[16:32], enc_bytes[32:]
    cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
    return cipher.decrypt_and_verify(ciphertext, tag).decode("utf-8")

#Register UDF
decrypt_udf = udf(lambda enc_email: decrypt_email(enc_email, encryption_key)) #decrypting using the encryption_key we saved in Key Vault.

#Apply Decryption to the DataFrame
students = students.withColumn("email_decrypted", decrypt_udf(students.email_encrypted))

#Show Decrypted Emails
students.select("email_encrypted", "email_decrypted").show(5)

+--------------------+--------------------+
|     email_encrypted|     email_decrypted|
+--------------------+--------------------+
|gRIS/s7Lkfa+/t8Id...|ayaan.menon@examp...|
|Vumhj/QHY9jTEqxGX...|diya.reddy@exampl...|
|Rx3MFpyVSCXZZgDiF...|reyansh.kapoor@ex...|
|88k42j6ihsipjVIBB...|aadhya.mehta@exam...|
|GTdvNgF7Fd73bCC6g...|reyansh.choudhary...|
+--------------------+--------------------+
only showing top 5 rows


In [0]:
#Dropping the decrypted email column

students = students.drop("email_decrypted")
students.show(5)

+----------+----------+---------+----------+------------+-------+--------------------+
|student_id|first_name|last_name|department|student_year|section|     email_encrypted|
+----------+----------+---------+----------+------------+-------+--------------------+
|  S17CS001|     ayaan|    menon|       CSE|           4|      A|DVoORffBz3sXtkXhF...|
|  S17CS002|      diya|    reddy|       CSE|           4|      A|EdpjZEJE3Hs2R1QrF...|
|  S17CS003|   reyansh|   kapoor|       CSE|           4|      A|kMOp/DieaqVSak7TH...|
|  S17CS004|    aadhya|    mehta|       CSE|           4|      A|d/57KUmp4aZgBhr7g...|
|  S17CS005|   reyansh|choudhary|       CSE|           4|      A|rlxzCxfqX+ho6ay3f...|
+----------+----------+---------+----------+------------+-------+--------------------+
only showing top 5 rows



####Saving students dataset into Delta format under Silver 

In [0]:
#Saving the data in delta format under silver location

students.write.format("delta").mode("overwrite").save("abfss://silver@lmsstorageaccount24.dfs.core.windows.net/students")

In [0]:
%sql
--Creating external table for the silver location in Unity Catalog
DROP TABLE IF EXISTS `lms-catalog`.silver.students;

CREATE TABLE `lms-catalog`.silver.students
USING DELTA
LOCATION 'abfss://silver@lmsstorageaccount24.dfs.core.windows.net/students';

#Transactions Table

In [0]:
display(dbutils.fs.ls('/mnt/bronze/transactions'))

path,name,size,modificationTime
dbfs:/mnt/bronze/transactions/checkpoint.txt,checkpoint.txt,1,1740396633000
dbfs:/mnt/bronze/transactions/transactions_2020_Q2.parquet,transactions_2020_Q2.parquet,177508,1740393746000
dbfs:/mnt/bronze/transactions/transactions_2020_Q3.parquet,transactions_2020_Q3.parquet,551688,1740395072000


In [0]:
#Define the mounted directory path
source_path = "dbfs:/mnt/bronze/transactions/"

#Ordered list of Parquet files to be processed
files = [
    "transactions_2020_Q2.parquet",
    "transactions_2020_Q3.parquet",
    "transactions_2020_Q4.parquet",
    "transactions_2021_Q1.parquet",
    "transactions_2021_Q2.parquet"
]

#Define a checkpoint file to track progress
checkpoint_file = "/dbfs/mnt/bronze/transactions/checkpoint.txt"

#Read the last processed index from the checkpoint file (if exists)
try:
    with open(checkpoint_file, "r") as f:
        last_processed_index = int(f.read().strip())
except FileNotFoundError:
    last_processed_index = -1  #No files processed yet

#Identify the next file to process
next_index = last_processed_index + 1

if next_index < len(files):
    next_file = files[next_index]
    file_path = os.path.join(source_path, next_file)

    #Read the next Parquet file
    transactions = spark.read.parquet(file_path)

    #Add a column for tracking the source file
    transactions = transactions.withColumn("source_file", lit(next_file))

    #Show data (for verification)
    print(f"Processing file: {next_file}")
    transactions.display(5)

    #Remove previous DataFrame from memory
    spark.catalog.clearCache()

    #Update the checkpoint file
    with open(checkpoint_file, "w") as f:
        f.write(str(next_index))

else:
    print("No new files to process. All files are already loaded.")

print("Notebook execution completed!")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-273642984284371>, line 33[0m
[1;32m     30[0m file_path [38;5;241m=[39m os[38;5;241m.[39mpath[38;5;241m.[39mjoin(source_path, next_file)
[1;32m     32[0m [38;5;66;03m# Read the next Parquet file[39;00m
[0;32m---> 33[0m transactions [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mparquet(file_path)
[1;32m     35[0m [38;5;66;03m# Add a column for tracking the source file[39;00m
[1;32m     36[0m transactions [38;5;241m=[39m transactions[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124msource_file[39m[38;5;124m"[39m, lit(next_file))

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter

In [0]:
# parquet_files = dbutils.fs.ls('/mnt/bronze/transactions/')

# #Filter for parquet files and sort by name to get the latest file
# latest_file = sorted([f.path for f in parquet_files if f.path.endswith('.parquet')])[-1]


# #Read the latest parquet file
# transactions = spark.read.parquet(latest_file)
# transactions.display()



In [0]:
#Data-type of each column

transactions.printSchema()



In [0]:
#Converting columns data-type

transactions = transactions.withColumn("year", col("year").cast("int")) \
                   .withColumn("quarter", col("quarter").cast("int")) \
                   .withColumn("fine_amount", col("fine_amount").cast("double"))

#Converting date columns to date data-type
transactions = transactions.withColumn("issue_date", to_date(col("issue_date"), "dd-MM-yyyy")) \
                   .withColumn("return_date", to_date(col("return_date"), "dd-MM-yyyy")) \
                   .withColumn("due_date", to_date(col("due_date"), "dd-MM-yyyy"))
        
transactions.printSchema()



In [0]:
#Total rows 

transactions.count()



In [0]:

#Descriptive stats on fine_amount column

transactions.describe("fine_amount").show()



In [0]:
#Checking for missing values

transactions.select([sum(col(c).isNull().cast("int")).alias(c) for c in transactions.columns]).display()

#return_date reflects how many students did not returned the book taken from library,
#payment_date reflects how many students did not paid the fine amount



In [0]:
#Checking how many duplicate rows we have

transactions.count() - transactions.distinct().count()  #total rows - unique rows



In [0]:
#Checking for duplicate rows for transaction_id unique column

#Find duplicate transaction_id values
duplicate_transaction_ids = transactions.groupBy("transaction_id").count().filter(col("count") > 1).select("transaction_id")

#Join back to original DataFrame to get all rows with duplicate transaction_id
duplicate_rows_transaction_ids = transactions.join(duplicate_transaction_ids, on="transaction_id", how="inner")

#Show duplicate rows
display(duplicate_rows_transaction_ids)

#NO DUPLICATES RECORDS.



In [0]:
transactions.display()

#Need to remove _ values from transaction_id column



In [0]:
#Using Regular Expression to replace underscore with empty string

#Remove underscores from transaction_id column
transactions = transactions.withColumn("transaction_id", regexp_replace(col("transaction_id"), "_", ""))

#Show updated DataFrame
transactions.display()



In [0]:
#Check for dates starting from DD- instead of YYYY-

transactions.select('issue_date','due_date','return_date','payment_date').show(5)

#ALL COLUMN FORMAT IS NOT IN CORRECT FORMAT -> (YYYY-MM-DD)



In [0]:
#Convert payment_date (which has time) into proper date format (YYYY-MM-DD)

transactions = transactions.withColumn("payment_date",to_date(to_timestamp(col("payment_date"), "dd-MM-yyyy HH:mm")))

#Converting all other date colunns to proper date format YYYY-MM-DD
transactions = transactions.withColumn("issue_date", to_date(col("issue_date"), "yyyy-MM-dd")) \
                   .withColumn("return_date", to_date(col("return_date"), "yyyy-MM-dd")) \
                   .withColumn("due_date", to_date(col("due_date"), "yyyy-MM-dd"))

#Show updated values
transactions.select('issue_date', 'due_date','return_date', 'payment_date').show(5)



In [0]:
#Regular expression to match dates starting with DD-MM-YYYY (01-31 at the start)
date_pattern = r"^[0-3][0-9]-[0-1][0-9]-[1-2][0-9]{3}$"

#*Check if any rows have DD-MM-YYYY format instead of YYYY-MM-DD*
incorrect_dates = transactions.filter(
    col("issue_date").rlike(date_pattern) | 
    col("due_date").rlike(date_pattern) | 
    col("return_date").rlike(date_pattern) |
    col("payment_date").rlike(date_pattern))

#Show incorrect format dates
incorrect_dates.select("issue_date", "due_date", "return_date", "payment_date").display(truncate=False)



In [0]:
#Filter rows where book_id starts with 'bk' (lowercase) only

bk_lowercase_check = transactions.filter(
    col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
    ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

#Show results
bk_lowercase_check.display()



In [0]:
#Filter rows where copy_id starts with 'cp' instead of 'CP'
cp_lowercase_check = transactions.filter(col("copy_id").rlike("^cp[0-9]+$"))

#Show results
cp_lowercase_check.select('copy_id').display()



In [0]:
#Convert book_id to uppercase
transactions = transactions.withColumn("book_id", upper(col("book_id")))

#Convert copy_id to uppercase
transactions = transactions.withColumn("copy_id", upper(col("copy_id")))



In [0]:
#Filter rows where book_id starts with 'bk' (lowercase) only

bk_lowercase_check = transactions.filter(
    col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
    ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

#Show results
bk_lowercase_check.display()

#NO LOWER CASE VALUES IS PRESENT NOW IN book_id COLUMN.



In [0]:
#Filter rows where copy_id starts with 'cp' instead of 'CP'
cp_lowercase_check = transactions.filter(col("copy_id").rlike("^cp[0-9]+$"))

#Show results
cp_lowercase_check.select('copy_id').display() 

#NO LOWER CASE VALUES IS PRESENT NOW IN copy_id COLUMN.



In [0]:
#Upper cases values are present we want all values to be lower case

transactions.select('initial_status').distinct().show()
transactions.select('final_status').distinct().show()
transactions.select('payment_status').distinct().show()



In [0]:
#Convert initial_status and final_status to lowercase
transactions = transactions.withColumn("initial_status", lower(col("initial_status"))) \
                                           .withColumn("final_status", lower(col("final_status")))\
                                            .withColumn("payment_status", lower(col("payment_status")))



In [0]:
transactions.select('initial_status').distinct().show()
transactions.select('final_status').distinct().show()
transactions.select('payment_status').distinct().show()




#####Saving the data in Silver container

######*This code will save the transaction data into Silver Container in ADLS Gen2 in 1st run of ADF Pipeline, then append the new data in 2nd run of ADF Pipeline.*

In [0]:
#Define the path to the Delta table in the silver container
silver_table_path = 'abfss://silver@lmsstorageaccount24.dfs.core.windows.net/transactions'

#Check if the DataFrame is empty
if transactions.isEmpty():
    print("No new data to append to the Delta table.")
else:
    #Check if the Delta table already exists
    try:
        #Try to read the existing Delta table
        existing_df = spark.read.format("delta").load(silver_table_path)
        
        #If it exists, append the new data
        transactions.write.format("delta").mode("append").save(silver_table_path)
        print(f"Appended new data to the existing Delta table at: {silver_table_path}")
    except AnalysisException:
        #If the table does not exist, create it
        transactions.write.format("delta").mode("overwrite").save(silver_table_path)
        print(f"Created new Delta table at: {silver_table_path}")  




#####Creating or Appending the Delta Table in Silver Schema according to the requirement

In [0]:
#Define the Delta table name
silver_table_name = "`lms-catalog`.silver.transactions"

#Check if the Delta table already exists
try:
    #Try to read the existing Delta table
    existing_df = spark.read.format("delta").table(silver_table_name)
    
    #If it exists, append the new data
    transactions.write.format("delta").mode("append").saveAsTable(silver_table_name)
    print(f"Appended new data to the existing Delta table: {silver_table_name}")
except AnalysisException:
    #If the table does not exist, create it
    transactions.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
    print(f"Created new Delta table: {silver_table_name}")




#####Verifying if the data is loaded in Silver Schema

In [0]:
%sql
select count(*) from `lms-catalog`.silver.transactions




####Transactions_2020_Q2 Table

In [0]:
# #Verifying if the files is present in container

# dbutils.fs.ls("mnt/silver/")



In [0]:
# #Reading the data from silver location

# transactions_2020_q2 = spark.read.csv('/mnt/silver/transactions_2020_Q2_raw.csv', header=True, inferSchema=True)

# transactions_2020_q2.display()



In [0]:
# #Data-type of each column

# transactions_2020_q2.printSchema()



In [0]:
# #Total records

# transactions_2020_q2.count()



In [0]:
# #Descriptive stats on fine_amount column before cleaning the data

# transactions_2020_q2.describe("fine_amount").display()



In [0]:
# #Checking for missing values

# transactions_2020_q2.select([sum(col(c).isNull().cast("int")).alias(c) for c in transactions_2020_q2.columns]).display()

# #return_date: 284 records are missing and payment_date: 4950 records are missing

# #return_date 284 reflects how many students did not returned the book taken from library,
# #payment_date 4950 reflects how many students did not paid the fine amount



In [0]:
# #Checking how many duplicate rows we have

# transactions_2020_q2.count() - transactions_2020_q2.distinct().count()  #total rows - unique rows



In [0]:
# #Checking for duplicate rows for transaction_id unique column

# #Find duplicate transaction_id values
# duplicate_transaction_ids_q2_2020 = transactions_2020_q2.groupBy("transaction_id").count().filter(col("count") > 1).select("transaction_id")

# #Join back to original DataFrame to get all rows with duplicate transaction_id
# duplicate_rows_transaction_ids_q2_2020 = transactions_2020_q2.join(duplicate_transaction_ids_q2_2020, on="transaction_id", how="inner")

# #Show duplicate rows
# display(duplicate_rows_transaction_ids_q2_2020)

# #NO DUPLICATES RECORDS.



In [0]:
# transactions_2020_q2.display()

# #Need to remove _ values from transaction_id column



In [0]:
# #Using Regular Expression to replace underscore with empty string
# from pyspark.sql.functions import regexp_replace

# #Remove underscores from transaction_id column
# transactions_2020_q2 = transactions_2020_q2.withColumn("transaction_id", regexp_replace(col("transaction_id"), "_", ""))

# #Show updated DataFrame
# transactions_2020_q2.display(7)



In [0]:
# #Check for dates starting from DD- instead of YYYY-

# transactions_2020_q2.select('issue_date','due_date','return_date','payment_date').display()

# #payment_date COLUMN FORMAT IS NOT IN CORRECT FORMAT (YYYY-MM-DD)



In [0]:
# #Convert payment_date (which has time) into proper date format (YYY-MM-DD)
# from pyspark.sql.functions import to_date, to_timestamp

# transactions_2020_q2 = transactions_2020_q2.withColumn(
#     "payment_date",
#     to_date(to_timestamp(col("payment_date"), "dd-MM-yyyy HH:mm"))
#     )

# #Show updated values
# transactions_2020_q2.select("payment_date").display(50, truncate=False)



In [0]:
# #Regular expression to match dates starting with DD-MM-YYYY (01-31 at the start)
# date_pattern = r"^[0-3][0-9]-[0-1][0-9]-[1-2][0-9]{3}$"

# #Check if any rows have DD-MM-YYYY format instead of YYYY-MM-DD
# incorrect_dates = transactions_2020_q2.filter(
#     col("issue_date").rlike(date_pattern) | 
#     col("due_date").rlike(date_pattern) | 
#     col("return_date").rlike(date_pattern) |
#     col("payment_date").rlike(date_pattern)
# )

# #Show incorrect format dates
# incorrect_dates.select("issue_date", "due_date", "return_date", "payment_date").display(truncate=False)



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2020_q2.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2020_q2.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display()



In [0]:
# from pyspark.sql.functions import upper

# #Convert book_id to uppercase
# transactions_2020_q2 = transactions_2020_q2.withColumn("book_id", upper(col("book_id")))

# #Show updated DataFrame
# transactions_2020_q2.display()



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2020_q2.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()

# #NO LOWER CASE VALUES IS PRESENT NOW IN book_id COLUMN.



In [0]:
# #Convert copy_id to uppercase
# transactions_2020_q2 = transactions_2020_q2.withColumn("copy_id", upper(col("copy_id")))

# #Show updated DataFrame
# transactions_2020_q2.display()



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2020_q2.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display() 

# #NO LOWER CASE VALUES IS PRESENT NOW IN copy_id COLUMN.



In [0]:
# #Upper cases values are present we want all values to be lower case

# transactions_2020_q2.select('initial_status','final_status').distinct().display()



In [0]:
# #Convert initial_status and final_status to lowercase
# transactions_2020_q2 = transactions_2020_q2.withColumn("initial_status", lower(col("initial_status"))) \
#                                            .withColumn("final_status", lower(col("final_status")))



In [0]:
# #Show transformed DataFrame
# transactions_2020_q2.select("initial_status").distinct().display()

# #Only one class in initial_status column




####Saving transactions_2020_q2 dataset into Delta format under Silver 

In [0]:
# #Saving the data in delta format under silver location

# transactions_2020_q2.write.format("delta").mode("overwrite").save("abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2020_Q2")



In [0]:
# %sql
# --Creating external table for the silver location in Unity Catalog
# DROP TABLE IF EXISTS `lms-catalog`.silver.transactions_2020_q2;

# CREATE TABLE `lms-catalog`.silver.transactions_2020_q2
# USING DELTA
# LOCATION 'abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2020_Q2';



####Transactions_2020_Q3 Table

In [0]:
# #Verifying if the files is present in container

# dbutils.fs.ls("mnt/silver/")



In [0]:
# #Reading the data from silver location

# transactions_2020_q3 = spark.read.csv('/mnt/silver/transactions_2020_Q3_raw.csv', header=True, inferSchema=True)

# transactions_2020_q3.display()



In [0]:
# #Data-type of each column

# transactions_2020_q3.printSchema()



In [0]:
# #Total records

# transactions_2020_q3.count()



In [0]:
# #Descriptive stats on fine_amount column before cleaning the data

# transactions_2020_q3.describe("fine_amount").display()



In [0]:
# #Checking for missing values

# transactions_2020_q3.select([sum(col(c).isNull().cast("int")).alias(c) for c in transactions_2020_q3.columns]).display()

# #return_date: 861 records are missing and payment_date: 14982 records are missing

# #return_date 861 reflects how many students did not returned the book taken from library,
# #payment_date 14982 reflects how many students did not paid the fine amount



In [0]:
# #Checking how many duplicate rows we have

# transactions_2020_q3.count() - transactions_2020_q3.distinct().count()  #total rows - unique rows



In [0]:
# #Checking for duplicate rows for transaction_id unique column

# #Find duplicate transaction_id values
# duplicate_transaction_ids_q3_2020 = transactions_2020_q3.groupBy("transaction_id").count().filter(col("count") > 1).select("transaction_id")

# #Join back to original DataFrame to get all rows with duplicate transaction_id
# duplicate_rows_transaction_ids_q3_2020 = transactions_2020_q3.join(duplicate_transaction_ids_q3_2020, on="transaction_id", how="inner")

# #Show duplicate rows
# display(duplicate_rows_transaction_ids_q3_2020)

# #NO DUPLICATES RECORDS.



In [0]:
# transactions_2020_q3.display()

# #Need to remove _ values from transaction_id column



In [0]:
# #Using Regular Expression to replace underscore with empty string
# from pyspark.sql.functions import regexp_replace

# #Remove underscores from transaction_id column
# transactions_2020_q3 = transactions_2020_q3.withColumn("transaction_id", regexp_replace(col("transaction_id"), "_", ""))

# #Show updated DataFrame
# transactions_2020_q3.display(7)



In [0]:
# #Check for dates starting from DD- instead of YYYY-

# transactions_2020_q3.select('issue_date','due_date','return_date','payment_date').display()

# #payment_date COLUMN FORMAT IS NOT IN CORRECT FORMAT (YYYY-MM-DD)



In [0]:
# #Convert payment_date (which has time) into proper date format (YYY-MM-DD)
# from pyspark.sql.functions import to_date, to_timestamp

# transactions_2020_q3 = transactions_2020_q3.withColumn(
#     "payment_date",
#     to_date(to_timestamp(col("payment_date"), "dd-MM-yyyy HH:mm"))
#     )

# #Show updated values
# transactions_2020_q3.select("payment_date").display(50)



In [0]:
# #Regular expression to match dates starting with DD-MM-YYYY (01-31 at the start)
# date_pattern = r"^[0-3][0-9]-[0-1][0-9]-[1-2][0-9]{3}$"

# #Check if any rows have DD-MM-YYYY format instead of YYYY-MM-DD
# incorrect_dates = transactions_2020_q2.filter(
#     col("issue_date").rlike(date_pattern) | 
#     col("due_date").rlike(date_pattern) | 
#     col("return_date").rlike(date_pattern) |
#     col("payment_date").rlike(date_pattern)
# )

# #Show incorrect format dates
# incorrect_dates.select("issue_date", "due_date", "return_date", "payment_date").display(truncate=False)



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2020_q3.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2020_q3.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display()



In [0]:
# #Convert book_id to uppercase
# transactions_2020_q3 = transactions_2020_q3.withColumn("book_id", upper(col("book_id")))

# #Show updated DataFrame
# transactions_2020_q3.display()



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2020_q3.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()

# #NO LOWER CASE VALUES IS PRESENT NOW IN book_id COLUMN.



In [0]:
# #Convert copy_id to uppercase
# transactions_2020_q3 = transactions_2020_q3.withColumn("copy_id", upper(col("copy_id")))

# #Show updated DataFrame
# transactions_2020_q3.display()



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2020_q3.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display() 

# #NO LOWER CASE VALUES IS PRESENT NOW IN copy_id COLUMN.



In [0]:
# #Upper cases values are present we want all values to be lower case

# transactions_2020_q3.select('initial_status','final_status').distinct().display()



In [0]:
# #Convert initial_status and final_status to lowercase
# transactions_2020_q3 = transactions_2020_q3.withColumn("initial_status", lower(col("initial_status"))) \
#                                            .withColumn("final_status", lower(col("final_status")))



In [0]:

# #Show transformed DataFrame
# transactions_2020_q3.select("initial_status").distinct().display()

# #Only one class in initial_status column




####Saving transactions_2020_q3 dataset into Delta format under Silver 

In [0]:
# #Saving the data in delta format under silver location

# transactions_2020_q3.write.format("delta").mode("overwrite").save("abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2020_Q3")



In [0]:
# %sql
# --Creating external table for the silver location in Unity Catalog
# DROP TABLE IF EXISTS `lms-catalog`.silver.transactions_2020_q3;

# CREATE TABLE `lms-catalog`.silver.transactions_2020_q3
# USING DELTA
# LOCATION 'abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2020_Q3';



####Transactions_2020_Q4 Table

In [0]:
# #Verifying if the files is present in container

# dbutils.fs.ls("mnt/silver/")



In [0]:
# #Reading the data from silver location

# transactions_2020_q4 = spark.read.csv('/mnt/silver/transactions_2020_Q4_raw.csv', header=True, inferSchema=True)

# transactions_2020_q4.display()



In [0]:
# #Checking data-type of each column

# transactions_2020_q4.printSchema()



In [0]:
# #Total observation

# transactions_2020_q4.count()



In [0]:
# #Descriptive stats on fine_amount column before cleaning the data

# transactions_2020_q4.describe("fine_amount").display()



In [0]:
# #Checking for missing values
# transactions_2020_q4.select([sum(col(c).isNull().cast("int")).alias(c) for c in transactions_2020_q4.columns]).display()

# #return_date: 860 records are missing and payment_date: 15062 records are missing

# #return_date 861 reflects how many students did not returned the book taken from library,
# #payment_date 14982 reflects how many students did not paid the fine amount



In [0]:
# #Checking how many duplicate rows we have

# transactions_2020_q4.count() - transactions_2020_q4.distinct().count()  #total rows - unique rows



In [0]:
# #Checking for duplicate rows for transaction_id unique column

# #Find duplicate transaction_id values
# duplicate_transaction_ids_q4_2020 = transactions_2020_q4.groupBy("transaction_id").count().filter(col("count") > 1).select("transaction_id")

# #Join back to original DataFrame to get all rows with duplicate transaction_id
# duplicate_rows_transaction_ids_q4_2020 = transactions_2020_q4.join(duplicate_transaction_ids_q4_2020, on="transaction_id", how="inner")

# #Show duplicate rows
# display(duplicate_rows_transaction_ids_q4_2020)

# #NO DUPLICATES RECORDS.



In [0]:
# transactions_2020_q4.display()

# #Need to remove _ values from transaction_id column



In [0]:
# #Using Regular Expression to replace underscore with empty string
# from pyspark.sql.functions import regexp_replace

# #Remove underscores from transaction_id column
# transactions_2020_q4 = transactions_2020_q4.withColumn("transaction_id", regexp_replace(col("transaction_id"), "_", ""))

# #Show updated DataFrame
# transactions_2020_q4.display(7)



In [0]:
# #Check for dates starting from DD- instead of YYYY-

# transactions_2020_q4.select('issue_date','due_date','return_date','payment_date').display()

# #payment_date COLUMN FORMAT IS NOT IN CORRECT FORMAT (YYYY-MM-DD)



In [0]:
# #Convert payment_date (which has time) into proper date format (YYY-MM-DD)

# transactions_2020_q4 = transactions_2020_q4.withColumn(
#     "payment_date",
#     to_date(to_timestamp(col("payment_date"), "dd-MM-yyyy HH:mm"))
#     )

# #Show updated values
# transactions_2020_q4.select("payment_date").display(50, truncate=False)



In [0]:
# #Regular expression to match dates starting with DD-MM-YYYY (01-31 at the start)
# date_pattern = r"^[0-3][0-9]-[0-1][0-9]-[1-2][0-9]{3}$"

# #Check if any rows have DD-MM-YYYY format instead of YYYY-MM-DD
# incorrect_dates = transactions_2020_q4.filter(
#     col("issue_date").rlike(date_pattern) | 
#     col("due_date").rlike(date_pattern) | 
#     col("return_date").rlike(date_pattern) |
#     col("payment_date").rlike(date_pattern)
# )

# #Show incorrect format dates
# incorrect_dates.select("issue_date", "due_date", "return_date", "payment_date").display(truncate=False)



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2020_q4.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2020_q4.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display()



In [0]:
# #Convert book_id to uppercase
# transactions_2020_q4 = transactions_2020_q4.withColumn("book_id", upper(col("book_id")))



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2020_q4.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()

# #NO LOWER CASE VALUES IS PRESENT NOW IN book_id COLUMN.



In [0]:
# #Convert copy_id to uppercase
# transactions_2020_q4 = transactions_2020_q4.withColumn("copy_id", upper(col("copy_id")))



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2020_q4.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display() 

# #NO LOWER CASE VALUES IS PRESENT NOW IN copy_id COLUMN.



In [0]:
# #Upper cases values are present we want all values to be lower case

# transactions_2020_q4.select('initial_status','final_status').distinct().display()



In [0]:
# #Convert initial_status and final_status to lowercase
# transactions_2020_q4 = transactions_2020_q4.withColumn("initial_status", lower(col("initial_status"))) \
#                                            .withColumn("final_status", lower(col("final_status")))

# #Show transformed DataFrame
# transactions_2020_q4.select("initial_status").distinct().display()

# #Only one class in initial_status column




####Saving transactions_2020_q4 dataset into Delta format under Silver 

In [0]:
# #Saving the data in delta format under silver location

# transactions_2020_q4.write.format("delta").mode("overwrite").save("abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2020_Q4")



In [0]:
# %sql
# --Creating external table for the silver location in Unity Catalog
# DROP TABLE IF EXISTS `lms-catalog`.silver.transactions_2020_q4;

# CREATE TABLE `lms-catalog`.silver.transactions_2020_q4
# USING DELTA
# LOCATION 'abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2020_Q4';



####Transactions_2021_Q1 Table

In [0]:
# #Checking if file is present in ADLS Gen2 or not

# dbutils.fs.ls('/mnt/silver')



In [0]:
# #Reading the data from silver location

# transactions_2021_q1 = spark.read.csv('/mnt/silver/transactions_2021_Q1_raw.csv', header=True, inferSchema=True)

# transactions_2021_q1.display()



In [0]:
# #Data-type of each column

# transactions_2021_q1.printSchema()



In [0]:
# #Total records

# transactions_2021_q1.count()



In [0]:
# #Descriptive stats on fine_amount column before cleaning the data

# transactions_2021_q1.describe("fine_amount").display()



In [0]:
# #Checking for missing values

# transactions_2021_q1.select([sum(col(c).isNull().cast("int")).alias(c) for c in transactions_2021_q1.columns]).display()

# #return_date: 847 records are missing and payment_date: 14594 records are missing

# #return_date 847 reflects how many students did not returned the book taken from library,
# #payment_date 14594 reflects how many students did not paid the fine amount



In [0]:
# #Checking how many duplicate rows we have

# transactions_2021_q1.count() - transactions_2021_q1.distinct().count()  #total rows - unique rows



In [0]:
# #Using Regular Expression to replace underscore with empty string in transaction_id column
# from pyspark.sql.functions import regexp_replace

# #Remove underscores from transaction_id column
# transactions_2021_q1 = transactions_2021_q1.withColumn("transaction_id", regexp_replace(col("transaction_id"), "_", ""))

# #Show updated DataFrame
# transactions_2021_q1.display(7)



In [0]:
# #Convert payment_date (which has time) into proper date format (YYY-MM-DD)
# from pyspark.sql.functions import to_date, to_timestamp

# transactions_2021_q1 = transactions_2021_q1.withColumn(
#     "payment_date",
#     to_date(to_timestamp(col("payment_date"), "dd-MM-yyyy HH:mm"))
#     )

# #Show updated values
# transactions_2021_q1.select("payment_date").display(50)



In [0]:
# #Regular expression to match dates starting with DD-MM-YYYY (01-31 at the start)
# date_pattern = r"^[0-3][0-9]-[0-1][0-9]-[1-2][0-9]{3}$"

# #Check if any rows have DD-MM-YYYY format instead of YYYY-MM-DD
# incorrect_dates = transactions_2021_q1.filter(
#     col("issue_date").rlike(date_pattern) | 
#     col("due_date").rlike(date_pattern) | 
#     col("return_date").rlike(date_pattern) |
#     col("payment_date").rlike(date_pattern)
# )

# #Show incorrect format dates
# incorrect_dates.select("issue_date", "due_date", "return_date", "payment_date").display(truncate=False)



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2021_q1.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2021_q1.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display()



In [0]:
# #Convert book_id to uppercase
# transactions_2021_q1 = transactions_2021_q1.withColumn("book_id", upper(col("book_id")))



In [0]:
# #Convert copy_id to uppercase
# transactions_2021_q1 = transactions_2021_q1.withColumn("copy_id", upper(col("copy_id")))



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2021_q1.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()

# #NO LOWER CASE VALUES IS PRESENT NOW IN book_id COLUMN.



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2021_q1.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display() 

# #NO LOWER CASE VALUES IS PRESENT NOW IN copy_id COLUMN.



In [0]:
# #Upper cases values are present we want all values to be lower case

# transactions_2021_q1.select('initial_status','final_status').distinct().display()



In [0]:
# #Convert initial_status and final_status values to lowercase
# transactions_2021_q1 = transactions_2021_q1.withColumn("initial_status", lower(col("initial_status"))) \
#                                            .withColumn("final_status", lower(col("final_status")))

# #Show transformed DataFrame
# transactions_2021_q1.select("initial_status").distinct().display()

# #Only one class in initial_status column




####Saving transactions_2021_q1 dataset into Delta format under Silver 

In [0]:
# #Saving the data in delta format under silver location

# transactions_2021_q1.write.format("delta").mode("overwrite").save("abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2021_Q1")



In [0]:
# %sql
# --Creating external table for the silver location in Unity Catalog
# DROP TABLE IF EXISTS `lms-catalog`.silver.transactions_2021_q1;

# CREATE TABLE `lms-catalog`.silver.transactions_2021_q1
# USING DELTA
# LOCATION 'abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2021_Q1';



####Transactions_2021_Q2 Table

In [0]:
# #Verifying the file location

# display(dbutils.fs.ls('mnt/silver'))



In [0]:
# #Reading the data from silver location

# transactions_2021_q2 = spark.read.csv('/mnt/silver/transactions_2021_Q2_raw.csv', header=True, inferSchema=True)

# transactions_2021_q2.display()



In [0]:
# #Data-type of each column

# transactions_2021_q2.printSchema()



In [0]:
# #Total records

# transactions_2021_q2.count()



In [0]:
# #Descriptive stats on fine_amount column before cleaning the data

# transactions_2021_q2.describe("fine_amount").display()



In [0]:
# #Checking for missing values

# transactions_2021_q2.select([sum(col(c).isNull().cast("int")).alias(c) for c in transactions_2021_q2.columns]).display()

# #return_date: 559 records are missing and payment_date: 14982 records are missing

# #return_date 559 reflects how many students did not returned the book taken from library,
# #payment_date 9809 reflects how many students did not paid the fine amount



In [0]:
# #Checking how many duplicate rows we have

# transactions_2021_q2.count() - transactions_2021_q2.distinct().count()  #total rows - unique rows



In [0]:
# #Using Regular Expression to replace underscore with empty string

# #Remove underscores from transaction_id column
# transactions_2021_q2 = transactions_2021_q2.withColumn("transaction_id", regexp_replace(col("transaction_id"), "_", ""))

# #Show updated DataFrame
# transactions_2021_q2.display(7)



In [0]:
# #Convert payment_date (which has time) into proper date format (YYY-MM-DD)

# transactions_2021_q2= transactions_2021_q2.withColumn(
#     "payment_date",
#     to_date(to_timestamp(col("payment_date"), "dd-MM-yyyy HH:mm"))
#     )

# #Show updated values
# transactions_2021_q2.select("payment_date").display(50)



In [0]:
# #Regular expression to match dates starting with DD-MM-YYYY (01-31 at the start)
# date_pattern = r"^[0-3][0-9]-[0-1][0-9]-[1-2][0-9]{3}$"

# #Check if any rows have DD-MM-YYYY format instead of YYYY-MM-DD
# incorrect_dates = transactions_2021_q2.filter(
#     col("issue_date").rlike(date_pattern) | 
#     col("due_date").rlike(date_pattern) | 
#     col("return_date").rlike(date_pattern) |
#     col("payment_date").rlike(date_pattern)
# )

# #Show incorrect format dates
# incorrect_dates.select("issue_date", "due_date", "return_date", "payment_date").display(truncate=False)



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2021_q2.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2021_q2.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display()



In [0]:
# #Convert book_id to uppercase
# transactions_2021_q2 = transactions_2021_q2.withColumn("book_id", upper(col("book_id")))

# #Convert copy_id to uppercase
# transactions_2021_q2 = transactions_2021_q2.withColumn("copy_id", upper(col("copy_id")))



In [0]:
# #Filter rows where book_id starts with 'bk' (lowercase) only

# bk_lowercase_check = transactions_2021_q2.filter(
#     col("book_id").rlike("^bk[0-9]+$")  #Matches bk001, bk105, etc.
#     ).filter(~col("book_id").rlike("^BK[0-9]+$"))  #Excludes uppercase BK

# #Show results
# bk_lowercase_check.display()

# #NO LOWER CASE VALUES IS PRESENT NOW IN book_id COLUMN.



In [0]:
# #Filter rows where copy_id starts with 'cp' instead of 'CP'
# cp_lowercase_check = transactions_2021_q2.filter(col("copy_id").rlike("^cp[0-9]+$"))

# #Show results
# cp_lowercase_check.select('copy_id').display()

# #NO LOWER CASE VALUES IS PRESENT NOW IN copy_id COLUMN.



In [0]:
# #Upper cases values are present we want all values to be lower case

# transactions_2021_q2.select('initial_status','final_status').distinct().display()



In [0]:
# #Convert initial_status and final_status to lowercase
# transactions_2021_q2 = transactions_2021_q2.withColumn("initial_status", lower(col("initial_status"))) \
#                                            .withColumn("final_status", lower(col("final_status")))



In [0]:

# #Show transformed DataFrame
# transactions_2021_q2.select("initial_status").distinct().display()

# #Only one class in initial_status column




####Saving transactions_2021_q2 dataset into Delta format under Silver 

In [0]:
# #Saving the data in delta format under silver location

# transactions_2021_q2.write.format("delta").mode("overwrite").save("abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2021_Q2")



In [0]:
# %sql
# --Creating external table for the silver location in Unity Catalog
# DROP TABLE IF EXISTS `lms-catalog`.silver.transactions_2021_q2;

# CREATE TABLE `lms-catalog`.silver.transactions_2021_q2
# USING DELTA
# LOCATION 'abfss://silver@lmsstorageaccount2025.dfs.core.windows.net/transactions_2021_Q2';




####Merge All Transactions Table's

In [0]:
# from functools import reduce
# from pyspark.sql import DataFrame

# #List of all quarterly DataFrames
# transactions_list = [transactions_2020_q2, transactions_2020_q3, transactions_2020_q4, 
#                      transactions_2021_q1, transactions_2021_q2]

# #.Merge all transactions using union
# transactions = reduce(DataFrame.union, transactions_list)

# #Show merged table
# transactions.show(10, truncate=False)



In [0]:
# transactions.count()



In [0]:
# transactions.display()

