In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import *
import shutil

spark = SparkSession.builder \
    .appName("Dynamic Schema Task") \
    .master("local[*]") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "matrix") \
    .config("spark.hadoop.fs.s3a.secret.key", "matrix123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [2]:
delta_path = "s3a://silver/card_transactions_delta"

In [3]:
df_part1 = spark.read.csv("card_transaction_part1.csv", header=True, inferSchema=True)

In [4]:
df_part1.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("transaction_date") \
    .save(delta_path)

In [5]:
df_part2_raw = spark.read.csv("card_transaction_part2.csv", header=True, inferSchema=False)

In [6]:
df_part2_raw.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- card_number: string (nullable = true)
 |-- mcc: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- dr_cr: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- merchant_name: string (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- status: string (nullable = true)



In [7]:
delta_table = DeltaTable.forPath(spark, delta_path)

target_schema = delta_table.toDF().schema

table_details = delta_table.detail().select("partitionColumns").collect()[0]
dynamic_partition_cols = table_details["partitionColumns"]

In [8]:
df_part2_transformed = df_part2_raw
for field in target_schema:
    col_name = field.name
    target_type = field.dataType
    
    df_part2_transformed = df_part2_transformed.withColumn(col_name, col(col_name).cast(target_type))

In [9]:
df_part2_transformed.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- card_number: string (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- dr_cr: string (nullable = true)
 |-- transaction_date: timestamp (nullable = true)
 |-- merchant_name: string (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- status: string (nullable = true)



In [10]:
df_part2_transformed.write \
    .format("delta") \
    .mode("append") \
    .partitionBy(*dynamic_partition_cols) \
    .save(delta_path)