In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, lit

dim_airline = DeltaTable.forName(spark, "flight_silver.dim_airline")

sourceDF = spark.sql("""
    SELECT DISTINCT IATA_CODE, AIRLINE
    FROM flight_bronze.bronze_airlines
""")

(
    dim_airline.alias("target")
    .merge(
        sourceDF.alias("source"),
        "target.IATA_CODE = source.IATA_CODE"
    )
    .whenNotMatchedInsert(values={
        "IATA_CODE": "source.IATA_CODE",
        "AIRLINE": "source.AIRLINE",
        "effective_from": current_timestamp(),
        "effective_to": lit(None).cast("timestamp"),
        "is_current": lit(True)
    })
    .execute()
)

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, lit, col

dim_airport = DeltaTable.forName(spark, "flight_silver.dim_airport")
sourceDF = spark.sql("""
    SELECT DISTINCT
        IATA_CODE,
        AIRPORT,
        CITY,
        STATE,
        COUNTRY,
        LATITUDE,
        LONGITUDE
    FROM flight_bronze.bronze_airports
""")

(
    dim_airport.alias("target")
    .merge(
        sourceDF.alias("source"),
        "target.IATA_CODE = source.IATA_CODE"
    )
    .whenNotMatchedInsert(values={
        "IATA_CODE": "source.IATA_CODE",
        "AIRPORT": "source.AIRPORT",
        "CITY": "source.CITY",
        "STATE": "source.STATE",
        "COUNTRY": "source.COUNTRY",
        "LATITUDE": "source.LATITUDE",
        "LONGITUDE": "source.LONGITUDE",
        "effective_from": current_timestamp(),
        "effective_to": lit(None).cast("timestamp"),
        "is_current": lit(True)
    })
    .execute()
)

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, make_date

sourceDF = (
    spark.table("flight_bronze.bronze_flights")
    .select(
        make_date(col("YEAR"), col("MONTH"), col("DAY")).alias("date_value"),
        col("YEAR").alias("year"),
        col("MONTH").alias("month"),
        col("DAY").alias("day"),
        col("DAY_OF_WEEK").alias("day_of_week")
    )
    .distinct()
)

dim_date = DeltaTable.forName(spark, "flight_silver.dim_date")

dim_date.alias("target").merge(
    sourceDF.alias("source"),
    "target.date_value = source.date_value"
).whenNotMatchedInsert(
    values={
        "date_value": col("source.date_value"),
        "year": col("source.year"),
        "month": col("source.month"),
        "day": col("source.day"),
        "day_of_week": col("source.day_of_week")
    }
).execute()

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, col

sourceDF = (
    spark.table("flight_bronze.bronze_flights")
    .filter(col("YEAR").isNotNull())
    .select(
        "YEAR",
        "MONTH",
        "DAY",
        "DAY_OF_WEEK",
        "AIRLINE",
        "FLIGHT_NUMBER",
        "TAIL_NUMBER",
        "ORIGIN_AIRPORT",
        "DESTINATION_AIRPORT",
        "SCHEDULED_DEPARTURE",
        "DEPARTURE_TIME",
        "DEPARTURE_DELAY",
        "TAXI_OUT",
        "WHEELS_OFF",
        "SCHEDULED_TIME",
        "ELAPSED_TIME",
        "AIR_TIME",
        "DISTANCE",
        "WHEELS_ON",
        "TAXI_IN",
        "SCHEDULED_ARRIVAL",
        "ARRIVAL_TIME",
        "ARRIVAL_DELAY",
        "DIVERTED",
        "CANCELLED",
        "CANCELLATION_REASON",
        "AIR_SYSTEM_DELAY",
        "SECURITY_DELAY",
        "AIRLINE_DELAY",
        "LATE_AIRCRAFT_DELAY",
        "WEATHER_DELAY"
    )
    .withColumn("ingestion_ts", current_timestamp())
)

fact_flights = DeltaTable.forName(spark, "flight_silver.fact_flights")

fact_flights.alias("target").merge(
    sourceDF.alias("source"),
    """
    target.FLIGHT_NUMBER = source.FLIGHT_NUMBER AND
    target.TAIL_NUMBER = source.TAIL_NUMBER AND
    target.YEAR = source.YEAR AND
    target.MONTH = source.MONTH AND
    target.DAY = source.DAY
    """
).whenNotMatchedInsertAll().execute()