In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import random
from datetime import datetime, timedelta
import pyspark

In [2]:
import os
# Set AWS region environment variable
os.environ['AWS_REGION'] = 'us-west-2'  # Replace with your AWS region
os.environ['AWS_ACCESS_KEY_ID'] = 'ADD_AWS_ACCESS_KEY'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'ADD_AWS_SECRET_KEY'

In [3]:
# Initialize Spark session with Glue catalog configuration
spark = SparkSession.builder \
    .appName("IcebergAssetMaintenance") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.jars.packages", 
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2," +
            "org.apache.iceberg:iceberg-aws:1.5.2," + 
            "software.amazon.awssdk:bundle:2.17.89," + 
            "software.amazon.awssdk:url-connection-client:2.17.89") \
    .config("spark.sql.defaultCatalog", "glue_catalog") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://dealership-demo/blame_demo") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.hadoop.fs.s3a.access.key", 'AWS_ACCESS_KEY') \
    .config("spark.hadoop.fs.s3a.secret.key", 'AWS_SECRET_KEY') \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

In [4]:
# Print the Spark session details to verify catalog configuration
print(spark.conf.get("spark.sql.catalog.glue_catalog"))
#print(spark.conf.get("spark.sql.catalog.glue_catalog.type"))
print(spark.conf.get("spark.sql.catalog.glue_catalog.warehouse"))
print(spark.conf.get("spark.sql.catalog.glue_catalog.catalog-impl"))
print(spark.conf.get("spark.sql.catalog.glue_catalog.io-impl"))
print(spark)

org.apache.iceberg.spark.SparkCatalog
s3://dealership-demo/blame_demo
org.apache.iceberg.aws.glue.GlueCatalog
org.apache.iceberg.aws.s3.S3FileIO
<pyspark.sql.session.SparkSession object at 0xffff899c4490>


In [5]:
# Print the configuration to check the Iceberg version
print("Iceberg Package:", spark.conf.get("spark.jars.packages"))

Iceberg Package: org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws:1.5.2,software.amazon.awssdk:bundle:2.17.89,software.amazon.awssdk:url-connection-client:2.17.89


In [6]:
# List databases in the Glue catalog
try:
    databases = spark.sql("SHOW CATALOGS")
    databases.show()
except Exception as e:
    print(f"Error: {e}")

+-------------+
|      catalog|
+-------------+
| glue_catalog|
|spark_catalog|
+-------------+



In [7]:
# List databases in the default Glue catalog
try:
    databases = spark.sql("SHOW DATABASES")
    databases.show()
except Exception as e:
    print(f"Error: {e}")

+----------+
| namespace|
+----------+
|blame_demo|
+----------+



In [40]:
# List tables in the glue_catalog.iceberg database
try:
    tables = spark.sql("SHOW TABLES IN glue_catalog.blame_demo")
    tables.show()
except Exception as e:
    print(f"Error listing tables: {e}")

+----------+-----------------+-----------+
| namespace|        tableName|isTemporary|
+----------+-----------------+-----------+
|blame_demo|asset_maintenance|      false|
|blame_demo|           assets|      false|
|blame_demo| real_estate_data|      false|
+----------+-----------------+-----------+



In [51]:
## Run a Query to create a Glue table with the Iceberg table format if it doesn't already exist
try:
    spark.sql("CREATE TABLE IF NOT EXISTS glue_catalog.blame_demo.assets (name string) USING iceberg;")
except Exception as e:
    print(f"Error in creating 'asset' table: {e}")

In [44]:
#check if table creation worked
try:
    tables = spark.sql("SHOW TABLES IN glue_catalog.blame_demo")
    tables.show()
except Exception as e:
    print(f"Error checking for table existence: {e}")

+----------+-----------------+-----------+
| namespace|        tableName|isTemporary|
+----------+-----------------+-----------+
|blame_demo|asset_maintenance|      false|
|blame_demo|           assets|      false|
|blame_demo| real_estate_data|      false|
+----------+-----------------+-----------+



In [45]:
# Define the schema
schema = StructType([
    StructField("asset_id", StringType(), True),
    StructField("asset_type", StringType(), True),
    StructField("maintenance_date", TimestampType(), True),
    StructField("invoice_date", TimestampType(), True),
    StructField("issue", IntegerType(), True),
    StructField("cost", IntegerType(), True)
])

# Class for Asset data generation

In [46]:
class Asset:
    issues_map = {"Inspection": 1, "Repair": 2, "Replacement": 3}

    def __init__(self, asset_type, start_date, zero_cost_percentage):
        self.asset_id = f"{asset_type[:3].upper()}_{random.randint(1000, 9999)}"
        self.asset_type = asset_type
        
        # Ensure maintenance_date is within the same month as start_date
        days_in_month = (start_date.replace(month=start_date.month % 12 + 1, day=1) - timedelta(days=1)).day
        self.maintenance_date = start_date + timedelta(days=random.randint(0, days_in_month - start_date.day))

        # Invoice date is a random number of days (0 to 21) after maintenance date
        self.invoice_date = self.maintenance_date + timedelta(days=random.randint(0, 21))
        
        self.issue = self.issues_map[random.choice(list(self.issues_map.keys()))]

        #Set 10% of costs to zero, the rest are random.
        if random.random() < zero_cost_percentage:
            self.cost = 0
        else:
            self.cost = random.randint(100, 1000)

    def to_dict(self):
        return {
            "asset_id": self.asset_id,
            "asset_type": self.asset_type,
            "maintenance_date": self.maintenance_date,
            "invoice_date": self.invoice_date,
            "issue": self.issue,
            "cost": self.cost
        }

# Simulate a file with good data being loaded to Iceberg

In [29]:
# Generate sample data
asset_types = ["HVAC", "Vehicle", "Refrigeration", "POS"]
start_date = datetime(2024, 1, 1)
zero_cost_percentage = 0.1

data = []

In [31]:
for asset_type in asset_types:
    for _ in range(250):
        asset = Asset(asset_type, start_date, zero_cost_percentage)
        data.append(asset.to_dict())



In [32]:
# Create DataFrame
df = spark.createDataFrame(data, schema)

In [33]:
df.show(10)

+--------+----------+-------------------+-------------------+-----+----+
|asset_id|asset_type|   maintenance_date|       invoice_date|issue|cost|
+--------+----------+-------------------+-------------------+-----+----+
|HVA_9093|      HVAC|2024-01-03 00:00:00|2024-01-03 00:00:00|    3|   0|
|HVA_4159|      HVAC|2024-01-26 00:00:00|2024-01-31 00:00:00|    3| 510|
|HVA_3081|      HVAC|2024-01-21 00:00:00|2024-01-29 00:00:00|    2| 687|
|HVA_2811|      HVAC|2024-01-09 00:00:00|2024-01-14 00:00:00|    1| 518|
|HVA_9942|      HVAC|2024-01-14 00:00:00|2024-02-03 00:00:00|    2| 894|
|HVA_7672|      HVAC|2024-01-17 00:00:00|2024-01-20 00:00:00|    2| 739|
|HVA_4783|      HVAC|2024-01-23 00:00:00|2024-02-02 00:00:00|    2| 378|
|HVA_3768|      HVAC|2024-01-23 00:00:00|2024-02-03 00:00:00|    2|   0|
|HVA_5687|      HVAC|2024-01-01 00:00:00|2024-01-20 00:00:00|    3|   0|
|HVA_2257|      HVAC|2024-01-13 00:00:00|2024-01-18 00:00:00|    2|   0|
+--------+----------+-------------------+----------

In [17]:

# Create the Iceberg table with partitioning
spark.sql("""
CREATE OR REPLACE TABLE glue_catalog.blame_demo.asset_maintenance (
    asset_id STRING,
    asset_type STRING,
    maintenance_date TIMESTAMP,
    invoice_date TIMESTAMP,
    issue INT,
    cost INT
)
USING iceberg
PARTITIONED BY (asset_type, months(invoice_date))
""")

DataFrame[]

In [25]:
# Write DataFrame to the Iceberg table
df.write.format("iceberg").mode("overwrite").save("glue_catalog.blame_demo.asset_maintenance")

# Simulate a file with bad data being loaded  

In [34]:
# Generate sample of bad data
asset_types = ["HVAC", "Vehicle", "Refrigeration", "POS"]
start_date = datetime(2024, 2, 1)
zero_cost_percentage = 0.5

data = []

In [35]:
for asset_type in asset_types:
    for _ in range(250):
        asset = Asset(asset_type, start_date, zero_cost_percentage)
        data.append(asset.to_dict())



In [36]:
# Create DataFrame
df = spark.createDataFrame(data, schema)

In [37]:
# Append DataFrame to the Iceberg table
df.write.format("iceberg").mode("append").save("glue_catalog.blame_demo.asset_maintenance")

In [38]:
# Execute query to get a sample of 10 rows
try:
    sample_spark_df = spark.sql("SELECT * FROM glue_catalog.blame_demo.asset_maintenance LIMIT 10")
    sample_spark_df.show()
except Exception as e:
    print(f"Error: {e}")

+--------+----------+-------------------+-------------------+-----+----+
|asset_id|asset_type|   maintenance_date|       invoice_date|issue|cost|
+--------+----------+-------------------+-------------------+-----+----+
|VEH_4466|   Vehicle|2024-02-14 00:00:00|2024-03-02 00:00:00|    2|   0|
|VEH_9868|   Vehicle|2024-02-29 00:00:00|2024-03-16 00:00:00|    2| 148|
|VEH_1653|   Vehicle|2024-02-10 00:00:00|2024-03-01 00:00:00|    2|   0|
|VEH_1338|   Vehicle|2024-02-18 00:00:00|2024-03-06 00:00:00|    1| 446|
|VEH_3325|   Vehicle|2024-02-29 00:00:00|2024-03-15 00:00:00|    3| 868|
|VEH_4410|   Vehicle|2024-02-20 00:00:00|2024-03-02 00:00:00|    3|   0|
|VEH_3234|   Vehicle|2024-02-21 00:00:00|2024-03-04 00:00:00|    3|   0|
|VEH_5695|   Vehicle|2024-02-26 00:00:00|2024-03-10 00:00:00|    1| 533|
|VEH_8597|   Vehicle|2024-02-25 00:00:00|2024-03-07 00:00:00|    3| 567|
|VEH_4878|   Vehicle|2024-02-28 00:00:00|2024-03-03 00:00:00|    2| 783|
+--------+----------+-------------------+----------

In [39]:
# Get the row with the max timestamp
try:
    max_timestamp = spark.sql("""
        SELECT * 
        FROM glue_catalog.blame_demo.asset_maintenance 
        WHERE invoice_date = (SELECT MAX(invoice_date) FROM glue_catalog.blame_demo.asset_maintenance);
    """)
    max_timestamp.show()
except Exception as e:
    print(f"Error: {e}")


+--------+-------------+-------------------+-------------------+-----+----+
|asset_id|   asset_type|   maintenance_date|       invoice_date|issue|cost|
+--------+-------------+-------------------+-------------------+-----+----+
|VEH_4019|      Vehicle|2024-02-29 00:00:00|2024-03-21 00:00:00|    3|   0|
|REF_5086|Refrigeration|2024-02-29 00:00:00|2024-03-21 00:00:00|    1| 171|
|REF_5089|Refrigeration|2024-02-29 00:00:00|2024-03-21 00:00:00|    3|   0|
|POS_6446|          POS|2024-02-29 00:00:00|2024-03-21 00:00:00|    3| 175|
+--------+-------------+-------------------+-------------------+-----+----+



# Generate another good sample file

In [47]:
# Generate sample of good data for March
asset_types = ["HVAC", "Vehicle", "Refrigeration", "POS"]
start_date = datetime(2024, 3, 1)
zero_cost_percentage = 0.1

data = []

In [48]:
for asset_type in asset_types:
    for _ in range(250):
        asset = Asset(asset_type, start_date, zero_cost_percentage)
        data.append(asset.to_dict())



In [49]:
# Create DataFrame
df = spark.createDataFrame(data, schema)

In [50]:
# Append DataFrame to the Iceberg table
df.write.format("iceberg").mode("append").save("glue_catalog.blame_demo.asset_maintenance")

In [52]:
# List all snapshots of the Iceberg table
snapshots_df = spark.sql("SELECT * FROM glue_catalog.blame_demo.asset_maintenance.snapshots")
snapshots_df.show(truncate=False)


+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                                  |summary                                                                                                                                                                                                                                                                                            

In [53]:
snapshot_id = '6982677708530941242'

# Run the time travel query using the snapshot ID
data_at_bad_snapshot = spark.sql(f"""
    SELECT * FROM glue_catalog.blame_demo.asset_maintenance
    FOR SYSTEM_VERSION AS OF {snapshot_id}
""")

In [58]:
p_df = data_at_bad_snapshot.toPandas()

In [59]:
p_df.head()

Unnamed: 0,asset_id,asset_type,maintenance_date,invoice_date,issue,cost
0,VEH_4466,Vehicle,2024-02-14,2024-03-02,2,0
1,VEH_9868,Vehicle,2024-02-29,2024-03-16,2,148
2,VEH_1653,Vehicle,2024-02-10,2024-03-01,2,0
3,VEH_1338,Vehicle,2024-02-18,2024-03-06,1,446
4,VEH_3325,Vehicle,2024-02-29,2024-03-15,3,868


In [64]:
print((p_df['cost'] == 0).sum()/p_df['cost'].count())

0.306


In [None]:
# Rollback to previous snapshot

In [65]:
# Roll back the table to the identified snapshot
spark.sql(f"""
    CALL system.rollback_to_snapshot(
        'glue_catalog.blame_demo.asset_maintenance',
        {snapshot_id}
    )
""")

DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

In [75]:
# Validate the roll back by checking the table history
current_snapshot = spark.sql("SELECT * FROM glue_catalog.blame_demo.asset_maintenance.history")
current_snapshot.show(truncate=False)

#in the output we can see that '6982677708530941242' was generated, then it became the parent of the next snapshot, then it was recorded again, signally a roll back..

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2024-07-01 06:02:21.178|1291093834306420945|NULL               |false              |
|2024-07-01 06:40:46.55 |7046654947850004437|NULL               |false              |
|2024-07-01 06:57:53.009|2047182808623583161|NULL               |true               |
|2024-07-01 07:07:11.404|6982677708530941242|2047182808623583161|true               |
|2024-07-01 07:14:45.574|1182465130147489411|6982677708530941242|false              |
|2024-07-01 11:13:58.457|6982677708530941242|2047182808623583161|true               |
+-----------------------+-------------------+-------------------+-------------------+

