In [None]:
%load_ext sparksql_magic

In [None]:
#setup libs
import os
import findspark
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
from py4j.java_gateway import java_import
findspark.init()

In [None]:
# Load configuration from config.py
import sys
sys.path.insert(0, '/Users/tarunvihartumati/iceberg-projects/glue-demo')
from config import *

# Set AWS credentials
import os
os.environ['AWS_PROFILE'] = AWS_PROFILE
os.environ['AWS_DEFAULT_REGION'] = AWS_REGION

# Print configuration
print_config()

In [None]:
# AWS Profile is set in the cell above via os.environ
# Shell export commands don't persist in Jupyter notebooks

In [None]:
# These variables are now loaded from config.py
# bucket_name, bucket_prefix, database_name, warehouse_path are all defined in config

print(f"Bucket: {S3_BUCKET_NAME}")
print(f"Prefix: {S3_BUCKET_PREFIX}")
print(f"Database: {DATABASE_NAME}")
print(f"Warehouse: {S3_WAREHOUSE_PATH}")


In [None]:
from pyspark.sql import SparkSession
import os

# Fix for connection refused error - set local IP before creating Spark session
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'

# IMPORTANT: Stop any existing Spark session first
try:
    spark.stop()
    print("ðŸ›‘ Stopped existing Spark session")
except:
    pass

# Use configuration from config.py
jars_dir = os.path.expanduser(JARS_DIR)
jars = [f"{jars_dir}/{jar}" for jar in JAR_FILES]

# Build driver classpath string
driver_classpath = ":".join(jars)

# Get Spark configuration from config file
spark_config = get_spark_config()

# Create Spark session with configuration
builder = SparkSession.builder
for key, value in spark_config.items():
    builder = builder.config(key, value)

spark = builder.getOrCreate()

print("âœ… Spark session created successfully with Glue catalog")
print(f"âœ… Using warehouse: {S3_WAREHOUSE_PATH}")
print(f"âœ… Using region: {AWS_REGION}")

In [None]:
from pyspark.sql import Row
import time

ut = time.time()

product = [
    {'id': '00001', 'name': 'Heater','price': 99900.76 },
    {'id': '00002', 'name': 'Thermostat','price': 881500.00},
    {'id': '00003', 'name': 'Television','price': 1400.89},
    {'id': '00004', 'name': 'Blender','price': 300.99},
    {'id': '00005', 'name': 'USB charger','price': 400.00}
]

df_products = spark.createDataFrame(Row(**x) for x in product)
     

In [None]:

df_products.createOrReplaceTempView("tmp_product")
     

In [None]:
# Query the temporary view
spark.sql("SELECT *  FROM tmp_product order by 1 LIMIT 2").show()

In [None]:
# Create database using config
spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME}")

In [None]:
# Drop table if exists using config
spark.sql(f"DROP TABLE IF EXISTS {FULL_TABLE_NAME}")

In [None]:
# Create table using config
sql_stmnt = f"""
CREATE TABLE IF NOT EXISTS {FULL_TABLE_NAME} (
    id string,
    name string,
    price decimal(10,2)
)
USING iceberg
TBLPROPERTIES (
'table_type'='{ICEBERG_TABLE_TYPE}', 
'format-version'='{ICEBERG_FORMAT_VERSION}'
)
LOCATION '{TABLE_LOCATION}'
"""
print(f"Executing SparkSQL:\n`{sql_stmnt}`")
spark.sql(sql_stmnt).show()

In [None]:
# List S3 contents using config
!aws s3 ls '{TABLE_LOCATION}' --recursive --profile {AWS_PROFILE}

In [None]:
spark.sql(f"DESC EXTENDED {FULL_TABLE_NAME}").show(truncate=False)

In [None]:
spark.sql(f"""
INSERT INTO {FULL_TABLE_NAME} 
SELECT * FROM tmp_product
""").show()

In [None]:
spark.sql(f"SELECT * FROM {FULL_TABLE_NAME}").show()

In [None]:
spark.sql(f"SELECT * FROM {FULL_TABLE_NAME}.history").show()

In [None]:
spark.sql(f"SELECT * FROM {FULL_TABLE_NAME}.snapshots").show()

In [None]:
spark.sql(f"""
SELECT h.made_current_at, s.operation, 
h.snapshot_id, h.is_current_ancestor, 
s.summary["spark.app.id"] 
FROM {FULL_TABLE_NAME}.history h 
JOIN {FULL_TABLE_NAME}.snapshots s  
ON h.snapshot_id = s.snapshot_id 
ORDER BY made_current_at
""").show()

In [None]:
spark.sql(f'DELETE FROM {FULL_TABLE_NAME} WHERE name = "Blender"').show()

In [None]:
spark.sql(f"SELECT * FROM {FULL_TABLE_NAME}").show()

In [None]:
ut = time.time()

product_updates = [
    {'id': '00001', 'name': 'Heater', 'price': 400}, # Update
    {'id': '00006', 'name': 'Chair', 'price': 500} # Insert
]
df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)

In [None]:

df_product_updates.createOrReplaceTempView(f"tmp_prodct_updates")

In [None]:
spark.sql("SELECT * FROM tmp_prodct_updates").show()

In [None]:
query = f"""
MERGE INTO glue.prod.customer AS t
USING (SELECT * FROM tmp_prodct_updates) AS u
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.price = u.price
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(query)

In [None]:
spark.sql(f"SELECT * FROM {FULL_TABLE_NAME}").show()

In [None]:
spark.sql(f"SELECT * FROM {FULL_TABLE_NAME}.snapshots ORDER BY committed_at DESC").show()

In [None]:
# Check current snapshots available in the table
print("=== Available Snapshots ===")
snapshots_df = spark.sql(f"SELECT committed_at, snapshot_id, operation FROM {FULL_TABLE_NAME}.snapshots ORDER BY committed_at")
snapshots_df.show(truncate=False)

# Get snapshot IDs dynamically
snapshots = snapshots_df.collect()

if len(snapshots) > 0:
    first_snapshot_id = snapshots[0]['snapshot_id']
    print(f"\n=== Querying first snapshot (ID: {first_snapshot_id}) ===")
    spark.sql(f"SELECT * FROM {FULL_TABLE_NAME} VERSION AS OF {first_snapshot_id}").show()
else:
    print("No snapshots found in the table")

In [None]:
# Query different snapshots if they exist
snapshots_df = spark.sql(f"SELECT committed_at, snapshot_id, operation FROM {FULL_TABLE_NAME}.snapshots ORDER BY committed_at")
snapshots = snapshots_df.collect()

if len(snapshots) >= 2:
    second_snapshot_id = snapshots[1]['snapshot_id']
    print(f"=== Querying second snapshot (ID: {second_snapshot_id}) ===")
    spark.sql(f"SELECT * FROM {FULL_TABLE_NAME} VERSION AS OF {second_snapshot_id}").show()
    
if len(snapshots) >= 3:
    third_snapshot_id = snapshots[2]['snapshot_id']
    print(f"\n=== Querying third snapshot (ID: {third_snapshot_id}) ===")
    spark.sql(f"SELECT * FROM {FULL_TABLE_NAME} VERSION AS OF {third_snapshot_id}").show()

# Show comparison
print(f"\n=== Current table state ===")
spark.sql(f"SELECT * FROM {FULL_TABLE_NAME}").show()