In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import LongType, StringType

# 1. Set Parameters
schema_path = "workspace.default"
tgt_table = "dbo_run_tripdata"
tgt_table_path = f"{schema_path}.{tgt_table}"
src_table = [[1, "stg_yellow_tripdata"], [2, "stg_green_tripdata"] , [3, "stg_fhv_tripdata"], [4, "stg_fhvhv_tripdata"]]

# 2. Data mapping
spark = SparkSession.builder.getOrCreate()
spark.sql(f"TRUNCATE TABLE {tgt_table_path}")
for src in src_table:
    src_table_path = f"{schema_path}.{src[1]}"
    df_src = spark.table(src_table_path)
    
    # 3. Select columns to transfer
    if src[0] == 1:
        tgt_columns = [
            col("VendorID").cast(LongType()).alias("VendorID"),
            df_src["tpep_pickup_datetime"].alias("pickup_datetime"),
            df_src["tpep_dropoff_datetime"].alias("dropoff_datetime"),
            "passenger_count",
            "RatecodeID",
            "payment_type",
            "total_amount"
        ]
        df_src = df_src.select(*tgt_columns)
        df_src = df_src.withColumn("license_id", lit(None).cast(StringType()))

    elif src[0] == 2:
        tgt_columns = [
            col("VendorID").cast(LongType()).alias("VendorID"),
            df_src["lpep_pickup_datetime"].alias("pickup_datetime"),
            df_src["lpep_dropoff_datetime"].alias("dropoff_datetime"),
            "passenger_count",
            "RatecodeID",
            "payment_type",
            "total_amount"
        ]
        df_src = df_src.select(*tgt_columns)
        df_src = df_src.withColumn("license_id", lit(None).cast(StringType()))

    elif src[0] == 3:
        tgt_columns = [
            "pickup_datetime",
            df_src["dropOff_datetime"].alias("dropoff_datetime")
        ]
        df_src = df_src.select(*tgt_columns)
        for col, dtype in {"VendorID":"long","passenger_count":"long","RatecodeID":"long",
                           "payment_type":"long","total_amount":"double","license_id":"string"}.items():
            df_src = df_src.withColumn(col, lit(None).cast(dtype))

    elif src[0] == 4:
        tgt_columns = [
            df_src["hvfhs_license_num"].alias("license_id"),
            "pickup_datetime",
            "dropoff_datetime"
        ]
        df_src = df_src.select(*tgt_columns)
        for col, dtype in {"VendorID":"long","passenger_count":"long","RatecodeID":"long",
                           "payment_type":"long","total_amount":"double"}.items():
            df_src = df_src.withColumn(col, lit(None).cast(dtype))

    else:
        continue

    # 4. Sort columns
    df_src = df_src.withColumn("taxi_type_id", lit(src[0]).cast(LongType()))
    sorted_columns = ["taxi_type_id", "license_id", "VendorID", "pickup_datetime", "dropoff_datetime",
                   "passenger_count", "RatecodeID", "payment_type", "total_amount"]
    df_src = df_src.select(*sorted_columns)

    # 5. Insert / Append Data
    (df_src.write
        .format("delta")
        .mode("append")
        .saveAsTable(tgt_table_path)
    )

