In [None]:
import dlt
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import to_json,from_json,col,explode,lit,to_timestamp,input_file_name,current_timestamp
from pyspark.sql import DataFrameWriter

In [None]:
@dlt.table(
  name = 'flights_bronze'
)
def get_flights_data():
    df = spark.readStream.format("cloudFiles")\
        .option("cloudFiles.format","parquet")\
        .option("cloudFiles.inferColumnTypes", "true")\
        .option("recursiveFileLookup", "true")\
        .option("cloudFiles.schemaLocation","dbfs:/mnt/zathura/checkpoints/raw/tables/flights/")\
        .load("dbfs:/mnt/data/raw/Postgress/flights/")

    df = df.withColumn("filename",input_file_name()).withColumn("fileTimestamp",current_timestamp())
    return df

In [None]:
@dlt.table(
  name = 'payments_bronze'
)
def get_payments_data():
    df = spark.readStream\
        .format("cloudFiles")\
        .option("cloudFiles.format","parquet")\
        .option("cloudFiles.inferColumnTypes", "true")\
        .option("recursiveFileLookup", "true")\
        .option("cloudFiles.schemaLocation","dbfs:/mnt/zathura/checkpoints/raw/tables/payments/")\
        .load("dbfs:/mnt/data/raw/Postgress/payments/")

    df = df.withColumn("filename",input_file_name()).withColumn("fileTimestamp",current_timestamp())    
    return df


In [None]:
@dlt.table(
  name = 'silver_flight_schedules'
)
def get_flights_data():
    df = spark.readStream.format("cloudFiles")\
        .option("cloudFiles.format","parquet")\
        .option("cloudFiles.inferColumnTypes", "true")\
        .option("recursiveFileLookup", "true")\
        .option("cloudFiles.schemaLocation","dbfs:/mnt/zathura/checkpoints/raw/tables/flight_schedules/")\
        .load("dbfs:/mnt/data/raw/Postgress/flight_schedules/")

    df = df.withColumn("filename",input_file_name()).withColumn("fileTimestamp",current_timestamp())

    df = df.select("name","frequency","source","destination","timings","filename","fileTimestamp")
   
    return df

In [None]:
@dlt.table(
  name = 'silver_airplanes'
)
def get_flights_data():
    df = spark.readStream.format("cloudFiles")\
        .option("cloudFiles.format","parquet")\
        .option("cloudFiles.inferColumnTypes", "true")\
        .option("recursiveFileLookup", "true")\
        .option("cloudFiles.schemaLocation","dbfs:/mnt/zathura/checkpoints/raw/tables/airplanes/")\
        .load("dbfs:/mnt/data/raw/Postgress/airplanes/")

    df = df.withColumn("filename",input_file_name()).withColumn("fileTimestamp",current_timestamp())

    df = df.select("airplane_id","name","type","total_seats","fuel_capacity","range","filename","fileTimestamp")

    return df

In [None]:
@dlt.table(
    name = 'silver_flights',
    comment="The cleaned flight data to select specified columns and partitioned by flight_date",
    partition_cols=["flight_date"],
    table_properties={
        "myCompanyPipeline.quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
@dlt.expect("valid ID", "id IS NOT NULL")
def enriching_flights():
    df = dlt.readStream("flights_bronze")
    df = df.withColumn("flight_date",df.departure_time.cast("DATE"))
    df = df.withColumn("departure_timestamp",to_timestamp(col("departure_time")))
    df = df.withColumn("arrival_timestamp",to_timestamp(col("arrival_time")))
    df = df.select("id","flight_code","source_airport","destination_airport","source_city","destination_city","estimated_departure_time","departure_timestamp","estimated_arrival_time","arrival_timestamp","total_passengers","status","filename","fileTimestamp","flight_date")
    
    return df




In [None]:

@dlt.table(
    name = 'silver_payments',
    comment="The cleaned flight - payment data to select specified columns and partitioned by payment_date",
    partition_cols=["payment_date"],
    table_properties={
        "myCompanyPipeline.quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
@dlt.expect_all({"valid TransactionID":"transaction_id IS NOT NULL", "valid amount" : "amount IS NOT NULL  AND amount > 0"})
def enriching_payments():
    df = dlt.readStream("payments_bronze")
    df = df.withColumn("payment_date",df.time_of_payment.cast("DATE"))
    df = df.select("flight_id","first_name","last_name","contact_number","email_id","seat_number","mode_of_payment","time_of_payment","amount","transaction_id","agent","filename","fileTimestamp","payment_date")

    return df

In [None]:

@dlt.table(
    name = 'silver_passengers',
    comment="The cleaned flight data to select specified columns and partitioned by order_date",
    table_properties={
        "myCompanyPipeline.quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
@dlt.expect_all({"valid bookingID" : "bookingID IS NOT NULL", "valid seat with person" : "seat IS NOT NULL AND first_name IS NOT NULL"})
def getPassenger_fromflights():
    
    # Define the schema
    schema = ArrayType(StructType([
        StructField("first_name", StringType()),
        StructField("last_name", StringType()),
        StructField("age", IntegerType()),
        StructField("gender", StringType()),
        StructField("contact_number", StringType()),
        StructField("email_id", StringType()),
        StructField("aadhar_number", StringType()),
        StructField("passport_number", StringType()),
        StructField("seating_class", StringType()),
        StructField("seat", StringType()),
        StructField("bookingID", StringType())
    ]))



    df = dlt.readStream("flights_bronze").select("id","passenger_data","filename","fileTimestamp")
    df = df.withColumn("json_passenger_data", from_json("passenger_data", schema))
    df = df.withColumn("exploded_passenger_data", explode("json_passenger_data"))
    df = df.select("id","filename","fileTimestamp","exploded_passenger_data.*")
    df = df.select("id","first_name","last_name","age","gender","contact_number","email_id","aadhar_number","passport_number","seating_class","seat","bookingID","filename","fileTimestamp")

    return df