**Importing dependencies**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

**Reading delta file**

In [0]:
# df = spark.read.format('delta')\
#           .option('header',True)\
#           .option('inferSchema',True)\
#           .load("/Volumes/workspace/bronze/bronzevolume/bookings/data/")

                

In [0]:
# display(df)

booking_id,passenger_id,flight_id,airport_id,amount,booking_date,_rescued_data
B00001,P0048,F0014,A048,850.72,2025-05-29,
B00002,P0011,F0052,A003,376.63,2025-06-09,
B00003,P0079,F0023,A012,534.02,2025-06-03,
B00004,P0068,F0001,A039,1333.7,2025-06-16,
B00005,P0189,F0019,A008,1334.96,2025-06-17,
B00006,P0070,F0003,A004,296.13,2025-05-18,
B00007,P0194,F0068,A016,460.14,2025-04-05,
B00008,P0181,F0048,A017,1402.02,2025-06-04,
B00009,P0096,F0081,A016,1444.51,2025-05-16,
B00010,P0131,F0089,A034,292.39,2025-05-16,


**file schema**

In [0]:
# df.printSchema()

root
 |-- booking_id: string (nullable = true)
 |-- passenger_id: string (nullable = true)
 |-- flight_id: string (nullable = true)
 |-- airport_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- booking_date: string (nullable = true)
 |-- _rescued_data: string (nullable = true)



In [0]:
import dlt

**Creating Streaming table**

In [0]:
@dlt.table(
    name = "stagging_bookings"
)
def stagging():
    df = spark.readStream.format('delta').load("/Volumes/workspace/bronze/bronzevolume/bookings/data/")
    return df

**Creating Streaming view**

In [0]:
@dlt.view(
    name = "transform_bookings"
)
def transform():
    df = spark.readStream.table("stagging_bookings")
    df = df.drop("_rescued_data")
    df = df.withColumn("amount",col("amount").cast("double"))
    df = df.withColumn("modifiedDate",current_timestamp())

    return df

**Expectation rules**

In [0]:
expect = {
    "rule1": "booking_id is NOT NULL",
    "rule2": "passenger_id is NOT NULL",
    "rule3": "flight_id is NOT NULL"
}

In [0]:
@dlt.table(
    name = "silver_booking"
)

@dlt.expect_all(expect)

def silver():
    df = spark.readStream.table("transform_bookings")
    return df

In [0]:
#Flights data
df= spark.read.format('delta').load("/Volumes/workspace/bronze/bronzevolume/flights/data/")
display(df)

flight_id,airline,origin,destination,flight_date,_rescued_data
F0001,Delta,Kellyfort,South Kathleen,2025-05-04,
F0002,Qatar Airways,Lake Stephen,New Vincent,2025-04-29,
F0003,Lufthansa,East Patrickborough,North Mary,2025-05-11,
F0004,Delta,Maddenshire,Johnchester,2025-05-16,
F0005,Qatar Airways,Bennettside,New Mistyhaven,2025-06-13,
F0006,Air Canada,New Richardside,South Jamesborough,2025-05-16,
F0007,Delta,Berryport,Miguelburgh,2025-05-24,
F0008,Lufthansa,Briannachester,Cervantesland,2025-05-26,
F0009,Delta,Alexandraborough,North Alexishaven,2025-06-10,
F0010,Emirates,Kruegerchester,Martintown,2025-05-20,


In [0]:
df = df.drop("_rescued_data")
df = df.withColumn("modifiedDate",current_timestamp())
display(df)

flight_id,airline,origin,destination,flight_date,modifiedDate
F0001,Delta,Kellyfort,South Kathleen,2025-05-04,2025-08-13T12:22:30.439Z
F0002,Qatar Airways,Lake Stephen,New Vincent,2025-04-29,2025-08-13T12:22:30.439Z
F0003,Lufthansa,East Patrickborough,North Mary,2025-05-11,2025-08-13T12:22:30.439Z
F0004,Delta,Maddenshire,Johnchester,2025-05-16,2025-08-13T12:22:30.439Z
F0005,Qatar Airways,Bennettside,New Mistyhaven,2025-06-13,2025-08-13T12:22:30.439Z
F0006,Air Canada,New Richardside,South Jamesborough,2025-05-16,2025-08-13T12:22:30.439Z
F0007,Delta,Berryport,Miguelburgh,2025-05-24,2025-08-13T12:22:30.439Z
F0008,Lufthansa,Briannachester,Cervantesland,2025-05-26,2025-08-13T12:22:30.439Z
F0009,Delta,Alexandraborough,North Alexishaven,2025-06-10,2025-08-13T12:22:30.439Z
F0010,Emirates,Kruegerchester,Martintown,2025-05-20,2025-08-13T12:22:30.439Z


In [0]:
# passengers data

df = spark.read.format("delta").load("/Volumes/workspace/bronze/bronzevolume/passengers/data/")
display(df)


passenger_id,name,gender,nationality,_rescued_data
P0001,Kevin Ferguson,Male,Reunion,
P0002,Kathleen Martinez DVM,Female,Burkina Faso,
P0003,Cynthia Frazier,Male,Marshall Islands,
P0004,Ryan Ramsey,Male,Niger,
P0005,Mike Kim,Male,Taiwan,
P0006,Diana Adams,Male,Mayotte,
P0007,Sharon Moon,Male,Madagascar,
P0008,Cheryl Glenn,Male,Maldives,
P0009,Allen Lowery,Male,Rwanda,
P0010,Maria Medina,Male,Denmark,


In [0]:
df = df.drop("_rescued_data")
df = df.withColumn("modifiedDate",current_timestamp())


In [0]:
display(df)

passenger_id,name,gender,nationality,modifiedDate
P0001,Kevin Ferguson,Male,Reunion,2025-08-15T05:51:18.730Z
P0002,Kathleen Martinez DVM,Female,Burkina Faso,2025-08-15T05:51:18.730Z
P0003,Cynthia Frazier,Male,Marshall Islands,2025-08-15T05:51:18.730Z
P0004,Ryan Ramsey,Male,Niger,2025-08-15T05:51:18.730Z
P0005,Mike Kim,Male,Taiwan,2025-08-15T05:51:18.730Z
P0006,Diana Adams,Male,Mayotte,2025-08-15T05:51:18.730Z
P0007,Sharon Moon,Male,Madagascar,2025-08-15T05:51:18.730Z
P0008,Cheryl Glenn,Male,Maldives,2025-08-15T05:51:18.730Z
P0009,Allen Lowery,Male,Rwanda,2025-08-15T05:51:18.730Z
P0010,Maria Medina,Male,Denmark,2025-08-15T05:51:18.730Z


In [0]:
#Airports data
df= spark.read.format('delta').load("/Volumes/workspace/bronze/bronzevolume/airports/data/")
display(df)

airport_id,airport_name,city,country,_rescued_data
A001,Gregoryland International Airport,Kristenmouth,South Georgia and the South Sandwich Islands,
A002,East Kristin International Airport,North Michaelview,Bosnia and Herzegovina,
A003,Brownland International Airport,Samuelville,Costa Rica,
A004,Meghanton International Airport,Andrewsmouth,Macedonia,
A005,East Aaron International Airport,Davishaven,Monaco,
A006,Michaelburgh International Airport,East Blake,Iceland,
A007,West Jennifer International Airport,Jillianstad,Libyan Arab Jamahiriya,
A008,Port Craig International Airport,New Lisa,French Southern Territories,
A009,New Joshuafurt International Airport,Port Jamiehaven,Pitcairn Islands,
A010,Thompsontown International Airport,Murraychester,Ireland,


**Querying the table after DLT pipeline run**

In [0]:
%sql
SELECT * FROM workspace.silver.silver_bookings

booking_id,passenger_id,flight_id,airport_id,amount,booking_date,Booking_Insert_Date
B01001,P0207,F0047,A004,912.64,2025-07-10,2025-08-15T06:58:13.048Z
B01002,P0124,F0088,A051,597.04,2025-07-01,2025-08-15T06:58:13.048Z
B01003,P0103,F0096,A036,724.88,2025-07-10,2025-08-15T06:58:13.048Z
B01004,P0049,F0069,A001,1370.99,2025-07-12,2025-08-15T06:58:13.048Z
B01005,P0210,F0084,A053,1476.15,2025-07-01,2025-08-15T06:58:13.048Z
B01006,P0126,F0084,A042,800.39,2025-06-26,2025-08-15T06:58:13.048Z
B01007,P0077,F0013,A055,1187.22,2025-06-28,2025-08-15T06:58:13.048Z
B01008,P0118,F0005,A033,913.43,2025-07-18,2025-08-15T06:58:13.048Z
B01009,P0090,F0102,A034,260.75,2025-07-08,2025-08-15T06:58:13.048Z
B01010,P0012,F0080,A045,245.61,2025-07-21,2025-08-15T06:58:13.048Z


In [0]:
%sql
SELECT * FROM workspace.silver.silver_flights

flight_id,airline,origin,destination,flight_date,Flight_Insert_Date
F0001,Delta,Kellyfort,South Kathleen,2025-05-04,2025-08-15T06:58:04.121Z
F0002,Qatar Airways,Lake Stephen,New Vincent,2025-04-29,2025-08-15T06:58:04.121Z
F0003,Jet Airways,East Patrickborough,North Mary,2025-07-03,2025-08-15T06:58:04.121Z
F0004,Delta,Maddenshire,Johnchester,2025-05-16,2025-08-15T06:58:04.121Z
F0005,IndiGo,Bennettside,New Mistyhaven,2025-06-24,2025-08-15T06:58:04.121Z
F0006,Air Canada,New Richardside,South Jamesborough,2025-05-16,2025-08-15T06:58:04.121Z
F0007,Delta,Berryport,Miguelburgh,2025-05-24,2025-08-15T06:58:04.121Z
F0008,Lufthansa,Briannachester,Cervantesland,2025-05-26,2025-08-15T06:58:04.121Z
F0009,Delta,Alexandraborough,North Alexishaven,2025-06-10,2025-08-15T06:58:04.121Z
F0010,Emirates,Kruegerchester,Martintown,2025-05-20,2025-08-15T06:58:04.121Z


In [0]:
%sql
SELECT * FROM workspace.silver.silver_passengers

passenger_id,name,gender,nationality,Passenger_Insert_Date
P0001,Kevin Ferguson,Male,Reunion,2025-08-15T06:58:03.127Z
P0002,Kathleen Martinez DVM,Female,Burkina Faso,2025-08-15T06:58:03.127Z
P0003,Cynthia Frazier,Male,Marshall Islands,2025-08-15T06:58:03.127Z
P0004,Ryan Ramsey,Male,Niger,2025-08-15T06:58:03.127Z
P0005,Mike Kim,Male,Taiwan,2025-08-15T06:58:03.127Z
P0006,Diana Adams,Male,Mayotte,2025-08-15T06:58:03.127Z
P0007,Sharon Moon,Male,Madagascar,2025-08-15T06:58:03.127Z
P0008,Cheryl Glenn,Male,Maldives,2025-08-15T06:58:03.127Z
P0009,Allen Lowery,Male,Rwanda,2025-08-15T06:58:03.127Z
P0010,Maria Medina,Male,Denmark,2025-08-15T06:58:03.127Z


In [0]:
%sql
SELECT * FROM workspace.silver.silver_airports

airport_id,airport_name,city,country,Airport_Insert_Date
A001,Gregoryland International Airport,Kristenmouth,South Georgia and the South Sandwich Islands,2025-08-15T06:58:05.092Z
A002,East Kristin International Airport,North Michaelview,Bosnia and Herzegovina,2025-08-15T06:58:05.092Z
A003,Brownland International Airport,Samuelville,Costa Rica,2025-08-15T06:58:05.092Z
A004,Meghanton International Airport,Andrewsmouth,Macedonia,2025-08-15T06:58:05.092Z
A005,East Aaron International Airport,Davishaven,Monaco,2025-08-15T06:58:05.092Z
A006,Michaelburgh International Airport,East Blake,Iceland,2025-08-15T06:58:05.092Z
A007,West Jennifer International Airport,Jillianstad,Libyan Arab Jamahiriya,2025-08-15T06:58:05.092Z
A008,Port Craig International Airport,New Lisa,French Southern Territories,2025-08-15T06:58:05.092Z
A009,New Joshuafurt International Airport,Port Jamiehaven,Pitcairn Islands,2025-08-15T06:58:05.092Z
A010,Thompsontown International Airport,Murraychester,Ireland,2025-08-15T06:58:05.092Z


In [0]:
%sql
SELECT * FROM workspace.silver.silver_business

airport_id,passenger_id,flight_id,booking_id,amount,booking_date,Booking_Insert_Date,airline,origin,destination,flight_date,Flight_Insert_Date,name,gender,nationality,Passenger_Insert_Date,airport_name,city,country,Airport_Insert_Date
A052,P0085,F0046,B01257,1129.17,2025-06-27,2025-08-15T06:58:13.048Z,Qatar Airways,West David,North Becky,2025-04-30,2025-08-15T06:58:04.121Z,Frederick Robertson,Female,Germany,2025-08-15T06:58:03.127Z,East Cynthia International Airport,Gibbsfurt,Ecuador,2025-08-15T06:58:05.092Z
A052,P0168,F0071,B01120,175.84,2025-07-01,2025-08-15T06:58:13.048Z,Delta,Jefferyberg,Wendyshire,2025-06-17,2025-08-15T06:58:04.121Z,Norman Jones,Female,Faroe Islands,2025-08-15T06:58:03.127Z,East Cynthia International Airport,Gibbsfurt,Ecuador,2025-08-15T06:58:05.092Z
A052,P0011,F0077,B01255,725.24,2025-07-14,2025-08-15T06:58:13.048Z,Qatar Airways,Samuelfort,Port Ashley,2025-06-12,2025-08-15T06:58:04.121Z,Michael Anderson MD,Female,Czech Republic,2025-08-15T06:58:03.127Z,East Cynthia International Airport,Gibbsfurt,Ecuador,2025-08-15T06:58:05.092Z
A052,P0096,F0098,B01218,1473.27,2025-07-09,2025-08-15T06:58:13.048Z,Air Canada,Wallerburgh,Lake Karen,2025-05-01,2025-08-15T06:58:04.121Z,Erika Hall,Male,Kenya,2025-08-15T06:58:03.127Z,East Cynthia International Airport,Gibbsfurt,Ecuador,2025-08-15T06:58:05.092Z
A052,P0146,F0055,B01055,573.54,2025-06-28,2025-08-15T06:58:13.048Z,Air Canada,Spencerborough,Randyville,2025-06-09,2025-08-15T06:58:04.121Z,Phillip Myers,Female,Slovenia,2025-08-15T06:58:03.127Z,East Cynthia International Airport,Gibbsfurt,Ecuador,2025-08-15T06:58:05.092Z
A053,P0185,F0060,B01292,1175.36,2025-07-06,2025-08-15T06:58:13.048Z,Air Canada,Port Angelicaborough,East Daniel,2025-05-21,2025-08-15T06:58:04.121Z,Karen Maynard,Male,Romania,2025-08-15T06:58:03.127Z,South Corey International Airport,Johnsonfort,Switzerland,2025-08-15T06:58:05.092Z
A053,P0002,F0057,B01125,144.35,2025-07-21,2025-08-15T06:58:13.048Z,Air Canada,Samanthashire,Fisherstad,2025-06-09,2025-08-15T06:58:04.121Z,Kathleen Martinez DVM,Female,Burkina Faso,2025-08-15T06:58:03.127Z,South Corey International Airport,Johnsonfort,Switzerland,2025-08-15T06:58:05.092Z
A053,P0180,F0017,B01105,1132.14,2025-07-13,2025-08-15T06:58:13.048Z,Delta,Wallermouth,New Rogerberg,2025-05-04,2025-08-15T06:58:04.121Z,Chelsea Copeland,Female,Bolivia,2025-08-15T06:58:03.127Z,South Corey International Airport,Johnsonfort,Switzerland,2025-08-15T06:58:05.092Z
A053,P0210,F0084,B01005,1476.15,2025-07-01,2025-08-15T06:58:13.048Z,Delta,Franklinmouth,Lake Joseph,2025-05-30,2025-08-15T06:58:04.121Z,Robert Davis,Male,Puerto Rico,2025-08-15T06:58:03.127Z,South Corey International Airport,Johnsonfort,Switzerland,2025-08-15T06:58:05.092Z
A053,P0108,F0070,B01032,1443.89,2025-07-10,2025-08-15T06:58:13.048Z,Air Canada,Glennhaven,Port Darrenchester,2025-05-04,2025-08-15T06:58:04.121Z,Pamela Byrd,Female,Morocco,2025-08-15T06:58:03.127Z,South Corey International Airport,Johnsonfort,Switzerland,2025-08-15T06:58:05.092Z
