In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set("spark.sql.execution.pyarrow.enabled", "true")
        .set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.2")
        .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.spark_catalog.type', 'hive')
        .set('spark.sql.catalog.local', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.local.type', 'hadoop')
        .set('spark.sql.catalog.local.warehouse', '/warehouse/')
        .set("spark.sql.catalog.defaultCatalog", 'local')
        .set("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
)

In [3]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.2_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-036495d9-6b2b-4fa9-bfcb-c066fe49677f;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.2_2.12;1.4.2 in central
:: resolution report :: resolve 106ms :: artifacts dl 7ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.2_2.12;1.4.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-subm

In [4]:
spark

# Prepare 

In [5]:
spark.sql("USE local;");

In [6]:
spark.sql("""
CREATE TABLE IF NOT EXISTS taxis.trips(
    vendor_id INT,
    pickup_datetime TIMESTAMP,
    dropoff_datetime TIMESTAMP,
    passenger_count INT,
    pickup_location_id INT,
    dropoff_location_id INT,
    fare_amount FLOAT
)
USING iceberg;
""").toPandas()

# Ingest first batch 

In [7]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW trips_table USING csv
OPTIONS (path "/data/yellow_tripdata_sample_2019_01.csv", header true);
""").toPandas()

In [8]:
spark.sql("""
SELECT * FROM trips_table LIMIT 5;
""").toPandas()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_location_id,dropoff_location_id,fare_amount
0,1,2019-01-15 03:36:12,2019-01-15 03:42:19,1,230,48,6.5
1,1,2019-01-25 18:20:32,2019-01-25 18:26:55,1,112,112,6.0
2,1,2019-01-05 06:47:31,2019-01-05 06:52:19,1,107,4,6.0
3,1,2019-01-09 15:08:02,2019-01-09 15:20:17,1,143,158,11.0
4,1,2019-01-25 18:49:51,2019-01-25 18:56:44,1,246,90,6.5


In [9]:
spark.sql("""
INSERT INTO taxis.trips
SELECT 
    CAST(vendor_id AS INT),
    CAST(pickup_datetime AS TIMESTAMP),
    CAST(dropoff_datetime AS TIMESTAMP),
    CAST(passenger_count AS INT),
    CAST(pickup_location_id AS INT),
    CAST(dropoff_location_id AS INT),
    CAST(fare_amount AS FLOAT)
FROM trips_table;
"""
).toPandas()

                                                                                

In [10]:
spark.sql("SELECT COUNT(*) FROM taxis.trips;").toPandas()

Unnamed: 0,count(1)
0,15


# Prepare 2nd batch 

In [11]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW trips_table USING csv
OPTIONS (path "/data/yellow_tripdata_sample_2019_02.csv", header true);
""").toPandas()

# Merge / Append 

In [12]:
spark.sql("""
WITH changes AS (
    SELECT
        CAST(vendor_id AS INT),
        CAST(pickup_datetime AS TIMESTAMP),
        CAST(dropoff_datetime AS TIMESTAMP),
        CAST(passenger_count AS INT),
        CAST(pickup_location_id AS INT),
        CAST(dropoff_location_id AS INT),
        CAST(fare_amount AS FLOAT)
    FROM trips_table
)

MERGE INTO taxis.trips t
USING changes s
ON
    t.vendor_id           = s.vendor_id AND
    t.pickup_datetime     = s.pickup_datetime AND
    t.dropoff_datetime    = s.dropoff_datetime AND
    t.pickup_location_id  = s.pickup_location_id AND
    t.dropoff_location_id = s.dropoff_location_id
WHEN NOT MATCHED THEN INSERT *
""").toPandas()

In [13]:
spark.sql("SELECT COUNT(*) FROM taxis.trips;").toPandas()

Unnamed: 0,count(1)
0,20


In [14]:
df_main = spark.table("taxis.trips")

In [17]:
all_cols = set(df_main.columns)
skip_cols = set(["passenger_count", "fare_amount"])

In [18]:
salt_cols = list(all_cols - skip_cols)

In [19]:
df_main = (
    df_main
    .withColumn('salted_id', F.sha2(F.concat_ws('_', *salt_cols), 512))
)

In [21]:
df_main.write.saveAsTable("taxis.trips", mode="overwrite")

# Prepare 3rd batch 

In [22]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW trips_table USING csv
OPTIONS (path "/data/yellow_tripdata_sample_2019_02-updates.csv", header true);
""").toPandas()

In [24]:
df_changes = spark.table("trips_table")
df_changes = (
    df_changes
    .withColumn('salted_id', F.sha2(F.concat_ws('_', *salt_cols), 512))
)

In [27]:
df_changes.createOrReplaceTempView("trips_table")

# Merge / Update 

In [41]:
spark.sql("""
WITH changes AS (
    SELECT
        COALESCE(s.salted_id, t.salted_id) AS salted_id,
        CAST(s.vendor_id AS INT),
        CAST(s.pickup_datetime AS TIMESTAMP),
        CAST(s.dropoff_datetime AS TIMESTAMP),
        CAST(s.passenger_count AS INT),
        CAST(s.pickup_location_id AS INT),
        CAST(s.dropoff_location_id AS INT),
        CAST(s.fare_amount AS FLOAT)
    FROM trips_table AS s
    FULL OUTER JOIN taxis.trips AS t ON s.salted_id = t.salted_id
)
MERGE INTO taxis.trips t
USING changes s
ON
    t.salted_id = s.salted_id
WHEN NOT MATCHED THEN INSERT *
""").toPandas()

In [43]:
spark.sql("SELECT COUNT(*) FROM taxis.trips;").toPandas()

Unnamed: 0,count(1)
0,20
