## Summary
	- Catalog: Coleção de namespaces e tabelas, responsável pelo gerenciamento de metadados.
	- Namespace: Organização lógica dentro de um catálogo, similar a esquemas ou bancos de dados.
	- Database: Termo equivalente a namespace no contexto do Spark e frequentemente utilizado de forma intercambiável.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
            .appName('Iceberg - minio - SCD 2') \
            .config('spark.executor.memory', '2G') \
            .config('spark.driver.memory', '2G') \
            .config('spark.driver.maxResultSize', '1G') \
            .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-s3:1.12.765,org.apache.hadoop:hadoop-aws:3.4.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0") \
            .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
            .config("spark.hadoop.fs.s3a.access.key", "myuserserviceaccount") \
            .config("spark.hadoop.fs.s3a.secret.key", "myuserserviceaccountpassword") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.committer.name", "directory") \
            .config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "replace") \
            .config("spark.hadoop.fs.s3a.committer.staging.tmp.path", "/tmp/staging") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
            .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
            .config("spark.sql.catalog.spark_catalog.warehouse", "s3a://my-bucket/iceberg") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
spark

## Create namespace / database

In [2]:
spark.catalog.listCatalogs()

                                                                                

[CatalogMetadata(name='spark_catalog', description=None)]

In [3]:
spark.catalog.currentCatalog()

'spark_catalog'

In [4]:
spark.catalog.setCurrentCatalog("spark_catalog")

In [3]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS spark_catalog.mydb")

# namespace = database
# location = "s3://my-bucket/another-database"
# # Crie o namespace com a localização especificada
# spark.sql(f"CREATE NAMESPACE IF NOT EXISTS spark_catalog.{namespace} LOCATION '{location}'")

DataFrame[]

In [6]:
spark.catalog.listDatabases()

[Database(name='mydb', catalog='spark_catalog', description=None, locationUri='s3a://my-bucket/iceberg/mydb')]

In [7]:
spark.sql("describe database mydb").toPandas()

Unnamed: 0,info_name,info_value
0,Catalog Name,spark_catalog
1,Namespace Name,mydb
2,Location,s3a://my-bucket/iceberg/mydb


In [8]:
spark.catalog.databaseExists("mydb")

True

In [13]:
spark.catalog.listTables("mydb")

[Table(name='user_group_tracker', catalog='spark_catalog', namespace=['mydb'], description=None, tableType='MANAGED', isTemporary=False)]

In [12]:
spark.sql("show tables in mydb").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,mydb,user_group_tracker,False


# Create Iceberg Table

In [3]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS spark_catalog.mydb.user_group_tracker (
    user_id int,
    group string,
    start_date date,
    end_date date
    )
USING iceberg
""")
# location 's3a://{bucket}/iceberg/mydb/user_group_tracker'  -- no need of it

DataFrame[]

In [4]:
spark.sql("select * from mydb.user_group_tracker").toPandas()

Unnamed: 0,user_id,group,start_date,end_date


In [5]:
table = spark.table("mydb.user_group_tracker")

print(type(table))

table.toPandas()

<class 'pyspark.sql.dataframe.DataFrame'>


Unnamed: 0,user_id,group,start_date,end_date


In [6]:
table = spark.read.table("mydb.user_group_tracker")

print(type(table))

table.toPandas()

<class 'pyspark.sql.dataframe.DataFrame'>


Unnamed: 0,user_id,group,start_date,end_date


In [None]:
df = spark.read \
  .format("iceberg") \
  .load("s3a://my-bucket/iceberg/mydb/user_group_tracker")

df.toPandas()

In [186]:
spark.sql("SELECT * FROM mydb.user_group_tracker.history").toPandas()

Unnamed: 0,made_current_at,snapshot_id,parent_id,is_current_ancestor
0,2024-07-31 02:36:26.515,2083599421527003655,,True
1,2024-07-31 02:59:32.086,7085544283171923468,2.083599e+18,True
2,2024-07-31 03:02:00.681,4962479228511825851,7.085544e+18,True
3,2024-07-31 03:05:05.773,1374337079072212611,4.962479e+18,True


In [187]:
spark.sql("SELECT * FROM mydb.user_group_tracker.snapshots").toPandas()

Unnamed: 0,committed_at,snapshot_id,parent_id,operation,manifest_list,summary
0,2024-07-31 02:36:26.515,2083599421527003655,,append,s3a://my-bucket/iceberg/mydb/user_group_tracke...,"{'engine-version': '3.5.1', 'added-data-files'..."
1,2024-07-31 02:59:32.086,7085544283171923468,2.083599e+18,overwrite,s3a://my-bucket/iceberg/mydb/user_group_tracke...,"{'engine-version': '3.5.1', 'added-data-files'..."
2,2024-07-31 03:02:00.681,4962479228511825851,7.085544e+18,overwrite,s3a://my-bucket/iceberg/mydb/user_group_tracke...,"{'engine-version': '3.5.1', 'added-data-files'..."
3,2024-07-31 03:05:05.773,1374337079072212611,4.962479e+18,overwrite,s3a://my-bucket/iceberg/mydb/user_group_tracke...,"{'engine-version': '3.5.1', 'added-data-files'..."


In [8]:
iceberg_table_schema = table.schema
iceberg_table_schema

StructType([StructField('user_id', IntegerType(), True), StructField('group', StringType(), True), StructField('start_date', DateType(), True), StructField('end_date', DateType(), True)])

## Dataframe creation

In [9]:
from datetime import datetime

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import lit, col, when, current_date

In [10]:
data =  [{'user_id': 1, 'group': 'teen titans', "start_date": datetime(2024, 10, 7)},
         {'user_id': 2, 'group': 'x-men', "start_date": datetime(2021, 3, 11)}]

# {'user_id': 1, 'group': 'avengers', "start_date": datetime(2021, 1, 1)}
# {'user_id': 1, 'group': 'justice league', "start_date": datetime(2024, 5, 1)}

# schema = StructType([
#         StructField('user_id', IntegerType(), nullable=True),
#         StructField('group', StringType(), nullable=True),
#         StructField('start_date', DateType(), nullable=True),
#         StructField('end_date', DateType(), nullable=True)
#     ])

new_data = spark.createDataFrame(data, schema=iceberg_table_schema)
new_data.toPandas()

                                                                                

Unnamed: 0,user_id,group,start_date,end_date
0,1,teen titans,2024-10-07,
1,2,x-men,2021-03-11,


## upsert SCD tipo 2

In [11]:
# Cast start_date to DateType
# new_data = new_data.withColumn("start_date", col("start_date").cast(DateType()))

# Add end_date column to new data with null values
new_data = new_data.withColumn("end_date", lit(None).cast(DateType()))


# Load existing Iceberg table
existing_df = spark.read.table("mydb.user_group_tracker")

# Handle case where the existing DataFrame is empty
if existing_df.count() == 0:
    # If existing table is empty, simply insert all new data
    new_data.writeTo("spark_catalog.mydb.user_group_tracker").append()

existing_df.toPandas()

                                                                                

Unnamed: 0,user_id,group,start_date,end_date
0,1,teen titans,2024-10-07,
1,2,x-men,2021-03-11,


In [12]:
def add_prefix(df, prefix: str):
    # Add prefix to all column names
    new_column_names = [prefix + col_name for col_name in df.columns]
    
    # Create new DataFrame with prefixed column names
    return df.select([col(old_col).alias(new_col) for old_col, new_col in zip(df.columns, new_column_names)])

In [13]:
# Rename columns for merging
# existing_df = existing_df.withColumnRenamed("end_date", "existing_end_date")

existing_df = add_prefix(existing_df, "existing_")
new_data = add_prefix(new_data, "new_")

existing_df.toPandas()
new_data.toPandas()
# Create a new DataFrame that will store updated and new records
joined_df = existing_df.join(
    new_data,
    (col("existing_user_id") == col("new_user_id")),
    "outer"
)

joined_df.toPandas()

                                                                                

Unnamed: 0,existing_user_id,existing_group,existing_start_date,existing_end_date,new_user_id,new_group,new_start_date,new_end_date
0,1,teen titans,2024-10-07,,1,teen titans,2024-10-07,
1,2,x-men,2021-03-11,,2,x-men,2021-03-11,


In [14]:
# Determine which records need to be updated
updates_df = joined_df.withColumn(
        "update_required",
        when(
            (col("new_user_id").isNotNull()) & (col("existing_group").isNotNull()) &
            (col("existing_group") != col("new_group")) &
            col("existing_end_date").isNull(),
            True
        ).otherwise(False)
    )
updates_df.toPandas()

Unnamed: 0,existing_user_id,existing_group,existing_start_date,existing_end_date,new_user_id,new_group,new_start_date,new_end_date,update_required
0,1,teen titans,2024-10-07,,1,teen titans,2024-10-07,,False
1,2,x-men,2021-03-11,,2,x-men,2021-03-11,,False


In [38]:
# Update end_date for existing records
updated_existing_df = updates_df.filter(col("update_required")).select(
    col("existing_user_id"),
    col("existing_group"),
    col("existing_start_date"),
    current_date().alias("end_date")
)

updated_existing_df.toPandas()

Unnamed: 0,existing_user_id,existing_group,existing_start_date,end_date


In [179]:
# Create new records for updated groups
new_records_df = updates_df.filter(col("update_required")).select(
    col("new_user_id"),
    col("new_group"),
    current_date().alias("start_date"),
    lit(None).cast(DateType()).alias("end_date")
)

new_records_df.toPandas()

Unnamed: 0,new_user_id,new_group,start_date,end_date
0,1,teen titans,2024-07-31,


In [180]:
# Filter out records that are not updated or new
no_update_records_df = updates_df.filter(~col("update_required")).select(
    col("existing_user_id"),
    col("existing_group"),
    col("existing_start_date"),
    col("existing_end_date")
)

no_update_records_df.toPandas()

Unnamed: 0,existing_user_id,existing_group,existing_start_date,existing_end_date
0,1,avengers,2021-01-01,2024-07-31
1,2,x-men,2021-03-11,


In [181]:
# Union all DataFrames together
final_df = updated_existing_df.union(new_records_df).union(no_update_records_df)

final_df.toPandas()

Unnamed: 0,existing_user_id,existing_group,existing_start_date,end_date
0,1,justice league,2024-07-31,2024-07-31
1,1,teen titans,2024-07-31,
2,1,avengers,2021-01-01,2024-07-31
3,2,x-men,2021-03-11,


In [182]:
final_df = final_df.withColumnRenamed("existing_user_id", "user_id") \
            .withColumnRenamed("existing_group", "group") \
            .withColumnRenamed("existing_start_date", "start_date")

final_df.toPandas()

Unnamed: 0,user_id,group,start_date,end_date
0,1,justice league,2024-07-31,2024-07-31
1,1,teen titans,2024-07-31,
2,1,avengers,2021-01-01,2024-07-31
3,2,x-men,2021-03-11,


In [183]:
# Write updated DataFrame back to Iceberg table
final_df.writeTo("spark_catalog.mydb.user_group_tracker").using("iceberg").overwritePartitions()

                                                                                

In [184]:
spark.table("mydb.user_group_tracker").toPandas()

Unnamed: 0,user_id,group,start_date,end_date
0,1,justice league,2024-07-31,2024-07-31
1,1,teen titans,2024-07-31,
2,1,avengers,2021-01-01,2024-07-31
3,2,x-men,2021-03-11,
