## Spark Session for Hudi on MinIO

This sets up Spark to use **Apache Hudi** with **MinIO** as the storage backend.

- Adds Hudi support via `hudi-spark3.5-bundle`
- Enables Hudi SQL features
- Connects to MinIO using S3A configs
- Increases memory to avoid Java heap errors
- Reduces shuffle partitions for local use



In [1]:
from pyspark.sql import SparkSession

packages = ",".join([
    "org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.2",   
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "com.amazonaws:aws-java-sdk-bundle:1.12.530",
])
# Create SparkSession with Hudi + S3A (MinIO) support
spark = (
    SparkSession.builder
      .appName("Hudi Lakehouse on MinIO")
      # Hudi SQL support
      .config("spark.jars.packages", packages)
      .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
       # Mandatory serializer for Hudi
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      # S3A configuration for MinIO access
      .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
      .config("spark.hadoop.fs.s3a.path.style.access", "true")
      .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
      .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
      .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
      .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
      # Prevent Java heap errors when writing/upserting
      .config("spark.driver.memory", "4g") 
      .config("spark.executor.memory", "4g")
      # Reduce shuffle overhead for local runs
      .config("spark.sql.shuffle.partitions", "8")
      .getOrCreate()
)
print("Spark:", spark.version)


Spark: 3.5.3


## Create Hudi Table in S3 (MinIO)

This creates a Hudi table `nyc.taxis_hudi` using the **Copy-on-Write (COW)** storage type.

- Partitioned by `partitionpath`
- Uses `uuid` as the primary key
- `ts` is the pre-combine field (used to deduplicate records)
- Data is stored in MinIO at `s3a://my-bucket/hudi-lakehouse/nyc/taxis`


In [2]:
spark.sql("CREATE DATABASE IF NOT EXISTS nyc")

spark.sql("""
CREATE TABLE IF NOT EXISTS nyc.taxis_hudi (
  VendorID INT,
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count BIGINT,
  trip_distance DOUBLE,
  RatecodeID BIGINT,
  store_and_fwd_flag STRING,
  PULocationID INT,
  DOLocationID INT,
  payment_type BIGINT,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE,
  congestion_surcharge DOUBLE,
  Airport_fee DOUBLE,
  cbd_congestion_fee DOUBLE,
  ts TIMESTAMP,
  uuid STRING,
  partitionpath STRING
)
USING hudi
PARTITIONED BY (partitionpath)
TBLPROPERTIES (
  type = 'cow', -- или 'mor'
  primaryKey = 'uuid',
  preCombineField = 'ts'
)
LOCATION 's3a://my-bucket/hudi-lakehouse/nyc/taxis'
""")


DataFrame[]

## Insert Data into Hudi Table

This inserts data into the `nyc.taxis_hudi` table from the May Parquet file, adding required fields:

- `uuid`: a unique ID for each row (Hudi primary key)
- `ts`: timestamp for preCombine logic
- `partitionpath`: converted from `PULocationID` to use as partition column

⚠️ This insert **initially failed** with `java.lang.OutOfMemoryError: Java heap space`.  
To fix it, memory and shuffle settings were adjusted:

```python
.config("spark.driver.memory", "4g") 
.config("spark.executor.memory", "4g")
.config("spark.sql.shuffle.partitions", "8")

In [3]:
spark.sql("""
INSERT INTO nyc.taxis_hudi
SELECT
  *,
  uuid() AS uuid,
  tpep_pickup_datetime AS ts,
  CAST(PULocationID AS STRING) AS partitionpath
FROM parquet.`s3a://my-bucket/raw-files/yellow_tripdata_2025-05.parquet`
""")


DataFrame[]

## Preview Hudi Table with Metadata Columns

This query shows a few rows from the `nyc.taxis_hudi` table, including internal Hudi metadata fields:

- `_hoodie_commit_time`: when the record was written
- `_hoodie_commit_seqno`: commit sequence number
- `_hoodie_record_key`: the primary key (`uuid`)
- `_hoodie_partition_path`: the partition used
- `_hoodie_file_name`: the file storing the row

These metadata fields are automatically managed by Hudi and useful for debugging and tracking data lineage.


In [4]:
spark.sql("SELECT _hoodie_commit_time, _hoodie_commit_seqno, _hoodie_record_key, _hoodie_partition_path, _hoodie_file_name, VendorID, total_amount FROM nyc.taxis_hudi LIMIT 10").show()

+-------------------+--------------------+--------------------+----------------------+--------------------+--------+------------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|VendorID|total_amount|
+-------------------+--------------------+--------------------+----------------------+--------------------+--------+------------+
|  20250810152622768|20250810152622768...|5219b7e4-1690-4c1...|     partitionpath=132|e89086e7-dfd5-403...|       2|       98.88|
|  20250810152622768|20250810152622768...|488de60d-9b90-499...|     partitionpath=132|e89086e7-dfd5-403...|       2|       54.55|
|  20250810152622768|20250810152622768...|19137f4f-c02a-40e...|     partitionpath=132|e89086e7-dfd5-403...|       2|      103.86|
|  20250810152622768|20250810152622768...|51c60e16-661b-486...|     partitionpath=132|e89086e7-dfd5-403...|       2|       83.44|
|  20250810152622768|20250810152622768...|d299710e-b490-4a2...|     partitionpath=132|e890

## List Hudi Partitions

Lists all partition values registered for `nyc.taxis_hudi` in the catalog


In [5]:
spark.sql("SHOW PARTITIONS nyc.taxis_hudi;").show()

+-----------------+
|        partition|
+-----------------+
|  partitionpath=1|
| partitionpath=10|
|partitionpath=100|
|partitionpath=101|
|partitionpath=102|
|partitionpath=106|
|partitionpath=107|
|partitionpath=108|
|partitionpath=109|
| partitionpath=11|
|partitionpath=111|
|partitionpath=112|
|partitionpath=113|
|partitionpath=114|
|partitionpath=115|
|partitionpath=116|
|partitionpath=117|
|partitionpath=118|
|partitionpath=119|
| partitionpath=12|
+-----------------+
only showing top 20 rows



## Show Hudi Commit History

This command lists all commits made to the `nyc.taxis_hudi` table.  
Each commit represents an insert, update, or delete operation and includes:

- Commit time
- Operation type (e.g. insert, upsert)
- Total number of records written

In [6]:
spark.sql("CALL show_commits(table => 'nyc.taxis_hudi')").show(truncate=False)


+-----------------+---------------------+------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|commit_time      |state_transition_time|action|total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors|
+-----------------+---------------------+------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|20250810152622768|20250810152718110    |commit|462193941          |269              |0                  |260                     |4591845              |0                           |0           |
+-----------------+---------------------+------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+



# Simulate New Batch Ingestion into Hudi Table

This insert loads a small random sample (~10%) from the June dataset into the `nyc.taxis_hudi` table.

As before, it adds required fields:

- `ts`: used for preCombine logic  
- `uuid`: unique ID for the primary key  
- `partitionpath`: determines where the data will be stored


In [7]:
spark.sql("""
INSERT INTO nyc.taxis_hudi
SELECT
  *,
  tpep_pickup_datetime AS ts,
  uuid() AS uuid,
  CAST(PULocationID AS STRING) AS partitionpath
FROM parquet.`s3a://my-bucket/raw-files/yellow_tripdata_2025-06.parquet` WHERE rand() < 0.10;
""")

DataFrame[]

## Verify New Commit After Insert

This command shows the updated commit history of the `nyc.taxis_hudi` table.

A new commit should appear, reflecting the recent insert of the June sample.  
Each entry includes the commit timestamp, operation type, and number of records written.


In [8]:
spark.sql("CALL show_commits(table => 'nyc.taxis_hudi')").show(truncate=False)


+-----------------+---------------------+------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|commit_time      |state_transition_time|action|total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors|
+-----------------+---------------------+------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|20250810152835682|20250810152857714    |commit|433786524          |0                |256                |256                     |4294877              |0                           |0           |
|20250810152622768|20250810152718110    |commit|462193941          |269              |0                  |260                     |4591845              |0                           |0           |
+-----------------+-

## Time Travel in Hudi

To read the Hudi table as it was at a specific point in time, set the `as.of.instant` option to a past commit time.

Replace commit_time with a valid value from show_commits. This allows you to view the table state as of that commit.


In [10]:
commit_time = "20250810152622768"   
df = spark.read.format("hudi") \
    .option("as.of.instant", commit_time) \
    .load("s3a://my-bucket/hudi-lakehouse/nyc/taxis/")
df.show(5)

+-------------------+--------------------+--------------------+----------------------+--------------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------------+--------------------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|                 ts|                uuid|partitionpath|
+-------------------+--------------------+--------------------+----------------------+------------

## Add and Populate New Column in Hudi Table

A new column `fare_bucket` is added to the `nyc.taxis_hudi` table to categorize trips by fare amount.

Steps:
1. `ADD COLUMNS` adds the new column.
2. `REFRESH TABLE` ensures Spark sees the updated schema.
3. `UPDATE` fills `fare_bucket` with values:
   - `'low'` for fares under 10  
   - `'mid'` for fares between 10 and 30  
   - `'high'` for fares 30 and above

Finally, the table schema and value distribution are checked.


In [11]:
spark.sql("ALTER TABLE nyc.taxis_hudi ADD COLUMNS (fare_bucket STRING)")

spark.sql("REFRESH TABLE nyc.taxis_hudi")

spark.sql("""
UPDATE nyc.taxis_hudi
SET fare_bucket = CASE
  WHEN total_amount < 10 THEN 'low'
  WHEN total_amount < 30 THEN 'mid'
  ELSE 'high'
END
WHERE total_amount IS NOT NULL
""")

spark.sql("DESCRIBE TABLE nyc.taxis_hudi").show(truncate=False)
spark.sql("SELECT fare_bucket, COUNT(*) FROM nyc.taxis_hudi GROUP BY fare_bucket").show()


+----------------------+---------+-------+
|col_name              |data_type|comment|
+----------------------+---------+-------+
|_hoodie_commit_time   |string   |NULL   |
|_hoodie_commit_seqno  |string   |NULL   |
|_hoodie_record_key    |string   |NULL   |
|_hoodie_partition_path|string   |NULL   |
|_hoodie_file_name     |string   |NULL   |
|VendorID              |int      |NULL   |
|tpep_pickup_datetime  |timestamp|NULL   |
|tpep_dropoff_datetime |timestamp|NULL   |
|passenger_count       |bigint   |NULL   |
|trip_distance         |double   |NULL   |
|RatecodeID            |bigint   |NULL   |
|store_and_fwd_flag    |string   |NULL   |
|PULocationID          |int      |NULL   |
|DOLocationID          |int      |NULL   |
|payment_type          |bigint   |NULL   |
|fare_amount           |double   |NULL   |
|extra                 |double   |NULL   |
|mta_tax               |double   |NULL   |
|tip_amount            |double   |NULL   |
|tolls_amount          |double   |NULL   |
+----------

### Attempt to Drop a Column from a Hudi Table

I tried to remove the `fare_bucket` column from the Hudi table using:

```python
spark.sql("ALTER TABLE nyc.taxis_hudi DROP COLUMNS (fare_bucket);")
```

However, this failed with:

AnalysisException: [UNSUPPORTED_FEATURE.TABLE_OPERATION] 
The feature is not supported: Table `spark_catalog`.`nyc`.`taxis_hudi` does not support DROP COLUMN. 
Please check the current catalog and namespace to make sure the qualified table name is expected, 
and also check the catalog implementation which is configured by "spark.sql.catalog".

## Adding a New Column to a Hudi Table on the Fly (Writer-Side Schema Evolution)

Hudi supports **schema evolution** when writing data — meaning you can add new columns directly in your writer DataFrame, and Hudi will update the table schema automatically.  

The following PySpark example adds a new nullable column `tip_rate_pct` (calculated as a percentage), generates a UUID record key, ensures Hudi-required fields (`recordkey`, `precombine`, and `partitionpath`), and also includes a `fare_bucket` column set to `NULL`

Important:

This approach only works during write (schema evolution in writer).
If you want to add a column using SQL, you must use ALTER TABLE ... ADD COLUMNS (...).

In [16]:
import uuid
from pyspark.sql.functions import col, when, lit, udf, round as sround
from pyspark.sql.types import StringType

# UDF to generate UUID strings
@udf(returnType=StringType())
def gen_uuid():
    return str(uuid.uuid4())

# Load original data
df = spark.read.parquet("s3a://my-bucket/raw-files/yellow_tripdata_2025-06.parquet")

# Add a NEW nullable column: tip_rate_pct (percent), plus Hudi-required fields
df_with_new = (
    df.withColumn(
        "tip_rate_pct",
        when(col("total_amount").isNull(), lit(None).cast("double"))   # ensure nullable in writer schema
        .when(col("total_amount") <= 0, lit(None).cast("double"))
        .otherwise(sround(col("tip_amount") / col("total_amount") * 100.0, 2))
    )
    .withColumn("ts", col("tpep_pickup_datetime"))
    .withColumn("uuid", gen_uuid())
    .withColumn("partitionpath", col("PULocationID").cast("string"))
)

# Cast NTZ -> timestamp for Hudi/Avro compatibility
df_fixed = (
    df_with_new
      .withColumn("ts", col("ts").cast("timestamp"))
      .withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast("timestamp"))
      .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))
      .withColumn("fare_bucket", lit(None).cast("string"))  # match table schema
)

# Write with schema evolution on (adds tip_rate_pct automatically)
(df_fixed.write.format("hudi")
  .option("hoodie.datasource.write.operation", "upsert")
  .option("hoodie.datasource.write.recordkey.field", "uuid")
  .option("hoodie.datasource.write.precombine.field", "ts")
  .option("hoodie.datasource.write.partitionpath.field", "partitionpath")
  .option("hoodie.schema.on.write.enable", "true")
  .option("hoodie.datasource.write.reconcile.schema", "true")
  .option("hoodie.write.set.null.for.missing.columns", "true")
  .option("hoodie.datasource.write.hive_style_partitioning", "true")
  .mode("append")
  .save("s3a://my-bucket/hudi-lakehouse/nyc/taxis"))


## Inspecting a Hudi Table via Snapshot Read

To explore the current schema and sample data in a Hudi table, you can perform a **snapshot read** by loading the table path directly into a DataFrame.  
Once loaded, you register it as a temporary view and then use standard SQL commands to:

1. **Describe the table** — view all columns, their data types, and nullability.
2. **Select specific columns** — retrieve `fare_bucket`, `tip_rate_pct`, and `partitionpath` to confirm that recently added fields are present and populated (or `NULL` for older records).


In [17]:
df_path = spark.read.format("hudi").load("s3a://my-bucket/hudi-lakehouse/nyc/taxis")
df_path.createOrReplaceTempView("hudi_taxis_snapshot")

spark.sql("DESCRIBE TABLE hudi_taxis_snapshot").show(truncate=False)
spark.sql("SELECT fare_bucket, tip_rate_pct, partitionpath FROM hudi_taxis_snapshot LIMIT 10").show()

+----------------------+---------+-------+
|col_name              |data_type|comment|
+----------------------+---------+-------+
|_hoodie_commit_time   |string   |NULL   |
|_hoodie_commit_seqno  |string   |NULL   |
|_hoodie_record_key    |string   |NULL   |
|_hoodie_partition_path|string   |NULL   |
|_hoodie_file_name     |string   |NULL   |
|VendorID              |int      |NULL   |
|tpep_pickup_datetime  |timestamp|NULL   |
|tpep_dropoff_datetime |timestamp|NULL   |
|passenger_count       |bigint   |NULL   |
|trip_distance         |double   |NULL   |
|RatecodeID            |bigint   |NULL   |
|store_and_fwd_flag    |string   |NULL   |
|PULocationID          |int      |NULL   |
|DOLocationID          |int      |NULL   |
|payment_type          |bigint   |NULL   |
|fare_amount           |double   |NULL   |
|extra                 |double   |NULL   |
|mta_tax               |double   |NULL   |
|tip_amount            |double   |NULL   |
|tolls_amount          |double   |NULL   |
+----------

## Partition Evolution in Hudi

Apache Hudi **does not support partition evolution**.

Once a table is created with a specific partition column (e.g. `partitionpath`), it cannot be changed or updated later.  
Changing the partitioning requires creating a new table with the desired partition spec and rewriting the data.
