In [1]:
import random
from faker import Faker

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth

### Spark Setup

In [3]:
spark_jar_packages = ",".join([
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1",
    "org.apache.hive:hive-metastore:3.1.3",
    "org.apache.hive:hive-exec:3.1.3",
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "com.amazonaws:aws-java-sdk-bundle:1.12.262",
])

In [4]:
spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("iceberg-hive-playground")
    .config("spark.jars.packages", spark_jar_packages)

    # Delta-Hive Integration
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.spark_catalog.type", "hive")
    .config("spark.sql.catalogImplementation", "hive")
    .config("hive.metastore.uris", "thrift://localhost:9083")

    # S3 (MinIO Integration)
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .config("spark.hadoop.fs.s3a.region", "us-east-1")

    .getOrCreate()
)

25/01/06 02:36:45 WARN Utils: Your hostname, Brunos-Macbook-Pro-16-M1-Max.local resolves to a loopback address: 127.0.0.1; using 192.168.15.86 instead (on interface en0)
25/01/06 02:36:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/iobruno/.sdkman/candidates/spark/3.5.3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/iobruno/.ivy2/cache
The jars for the packages stored in: /Users/iobruno/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.hive#hive-metastore added as a dependency
org.apache.hive#hive-exec added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-31535f4c-cf78-4074-afdb-4f6be5e55ba7;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.1 in central
	found org.apache.hive#hive-metastore;3.1.3 in central
	found org.apache.hive#hive-serde;3.1.3 in central
	found org.apache.hive#hive-common;3.1.3 in central
	found org.apache.hive#hive-classification;3.1.3 in central
	found org.slf4j#slf4j-api;1.7.10 in central
	found org.apache.hive#hive-upgrade-acid;3.1.3 in central
	found org.apache.hive#hive-shims;3.1.3 in central
	found org.apache.hive.shims#hive

### Dataset Generation

In [5]:
def generate_entry(faker: Faker, country_codes: list):
    return {
        "id": faker.unique.uuid4(),
        "name":  faker.name(),
        "email": faker.email(),
        "passport": faker.passport_number(),
        "country_code": random.choice(country_codes),
        "iban": faker.iban(),
        "swift": faker.swift11(),
        "created_at": faker.past_date(start_date='-90d').strftime('%Y-%m-%d')
    }

In [6]:
def generate_dataset(num: int, seed: int):
    country_codes = ['US', 'CA', 'JP', 'KR', 'FR', 'GE', 'UK', 'BR', 'AR']
    Faker.seed(seed)
    faker = Faker()
    return [generate_entry(faker, country_codes) for _ in range(num)]

In [7]:
dataset = generate_dataset(num=1_000, seed=739)

In [8]:
df = spark.createDataFrame(dataset)\
        .withColumn("year", year(col("created_at")))\
        .withColumn("month", month(col("created_at")))\
        .withColumn("day", dayofmonth(col("created_at")))

### Iceberg-Hive Integration

In [9]:
spark.sql("""
    CREATE DATABASE IF NOT EXISTS iceberg_raw
    LOCATION 's3a://lakehouse-raw/iceberg/'
""")

DataFrame[]

In [10]:
df.write.format("iceberg") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .option("write.delete.mode", "merge-on-read") \
    .option("write.update.mode", "merge-on-read") \
    .option("changelog.enabled", "true") \
    .partitionBy("year", "month") \
    .saveAsTable("iceberg_raw.accounts")

25/01/06 02:36:54 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

### Upsert Dataset

In [11]:
entries = [
    # Existing entries
    dataset[2], 
    dataset[4], 
    dataset[7],
    dataset[11],
    # New entries
    *generate_dataset(4, seed=1037)
]

In [12]:
for entry in entries:
    username = entry['name'].lower().replace(" ", ".")
    entry['email'] = f"{username}@domain.com"

In [13]:
upsert_df = spark.createDataFrame(entries)\
        .withColumn("year", year(col("created_at")))\
        .withColumn("month", month(col("created_at")))\
        .withColumn("day", dayofmonth(col("created_at")))

In [14]:
upsert_df.show(8, truncate=False)

+------------+----------+---------------------------+----------------------+------------------------------------+----------------+---------+-----------+----+-----+---+
|country_code|created_at|email                      |iban                  |id                                  |name            |passport |swift      |year|month|day|
+------------+----------+---------------------------+----------------------+------------------------------------+----------------+---------+-----------+----+-----+---+
|KR          |2024-12-08|ann.cruz@domain.com        |GB55LFTZ50027083194346|b7e33adb-9bfe-465f-a533-1d57f8d9c9f6|Ann Cruz        |T22953641|HMAQGBCSXE8|2024|12   |8  |
|AR          |2024-12-02|cassidy.jones.md@domain.com|GB14AYNQ55188150393152|0daad7bc-25b6-4469-8a2f-2ba767f86791|Cassidy Jones MD|595954695|VTHYGBZMNOI|2024|12   |2  |
|GE          |2024-12-03|kara.thomas@domain.com     |GB02LAAF80272115976869|4cbbf121-caae-42aa-8508-3fd99bb2f762|Kara Thomas     |661814813|DULPGBWLTDU|2024|12 

In [15]:
upsert_df.createOrReplaceTempView("upsert_data")

### Upsert Strategy

In [16]:
spark.sql("""
    MERGE INTO iceberg_raw.accounts AS target
    USING upsert_data AS source ON 
        target.id = source.id
    WHEN MATCHED THEN 
        UPDATE SET
            target.country_code = source.country_code,
            target.email = source.email,
            target.name = source.name,
            target.iban = source.iban,
            target.swift = source.swift,
            target.passport = source.passport
    WHEN NOT MATCHED THEN 
        INSERT *
""")

                                                                                

DataFrame[]

### Iceberg Metadata

In [17]:
spark.sql("""
    select 
        *
    from
        iceberg_raw.accounts.history
""").show(truncate=False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2025-01-06 02:23:23.78 |1492144004614084996|NULL               |false              |
|2025-01-06 02:23:32.194|6494564091938804125|1492144004614084996|false              |
|2025-01-06 02:26:21.989|6619905204543243203|NULL               |false              |
|2025-01-06 02:26:31.134|6059993689479572453|6619905204543243203|false              |
|2025-01-06 02:34:57.729|4025798766118407026|NULL               |false              |
|2025-01-06 02:35:02.256|6466590935935897141|4025798766118407026|false              |
|2025-01-06 02:36:57.932|3977949520508409   |NULL               |true               |
|2025-01-06 02:37:02.456|8354676595401335017|3977949520508409   |true               |
+-----------------------+-------------------+---------

In [18]:
before_changes_df = spark.read.format("iceberg") \
    .option("snapshot-id", "1492144004614084996") \
    .load("iceberg_raw.accounts")

In [19]:
after_changes_df = spark.read.format("iceberg") \
    .option("snapshot-id", "6466590935935897141") \
    .load("iceberg_raw.accounts")