# Apache Iceberg Hands-on Lab: Cafe Chain Data Management

Software Engineering, Shenkar - BDE Course L6

In this lab, you'll work with Apache Iceberg to manage data for GlobalCafe, a multinational coffee chain. You'll learn how to:
- Set up an Iceberg environment
- Create and manage tables
- Handle schema evolution
- Perform time travel queries
- Implement data maintenance tasks

## Prerequisites
- Docker and Docker Compose installed
- Basic understanding of SQL and PySpark
- Git installed

## Part 1: Environment Setup

First, let's set up our environment. Run these commands in your terminal:

In [None]:
# Run these commands in your terminal, not in this notebook
'''
git clone https://github.com/your-repo/cafe-iceberg-lab
cd cafe-iceberg-lab
docker-compose up -d
'''

Now let's initialize our PySpark session with Iceberg support:

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CafeIcebergLab") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/warehouse/cafe") \
    .getOrCreate()

25/05/06 08:50:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Part 2: Creating Tables

Let's create tables for our cafe chain data model:

In [2]:
# Create database
spark.sql("CREATE DATABASE IF NOT EXISTS cafe_ops")
spark.sql("USE cafe_ops")

# Create sales transactions table
spark.sql("""
CREATE TABLE IF NOT EXISTS sales_transactions (
    transaction_id BIGINT,
    store_id BIGINT,
    cashier_id BIGINT,
    transaction_timestamp TIMESTAMP,
    total_amount DECIMAL(10,2),
    payment_method STRING,
    items ARRAY<STRUCT<
        item_id: BIGINT,
        quantity: INT,
        unit_price: DECIMAL(10,2)
    >>
) USING iceberg
PARTITIONED BY (days(transaction_timestamp))
""")

# Create store locations table
spark.sql("""
CREATE TABLE IF NOT EXISTS store_locations (
    store_id BIGINT,
    store_name STRING,
    city STRING,
    country STRING,
    opening_date DATE,
    store_type STRING,
    square_footage INT
) USING iceberg
""")

# Create menu items table
spark.sql("""
CREATE TABLE IF NOT EXISTS menu_items (
    item_id BIGINT,
    item_name STRING,
    category STRING,
    price DECIMAL(10,2),
    is_seasonal BOOLEAN,
    available_from DATE,
    available_until DATE,
    calories INT
) USING iceberg
""")

DataFrame[]

## Exercise 1: Insert Sample Data

Insert the following sample data into your tables. Use both SQL and DataFrame APIs.

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date

# Define the schema explicitly
schema = StructType([
    StructField("store_id", LongType(), False),
    StructField("store_name", StringType(), False),
    StructField("city", StringType(), False),
    StructField("country", StringType(), False),
    StructField("opening_date", DateType(), False),
    StructField("store_type", StringType(), False),
    StructField("square_footage", IntegerType(), False)
])

# Convert string dates to Python date objects
stores_data = [
    (1, "Downtown Plaza", "New York", "USA", date(2020, 1, 15), "flagship", 2500),
    (2, "Airport Terminal 3", "London", "UK", date(2021, 3, 20), "kiosk", 800),
    (3, "Shopping Mall", "Tokyo", "Japan", date(2019, 11, 30), "standard", 1500)
]

# Create DataFrame with schema
stores_df = spark.createDataFrame(stores_data, schema=schema)

# Now append to the table
stores_df.writeTo("cafe_ops.store_locations").append()
stores_df.show()

                                                                                

+--------+------------------+--------+-------+------------+----------+--------------+
|store_id|        store_name|    city|country|opening_date|store_type|square_footage|
+--------+------------------+--------+-------+------------+----------+--------------+
|       1|    Downtown Plaza|New York|    USA|  2020-01-15|  flagship|          2500|
|       2|Airport Terminal 3|  London|     UK|  2021-03-20|     kiosk|           800|
|       3|     Shopping Mall|   Tokyo|  Japan|  2019-11-30|  standard|          1500|
+--------+------------------+--------+-------+------------+----------+--------------+



## Explore MinIO

http://localhost:9001/browser/warehouse/cafe_ops/

In [8]:
# TODO: Exercise 1.1
# Insert menu items using SQL
# Hint: Use spark.sql() with INSERT INTO statement

# Insert menu items using SQL
# Your code here
spark.sql("""
INSERT INTO cafe_ops.menu_items VALUES
(101, 'Espresso', 'Beverage', 2.99, false, DATE('2020-01-01'), DATE('2030-01-01'), 5),
(102, 'Latte', 'Beverage', 3.49, false, DATE('2020-01-01'), DATE('2030-01-01'), 150),
(103, 'Pumpkin Spice Latte', 'Beverage', 4.29, true, DATE('2024-10-01'), DATE('2024-12-31'), 210),
(201, 'Blueberry Muffin', 'Pastry', 2.49, false, DATE('2020-01-01'), DATE('2030-01-01'), 380)
""")


spark.sql("SELECT * FROM cafe_ops.menu_items").show(truncate=False)




+-------+-------------------+--------+-----+-----------+--------------+---------------+--------+-----------+-------------+---------+
|item_id|item_name          |category|price|is_seasonal|available_from|available_until|calories|sugar_grams|protein_grams|fat_grams|
+-------+-------------------+--------+-----+-----------+--------------+---------------+--------+-----------+-------------+---------+
|102    |Latte              |Beverage|3.49 |false      |2020-01-01    |2030-01-01     |150     |14         |6            |4        |
|101    |Espresso           |Beverage|2.99 |false      |2020-01-01    |2030-01-01     |5       |0          |1            |0        |
|201    |Blueberry Muffin   |Pastry  |2.49 |false      |2020-01-01    |2030-01-01     |380     |10         |2            |4        |
|103    |Pumpkin Spice Latte|Beverage|4.29 |true       |2024-10-01    |2024-12-31     |210     |2          |5            |8        |
+-------+-------------------+--------+-----+-----------+-------------

## Exercise 2: Schema Evolution

Add nutritional information to the menu items table and update existing records.

In [7]:
# TODO: Exercise 2.1
# Add new columns for nutritional info
# Update existing records with the new information

# Your code here

spark.sql("""
ALTER TABLE cafe_ops.menu_items ADD COLUMNS (
    sugar_grams INT,
    protein_grams INT,
    fat_grams INT
)
""")

spark.sql("""
UPDATE cafe_ops.menu_items
SET sugar_grams = 0, protein_grams = 1, fat_grams = 0
WHERE item_id = 101
""")

spark.sql("""
UPDATE cafe_ops.menu_items
SET sugar_grams = 14, protein_grams = 6, fat_grams = 4
WHERE item_id = 102
""")

spark.sql("""
UPDATE cafe_ops.menu_items
SET sugar_grams = 2, protein_grams = 5, fat_grams = 8
WHERE item_id = 103
""")

spark.sql("""
UPDATE cafe_ops.menu_items
SET sugar_grams = 10, protein_grams = 2, fat_grams = 4
WHERE item_id = 201
""")





DataFrame[]

## Exercise 3: Time Travel Queries

Perform time travel queries to explore different versions of your data.

In [10]:
# TODO: Exercise 3.1
# 1. Query the current state of menu_items
# 2. Make some changes to the data
# 3. Query a previous version using timestamp
# 4. Query using snapshot ID

# Your code here

# spark.sql("SELECT * FROM cafe_ops.menu_items").show(truncate=False)
# spark.sql("DELETE FROM cafe_ops.menu_items WHERE item_id = 101")
# spark.sql("SELECT * FROM cafe_ops.menu_items.snapshots").show(truncate=False)

spark.sql("""
SELECT * FROM cafe_ops.menu_items TIMESTAMP AS OF '2025-05-06 12:03:59'
""").show(truncate=False)

spark.sql("""
SELECT * FROM cafe_ops.menu_items VERSION AS OF 7874844710241166134
""").show(truncate=False)

spark.sql("""
SELECT snapshot_id, committed_at, operation
FROM cafe_ops.menu_items.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)



+-------+-------------------+--------+-----+-----------+--------------+---------------+--------+-----------+-------------+---------+
|item_id|item_name          |category|price|is_seasonal|available_from|available_until|calories|sugar_grams|protein_grams|fat_grams|
+-------+-------------------+--------+-----+-----------+--------------+---------------+--------+-----------+-------------+---------+
|103    |Pumpkin Spice Latte|Beverage|4.29 |true       |2024-10-01    |2024-12-31     |210     |2          |5            |8        |
|102    |Latte              |Beverage|3.49 |false      |2020-01-01    |2030-01-01     |150     |14         |6            |4        |
|201    |Blueberry Muffin   |Pastry  |2.49 |false      |2020-01-01    |2030-01-01     |380     |10         |2            |4        |
+-------+-------------------+--------+-----+-----------+--------------+---------------+--------+-----------+-------------+---------+

+-------+-------------------+--------+-----+-----------+------------

## Exercise 4: Data Maintenance

Implement data maintenance tasks including snapshot expiration and file compaction.

In [21]:
# TODO: Exercise 4.1
# 1. Expire old snapshots
# 2. Rewrite data files (compaction)

# Your code here


spark.sql("""
SELECT snapshot_id, committed_at, operation
FROM cafe_ops.menu_items.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)

spark.sql("""
CALL spark_catalog.system.expire_snapshots(
    table => 'cafe_ops.menu_items',
    older_than => TIMESTAMP '2025-05-06 09:05:16.238'
)
""")

spark.sql("""
SELECT snapshot_id, committed_at, operation
FROM cafe_ops.menu_items.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)


+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|8732803290891266204|2025-05-06 09:05:16.238|delete   |
|7874844710241166134|2025-05-06 09:03:15.81 |overwrite|
|2068038017345985171|2025-05-06 09:03:15.175|overwrite|
|4444677615713026846|2025-05-06 09:03:14.568|overwrite|
|8711783651375595802|2025-05-06 09:03:13.922|overwrite|
|6278888271366296432|2025-05-06 09:00:24.857|append   |
+-------------------+-----------------------+---------+



25/05/06 09:14:17 WARN Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MDatabase and subclasses resulted in no possible candidates
Required table missing : "DBS" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.schema.autoCreateTables"
org.datanucleus.store.rdbms.exceptions.MissingTableException: Required table missing : "DBS" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.schema.autoCreateTables"
	at org.datanucleus.store.rdbms.table.AbstractTable.exists(AbstractTable.java:606)
	at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.performTablesValidation(RDBMSStoreManager.java:3385)
	at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.run(RDBMSStoreManager.java:2896)
	at org.datanucleus.store.rdbms.AbstractSchemaTran

Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:85)
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
	at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:143)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:70)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:65)
	at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
	at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:147)
	at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:87)
	at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:70)
	at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:147)
	at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:844)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:169)
	at org.apache.iceberg.spark.SparkSessionCatalog.loadTable(SparkSessionCatalog.java:146)
	at org.apache.iceberg.spark.procedures.BaseProcedure.loadSparkTable(BaseProcedure.java:143)
	at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:104)
	at org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:88)
	at org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.call(ExpireSnapshotsProcedure.java:113)
	at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at jdk.internal.reflect.GeneratedMethodAccessor95.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1742)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:97)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72)
	at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:189)
	at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
	... 66 more
Caused by: java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1740)
	... 78 more
Caused by: MetaException(message:Version information not found in metastore. )
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:83)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:92)
	at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:6902)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:162)
	... 84 more
Caused by: MetaException(message:Version information not found in metastore. )
	at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java:7810)
	at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java:7788)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:101)
	at jdk.proxy2/jdk.proxy2.$Proxy53.verifySchema(Unknown Source)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:595)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:588)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:655)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:431)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:148)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:79)
	... 87 more


In [23]:
print("Available tables:")
spark.sql("SHOW TABLES FROM cafe_ops").show()
spark.sql("SELECT * FROM cafe_ops.sales_transactions").show(truncate=False)
spark.sql("SELECT * FROM cafe_ops.menu_items").show(truncate=False)
spark.sql("SELECT * FROM cafe_ops.store_locations").show(truncate=False)


Available tables:
+---------+------------------+-----------+
|namespace|         tableName|isTemporary|
+---------+------------------+-----------+
| cafe_ops|        menu_items|      false|
| cafe_ops|sales_transactions|      false|
| cafe_ops|   store_locations|      false|
+---------+------------------+-----------+

+--------------+--------+----------+---------------------+------------+--------------+-----+
|transaction_id|store_id|cashier_id|transaction_timestamp|total_amount|payment_method|items|
+--------------+--------+----------+---------------------+------------+--------------+-----+
+--------------+--------+----------+---------------------+------------+--------------+-----+

+-------+-------------------+--------+-----+-----------+--------------+---------------+--------+-----------+-------------+---------+
|item_id|item_name          |category|price|is_seasonal|available_from|available_until|calories|sugar_grams|protein_grams|fat_grams|
+-------+-------------------+--------+---

## Bonus Exercise: Data Quality and Analytics

Implement data quality checks and write analytical queries.

In [27]:
# TODO: Bonus 1
# Implement data quality checks:
# 1. Check for negative prices
# 2. Check for future dates
# 3. Validate store_ids

# Your code here


from pyspark.sql.functions import col, current_date

# Load the menu_items table
menu_items_df = spark.table("cafe_ops.menu_items")

negative_prices_df = menu_items_df.filter(col("price") < 0).select("item_id", "price")
negative_prices_df.show()



future_dates_df = menu_items_df.filter(
    (col("available_from") > current_date()) |
    (col("available_until") > current_date())
).select("item_id", "available_from", "available_until")
future_dates_df.show()



# Load the sales_transactions table
sales_transactions_df = spark.table("cafe_ops.sales_transactions")

# Load the store_locations table
store_locations_df = spark.table("cafe_ops.store_locations")

invalid_store_ids_df = sales_transactions_df.join(
    valid_store_ids_df,
    on="store_id",
    how="left_anti"
).select("store_id", "transaction_id")
invalid_store_ids_df.show()




+-------+-----+
|item_id|price|
+-------+-----+
+-------+-----+

+-------+--------------+---------------+
|item_id|available_from|available_until|
+-------+--------------+---------------+
|    102|    2020-01-01|     2030-01-01|
|    201|    2020-01-01|     2030-01-01|
+-------+--------------+---------------+

+--------+--------------+
|store_id|transaction_id|
+--------+--------------+
+--------+--------------+



In [32]:
# TODO: Bonus 2
# Write analytics queries:
# 1. Top selling items by revenue
# 2. Sales patterns by store type
# 3. Seasonal item performance

# Your code here

from pyspark.sql import Row
from pyspark.sql.functions import explode, col, sum as _sum

sample_sales = [
    Row(transaction_id=1, store_id=1, cashier_id=101, transaction_timestamp="2024-01-01 10:00:00", total_amount=10.98, payment_method="card", items=[
        Row(item_id=102, quantity=2, unit_price=3.49),
        Row(item_id=201, quantity=1, unit_price=4.00)
    ])
]

test_sales_df = spark.createDataFrame(sample_sales)
exploded_test_sales_df = test_sales_df.withColumn("item", explode("items"))
exploded_test_sales_df = exploded_test_sales_df \
    .withColumn("item_id", col("item.item_id")) \
    .withColumn("quantity", col("item.quantity")) \
    .withColumn("unit_price", col("item.unit_price"))

menu_items_df = spark.table("cafe_ops.menu_items")

revenue_df = exploded_test_sales_df.join(menu_items_df.select("item_id", "item_name"), on="item_id", how="left") \
    .withColumn("revenue", col("quantity") * col("unit_price")) \
    .groupBy("item_id", "item_name") \
    .agg(_sum("revenue").alias("total_revenue")) \
    .orderBy(col("total_revenue").desc())

revenue_df.show()







+-------+----------------+-------------+
|item_id|       item_name|total_revenue|
+-------+----------------+-------------+
|    102|           Latte|         6.98|
|    201|Blueberry Muffin|          4.0|
+-------+----------------+-------------+



                                                                                

## Submission Questions

Please answer the following questions:

1. How does Iceberg handle schema evolution differently from traditional databases?
2. What are the benefits of time travel in this cafe chain context?
3. How would you implement a data retention policy using Iceberg features?