In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

In [2]:
sp_conf = SparkConf() 
sp_conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
sp_conf.set("spark.sql.catalog.glue_catalog.warehouse", "s3://iceberg-wh-east/")
sp_conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
sp_conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
sp_conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sp_conf.set("spark.hadoop.fs.s3a.aws.credentials.provider","com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

In [3]:
spark = SparkSession.builder \
    .appName("Glue-Iceberg-Integration") \
    .config(conf=sp_conf) \
    .getOrCreate()

25/08/01 14:51:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
spark.sql("""
    CREATE DATABASE IF NOT EXISTS glue_catalog.berg 
""")

DataFrame[]

In [5]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS glue_catalog.berg.icetable1 (
        id INT,
        name STRING
    )
    USING iceberg
""")

DataFrame[]

In [6]:
spark.sql("select count(*) from glue_catalog.berg.icetable1").show()

+--------+
|count(1)|
+--------+
|     400|
+--------+



In [7]:
import random
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

write_to_db = "berg"
write_to_table = "icetable1"

data = [(i, f"name_{random.randint(1000, 9999)}") for i in range(100)]

# Step 2: Create DataFrame with schema id(int), name(string)
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False)
])
df = spark.createDataFrame(data, schema)

df.createOrReplaceTempView("temp_table1")

df.show()

spark.sql(f"""
    INSERT INTO glue_catalog.{write_to_db}.{write_to_table}
    SELECT id, name FROM temp_table1
""")

                                                                                

+---+---------+
| id|     name|
+---+---------+
|  0|name_9147|
|  1|name_6658|
|  2|name_4274|
|  3|name_8309|
|  4|name_4360|
|  5|name_8612|
|  6|name_1282|
|  7|name_7379|
|  8|name_8910|
|  9|name_4706|
| 10|name_6110|
| 11|name_7558|
| 12|name_4104|
| 13|name_3289|
| 14|name_7611|
| 15|name_2002|
| 16|name_5139|
| 17|name_6580|
| 18|name_4037|
| 19|name_2816|
+---+---------+
only showing top 20 rows



                                                                                

DataFrame[]

In [13]:
spark.sql("select count(*) from glue_catalog.berg.icetable1").show()

+--------+
|count(1)|
+--------+
|     400|
+--------+



In [7]:
spark.sql("""
  CALL glue_catalog.system.rewrite_table_path(
    table => 'berg.icetable1',
    source_prefix => 's3://iceberg-wh-east',
    target_prefix => 's3://iceberg-wh-west',
    staging_location => 's3a://iceberg-wh-east/berg.db/icetable1/staging_west_metadata'
  )
""")

25/08/01 14:52:11 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/08/01 14:52:14 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/08/01 14:52:15 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

DataFrame[latest_version: string, file_list_location: string]

In [11]:
import boto3
def active_iceberg_table_metadata(active_database_name, active_table_name):
    glue = boto3.client("glue", region_name = 'us-east-1')
    table = glue.get_table(DatabaseName=active_database_name, Name=active_table_name)
    parameters = table["Table"]["Parameters"]
    full_path_metadata_location = parameters["metadata_location"]
    return full_path_metadata_location.split('/')[-1]

print(active_iceberg_table_metadata("berg", "icetable1"))   

00004-bafdff20-352d-4b17-89ae-a75eda17bd3c.metadata.json


In [12]:
def update_iceberg_table_metadata(active_database_name, active_table_name, metadata):
    glue = boto3.client("glue", region_name = 'us-east-1')
    table = glue.get_table(DatabaseName=active_database_name, Name=active_table_name)
    table_input = table["Table"]
    table_input["Parameters"]["metadata_location"] = f"s3://iceberg-wh-east/berg.db/icetable1/metadata/{metadata}"
    
    keys_to_remove = ['CreateTime', 'UpdateTime', 'IsRegisteredWithLakeFormation', 'CatalogId', 'DatabaseName', 'CreatedBy', 'VersionId', 'IsMultiDialectView']
    
    for key in keys_to_remove:
        if key in table_input: del table_input[key]

    print(table_input)
    glue.update_table(
        DatabaseName=active_database_name,
        TableInput=table_input
    )
    return
# update_iceberg_table_metadata("berg", "icetable1", "00004-bafdff20-352d-4b17-89ae-a75eda17bd3c.metadata.json")   

{'Name': 'icetable1', 'Retention': 0, 'StorageDescriptor': {'Columns': [{'Name': 'id', 'Type': 'int', 'Parameters': {'iceberg.field.current': 'true', 'iceberg.field.id': '1', 'iceberg.field.optional': 'true'}}, {'Name': 'name', 'Type': 'string', 'Parameters': {'iceberg.field.current': 'true', 'iceberg.field.id': '2', 'iceberg.field.optional': 'true'}}], 'Location': 's3://iceberg-wh-east/berg.db/icetable1', 'AdditionalLocations': [], 'Compressed': False, 'NumberOfBuckets': 0, 'SortColumns': [], 'StoredAsSubDirectories': False}, 'TableType': 'EXTERNAL_TABLE', 'Parameters': {'metadata_location': 's3://iceberg-wh-east/berg.db/icetable1/metadata/00004-bafdff20-352d-4b17-89ae-a75eda17bd3c.metadata.json', 'previous_metadata_location': 's3://iceberg-wh-east/berg.db/icetable1/metadata/00003-19cbcc94-a339-4934-949d-888a57eeb46f.metadata.json', 'table_type': 'ICEBERG'}}
