In [1]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

![title](img/spark-connect-slide.png)

In [4]:
from os.path import exists
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

from cde import CDESparkConnectSession
spark = CDESparkConnectSession.builder.sessionName('paul-hol-session').get()

In [5]:
storageLocation = "s3a://vw-hol-buk-3c3eeffe/data/cde_hol/manufacturing/20250717"
username = "user015"

In [6]:
### LOAD HISTORICAL TRANSACTIONS FILE FROM CLOUD STORAGE
transactionsDf = spark\
                    .read\
                    .json("{0}/carsales/{1}/rawcarsales".format(storageLocation, username))

transactionsDf.printSchema()

root
 |-- bank_account_number: string (nullable = true)
 |-- car_brand: string (nullable = true)
 |-- event_ts: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- transaction_amount: long (nullable = true)
 |-- transaction_currency: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- vehicle_model_name: string (nullable = true)
 |-- vehicle_type: string (nullable = true)



In [7]:
spark.sql("""CREATE DATABASE 
                IF NOT EXISTS 
                SPARK_CATALOG.CAR_SALES_{}""".format(username))

spark.sql("""DROP TABLE 
                IF EXISTS 
                spark_catalog.CAR_SALES_{0}.HIST_SALES_{0} 
                PURGE""".format(username))

DataFrame[]

In [8]:
transactionsDf\
        .writeTo("SPARK_CATALOG.CAR_SALES_{0}.HIST_SALES_{0}".format(username))\
        .using("iceberg")\
        .tableProperty("write.format.default", "parquet")\
        .createOrReplace()

In [14]:
trxBatchDf = spark\
                .read\
                .schema("""bank_account_number string, 
                car_brand string, 
                event_ts timestamp, 
                latitude double, 
                longitude double, 
                transaction_amount long, 
                transaction_currency string, 
                transaction_type string,
                vehicle_model_name string,
                vehicle_type string
                """)\
                .json("{0}/carsales/{1}/sales_batch_1".format(storageLocation, username))

trxBatchDf\
    .createOrReplaceTempView("trx_batch")

In [16]:
# PRE-MERGE COUNTS BY TRANSACTION TYPE:
spark.sql("""SELECT TRANSACTION_TYPE, 
                COUNT(*) 
                FROM spark_catalog.CAR_SALES_{0}.HIST_SALES_{0} 
                GROUP BY TRANSACTION_TYPE""".format(username)).show()

+----------------+--------+
|TRANSACTION_TYPE|count(1)|
+----------------+--------+
|        car loan|    1796|
|  whole purchase|     204|
+----------------+--------+



In [19]:
# MERGE OPERATION
spark.sql("""MERGE INTO spark_catalog.CAR_SALES_{0}.HIST_SALES_{0} t
                USING (SELECT * FROM trx_batch) s
                ON t.bank_account_number = s.bank_account_number
                WHEN MATCHED AND t.transaction_amount > 80000 AND t.transaction_currency == "USD" THEN UPDATE SET t.transaction_type = "invalid"
                WHEN NOT MATCHED THEN INSERT *""".format(username))

DataFrame[]

In [20]:
# POST-MERGE COUNT:
spark.sql("""SELECT TRANSACTION_TYPE, 
                    COUNT(*) 
                    FROM spark_catalog.CAR_SALES_{0}.HIST_SALES_{0} 
                    GROUP BY TRANSACTION_TYPE""".format(username)).show()

+----------------+--------+
|TRANSACTION_TYPE|count(1)|
+----------------+--------+
|        car loan|    1741|
|  whole purchase|     198|
|         invalid|      61|
+----------------+--------+



In [21]:
# ICEBERG TABLE HISTORY (SHOWS EACH SNAPSHOT AND TIMESTAMP)
spark.sql("""SELECT * 
            FROM spark_catalog.CAR_SALES_{0}.HIST_SALES_{0}.history""".format(username)).show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2025-07-28 17:09:...|8743840576473463578|               NULL|               true|
|2025-07-28 17:12:...|6426686791019454457|8743840576473463578|               true|
|2025-07-28 17:12:...|7205700765140511535|6426686791019454457|               true|
+--------------------+-------------------+-------------------+-------------------+



In [22]:
# ICEBERG TABLE SNAPSHOTS (USEFUL FOR INCREMENTAL QUERIES AND TIME TRAVEL)
spark.sql("""SELECT * 
            FROM spark_catalog.CAR_SALES_{0}.HIST_SALES_{0}.snapshots""".format(username)).show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-07-28 17:09:...|8743840576473463578|               NULL|   append|s3a://vw-hol-buk-...|{spark.app.id -> ...|
|2025-07-28 17:12:...|6426686791019454457|8743840576473463578|overwrite|s3a://vw-hol-buk-...|{spark.app.id -> ...|
|2025-07-28 17:12:...|7205700765140511535|6426686791019454457|overwrite|s3a://vw-hol-buk-...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [24]:
# APPEND SECOND DATA BATCH
trxBatchDf = spark\
                .read\
                .schema("""bank_account_number string, car_brand string, event_ts timestamp, latitude double, longitude double, transaction_amount long, transaction_currency string, transaction_type string, vehicle_model_name string,
                vehicle_type string""")\
                .json("{0}/carsales/{1}/sales_batch_2".format(storageLocation, username))

trxBatchDf\
    .writeTo("spark_catalog.CAR_SALES_{0}.HIST_SALES_{0}".format(username))\
    .using("iceberg")\
    .append()

In [25]:
# STORE FIRST AND LAST SNAPSHOT ID'S FROM SNAPSHOTS TABLE
snapshots_df = spark.sql("""SELECT * 
                            FROM spark_catalog.CAR_SALES_{0}.HIST_SALES_{0}.snapshots;""".format(username))

In [26]:
last_snapshot = snapshots_df\
                .select("snapshot_id")\
                .tail(1)[0][0]

second_snapshot = snapshots_df\
                    .select("snapshot_id")\
                    .collect()[1][0]

In [27]:
#this is incremental read!!
incReadDf = spark.read\
    .format("iceberg")\
    .option("start-snapshot-id", second_snapshot)\
    .option("end-snapshot-id", last_snapshot)\
    .load("spark_catalog.CAR_SALES_{0}.HIST_SALES_{0}".format(username))

In [28]:
print("Incremental Report:")
incReadDf.show()

Incremental Report:
+-------------------+--------------------+-------------------+--------+---------+------------------+--------------------+----------------+------------------+------------------+
|bank_account_number|           car_brand|           event_ts|latitude|longitude|transaction_amount|transaction_currency|transaction_type|vehicle_model_name|      vehicle_type|
+-------------------+--------------------+-------------------+--------+---------+------------------+--------------------+----------------+------------------+------------------+
|   3674567891192000|     American Wagons|2023-07-05 13:33:00| 36.3963|    -77.0|            168324|                 USD|        car loan|         Crestline|     SUV/Crossover|
|   3674567891192001|             Gravari|2023-06-18 04:01:00| 38.3963|    -92.0|            109220|                 USD|        car loan|             Qavac|                EV|
|   3674567891192002|Aetherion Automotive|2023-01-12 13:03:00| 39.3963|   -109.0|            18