In [1]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

In [2]:
from os.path import exists
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

from cde import CDESparkConnectSession
spark = CDESparkConnectSession.builder.sessionName('spark-connect-session-res').get()

In [3]:
USERNAME = "pauldefusco"

df  = spark.read.csv("/app/mount/cell_towers_1.csv", header=True, inferSchema=True)
df.writeTo("CELL_TOWERS_{}".format(USERNAME)).using("iceberg").tableProperty("write.format.default", "parquet").createOrReplace()

In [4]:
# SET TABLE BRANCH AS ACTIVE - Skip: Not supported
spark.sql("SET spark.wap.branch = 'ingestion_branch';")

DataFrame[key: string, value: string]

In [5]:
# LOAD NEW TRANSACTION BATCH
batchDf = spark.read.csv("/app/mount/cell_towers_2.csv", header=True, inferSchema=True)
batchDf.printSchema()
batchDf.createOrReplaceTempView("BATCH_TEMP_VIEW".format(USERNAME))

root
 |-- id: integer (nullable = true)
 |-- device_id: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- iot_signal_1: integer (nullable = true)
 |-- iot_signal_3: integer (nullable = true)
 |-- iot_signal_4: integer (nullable = true)
 |-- cell_tower_failure: integer (nullable = true)



In [6]:
# CREATE TABLE BRANCH - Supported
spark.sql("ALTER TABLE CELL_TOWERS_{} CREATE BRANCH ingestion_branch".format(USERNAME))
# WRITE DATA OPERATION ON TABLE BRANCH - Supported
batchDf.write.format("iceberg").option("branch", "ingestion_branch").mode("append").save("CELL_TOWERS_{}".format(USERNAME))

In [7]:
spark.sql("SELECT * FROM CELL_TOWERS_{};".format(USERNAME)).show()

+---+---------------+------------+------------------+----------+---------+------------+------------+------------+------------------+
| id|      device_id|manufacturer|        event_type| longitude| latitude|iot_signal_1|iot_signal_3|iot_signal_4|cell_tower_failure|
+---+---------------+------------+------------------+----------+---------+------------+------------+------------+------------------+
|  0|0x1000000000005|TelecomWorld|system malfunction| -83.61318|51.656384|           3|          51|         104|                 1|
|  1|0x100000000001d|TelecomWorld|       battery 10%| -83.04828|51.610226|           9|          52|         103|                 0|
|  2|0x1000000000008|TelecomWorld|       battery 10%| -83.60245|51.892113|           6|          54|         103|                 0|
|  3|0x100000000001b|     NewComm|       battery 10%| -82.80548|51.913082|           2|          53|         105|                 0|
|  4|0x1000000000014|TelecomWorld|       battery 10%| -83.44709|51.97

In [8]:
spark.sql("SELECT * FROM CELL_TOWERS_{} VERSION AS OF 'ingestion_branch';".format(USERNAME)).show()

+---+---------------+------------+------------------+----------+---------+------------+------------+------------+------------------+
| id|      device_id|manufacturer|        event_type| longitude| latitude|iot_signal_1|iot_signal_3|iot_signal_4|cell_tower_failure|
+---+---------------+------------+------------------+----------+---------+------------+------------+------------+------------------+
|  0|0x1000000000005|TelecomWorld|system malfunction| -83.61318|51.656384|           3|          51|         104|                 1|
|  1|0x100000000001d|TelecomWorld|       battery 10%| -83.04828|51.610226|           9|          52|         103|                 0|
|  2|0x1000000000008|TelecomWorld|       battery 10%| -83.60245|51.892113|           6|          54|         103|                 0|
|  3|0x100000000001b|     NewComm|       battery 10%| -82.80548|51.913082|           2|          53|         105|                 0|
|  4|0x1000000000014|TelecomWorld|       battery 10%| -83.44709|51.97

In [9]:
spark.sql("SELECT COUNT(*) FROM CELL_TOWERS_{};".format(USERNAME)).show()

+--------+
|count(1)|
+--------+
|    1440|
+--------+



In [10]:
# SHOW PAST BRANCH SNAPSHOT ID'S
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.refs;".format(USERNAME)).show()

+----------------+------+-------------------+-----------------------+---------------------+----------------------+
|            name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----------------+------+-------------------+-----------------------+---------------------+----------------------+
|            main|BRANCH|6289792391879563079|                   NULL|                 NULL|                  NULL|
|ingestion_branch|BRANCH|9113098974453803790|                   NULL|                 NULL|                  NULL|
+----------------+------+-------------------+-----------------------+---------------------+----------------------+



In [11]:
# SAVE THE SNAPSHOT ID CORRESPONDING TO THE CREATED BRANCH
branchSnapshotId = spark.sql("SELECT snapshot_id FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.refs WHERE NAME == 'ingestion_branch';".format(USERNAME)).collect()[0][0]

In [12]:
# USE THE PROCEDURE TO CHERRY-PICK THE SNAPSHOT
# THIS IMPLICITLY SETS THE CURRENT TABLE STATE TO THE STATE DEFINED BY THE CHOSEN PRIOR SNAPSHOT ID
spark.sql("CALL spark_catalog.system.cherrypick_snapshot('SPARK_CATALOG.DEFAULT.CELL_TOWERS_{0}',{1})".format(USERNAME, branchSnapshotId))

DataFrame[source_snapshot_id: bigint, current_snapshot_id: bigint]

In [13]:
# VALIDATE THE CHANGES
# THE TABLE ROW COUNT IN THE CURRENT TABLE STATE REFLECTS THE APPEND OPERATION - IT PREVIOSULY ONLY DID BY SELECTING THE BRANCH
spark.sql("SELECT COUNT(*) FROM CELL_TOWERS_{};".format(USERNAME)).show()

+--------+
|count(1)|
+--------+
|    2880|
+--------+



In [15]:
# QUERY ICEBERG METADATA HISTORY TABLE
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.snapshots".format(USERNAME)).show(20, False)

+-----------------------+-------------------+-------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                                                                                                                 |summary                                                                                                                