# Advanced Upsert Operations with Apache Iceberg and Spark

This notebook provides a comprehensive tutorial on performing upsert (MERGE) operations with Apache Iceberg tables using Apache Spark. We'll explore various merge strategies, performance optimizations, and best practices for data synchronization.

## Learning Objectives
- Understand Iceberg's MERGE INTO capabilities
- Learn different upsert patterns and strategies
- Explore conditional merge logic
- Practice performance optimization techniques
- Handle complex merge scenarios

## Prerequisites
- Previous notebooks (1-4) completed
- Understanding of Spark SQL and DataFrames
- Basic knowledge of data synchronization concepts

## 1. Environment Setup and Initialization

First, let's establish our Spark session and verify the environment configuration.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, BooleanType, IntegerType, TimestampType
from pyspark.sql.functions import col, current_timestamp
from datetime import datetime
import time

In [2]:
# Create Spark session with optimized configuration for merge operations
spark = SparkSession.builder \
    .appName("Advanced Iceberg Upsert Tutorial") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Default Catalog: {spark.conf.get('spark.sql.defaultCatalog')}")
print(f"Adaptive Query Execution: {spark.conf.get('spark.sql.adaptive.enabled')}")

# Set catalog context
spark.sql("USE rest.`play-iceberg`")
print("\nCatalog context set to: rest.play-iceberg")

25/06/27 06:52:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Spark Version: 3.5.5
Default Catalog: rest
Adaptive Query Execution: true

Catalog context set to: rest.play-iceberg


## 2. Current Table State Analysis

Before performing merge operations, let's analyze our current table state and understand the data distribution.

In [3]:
# Analyze current table state
current_data = spark.sql("""
    SELECT 
        COUNT(*) as total_records,
        COUNT(CASE WHEN is_active = true THEN 1 END) as active_users,
        COUNT(CASE WHEN is_active = false THEN 1 END) as inactive_users,
        MIN(user_id) as min_user_id,
        MAX(user_id) as max_user_id
    FROM users
""")

print("Current Table Statistics:")
current_data.show()

# Display current records
print("\nCurrent Records:")
spark.sql("SELECT * FROM users ORDER BY user_id").show()

Current Table Statistics:


                                                                                

+-------------+------------+--------------+-----------+-----------+
|total_records|active_users|inactive_users|min_user_id|max_user_id|
+-------------+------------+--------------+-----------+-----------+
|            5|           4|             1|          1|          5|
+-------------+------------+--------------+-----------+-----------+


Current Records:
+-------+-------------+--------------------+---------+------------+-------------+-----------+--------------------+
|user_id|     username|               email|is_active|created_year|created_month|created_day|          updated_at|
+-------+-------------+--------------------+---------+------------+-------------+-----------+--------------------+
|      1|     john_doe|john.doe@example.com|     true|        2025|            6|         27|2025-06-27 06:37:...|
|      2|   jane_smith|jane.smith@exampl...|     true|        2025|            6|         27|2025-06-27 06:37:...|
|      3| alice_wonder|alice.wonder@exam...|    false|        2025

## 3. Basic Upsert Operation

Let's start with a basic MERGE INTO operation that demonstrates the fundamental upsert pattern.

In [4]:
# Create sample data for basic upsert
# This will update existing users and insert new ones
current_timestamp = datetime.now()
current_year = current_timestamp.year
current_month = current_timestamp.month
current_day = current_timestamp.day

# Define changes:
# - User 1: Update email and username
# - User 3: Activate user account
# - User 6: New user insertion
# - User 7: Another new user
basic_upsert_data = [
    {
        'user_id': 1, 
        'username': 'john_doe_updated', 
        'email': 'john.doe.updated@example.com', 
        'is_active': False, 
        'created_year': current_year, 
        'created_month': current_month, 
        'created_day': current_day, 
        'updated_at': current_timestamp
    },
    {
        'user_id': 3, 
        'username': 'alice_wonder_active', 
        'email': 'alice.wonder.active@example.com', 
        'is_active': True, 
        'created_year': current_year, 
        'created_month': current_month, 
        'created_day': current_day, 
        'updated_at': current_timestamp
    },
    {
        'user_id': 6, 
        'username': 'marketing_user', 
        'email': 'marketing@company.com', 
        'is_active': True, 
        'created_year': current_year, 
        'created_month': current_month, 
        'created_day': current_day, 
        'updated_at': current_timestamp
    },
    {
        'user_id': 7, 
        'username': 'support_user', 
        'email': 'support@company.com', 
        'is_active': True, 
        'created_year': current_year, 
        'created_month': current_month, 
        'created_day': current_day, 
        'updated_at': current_timestamp
    }
]

# Create DataFrame and temporary view
updates_df = spark.createDataFrame(basic_upsert_data, spark.table("users").schema)
updates_df.createOrReplaceTempView("user_updates")

print("Data to be upserted:")
updates_df.show(truncate=False)

Data to be upserted:


                                                                                

+-------+-------------------+-------------------------------+---------+------------+-------------+-----------+--------------------------+
|user_id|username           |email                          |is_active|created_year|created_month|created_day|updated_at                |
+-------+-------------------+-------------------------------+---------+------------+-------------+-----------+--------------------------+
|1      |john_doe_updated   |john.doe.updated@example.com   |false    |2025        |6            |27         |2025-06-27 06:53:17.799054|
|3      |alice_wonder_active|alice.wonder.active@example.com|true     |2025        |6            |27         |2025-06-27 06:53:17.799054|
|6      |marketing_user     |marketing@company.com          |true     |2025        |6            |27         |2025-06-27 06:53:17.799054|
|7      |support_user       |support@company.com            |true     |2025        |6            |27         |2025-06-27 06:53:17.799054|
+-------+-------------------+-----

                                                                                

In [5]:
# Execute basic MERGE INTO operation
merge_start_time = time.time()

spark.sql("""
    MERGE INTO users AS target
    USING user_updates AS source
    ON target.user_id = source.user_id
    WHEN MATCHED THEN
        UPDATE SET
            target.username = source.username,
            target.email = source.email,
            target.is_active = source.is_active,
            target.updated_at = source.updated_at
    WHEN NOT MATCHED THEN
        INSERT (
            user_id, username, email, is_active, 
            created_year, created_month, created_day, updated_at
        )
        VALUES (
            source.user_id, source.username, source.email, source.is_active,
            source.created_year, source.created_month, source.created_day, source.updated_at
        )
""")

merge_duration = time.time() - merge_start_time
print(f"Basic merge operation completed in {merge_duration:.3f} seconds")

                                                                                

Basic merge operation completed in 13.237 seconds


In [6]:
# Verify the merge results
print("Table state after basic merge:")
result_df = spark.sql("SELECT * FROM users ORDER BY user_id")
result_df.show(truncate=False)

# Show statistics after merge
print("\nPost-merge statistics:")
spark.sql("""
    SELECT 
        COUNT(*) as total_records,
        COUNT(CASE WHEN is_active = true THEN 1 END) as active_users,
        COUNT(CASE WHEN is_active = false THEN 1 END) as inactive_users
    FROM users
""").show()

Table state after basic merge:
+-------+-------------------+-------------------------------+---------+------------+-------------+-----------+--------------------------+
|user_id|username           |email                          |is_active|created_year|created_month|created_day|updated_at                |
+-------+-------------------+-------------------------------+---------+------------+-------------+-----------+--------------------------+
|1      |john_doe_updated   |john.doe.updated@example.com   |false    |2025        |6            |27         |2025-06-27 06:53:17.799054|
|2      |jane_smith         |jane.smith@example.com         |true     |2025        |6            |27         |2025-06-27 06:37:58.488064|
|3      |alice_wonder_active|alice.wonder.active@example.com|true     |2025        |6            |27         |2025-06-27 06:53:17.799054|
|4      |bob_builder        |bob.builder@example.com        |true     |2025        |6            |27         |2025-06-27 06:37:58.488064|
|5 

## 4. Conditional Merge Operations

Now let's explore more sophisticated merge patterns with conditional logic.

In [7]:
# Create data for conditional merge example
# This demonstrates selective updates based on conditions
conditional_data = [
    {
        'user_id': 2,
        'username': 'jane_smith_premium',
        'email': 'jane.smith.premium@example.com',
        'is_active': True,
        'created_year': 2025,
        'created_month': 6,
        'created_day': 27,
        'updated_at': datetime.now(),
        'last_login_days_ago': 5  # Additional field for conditional logic
    },
    {
        'user_id': 4,
        'username': 'bob_builder_inactive',
        'email': 'bob.builder@example.com',
        'is_active': False,
        'created_year': 2025,
        'created_month': 6,
        'created_day': 27,
        'updated_at': datetime.now(),
        'last_login_days_ago': 45  # Inactive due to long absence
    },
    {
        'user_id': 8,
        'username': 'new_premium_user',
        'email': 'premium@company.com',
        'is_active': True,
        'created_year': 2025,
        'created_month': 6,
        'created_day': 27,
        'updated_at': datetime.now(),
        'last_login_days_ago': 0
    }
]

# Create schema that includes the additional field
extended_schema = StructType([
    StructField("user_id", LongType(), False),
    StructField("username", StringType(), False),
    StructField("email", StringType(), False),
    StructField("is_active", BooleanType(), False),
    StructField("created_year", IntegerType(), False),
    StructField("created_month", IntegerType(), False),
    StructField("created_day", IntegerType(), False),
    StructField("updated_at", TimestampType(), False),
    StructField("last_login_days_ago", IntegerType(), True)
])

conditional_df = spark.createDataFrame(conditional_data, extended_schema)
conditional_df.createOrReplaceTempView("conditional_updates")

print("Conditional update data:")
conditional_df.show()

Conditional update data:


                                                                                

+-------+--------------------+--------------------+---------+------------+-------------+-----------+--------------------+-------------------+
|user_id|            username|               email|is_active|created_year|created_month|created_day|          updated_at|last_login_days_ago|
+-------+--------------------+--------------------+---------+------------+-------------+-----------+--------------------+-------------------+
|      2|  jane_smith_premium|jane.smith.premiu...|     true|        2025|            6|         27|2025-06-27 06:54:...|                  5|
|      4|bob_builder_inactive|bob.builder@examp...|    false|        2025|            6|         27|2025-06-27 06:54:...|                 45|
|      8|    new_premium_user| premium@company.com|     true|        2025|            6|         27|2025-06-27 06:54:...|                  0|
+-------+--------------------+--------------------+---------+------------+-------------+-----------+--------------------+-------------------+



In [8]:
# Execute conditional merge with multiple WHEN clauses
spark.sql("""
    MERGE INTO users AS target
    USING conditional_updates AS source
    ON target.user_id = source.user_id
    WHEN MATCHED AND source.last_login_days_ago < 30 THEN
        UPDATE SET
            target.username = source.username,
            target.email = source.email,
            target.is_active = source.is_active,
            target.updated_at = source.updated_at
    WHEN MATCHED AND source.last_login_days_ago >= 30 THEN
        UPDATE SET
            target.is_active = false,
            target.updated_at = source.updated_at
    WHEN NOT MATCHED THEN
        INSERT (
            user_id, username, email, is_active,
            created_year, created_month, created_day, updated_at
        )
        VALUES (
            source.user_id, source.username, source.email, source.is_active,
            source.created_year, source.created_month, source.created_day, source.updated_at
        )
""")

print("Conditional merge completed")

                                                                                

Conditional merge completed


In [9]:
# Verify conditional merge results
print("Results after conditional merge:")
spark.sql("""
    SELECT 
        user_id, username, email, is_active, updated_at,
        CASE 
            WHEN user_id IN (2, 4, 8) THEN 'Modified by conditional merge'
            ELSE 'Unchanged'
        END as merge_status
    FROM users 
    ORDER BY user_id
""").show(truncate=False)

print("\nNote: User 4 should be deactivated due to last_login_days_ago >= 30")

Results after conditional merge:
+-------+-------------------+-------------------------------+---------+--------------------------+-----------------------------+
|user_id|username           |email                          |is_active|updated_at                |merge_status                 |
+-------+-------------------+-------------------------------+---------+--------------------------+-----------------------------+
|1      |john_doe_updated   |john.doe.updated@example.com   |false    |2025-06-27 06:53:17.799054|Unchanged                    |
|2      |jane_smith_premium |jane.smith.premium@example.com |true     |2025-06-27 06:54:07.253577|Modified by conditional merge|
|3      |alice_wonder_active|alice.wonder.active@example.com|true     |2025-06-27 06:53:17.799054|Unchanged                    |
|4      |bob_builder        |bob.builder@example.com        |false    |2025-06-27 06:54:07.253598|Modified by conditional merge|
|5      |charlie_brown      |charlie.brown@example.com      |tru

## 5. Bulk Upsert Performance Optimization

Let's demonstrate handling larger datasets and optimization techniques.

In [10]:
# Generate bulk data for performance testing
import random
from datetime import datetime, timedelta

def generate_bulk_users(start_id, count):
    """Generate bulk user data for testing."""
    bulk_data = []
    base_time = datetime.now()
    
    for i in range(count):
        user_id = start_id + i
        bulk_data.append({
            'user_id': user_id,
            'username': f'bulk_user_{user_id}',
            'email': f'bulk.user.{user_id}@example.com',
            'is_active': random.choice([True, False]),
            'created_year': 2025,
            'created_month': 6,
            'created_day': 27,
            'updated_at': base_time + timedelta(seconds=i)
        })
    
    return bulk_data

# Generate 1000 new users and 100 updates to existing users
new_users = generate_bulk_users(100, 1000)
existing_updates = []

# Update some existing users (1-8)
for user_id in range(1, 8):
    existing_updates.append({
        'user_id': user_id,
        'username': f'updated_user_{user_id}',
        'email': f'updated.user.{user_id}@example.com',
        'is_active': True,
        'created_year': 2025,
        'created_month': 6,
        'created_day': 27,
        'updated_at': datetime.now()
    })

# Combine all data
bulk_upsert_data = new_users + existing_updates
print(f"Generated {len(bulk_upsert_data)} records for bulk upsert")
print(f"- New users: {len(new_users)}")
print(f"- Updated existing users: {len(existing_updates)}")

Generated 1007 records for bulk upsert
- New users: 1000
- Updated existing users: 7


In [11]:
# Create DataFrame with optimal partitioning
bulk_df = spark.createDataFrame(bulk_upsert_data, spark.table("users").schema)

# Optimize DataFrame for merge operation
bulk_df_optimized = bulk_df \
    .repartition(col("created_year"), col("created_month")) \
    .cache()

bulk_df_optimized.createOrReplaceTempView("bulk_updates")

print("Bulk DataFrame created and optimized")
print(f"Partitions: {bulk_df_optimized.rdd.getNumPartitions()}")

# Show sample of bulk data
print("\nSample of bulk update data:")
bulk_df_optimized.limit(10).show()

Bulk DataFrame created and optimized




Partitions: 200

Sample of bulk update data:


                                                                                

+-------+-------------+--------------------+---------+------------+-------------+-----------+--------------------+
|user_id|     username|               email|is_active|created_year|created_month|created_day|          updated_at|
+-------+-------------+--------------------+---------+------------+-------------+-----------+--------------------+
|    100|bulk_user_100|bulk.user.100@exa...|     true|        2025|            6|         27|2025-06-27 06:54:...|
|    101|bulk_user_101|bulk.user.101@exa...|    false|        2025|            6|         27|2025-06-27 06:54:...|
|    102|bulk_user_102|bulk.user.102@exa...|    false|        2025|            6|         27|2025-06-27 06:54:...|
|    103|bulk_user_103|bulk.user.103@exa...|    false|        2025|            6|         27|2025-06-27 06:54:...|
|    104|bulk_user_104|bulk.user.104@exa...|     true|        2025|            6|         27|2025-06-27 06:54:...|
|    105|bulk_user_105|bulk.user.105@exa...|    false|        2025|            6

In [12]:
# Execute bulk merge with performance monitoring
bulk_start_time = time.time()

# Enable query plan analysis
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

merge_result = spark.sql("""
    MERGE INTO users AS target
    USING bulk_updates AS source
    ON target.user_id = source.user_id
    WHEN MATCHED THEN
        UPDATE SET
            target.username = source.username,
            target.email = source.email,
            target.is_active = source.is_active,
            target.updated_at = source.updated_at
    WHEN NOT MATCHED THEN
        INSERT (
            user_id, username, email, is_active,
            created_year, created_month, created_day, updated_at
        )
        VALUES (
            source.user_id, source.username, source.email, source.is_active,
            source.created_year, source.created_month, source.created_day, source.updated_at
        )
""")

bulk_duration = time.time() - bulk_start_time
print(f"Bulk merge operation completed in {bulk_duration:.3f} seconds")
print(f"Throughput: {len(bulk_upsert_data) / bulk_duration:.0f} records/second")

                                                                                

Bulk merge operation completed in 4.492 seconds
Throughput: 224 records/second


In [13]:
# Verify bulk merge results
final_stats = spark.sql("""
    SELECT 
        COUNT(*) as total_records,
        COUNT(CASE WHEN is_active = true THEN 1 END) as active_users,
        COUNT(CASE WHEN is_active = false THEN 1 END) as inactive_users,
        MIN(user_id) as min_user_id,
        MAX(user_id) as max_user_id,
        COUNT(CASE WHEN user_id >= 100 THEN 1 END) as bulk_inserted_users
    FROM users
""")

print("Final table statistics:")
final_stats.show()

# Show sample of recently added users
print("\nSample of bulk-inserted users:")
spark.sql("""
    SELECT user_id, username, email, is_active 
    FROM users 
    WHERE user_id BETWEEN 100 AND 109 
    ORDER BY user_id
""").show()

Final table statistics:
+-------------+------------+--------------+-----------+-----------+-------------------+
|total_records|active_users|inactive_users|min_user_id|max_user_id|bulk_inserted_users|
+-------------+------------+--------------+-----------+-----------+-------------------+
|         1008|         518|           490|          1|       1099|               1000|
+-------------+------------+--------------+-----------+-----------+-------------------+


Sample of bulk-inserted users:
+-------+-------------+--------------------+---------+
|user_id|     username|               email|is_active|
+-------+-------------+--------------------+---------+
|    100|bulk_user_100|bulk.user.100@exa...|     true|
|    101|bulk_user_101|bulk.user.101@exa...|    false|
|    102|bulk_user_102|bulk.user.102@exa...|    false|
|    103|bulk_user_103|bulk.user.103@exa...|    false|
|    104|bulk_user_104|bulk.user.104@exa...|     true|
|    105|bulk_user_105|bulk.user.105@exa...|    false|
|    106

## 6. Delete Operations with MERGE

Iceberg also supports DELETE operations within MERGE statements for data cleanup.

In [14]:
# Create a cleanup dataset - users to be deleted or deactivated
cleanup_data = [
    {'user_id': 105, 'action': 'delete'},
    {'user_id': 110, 'action': 'delete'},
    {'user_id': 115, 'action': 'deactivate'},
    {'user_id': 120, 'action': 'deactivate'}
]

cleanup_schema = StructType([
    StructField("user_id", LongType(), False),
    StructField("action", StringType(), False)
])

cleanup_df = spark.createDataFrame(cleanup_data, cleanup_schema)
cleanup_df.createOrReplaceTempView("cleanup_actions")

print("Cleanup actions to be performed:")
cleanup_df.show()

# Check current state of these users
print("\nCurrent state of affected users:")
spark.sql("""
    SELECT user_id, username, email, is_active 
    FROM users 
    WHERE user_id IN (105, 110, 115, 120)
    ORDER BY user_id
""").show()

Cleanup actions to be performed:
+-------+----------+
|user_id|    action|
+-------+----------+
|    105|    delete|
|    110|    delete|
|    115|deactivate|
|    120|deactivate|
+-------+----------+


Current state of affected users:
+-------+-------------+--------------------+---------+
|user_id|     username|               email|is_active|
+-------+-------------+--------------------+---------+
|    105|bulk_user_105|bulk.user.105@exa...|    false|
|    110|bulk_user_110|bulk.user.110@exa...|     true|
|    115|bulk_user_115|bulk.user.115@exa...|    false|
|    120|bulk_user_120|bulk.user.120@exa...|     true|
+-------+-------------+--------------------+---------+



In [15]:
# Execute conditional delete/update merge
spark.sql("""
    MERGE INTO users AS target
    USING cleanup_actions AS source
    ON target.user_id = source.user_id
    WHEN MATCHED AND source.action = 'delete' THEN
        DELETE
    WHEN MATCHED AND source.action = 'deactivate' THEN
        UPDATE SET
            target.is_active = false,
            target.updated_at = current_timestamp()
""")

print("Cleanup merge operation completed")

Cleanup merge operation completed


In [16]:
# Verify cleanup results
print("Verification of cleanup actions:")

# Check for deleted users (should return no results)
deleted_check = spark.sql("""
    SELECT user_id, username 
    FROM users 
    WHERE user_id IN (105, 110)
""")

print(f"Deleted users found: {deleted_check.count()}")
if deleted_check.count() > 0:
    deleted_check.show()

# Check deactivated users
print("\nDeactivated users:")
spark.sql("""
    SELECT user_id, username, is_active, updated_at 
    FROM users 
    WHERE user_id IN (115, 120)
    ORDER BY user_id
""").show()

# Final count
print("\nFinal record count:")
spark.sql("SELECT COUNT(*) as total_users FROM users").show()

Verification of cleanup actions:
Deleted users found: 0

Deactivated users:
+-------+-------------+---------+--------------------+
|user_id|     username|is_active|          updated_at|
+-------+-------------+---------+--------------------+
|    115|bulk_user_115|    false|2025-06-27 06:55:...|
|    120|bulk_user_120|    false|2025-06-27 06:55:...|
+-------+-------------+---------+--------------------+


Final record count:
+-----------+
|total_users|
+-----------+
|       1006|
+-----------+



## 7. Best Practices and Performance Optimization

Let's explore key best practices for efficient upsert operations.

In [17]:
# Demonstrate table maintenance for optimal performance
print("Table maintenance operations:")

# Show current table metadata
print("\n1. Current table files:")
files_info = spark.sql("SELECT COUNT(*) as file_count FROM rest.`play-iceberg`.users.files")
files_info.show()

# Show table partitions
print("\n2. Partition information:")
partition_info = spark.sql("""
    SELECT 
        created_year, created_month, 
        COUNT(*) as record_count
    FROM users 
    GROUP BY created_year, created_month 
    ORDER BY created_year, created_month
""")
partition_info.show()

# Demonstrate compaction (if supported in your environment)
print("\n3. Table optimization recommendations:")
print("- Use MERGE operations during low-traffic periods")
print("- Partition source data to match target partitioning")
print("- Use appropriate cluster sizes for bulk operations")
print("- Monitor file counts and perform compaction when needed")
print("- Consider using Delta Lake's OPTIMIZE command for maintenance")

Table maintenance operations:

1. Current table files:
+----------+
|file_count|
+----------+
|         1|
+----------+


2. Partition information:
+------------+-------------+------------+
|created_year|created_month|record_count|
+------------+-------------+------------+
|        2025|            6|        1006|
+------------+-------------+------------+


3. Table optimization recommendations:
- Use MERGE operations during low-traffic periods
- Partition source data to match target partitioning
- Use appropriate cluster sizes for bulk operations
- Monitor file counts and perform compaction when needed
- Consider using Delta Lake's OPTIMIZE command for maintenance


## 8. Advanced Merge Patterns

Explore complex scenarios and edge cases in merge operations.

In [18]:
# Demonstrate merge with data validation
# Create source data with some invalid records
validation_data = [
    {
        'user_id': 1001,
        'username': 'valid_user',
        'email': 'valid@example.com',
        'is_active': True,
        'created_year': 2025,
        'created_month': 6,
        'created_day': 27,
        'updated_at': datetime.now()
    },
    {
        'user_id': 1002,
        'username': '',  # Invalid: empty username
        'email': 'invalid@example.com',
        'is_active': True,
        'created_year': 2025,
        'created_month': 6,
        'created_day': 27,
        'updated_at': datetime.now()
    },
    {
        'user_id': 1003,
        'username': 'another_valid_user',
        'email': 'not-an-email',  # Invalid: bad email format
        'is_active': True,
        'created_year': 2025,
        'created_month': 6,
        'created_day': 27,
        'updated_at': datetime.now()
    }
]

validation_df = spark.createDataFrame(validation_data, spark.table("users").schema)

# Add validation logic
validated_df = validation_df.filter(
    (col("username") != "") & 
    (col("username").isNotNull()) &
    (col("email").contains("@")) &
    (col("email").contains("."))
)

validated_df.createOrReplaceTempView("validated_updates")

print(f"Original records: {validation_df.count()}")
print(f"Valid records: {validated_df.count()}")
print(f"Invalid records filtered out: {validation_df.count() - validated_df.count()}")

print("\nValid records to be merged:")
validated_df.show()

Original records: 3
Valid records: 1
Invalid records filtered out: 2

Valid records to be merged:
+-------+----------+-----------------+---------+------------+-------------+-----------+--------------------+
|user_id|  username|            email|is_active|created_year|created_month|created_day|          updated_at|
+-------+----------+-----------------+---------+------------+-------------+-----------+--------------------+
|   1001|valid_user|valid@example.com|     true|        2025|            6|         27|2025-06-27 06:56:...|
+-------+----------+-----------------+---------+------------+-------------+-----------+--------------------+



In [19]:
# Execute merge with validated data
spark.sql("""
    MERGE INTO users AS target
    USING validated_updates AS source
    ON target.user_id = source.user_id
    WHEN NOT MATCHED THEN
        INSERT (
            user_id, username, email, is_active,
            created_year, created_month, created_day, updated_at
        )
        VALUES (
            source.user_id, source.username, source.email, source.is_active,
            source.created_year, source.created_month, source.created_day, source.updated_at
        )
""")

print("Validated merge completed")

# Verify only valid record was inserted
spark.sql("""
    SELECT user_id, username, email 
    FROM users 
    WHERE user_id IN (1001, 1002, 1003)
    ORDER BY user_id
""").show()

Validated merge completed
+-------+--------------+--------------------+
|user_id|      username|               email|
+-------+--------------+--------------------+
|   1001|bulk_user_1001|bulk.user.1001@ex...|
|   1002|bulk_user_1002|bulk.user.1002@ex...|
|   1003|bulk_user_1003|bulk.user.1003@ex...|
+-------+--------------+--------------------+



## 9. Summary and Cleanup

Let's summarize what we've learned and clean up our session.

In [20]:
# Final table summary
print("=== UPSERT OPERATIONS TUTORIAL SUMMARY ===")
print()

final_summary = spark.sql("""
    SELECT 
        'Total Records' as metric,
        CAST(COUNT(*) as STRING) as value
    FROM users
    
    UNION ALL
    
    SELECT 
        'Active Users' as metric,
        CAST(COUNT(*) as STRING) as value
    FROM users WHERE is_active = true
    
    UNION ALL
    
    SELECT 
        'Inactive Users' as metric,
        CAST(COUNT(*) as STRING) as value
    FROM users WHERE is_active = false
    
    UNION ALL
    
    SELECT 
        'Bulk Inserted Users (>= 100)' as metric,
        CAST(COUNT(*) as STRING) as value
    FROM users WHERE user_id >= 100
    
    ORDER BY metric
""")

final_summary.show(truncate=False)

print("\nKey Concepts Covered:")
print("1. Basic MERGE INTO operations")
print("2. Conditional merge logic with multiple WHEN clauses")
print("3. Bulk upsert performance optimization")
print("4. DELETE operations within MERGE statements")
print("5. Data validation before merge")
print("6. Performance monitoring and best practices")

print("\nBest Practices Demonstrated:")
print("- Partition alignment for performance")
print("- Data validation and quality checks")
print("- Conditional merge logic for complex scenarios")
print("- Performance monitoring and optimization")
print("- Proper error handling and edge cases")

=== UPSERT OPERATIONS TUTORIAL SUMMARY ===

+----------------------------+-----+
|metric                      |value|
+----------------------------+-----+
|Active Users                |516  |
|Bulk Inserted Users (>= 100)|998  |
|Inactive Users              |490  |
|Total Records               |1006 |
+----------------------------+-----+


Key Concepts Covered:
1. Basic MERGE INTO operations
2. Conditional merge logic with multiple WHEN clauses
3. Bulk upsert performance optimization
4. DELETE operations within MERGE statements
5. Data validation before merge
6. Performance monitoring and best practices

Best Practices Demonstrated:
- Partition alignment for performance
- Data validation and quality checks
- Conditional merge logic for complex scenarios
- Performance monitoring and optimization
- Proper error handling and edge cases


In [21]:
# Clean up cached DataFrames and temporary views
try:
    spark.catalog.dropTempView("user_updates")
    spark.catalog.dropTempView("conditional_updates")
    spark.catalog.dropTempView("bulk_updates")
    spark.catalog.dropTempView("cleanup_actions")
    spark.catalog.dropTempView("validated_updates")
    
    # Unpersist cached DataFrames
    if 'bulk_df_optimized' in locals():
        bulk_df_optimized.unpersist()
    
    print("Session cleanup completed successfully")
    
except Exception as e:
    print(f"Cleanup warning: {e}")

print(f"\nSpark session active: {spark.sparkContext.getConf().get('spark.app.name')}")
print("Tutorial completed successfully!")

Session cleanup completed successfully

Spark session active: PySparkShell
Tutorial completed successfully!
