# LASAGNA Big Data Architecture Showcase

This notebook demonstrates and tests all components of the LASAGNA big data architecture:

## Architecture Components Tested:
- **MinIO**: S3-compatible object storage
- **PostgreSQL**: Metadata database
- **Hive Metastore**: Centralized catalog service
- **Spark Cluster**: Distributed processing engine
- **Trino**: Fast analytical query engine
- **JupyterLab**: Interactive development environment

## Table Formats Demonstrated:
- **Hive Tables**: Traditional format
- **Delta Lake**: ACID transactions and time travel
- **Apache Iceberg**: Schema evolution and advanced partitioning

## What We'll Test:
1. Connection to all services
2. Sample data creation
3. Table operations across different formats
4. Cross-engine querying with Trino
5. Performance comparisons
6. Advanced features (ACID, time travel, schema evolution)


## 1. Environment Setup and Connection Tests


In [1]:
# Import required libraries
import pandas as pd
import numpy as np
from datetime import datetime, date, timedelta
import random
import time
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import trino
import json

print("✅ All libraries imported successfully")


✅ All libraries imported successfully


In [2]:
# Test connections to all services
def test_service_connections():
    # When running inside workspace container, use container names instead of localhost
    services = {
        "MinIO Console": "http://minio:9090",
        "Spark Master": "http://spark-master:5050",  # Spark Master UI
        "Spark Worker A": "http://spark-worker-a:5051",  # Spark Worker UI
        "Spark Worker B": "http://spark-worker-b:5052",  # Spark Worker UI
        "Trino": "http://trino:8080"
    }
    
    print("🔍 Testing service connections...")
    for service, url in services.items():
        try:
            response = requests.get(url, timeout=5)
            if response.status_code == 200:
                print(f"✅ {service}: Connected")
            else:
                print(f"⚠️  {service}: Responding but status {response.status_code}")
        except Exception as e:
            print(f"❌ {service}: Connection failed - {str(e)}")

test_service_connections()


🔍 Testing service connections...
✅ MinIO Console: Connected
✅ Spark Master: Connected
✅ Spark Worker A: Connected
❌ Spark Worker B: Connection failed - HTTPConnectionPool(host='spark-worker-b', port=5052): Max retries exceeded with url: / (Caused by NameResolutionError("<urllib3.connection.HTTPConnection object at 0xffff46f395a0>: Failed to resolve 'spark-worker-b' ([Errno -2] Name or service not known)"))
✅ Trino: Connected


## 2. Sample Data Generation

Let's create realistic sample datasets to test our architecture:


In [3]:
# Generate sample employee data (using pure Python to avoid Spark worker numpy issues)
def generate_employee_data(num_records=10000):
    random.seed(42)
    
    departments = ['Engineering', 'Sales', 'Marketing', 'HR', 'Finance', 'Operations']
    positions = ['Manager', 'Senior', 'Mid-level', 'Junior', 'Intern']
    locations = ['New York', 'San Francisco', 'London', 'Tokyo', 'Berlin']
    
    data = []
    base_date = date(2020, 1, 1)
    
    for i in range(num_records):
        hire_date = base_date + timedelta(days=random.randint(0, 1460))  # 4 years range
        data.append({
            'employee_id': f'EMP_{i+1:06d}',
            'first_name': f'Employee{i+1}',
            'last_name': f'LastName{i+1}',
            'email': f'employee{i+1}@company.com',
            'department': random.choice(departments),
            'position': random.choice(positions),
            'location': random.choice(locations),
            'salary': random.randint(40000, 200000),
            'hire_date': hire_date, # hire_date.strftime('%Y-%m-%d'),
            'is_active': random.choice([True, False])
        })
    
    return pd.DataFrame(data)

# Generate sample sales data (using pure Python to avoid Spark worker numpy issues)
def generate_sales_data(num_records=50000):
    import builtins  # Import builtins module to access round function
    
    random.seed(42)
    
    products = ['Laptop', 'Phone', 'Tablet', 'Monitor', 'Keyboard', 'Mouse', 'Headphones']
    regions = ['North America', 'Europe', 'Asia', 'South America', 'Africa']
    
    data = []
    base_date = datetime(2023, 1, 1)
    
    for i in range(num_records):
        sale_date = base_date + timedelta(days=random.randint(0, 365))
        quantity = random.randint(1, 10)
        unit_price = random.uniform(50, 2000)
        
        # Use built-in round function to avoid PySpark namespace collision
        unit_price_rounded = builtins.round(unit_price, 2)
        total_amount_rounded = builtins.round(quantity * unit_price, 2)
        
        data.append({
            'sale_id': f'SALE_{i+1:08d}',
            'product': random.choice(products),
            'quantity': quantity,
            'unit_price': unit_price_rounded,
            'total_amount': total_amount_rounded,
            'region': random.choice(regions),
            'sale_date': sale_date.strftime('%Y-%m-%d'),
            'customer_id': f'CUST_{random.randint(1, 5000):06d}'
        })
    
    return pd.DataFrame(data)

print("📊 Sample data generation functions created")


📊 Sample data generation functions created


In [4]:
# Generate the sample datasets
print("🔄 Generating sample datasets...")

employees_df = generate_employee_data(10000)
sales_df = generate_sales_data(50000)

print(f"✅ Generated {len(employees_df):,} employee records")
print(f"✅ Generated {len(sales_df):,} sales records")

# Display sample data
print("\n📋 Sample Employee Data:")
print(employees_df.head())

print("\n📋 Sample Sales Data:")
print(sales_df.head())


🔄 Generating sample datasets...
✅ Generated 10,000 employee records
✅ Generated 50,000 sales records

📋 Sample Employee Data:
  employee_id first_name  last_name                  email   department  \
0  EMP_000001  Employee1  LastName1  employee1@company.com  Engineering   
1  EMP_000002  Employee2  LastName2  employee2@company.com   Operations   
2  EMP_000003  Employee3  LastName3  employee3@company.com  Engineering   
3  EMP_000004  Employee4  LastName4  employee4@company.com        Sales   
4  EMP_000005  Employee5  LastName5  employee5@company.com    Marketing   

  position       location  salary   hire_date  is_active  
0  Manager         London  104196  2023-08-02       True  
1  Manager         Berlin   62790  2020-10-12      False  
2  Manager  San Francisco  100990  2020-03-06       True  
3   Intern          Tokyo   97787  2023-02-23      False  
4  Manager  San Francisco  150785  2023-04-21      False  

📋 Sample Sales Data:
         sale_id     product  quantity  unit_pr

## 3. Spark Session Setup and Hive Tables

Let's initialize Spark and create Hive tables to test the traditional data warehouse functionality:


In [5]:
# Initialize Spark Session with both Iceberg and Delta Lake support
print("🚀 Initializing Spark Session with Iceberg and Delta Lake support...")

spark = SparkSession.builder \
    .appName("LASAGNA-Architecture-Test") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2,io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hive") \
    .config("spark.sql.catalog.iceberg.uri", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

print("🚀 Spark Session initialized with Iceberg and Delta Lake support")
print(f"📊 Spark Version: {spark.version}")
print(f"🔗 Spark Master: {spark.conf.get('spark.master')}")
print(f"📁 Warehouse Directory: {spark.conf.get('spark.sql.warehouse.dir')}")

# Test Hive Metastore connection
try:
    spark.sql("SHOW DATABASES").show()
    print("✅ Hive Metastore connection successful")
except Exception as e:
    print(f"❌ Hive Metastore connection failed: {e}")

# Test Iceberg catalog
try:
    spark.sql("SHOW NAMESPACES IN iceberg").show()
    print("✅ Iceberg catalog connection successful")
except Exception as e:
    print(f"❌ Iceberg catalog connection failed: {e}")

print("ℹ️  Both Iceberg and Delta Lake are now configured and ready to use")


🚀 Initializing Spark Session with Iceberg and Delta Lake support...
:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b609ec88-426c-4e1d-9825-1ad94112cf75;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.2 in central
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 120ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.4.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            mo

🚀 Spark Session initialized with Iceberg and Delta Lake support
📊 Spark Version: 3.4.3


25/09/22 22:28:51 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


🔗 Spark Master: spark://spark-master:7077
📁 Warehouse Directory: s3a://warehouse/


Hive Session ID = 8f8ffcf1-defe-463a-a7a9-06acecd36c23


+--------------+
|     namespace|
+--------------+
|       default|
|dunder_mifflin|
|  lasagna_demo|
|  lasagna_test|
|          test|
|  test_hive_db|
+--------------+

✅ Hive Metastore connection successful
+--------------+
|     namespace|
+--------------+
|       default|
|dunder_mifflin|
|  lasagna_demo|
|  lasagna_test|
|          test|
|  test_hive_db|
+--------------+

✅ Iceberg catalog connection successful
ℹ️  Both Iceberg and Delta Lake are now configured and ready to use


In [6]:
# Create database and Hive tables
print("🏗️ Creating database and Hive tables...")

# Create database
spark.sql("CREATE DATABASE IF NOT EXISTS lasagna_demo")
spark.sql("USE lasagna_demo")

# Convert pandas DataFrames to Spark DataFrames
employees_spark = spark.createDataFrame(employees_df)
sales_spark = spark.createDataFrame(sales_df)

# Create Hive tables
print("📝 Creating employees_hive table...")
employees_spark.write.mode("overwrite").saveAsTable("employees_hive")

print("📝 Creating sales_hive table...")
sales_spark.write.mode("overwrite").saveAsTable("sales_hive")

print("✅ Hive tables created successfully")

# Verify tables
print("\n📋 Available tables:")
spark.sql("SHOW TABLES").show()


🏗️ Creating database and Hive tables...
📝 Creating employees_hive table...


                                                                                

📝 Creating sales_hive table...


25/09/22 22:29:07 WARN TaskSetManager: Stage 1 contains a task of very large size (1690 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

✅ Hive tables created successfully

📋 Available tables:
+------------+-----------------+-----------+
|   namespace|        tableName|isTemporary|
+------------+-----------------+-----------+
|lasagna_demo|  employees_delta|      false|
|lasagna_demo|   employees_hive|      false|
|lasagna_demo|employees_iceberg|      false|
|lasagna_demo|      sales_delta|      false|
|lasagna_demo|       sales_hive|      false|
+------------+-----------------+-----------+



In [7]:
# Test Hive table queries
print("🔍 Testing Hive table queries...")

# Basic queries
print("\n1️⃣ Employee count by department:")
spark.sql("""
    SELECT department, COUNT(*) as employee_count 
    FROM employees_hive 
    GROUP BY department 
    ORDER BY employee_count DESC
""").show()

print("\n2️⃣ Average salary by position:")
spark.sql("""
    SELECT position, ROUND(AVG(salary), 2) as avg_salary 
    FROM employees_hive 
    GROUP BY position 
    ORDER BY avg_salary DESC
""").show()

print("\n3️⃣ Sales summary by region:")
spark.sql("""
    SELECT region, 
           COUNT(*) as total_sales,
           ROUND(SUM(total_amount), 2) as total_revenue,
           ROUND(AVG(total_amount), 2) as avg_sale_amount
    FROM sales_hive 
    GROUP BY region 
    ORDER BY total_revenue DESC
""").show()

print("\n4️⃣ Top selling products:")
spark.sql("""
    SELECT product, 
           COUNT(*) as sales_count,
           ROUND(SUM(total_amount), 2) as total_revenue
    FROM sales_hive 
    GROUP BY product 
    ORDER BY total_revenue DESC
    LIMIT 5
""").show()


🔍 Testing Hive table queries...

1️⃣ Employee count by department:
+-----------+--------------+
| department|employee_count|
+-----------+--------------+
| Operations|          1716|
|      Sales|          1689|
|    Finance|          1677|
|Engineering|          1674|
|  Marketing|          1631|
|         HR|          1613|
+-----------+--------------+


2️⃣ Average salary by position:
+---------+----------+
| position|avg_salary|
+---------+----------+
|Mid-level| 120800.47|
|   Intern| 119941.05|
|   Junior| 119746.57|
|   Senior| 119532.88|
|  Manager| 119135.56|
+---------+----------+


3️⃣ Sales summary by region:
+-------------+-----------+-------------+---------------+
|       region|total_sales|total_revenue|avg_sale_amount|
+-------------+-----------+-------------+---------------+
|       Europe|      10104|5.696575425E7|        5637.94|
|         Asia|       9869|5.645521584E7|        5720.46|
|South America|       9981|5.588214471E7|        5598.85|
|       Africa|      10

## 4. Delta Lake - ACID Transactions and Time Travel

Now let's test Delta Lake capabilities including ACID transactions and time travel:


In [8]:
# Create Delta Lake tables
print("🔄 Creating Delta Lake tables...")

# Create a separate Spark session with Delta Lake support
# print("🔄 Creating Spark session with Delta Lake support...")

# Convert pandas DataFrames to Delta Spark DataFrames with explicit schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, DateType

# Define schema for employees table
employees_schema = StructType([
    StructField("employee_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("department", StringType(), True),
    StructField("position", StringType(), True),
    StructField("location", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("hire_date", DateType(), True),
    StructField("is_active", BooleanType(), True)
])

# Define schema for sales table
sales_schema = StructType([
    StructField("sale_id", StringType(), True),
    StructField("product", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", FloatType(), True),
    StructField("total_amount", FloatType(), True),
    StructField("region", StringType(), True),
    StructField("sale_date", DateType(), True),
    StructField("customer_id", StringType(), True)
])

# Convert date strings to proper date objects for Spark compatibility
print("📝 Converting date strings to date objects...")
employees_df_copy = employees_df.copy()
employees_df_copy['hire_date'] = pd.to_datetime(employees_df_copy['hire_date']).dt.date

sales_df_copy = sales_df.copy()
sales_df_copy['sale_date'] = pd.to_datetime(sales_df_copy['sale_date']).dt.date

# Create Delta Spark DataFrames
print("📝 Converting pandas DataFrames to Delta Spark DataFrames...")
employees_delta_spark = spark.createDataFrame(employees_df_copy, schema=employees_schema)
sales_delta_spark = spark.createDataFrame(sales_df_copy, schema=sales_schema)

# Create Delta tables
print("📝 Creating employees_delta table...")
# Drop table first if it exists
spark.sql("DROP TABLE IF EXISTS employees_delta")
employees_delta_spark.write.format("delta").saveAsTable("employees_delta")

print("📝 Creating sales_delta table...")
# Drop table first if it exists
spark.sql("DROP TABLE IF EXISTS sales_delta")
sales_delta_spark.write.format("delta").saveAsTable("sales_delta")

print("✅ Delta Lake tables created successfully")

# Show Delta table history
print("\n📜 Delta table history (employees_delta):")
spark.sql("DESCRIBE HISTORY employees_delta").show()

print("\n📜 Delta table history (sales_delta):")
spark.sql("DESCRIBE HISTORY sales_delta").show()

# Demonstrate Delta Lake features
print("\n🔄 Demonstrating Delta Lake ACID transactions...")
print("📝 Adding a new record to employees_delta...")
new_employee = spark.createDataFrame([("EMP_99999", "Test", "User", "test@company.com", "Engineering", "Senior", "New York", 100000, date(2024, 1, 1), True)], schema=employees_schema)
new_employee.write.format("delta").mode("append").saveAsTable("employees_delta")

print("📊 Updated record count:")
spark.sql("SELECT COUNT(*) as total_employees FROM employees_delta").show()

print("\n⏰ Time travel - showing previous version:")
spark.sql("SELECT COUNT(*) as previous_count FROM employees_delta VERSION AS OF 0").show()


🔄 Creating Delta Lake tables...
📝 Converting date strings to date objects...
📝 Converting pandas DataFrames to Delta Spark DataFrames...
📝 Creating employees_delta table...


25/09/22 22:29:25 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/09/22 22:29:29 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`lasagna_demo`.`employees_delta` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


📝 Creating sales_delta table...


25/09/22 22:29:30 WARN TaskSetManager: Stage 24 contains a task of very large size (1450 KiB). The maximum recommended task size is 1000 KiB.
25/09/22 22:29:41 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`lasagna_demo`.`sales_delta` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


✅ Delta Lake tables created successfully

📜 Delta table history (employees_delta):


                                                                                

+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2025-09-22 22:29:24|  null|    null|CREATE TABLE AS S...|{isManaged -> tru...|null|    null|     null|       null|  Serializable|         true|{numFiles -> 2, n...|        null|Apache-Spark/3.4....|
+-------+-------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+----------------

                                                                                

📊 Updated record count:


                                                                                

+---------------+
|total_employees|
+---------------+
|          10001|
+---------------+


⏰ Time travel - showing previous version:




+--------------+
|previous_count|
+--------------+
|         10000|
+--------------+



                                                                                

In [9]:
def remove_s3_location(table_name, database_name="lasagna_demo"):
    """Remove table location from S3/MinIO with proper configuration"""
    try:
        # Get the current Hadoop configuration
        hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
        
        # Ensure S3A configuration is set
        hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")  # Use container name
        hadoop_conf.set("fs.s3a.path.style.access", "true")
        hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
        
        # Create filesystem object with the configured settings
        fs = spark.sparkContext._jvm.org.apache.hadoop.fs.FileSystem.get(
            spark.sparkContext._jvm.java.net.URI("s3a://warehouse"), 
            hadoop_conf
        )
        
        # Construct the path
        location_path = f"s3a://warehouse/{database_name}/{table_name}"
        path = spark.sparkContext._jvm.org.apache.hadoop.fs.Path(location_path)
        
        # Check if path exists and delete it
        if fs.exists(path):
            success = fs.delete(path, True)  # True = recursive
            if success:
                print(f"✅ Successfully removed location: {location_path}")
                return True
            else:
                print(f"❌ Failed to remove location: {location_path}")
                return False
        else:
            print(f"ℹ️ Location doesn't exist: {location_path}")
            return True
            
    except Exception as e:
        print(f"❌ Error removing location: {e}")
        return False

# Usage
# remove_s3_location("", "sales_delta")

In [None]:
# Stop the current Spark session if it exists
try:
    spark.stop()
except:
    pass

# Create a new Spark session
spark = SparkSession.builder \
    .appName("LASAGNA-Architecture-Test") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2,io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hive") \
    .config("spark.sql.catalog.iceberg.uri", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

# Verify the connection
print("✅ Spark session restarted successfully!")
print(f"🔗 Spark UI: http://localhost:4040")
print(f"📊 Spark version: {spark.version}")

In [5]:
# Demonstrate ACID transactions and updates
print("🔒 Testing ACID transactions...")

# Record current version
current_version = spark.sql("DESCRIBE HISTORY employees_delta").collect()[0]['version']
print(f"📊 Current version: {current_version}")

# Perform updates (ACID transaction)
print("\n🔄 Performing updates...")

# Update salaries for Engineering department
spark.sql("""
    UPDATE employees_delta 
    SET salary = salary * 1.1 
    WHERE department = 'Engineering'
""")

# Insert new employee
spark.sql("""
    INSERT INTO employees_delta VALUES 
    ('EMP_999999', 'New', 'Employee', 'new.employee@company.com', 
     'Engineering', 'Senior', 'San Francisco', 120000, '2024-01-15', true)
""")

# Delete inactive employees from HR
spark.sql("""
    DELETE FROM employees_delta 
    WHERE department = 'HR' AND is_active = false
""")

print("✅ ACID transactions completed")

# Show updated history
print("\n📜 Updated Delta table history:")
spark.sql("DESCRIBE HISTORY employees_delta").show()


🔒 Testing ACID transactions...


ConnectionRefusedError: [Errno 111] Connection refused

In [9]:
# Demonstrate Time Travel
print("⏰ Testing Delta Lake Time Travel...")

# Show current data
print("\n📊 Current Engineering employees count:")
spark.sql("""
    SELECT COUNT(*) as current_count 
    FROM employees_delta 
    WHERE department = 'Engineering'
""").show()

# Travel back to version 0 (original data)
print("\n🕰️ Time traveling to version 0...")
spark.sql("""
    SELECT COUNT(*) as original_count 
    FROM employees_delta VERSION AS OF 0 
    WHERE department = 'Engineering'
""").show()

# Compare average salaries
print("\n💰 Salary comparison - Current vs Original:")
print("Current average salary for Engineering:")
spark.sql("""
    SELECT ROUND(AVG(salary), 2) as current_avg_salary 
    FROM employees_delta 
    WHERE department = 'Engineering'
""").show()

print("Original average salary for Engineering:")
spark.sql("""
    SELECT ROUND(AVG(salary), 2) as original_avg_salary 
    FROM employees_delta VERSION AS OF 0 
    WHERE department = 'Engineering'
""").show()

print("✅ Time travel demonstration completed")


⏰ Testing Delta Lake Time Travel...

📊 Current Engineering employees count:
+-------------+
|current_count|
+-------------+
|         1676|
+-------------+


🕰️ Time traveling to version 0...


                                                                                

+--------------+
|original_count|
+--------------+
|          1674|
+--------------+


💰 Salary comparison - Current vs Original:
Current average salary for Engineering:


                                                                                

+------------------+
|current_avg_salary|
+------------------+
|         131149.42|
+------------------+

Original average salary for Engineering:


                                                                                

+-------------------+
|original_avg_salary|
+-------------------+
|           119244.7|
+-------------------+

✅ Time travel demonstration completed


## 5. Apache Iceberg - Schema Evolution and Advanced Partitioning

Let's test Iceberg's advanced features including schema evolution and partitioning:


In [15]:
# Create Iceberg tables with partitioning
print("🧊 Creating Iceberg tables with partitioning...")

# First, let's create a new Spark session with Iceberg support
print("🔄 Creating Spark session with Iceberg support...")

# iceberg_spark = SparkSession.builder \
#    .appName("LASAGNA-Iceberg-Test") \
#    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
#    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
#    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
#    .config("spark.sql.catalog.iceberg.type", "hive") \
#    .config("spark.sql.catalog.iceberg.uri", "thrift://hive-metastore:9083") \
#    .enableHiveSupport() \
#    .getOrCreate()

# Create Iceberg catalog
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.lasagna_demo")

# Create partitioned Iceberg table
print("📝 Creating partitioned employees_iceberg table...")
spark.sql("""
    CREATE OR REPLACE TABLE iceberg.lasagna_demo.employees_iceberg (
        employee_id STRING,
        first_name STRING,
        last_name STRING,
        email STRING,
        department STRING,
        position STRING,
        location STRING,
        salary INT,
        hire_date DATE,
        is_active BOOLEAN
    ) USING iceberg
    PARTITIONED BY (department, location)
""")

# Insert data into Iceberg table
employees_spark.writeTo("iceberg.lasagna_demo.employees_iceberg").append()

print("✅ Iceberg table created and populated")

# Show table details
print("\n📋 Iceberg table details:")
spark.sql("DESCRIBE TABLE EXTENDED iceberg.lasagna_demo.employees_iceberg").show(truncate=False)


🧊 Creating Iceberg tables with partitioning...
🔄 Creating Spark session with Iceberg support...
📝 Creating partitioned employees_iceberg table...


                                                                                

✅ Iceberg table created and populated

📋 Iceberg table details:
+-----------------------+-----------------------------------------+-------+
|col_name               |data_type                                |comment|
+-----------------------+-----------------------------------------+-------+
|employee_id            |string                                   |null   |
|first_name             |string                                   |null   |
|last_name              |string                                   |null   |
|email                  |string                                   |null   |
|department             |string                                   |null   |
|position               |string                                   |null   |
|location               |string                                   |null   |
|salary                 |int                                      |null   |
|hire_date              |date                                     |null   |
|is_active              

In [17]:
# Demonstrate Schema Evolution
print("🔄 Testing Iceberg Schema Evolution...")

# Add new columns (one at a time for Iceberg compatibility)
print("\n➕ Adding new columns to Iceberg table...")
spark.sql("""
    ALTER TABLE iceberg.lasagna_demo.employees_iceberg 
    ADD COLUMN bonus DECIMAL(10,2)
""")

spark.sql("""
    ALTER TABLE iceberg.lasagna_demo.employees_iceberg 
    ADD COLUMN performance_rating STRING
""")

# Update existing records with new columns
spark.sql("""
    UPDATE iceberg.lasagna_demo.employees_iceberg 
    SET bonus = CASE 
        WHEN position = 'Manager' THEN salary * 0.2
        WHEN position = 'Senior' THEN salary * 0.15
        ELSE salary * 0.1
    END,
    performance_rating = CASE 
        WHEN salary > 150000 THEN 'Excellent'
        WHEN salary > 100000 THEN 'Good'
        ELSE 'Average'
    END
""")

print("✅ Schema evolution completed")

# Show updated schema
print("\n📋 Updated table schema:")
spark.sql("DESCRIBE iceberg.lasagna_demo.employees_iceberg").show()

# Test queries on evolved schema
print("\n🔍 Testing queries on evolved schema:")
spark.sql("""
    SELECT department, 
           COUNT(*) as employee_count,
           ROUND(AVG(bonus), 2) as avg_bonus,
           COUNT(CASE WHEN performance_rating = 'Excellent' THEN 1 END) as excellent_performers
    FROM iceberg.lasagna_demo.employees_iceberg 
    GROUP BY department 
    ORDER BY avg_bonus DESC
""").show()


🔄 Testing Iceberg Schema Evolution...

➕ Adding new columns to Iceberg table...


25/09/22 21:39:54 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
25/09/22 21:39:54 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
                                                                                

✅ Schema evolution completed

📋 Updated table schema:
+--------------------+-------------+-------+
|            col_name|    data_type|comment|
+--------------------+-------------+-------+
|         employee_id|       string|   null|
|          first_name|       string|   null|
|           last_name|       string|   null|
|               email|       string|   null|
|          department|       string|   null|
|            position|       string|   null|
|            location|       string|   null|
|              salary|          int|   null|
|           hire_date|         date|   null|
|           is_active|      boolean|   null|
|               bonus|decimal(10,2)|   null|
|  performance_rating|       string|   null|
|# Partition Infor...|             |       |
|          # col_name|    data_type|comment|
|          department|       string|   null|
|            location|       string|   null|
+--------------------+-------------+-------+


🔍 Testing queries on evolved schema:
+------

## 6. Trino - Cross-Engine Querying

Let's test Trino's ability to query across all our table formats:


In [19]:
# Connect to Trino
print("🔗 Connecting to Trino...")

try:
    conn = trino.dbapi.connect(
        host='trino',
        port=8080,
        user='admin',
        catalog='hive',
        schema='lasagna_demo'
    )
    
    cursor = conn.cursor()
    print("✅ Trino connection successful")
    
    # Test basic query
    cursor.execute("SHOW TABLES")
    tables = cursor.fetchall()
    print(f"\n📋 Available tables in Trino: {[table[0] for table in tables]}")
    
except Exception as e:
    print(f"❌ Trino connection failed: {e}")
    print("💡 Make sure Trino is running on localhost:8080")


🔗 Connecting to Trino...
✅ Trino connection successful

📋 Available tables in Trino: ['employees_hive', 'employees_iceberg', 'sales_delta', 'sales_hive']


In [20]:
# Test Trino queries across different catalogs
if 'conn' in locals():
    print("🔍 Testing Trino queries across catalogs...")
    
    # Query Hive tables
    print("\n1️⃣ Querying Hive tables:")
    cursor.execute("""
        SELECT department, COUNT(*) as employee_count 
        FROM hive.lasagna_demo.employees_hive 
        GROUP BY department 
        ORDER BY employee_count DESC
        LIMIT 5
    """)
    hive_results = cursor.fetchall()
    print("Hive table results:")
    for row in hive_results:
        print(f"  {row[0]}: {row[1]} employees")
    
    # Query Delta tables
    print("\n2️⃣ Querying Delta tables:")
    try:
        cursor.execute("""
            SELECT department, COUNT(*) as employee_count 
            FROM delta_lake.lasagna_demo.employees_delta 
            GROUP BY department 
            ORDER BY employee_count DESC
            LIMIT 5
        """)
        delta_results = cursor.fetchall()
        print("Delta table results:")
        for row in delta_results:
            print(f"  {row[0]}: {row[1]} employees")
    except Exception as e:
        print(f"⚠️ Delta query failed: {e}")
    
    # Query Iceberg tables
    print("\n3️⃣ Querying Iceberg tables:")
    try:
        cursor.execute("""
            SELECT department, COUNT(*) as employee_count 
            FROM iceberg.lasagna_demo.employees_iceberg 
            GROUP BY department 
            ORDER BY employee_count DESC
            LIMIT 5
        """)
        iceberg_results = cursor.fetchall()
        print("Iceberg table results:")
        for row in iceberg_results:
            print(f"  {row[0]}: {row[1]} employees")
    except Exception as e:
        print(f"⚠️ Iceberg query failed: {e}")
    
    print("\n✅ Trino cross-catalog querying completed")
else:
    print("⚠️ Skipping Trino tests - connection not available")


🔍 Testing Trino queries across catalogs...

1️⃣ Querying Hive tables:
Hive table results:
  Operations: 1716 employees
  Sales: 1689 employees
  Finance: 1677 employees
  Engineering: 1674 employees
  Marketing: 1631 employees

2️⃣ Querying Delta tables:
⚠️ Delta query failed: TrinoUserError(type=USER_ERROR, name=CATALOG_NOT_FOUND, message="line 3:18: Catalog 'delta_lake' not found", query_id=20250922_214036_00002_2rjez)

3️⃣ Querying Iceberg tables:
Iceberg table results:
  Operations: 1716 employees
  Sales: 1689 employees
  Finance: 1677 employees
  Engineering: 1674 employees
  Marketing: 1631 employees

✅ Trino cross-catalog querying completed


## 7. Performance Comparison

Let's compare query performance across different table formats:


In [None]:
# Performance comparison function
def measure_query_performance(query, table_name, description):
    start_time = time.time()
    try:
        result = spark.sql(query)
        result.collect()  # Force execution
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"✅ {description} ({table_name}): {execution_time:.2f} seconds")
        return execution_time
    except Exception as e:
        print(f"❌ {description} ({table_name}): Failed - {e}")
        return None

# Test query performance
print("⚡ Performance comparison across table formats...")

# Complex analytical query
complex_query = """
    SELECT department, 
           location,
           COUNT(*) as employee_count,
           ROUND(AVG(salary), 2) as avg_salary,
           ROUND(MAX(salary), 2) as max_salary,
           ROUND(MIN(salary), 2) as min_salary
    FROM {table} 
    WHERE is_active = true
    GROUP BY department, location
    HAVING COUNT(*) > 50
    ORDER BY avg_salary DESC
"""

print("\n🔍 Running complex analytical queries...")

# Test Hive performance
hive_time = measure_query_performance(
    complex_query.format(table="employees_hive"),
    "Hive",
    "Complex Analytics Query"
)

# Test Delta performance
delta_time = measure_query_performance(
    complex_query.format(table="employees_delta"),
    "Delta Lake",
    "Complex Analytics Query"
)

# Test Iceberg performance
iceberg_time = measure_query_performance(
    complex_query.format(table="iceberg.lasagna_demo.employees_iceberg"),
    "Iceberg",
    "Complex Analytics Query"
)

print("\n📊 Performance Summary:")
if hive_time:
    print(f"  Hive: {hive_time:.2f}s")
if delta_time:
    print(f"  Delta Lake: {delta_time:.2f}s")
if iceberg_time:
    print(f"  Iceberg: {iceberg_time:.2f}s")


## 8. Architecture Summary and Verification

Let's verify all components are working and summarize what we've tested:


In [None]:
# Final verification and summary
print("🎯 LASAGNA Architecture Verification Summary")
print("=" * 50)

# Check all tables exist
print("\n📋 Table Verification:")
tables_to_check = [
    ("employees_hive", "Hive"),
    ("sales_hive", "Hive"),
    ("employees_delta", "Delta Lake"),
    ("sales_delta", "Delta Lake"),
    ("iceberg.lasagna_demo.employees_iceberg", "Iceberg")
]

for table, format_type in tables_to_check:
    try:
        count = spark.sql(f"SELECT COUNT(*) FROM {table}").collect()[0][0]
        print(f"✅ {format_type}: {table} - {count:,} records")
    except Exception as e:
        print(f"❌ {format_type}: {table} - Error: {e}")

# Check MinIO storage
print("\n🗄️ Storage Verification:")
try:
    # Check if we can read from S3
    spark.sql("SELECT COUNT(*) FROM employees_hive").collect()
    print("✅ MinIO S3 Storage: Accessible")
except Exception as e:
    print(f"❌ MinIO S3 Storage: Error - {e}")

# Check Hive Metastore
print("\n📊 Metadata Verification:")
try:
    databases = spark.sql("SHOW DATABASES").collect()
    print(f"✅ Hive Metastore: {len(databases)} databases found")
    print(f"   Databases: {[db[0] for db in databases]}")
except Exception as e:
    print(f"❌ Hive Metastore: Error - {e}")

# Check Spark Cluster
print("\n⚡ Spark Cluster Status:")
try:
    print(f"✅ Spark Master: {spark.conf.get('spark.master')}")
    print(f"✅ Spark Version: {spark.version}")
    print(f"✅ Executor Instances: {spark.conf.get('spark.executor.instances')}")
except Exception as e:
    print(f"❌ Spark Cluster: Error - {e}")

print("\n🎉 Architecture Test Complete!")
print("\n📝 What We Successfully Tested:")
print("   ✅ MinIO object storage integration")
print("   ✅ PostgreSQL metadata persistence")
print("   ✅ Hive Metastore catalog service")
print("   ✅ Spark distributed processing")
print("   ✅ Hive table operations")
print("   ✅ Delta Lake ACID transactions and time travel")
print("   ✅ Iceberg schema evolution and partitioning")
print("   ✅ Trino cross-engine querying")
print("   ✅ Performance comparisons")
print("\n🚀 Your LASAGNA big data architecture is fully functional!")
