In [2]:
import sys
sys.path.append('/home/iceberg/notebooks/PyCon_LT_Workshop')

from helpers.utils import get_spark_session, get_yellow_taxi_data, get_dim_data
spark = get_spark_session("getting_started")

from minio import Minio
client = Minio(
        "minio:9000",
        access_key="admin",
        secret_key="password",
        secure=False  # Set to True if your Minio server uses TLS/SSL
    )

def print_objects_in_warehouse(bucket_name):
    objects = client.list_objects(bucket_name, recursive=True)
    for obj in objects:
        print(obj.object_name, str(obj._size//1024) + "KB")
print_objects_in_warehouse("warehouse")

staging/aggregated_taxi_per_vendor_date/data/00000-11-146c816e-61d7-46da-a5d1-03a145d3fa47-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/data/00000-6-b17d2ddd-2a29-4028-a76f-7e2c7da36fc5-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00000-b129cac9-ca9f-4f77-ba35-210754c0eddb.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00001-4b6805a8-9ab9-4874-b2df-81a97e5fd3f5.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00002-f3247c59-464a-4456-bd4b-61c7238ee394.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00003-e7c6e6ea-92ed-4f00-952d-d35af8047289.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/00004-1825cd1d-addf-42e4-84c6-430ee85caf14.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m0.avro 6KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m1.avro 6KB
staging/aggregated_taxi_per_vendo

In [3]:
dim_taxi_zones, dim_rates, dim_payments, dim_vendor = get_dim_data(spark)

In [4]:
dim_taxi_zones.write.saveAsTable("staging.dim_taxi_zones")
dim_rates.write.saveAsTable("staging.dim_rates")
dim_payments.write.saveAsTable("staging.dim_payments")
dim_vendor.write.saveAsTable("staging.dim_vendor")

                                                                                

Table Types:

✅ Copy-on-Write (CoW):

- With CoW, any records that are changed in a file, results in rewriting of the file as a new version
- has a higher write amplification but less cost on read-side & hence good for read-heavy workloads
- larger the [Parquet](https://www.linkedin.com/feed/hashtag/?keywords=parquet&highlightedUpdateUrns=urn%3Ali%3Aactivity%3A7176027113655844864) file, more time it takes to ingest data

✅ Merge-on-Read (MoR):

- With MoR, any changes to existing records, gets written to a separate log file
- A compaction process (configured as inline or asynchronous) will compact the log file to the base file later as a new version
- enables ingesting data quickly onto row-based data format such as Avro (log file)

In [12]:
from pyspark.sql.functions import when, col
from pyspark.sql import Row

new_row = spark.createDataFrame([Row(rate_code_id=99, rate_name="Unknown")])


updated_dim_rates = dim_rates.withColumn("rate_name", 
                                         when(col("rate_code_id") == 4, "New Rate Name")
                                         .otherwise(col("rate_name"))).filter("rate_code_id != 1").union(new_row)

In [13]:
updated_dim_rates.show()

+------------+---------------+
|rate_code_id|      rate_name|
+------------+---------------+
|           2|            JFK|
|           3|         Newark|
|           4|  New Rate Name|
|           5|Negotiated fare|
|           6|     Group ride|
|          99|        Unknown|
+------------+---------------+



In [14]:
updated_dim_rates.write.saveAsTable("staging.updated_dim_rates")

In [20]:
%%sql
MERGE INTO staging.dim_rates t
USING (SELECT rate_code_id, rate_name from staging.updated_dim_rates) s          
ON t.rate_code_id = s.rate_code_id
WHEN MATCHED and s.rate_name!=t.rate_name THEN UPDATE SET t.rate_name = s.rate_name
WHEN NOT MATCHED BY TARGET THEN INSERT *
WHEN NOT MATCHED BY SOURCE THEN DELETE

In [21]:
%%sql
select * from staging.dim_rates

rate_code_id,rate_name
2,JFK
3,Newark
4,New Rate Name
5,Negotiated fare
6,Group ride
99,Unknown


In [22]:
print_objects_in_warehouse("warehouse")

staging/aggregated_taxi_per_vendor_date/data/00000-11-146c816e-61d7-46da-a5d1-03a145d3fa47-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/data/00000-6-b17d2ddd-2a29-4028-a76f-7e2c7da36fc5-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00000-b129cac9-ca9f-4f77-ba35-210754c0eddb.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00001-4b6805a8-9ab9-4874-b2df-81a97e5fd3f5.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00002-f3247c59-464a-4456-bd4b-61c7238ee394.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00003-e7c6e6ea-92ed-4f00-952d-d35af8047289.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/00004-1825cd1d-addf-42e4-84c6-430ee85caf14.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m0.avro 6KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m1.avro 6KB
staging/aggregated_taxi_per_vendo

In [23]:
%%sql

select * from staging.dim_rates.snapshots

committed_at,snapshot_id,parent_id,operation,manifest_list,summary
2024-03-29 16:29:17.832000,583487472890318615,,append,s3://warehouse/staging/dim_rates/metadata/snap-583487472890318615-1-046a43a1-bc96-46d9-a58e-0c901b028409.avro,"{'spark.app.id': 'local-1711729589378', 'changed-partition-count': '1', 'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '6', 'total-position-deletes': '0', 'added-files-size': '793', 'total-delete-files': '0', 'total-files-size': '793', 'total-records': '6', 'total-data-files': '1'}"
2024-03-29 16:41:02.850000,8545696951471959109,5.834874728903186e+17,overwrite,s3://warehouse/staging/dim_rates/metadata/snap-8545696951471959109-1-e18d0e0d-f825-4a8d-a896-64aa981a5c73.avro,"{'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '7', 'deleted-data-files': '1', 'deleted-records': '6', 'total-records': '7', 'spark.app.id': 'local-1711729589378', 'removed-files-size': '793', 'changed-partition-count': '1', 'total-position-deletes': '0', 'added-files-size': '791', 'total-delete-files': '0', 'total-files-size': '791', 'total-data-files': '1'}"
2024-03-29 16:41:35.888000,2261648511033844294,8.545696951471958e+18,overwrite,s3://warehouse/staging/dim_rates/metadata/snap-2261648511033844294-1-c5dcd486-ec45-41bf-b113-86c4ccc81e63.avro,"{'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '6', 'deleted-data-files': '1', 'deleted-records': '7', 'total-records': '6', 'spark.app.id': 'local-1711729589378', 'removed-files-size': '791', 'changed-partition-count': '1', 'total-position-deletes': '0', 'added-files-size': '775', 'total-delete-files': '0', 'total-files-size': '775', 'total-data-files': '1'}"


In [37]:
%%sql
CREATE TABLE staging.dim_rates_mor (
    rate_code_id string,
    rate_name string
) TBLPROPERTIES (
    'write.delete.mode'='merge-on-read',
    'write.update.mode'='merge-on-read',
    'write.merge.mode'='merge-on-read'
)
/*
Also an option:
ALTER TABLE catalog.db.students SET TBLPROPERTIES (
    'write.delete.mode'='merge-on-read',
    'write.update.mode'='copy-on-write',
    'write.merge.mode'='copy-on-write'
);
*/

AnalysisException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `staging`.`dim_rates_mor` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects.

In [36]:
%%sql
insert into staging.dim_rates_mor (rate_code_id, rate_name) select rate_code_id, rate_name from staging.dim_rates

In [None]:
print_objects_in_warehouse("warehouse")

In [38]:
%%sql
MERGE INTO staging.dim_rates_mor t
USING (SELECT rate_code_id, rate_name from staging.updated_dim_rates) s          
ON t.rate_code_id = s.rate_code_id
WHEN MATCHED and s.rate_name!=t.rate_name THEN UPDATE SET t.rate_name = s.rate_name
WHEN NOT MATCHED BY TARGET THEN INSERT *
WHEN NOT MATCHED BY SOURCE THEN DELETE

In [39]:
%%sql
select * from staging.dim_rates_mor

rate_code_id,rate_name
2,JFK
3,Newark
4,New Rate Name
5,Negotiated fare
6,Group ride
99,Unknown


In [40]:
print_objects_in_warehouse("warehouse")

staging/aggregated_taxi_per_vendor_date/data/00000-11-146c816e-61d7-46da-a5d1-03a145d3fa47-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/data/00000-6-b17d2ddd-2a29-4028-a76f-7e2c7da36fc5-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00000-b129cac9-ca9f-4f77-ba35-210754c0eddb.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00001-4b6805a8-9ab9-4874-b2df-81a97e5fd3f5.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00002-f3247c59-464a-4456-bd4b-61c7238ee394.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00003-e7c6e6ea-92ed-4f00-952d-d35af8047289.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/00004-1825cd1d-addf-42e4-84c6-430ee85caf14.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m0.avro 6KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m1.avro 6KB
staging/aggregated_taxi_per_vendo

In [43]:
%%sql
CREATE TABLE staging.dim_rates_cow (
    rate_code_id string,
    rate_name string
) TBLPROPERTIES (
    'write.delete.mode'='copy-on-write',
    'write.update.mode'='copy-on-write',
    'write.merge.mode'='copy-on-write'
)
/*
Also an option:
ALTER TABLE catalog.db.students SET TBLPROPERTIES (
    'write.delete.mode'='merge-on-read',
    'write.update.mode'='copy-on-write',
    'write.merge.mode'='copy-on-write'
);
*/


In [44]:
%sql insert into staging.dim_rates_cow (rate_code_id, rate_name) select rate_code_id, rate_name from staging.dim_rates

In [45]:
print_objects_in_warehouse("warehouse")

staging/aggregated_taxi_per_vendor_date/data/00000-11-146c816e-61d7-46da-a5d1-03a145d3fa47-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/data/00000-6-b17d2ddd-2a29-4028-a76f-7e2c7da36fc5-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00000-b129cac9-ca9f-4f77-ba35-210754c0eddb.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00001-4b6805a8-9ab9-4874-b2df-81a97e5fd3f5.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00002-f3247c59-464a-4456-bd4b-61c7238ee394.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00003-e7c6e6ea-92ed-4f00-952d-d35af8047289.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/00004-1825cd1d-addf-42e4-84c6-430ee85caf14.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m0.avro 6KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m1.avro 6KB
staging/aggregated_taxi_per_vendo

In [46]:
%%sql
MERGE INTO staging.dim_rates_cow t
USING (SELECT rate_code_id, rate_name from staging.updated_dim_rates) s          
ON t.rate_code_id = s.rate_code_id
WHEN MATCHED and s.rate_name!=t.rate_name THEN UPDATE SET t.rate_name = s.rate_name
WHEN NOT MATCHED BY TARGET THEN INSERT *
WHEN NOT MATCHED BY SOURCE THEN DELETE

In [47]:
print_objects_in_warehouse("warehouse")

staging/aggregated_taxi_per_vendor_date/data/00000-11-146c816e-61d7-46da-a5d1-03a145d3fa47-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/data/00000-6-b17d2ddd-2a29-4028-a76f-7e2c7da36fc5-0-00001.parquet 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00000-b129cac9-ca9f-4f77-ba35-210754c0eddb.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00001-4b6805a8-9ab9-4874-b2df-81a97e5fd3f5.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00002-f3247c59-464a-4456-bd4b-61c7238ee394.metadata.json 2KB
staging/aggregated_taxi_per_vendor_date/metadata/00003-e7c6e6ea-92ed-4f00-952d-d35af8047289.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/00004-1825cd1d-addf-42e4-84c6-430ee85caf14.metadata.json 3KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m0.avro 6KB
staging/aggregated_taxi_per_vendor_date/metadata/1a4bcaf0-6855-4316-9a69-42b4963917ef-m1.avro 6KB
staging/aggregated_taxi_per_vendo