In [1]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

![title](img/spark-connect-slide.png)

In [2]:
from os.path import exists
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

from cde import CDESparkConnectSession
spark = CDESparkConnectSession.builder.sessionName('spark-conn-paul').get()

In [3]:
storageLocation = "s3a://go01-demo/data/cde-123-hol"
username = "user001"

In [4]:
### LOAD HISTORICAL TRANSACTIONS FILE FROM CLOUD STORAGE
transactionsDf = spark\
                    .read\
                    .json("{0}/trans/{1}/rawtransactions".format(storageLocation, username))

transactionsDf.printSchema()

root
 |-- credit_card_number: string (nullable = true)
 |-- credit_card_provider: string (nullable = true)
 |-- event_ts: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- transaction_amount: long (nullable = true)
 |-- transaction_currency: string (nullable = true)
 |-- transaction_type: string (nullable = true)



In [5]:
spark.sql("""CREATE DATABASE 
                IF NOT EXISTS 
                SPARK_CATALOG.HOL_DB_{}""".format(username))

spark.sql("""DROP TABLE 
                IF EXISTS 
                spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0} 
                PURGE""".format(username))

DataFrame[]

In [6]:
transactionsDf\
        .writeTo("SPARK_CATALOG.HOL_DB_{0}.TRANSACTIONS_{0}".format(username))\
        .using("iceberg")\
        .tableProperty("write.format.default", "parquet")\
        .createOrReplace()

In [7]:
trxBatchDf = spark\
                .read\
                .schema("""credit_card_number string, 
                credit_card_provider string, 
                event_ts timestamp, 
                latitude double, 
                longitude double, 
                transaction_amount long, 
                transaction_currency string, 
                transaction_type string""")\
                .json("{0}/trans/{1}/trx_batch_1".format(storageLocation, username))

trxBatchDf\
    .createOrReplaceTempView("trx_batch")

In [8]:
# PRE-MERGE COUNTS BY TRANSACTION TYPE:
spark.sql("""SELECT TRANSACTION_TYPE, 
                COUNT(*) 
                FROM spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0} 
                GROUP BY TRANSACTION_TYPE""".format(username)).show()

+----------------+--------+
|TRANSACTION_TYPE|count(1)|
+----------------+--------+
|        purchase|    1789|
|    cash_advance|     211|
+----------------+--------+



In [9]:
# MERGE OPERATION
spark.sql("""MERGE INTO spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0} t
                USING (SELECT * FROM trx_batch) s
                ON t.credit_card_number = s.credit_card_number
                WHEN MATCHED AND t.transaction_amount < 1000 AND t.transaction_currency != "CHF" THEN UPDATE SET t.transaction_type = "invalid"
                WHEN NOT MATCHED THEN INSERT *""".format(username))

DataFrame[]

In [10]:
# POST-MERGE COUNT:
spark.sql("""SELECT TRANSACTION_TYPE, 
                    COUNT(*) 
                    FROM spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0} 
                    GROUP BY TRANSACTION_TYPE""".format(username)).show()

+----------------+--------+
|TRANSACTION_TYPE|count(1)|
+----------------+--------+
|        purchase|    1787|
|    cash_advance|     210|
|         invalid|       3|
+----------------+--------+



In [11]:
# ICEBERG TABLE HISTORY (SHOWS EACH SNAPSHOT AND TIMESTAMP)
spark.sql("""SELECT * 
            FROM spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0}.history""".format(username)).show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2025-01-15 00:17:...|1803119392281374845|               NULL|               true|
|2025-01-15 00:17:...|1961025733431406081|1803119392281374845|               true|
+--------------------+-------------------+-------------------+-------------------+



In [12]:
# ICEBERG TABLE SNAPSHOTS (USEFUL FOR INCREMENTAL QUERIES AND TIME TRAVEL)
spark.sql("""SELECT * 
            FROM spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0}.snapshots""".format(username)).show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-01-15 00:17:...|1803119392281374845|               NULL|   append|s3a://go01-demo/w...|{spark.app.id -> ...|
|2025-01-15 00:17:...|1961025733431406081|1803119392281374845|overwrite|s3a://go01-demo/w...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [13]:
# APPEND SECOND DATA BATCH
trxBatchDf = spark\
                .read\
                .schema("""credit_card_number string, credit_card_provider string, event_ts timestamp, latitude double, longitude double, transaction_amount long, transaction_currency string, transaction_type string""")\
                .json("{0}/trans/{1}/trx_batch_2".format(storageLocation, username))

trxBatchDf\
    .writeTo("spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0}".format(username))\
    .using("iceberg")\
    .append()

In [14]:
# STORE FIRST AND LAST SNAPSHOT ID'S FROM SNAPSHOTS TABLE
snapshots_df = spark.sql("""SELECT * 
                            FROM spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0}.snapshots;""".format(username))

In [15]:
last_snapshot = snapshots_df\
                .select("snapshot_id")\
                .tail(1)[0][0]

second_snapshot = snapshots_df\
                    .select("snapshot_id")\
                    .collect()[1][0]

In [16]:
#this is incremental read!!
incReadDf = spark.read\
    .format("iceberg")\
    .option("start-snapshot-id", second_snapshot)\
    .option("end-snapshot-id", last_snapshot)\
    .load("spark_catalog.HOL_DB_{0}.TRANSACTIONS_{0}".format(username))

In [17]:
print("Incremental Report:")
incReadDf.show()

Incremental Report:
+------------------+--------------------+-------------------+--------+---------+------------------+--------------------+----------------+
|credit_card_number|credit_card_provider|           event_ts|latitude|longitude|transaction_amount|transaction_currency|transaction_type|
+------------------+--------------------+-------------------+--------+---------+------------------+--------------------+----------------+
|  3674567891192000|       VISA 13 digit|2023-07-05 13:33:00| 36.3963|    -77.0|             26336|                 GBP|    cash_advance|
|  3674567891192000|       VISA 16 digit|2023-03-10 09:19:00| 38.3963|   -122.0|             11223|                 MEX|    cash_advance|
|  3674567891192001|       VISA 19 digit|2023-06-18 04:01:00| 38.3963|    -92.0|             16346|                 CHF|        purchase|
|  3674567891192002|Diners Club / Car...|2023-07-01 07:22:00| 28.3963|   -116.0|             18287|                 EUR|        purchase|
|  36745678911