# R2 Clickstream Data to Iceberg

This notebook loads clickstream data from R2 for a specific day and hour, then loads it into an Iceberg table.

In [None]:
# Import required libraries
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import sys
import json
from datetime import datetime
# Install Boto3 if not already installed
!{sys.executable} -m pip install boto3
import boto3
# Install duckdb if not already installed
!{sys.executable} -m pip install duckdb
import duckdb
# Install PyIceberg if not already installed
!{sys.executable} -m pip install pyiceberg
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError, TableAlreadyExistsError

## Configure Environment

Set up environment variables for both R2 and Iceberg catalog connections.

In [None]:
# R2 connection settings
r2_access_key = os.environ.get("R2_ACCESS_KEY_ID")
r2_secret_key = os.environ.get("R2_SECRET_ACCESS_KEY")
r2_endpoint = os.environ.get("R2_ENDPOINT", "https://account-id.r2.cloudflarestorage.com")
r2_bucket = os.environ.get("R2_CLICKSTREAM_BUCKET", "analytics-pipeline")

# Iceberg catalog settings
warehouse = os.environ.get("WAREHOUSE")
token = os.environ.get("TOKEN")
catalog_uri = os.environ.get("CATALOG_URI")

# Show connection information (without secrets)
print(f"R2 Bucket: {r2_bucket}")
print(f"Warehouse: {warehouse}")
print(f"Catalog URI: {catalog_uri}")

# Check if all required credentials are available
missing_r2 = []
if not r2_access_key: missing_r2.append("R2_ACCESS_KEY")
if not r2_secret_key: missing_r2.append("R2_SECRET_KEY")

missing_iceberg = []
if not warehouse: missing_iceberg.append("WAREHOUSE")
if not token: missing_iceberg.append("TOKEN")
if not catalog_uri: missing_iceberg.append("CATALOG_URI")

if missing_r2:
    print(f"Warning: Missing required R2 credentials: {', '.join(missing_r2)}")
if missing_iceberg:
    print(f"Warning: Missing required Iceberg credentials: {', '.join(missing_iceberg)}")

## Connect to R2 and Iceberg

Establish connections to R2 Storage and Iceberg catalog.

In [None]:
# Connect to R2
try:
    if all([r2_access_key, r2_secret_key]):
        s3_client = boto3.client(
            's3',
            aws_access_key_id=r2_access_key,
            aws_secret_access_key=r2_secret_key,
            endpoint_url=r2_endpoint
        )
        print("Connected to R2 successfully!")
    else:
        print("Cannot connect to R2 - missing required credentials")
        s3_client = None
except Exception as e:
    print(f"R2 connection failed: {str(e)}")
    s3_client = None

# Connect to Iceberg catalog
try:
    if all([warehouse, token, catalog_uri]):
        catalog = RestCatalog(
            name="my_catalog",
            warehouse=warehouse,
            uri=catalog_uri,
            token=token,
        )
        print("Connected to Iceberg catalog successfully!")
    else:
        print("Cannot connect to Iceberg catalog - missing required credentials")
        catalog = None
except Exception as e:
    print(f"Iceberg connection failed: {str(e)}")
    catalog = None

## Setup Parameters

Configure which day and hour of data to process.

In [None]:
# Configure which day and hour to process
event_date = "2025-05-11"  # Format: YYYY-MM-DD
event_hour = "01"          # Format: HH (24-hour)

# Create the prefix for the S3 path based on the partition structure
s3_prefix = f"event_date={event_date}/hr={event_hour}/"
print(f"Will process data in: {s3_prefix}")

# Configure target Iceberg table
iceberg_namespace = "default"
iceberg_table = "clickstream_events"

## Create Namespace (if needed)

Ensure the target namespace exists in the Iceberg catalog.

In [None]:
# Create namespace if needed
if catalog is not None:
    try:
        catalog.create_namespace(iceberg_namespace)
        print(f"Created '{iceberg_namespace}' namespace")
    except NamespaceAlreadyExistsError:
        print(f"'{iceberg_namespace}' namespace already exists")
    except Exception as e:
        print(f"Error creating namespace: {str(e)}")

## List Available S3 Objects

Check what data files are available for the selected day and hour.

In [None]:
# List objects in the specified S3 path
def list_s3_objects(bucket, prefix):
    if s3_client is None:
        return "Not connected to R2"
    
    try:
        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        if 'Contents' in response:
            return [obj['Key'] for obj in response['Contents']]
        else:
            return []
    except Exception as e:
        return f"Error listing objects: {str(e)}"

# Get available data files
s3_objects = list_s3_objects(r2_bucket, s3_prefix)

if isinstance(s3_objects, list):
    print(f"Found {len(s3_objects)} objects in the specified path")
    if s3_objects:
        for i, obj in enumerate(s3_objects[:5]):
            print(f"- {obj}")
        if len(s3_objects) > 5:
            print(f"... and {len(s3_objects) - 5} more")
    else:
        print("No data files found for the specified date and hour")
else:
    print(s3_objects)

## Load Data from R2

Load the clickstream data from R2 into a pandas DataFrame.

In [None]:
# Create a DuckDB connection
db_file = "/home/jovyan/duckdb_data/clickstream.db"
os.makedirs(os.path.dirname(db_file), exist_ok=True)

con = duckdb.connect(db_file)

# Optional: Set some pragmas for better performance
con.execute("PRAGMA temp_directory='/home/jovyan/duckdb_data/temp'")
con.execute("PRAGMA memory_limit='100GB'")

# Function to read S3 files with DuckDB
def read_s3_with_duckdb(bucket, key):
    try:
        # Set up AWS S3 credentials for DuckDB
        con.execute(f"""
            SET s3_region='auto';
            SET s3_endpoint='{r2_endpoint.replace('https://', '')}';
            SET s3_access_key_id='{r2_access_key}';
            SET s3_secret_access_key='{r2_secret_key}';
        """)
        
        # Create S3 URL
        s3_url = f"s3://{bucket}/{key}"
        print(f"Reading {s3_url}")
        
        # Use DuckDB to read the file based on extension
        if key.endswith('.json.gz'):
            # For JSON.gz files
            query = f"""
                SELECT * FROM read_json_auto('{s3_url}', 
                    filename=true, 
                    maximum_object_size=3000000000,
                    ignore_errors=true)
            """
        elif key.endswith('.parquet'):
            # For Parquet files
            query = f"SELECT * FROM read_parquet('{s3_url}')"
        elif key.endswith('.csv') or key.endswith('.tsv') or key.endswith('.txt'):
            # For CSV/TSV files
            query = f"SELECT * FROM read_csv_auto('{s3_url}')"
        else:
            return None, f"Unsupported file type: {key}"
        
        # Execute the query and convert result to pandas DataFrame
        result = con.execute(query).fetchdf()
        return result, None
    except Exception as e:
        return None, f"Error reading file with DuckDB: {str(e)}"

# Load all data for the specified date and hour
all_data = []
error_count = 0
valid_count = 0

if isinstance(s3_objects, list) and s3_objects:
    for obj_key in s3_objects:
        print(f"Processing {obj_key} with DuckDB...")
        
        df, error = read_s3_with_duckdb(r2_bucket, obj_key)
            
        if df is not None:
            row_count = len(df)
            print(f"Successfully loaded {row_count} rows from {obj_key}")
            all_data.append(df)
            valid_count += 1
        else:
            print(f"Failed to load {obj_key}: {error}")
            error_count += 1
    
    if all_data:
        # Combine all dataframes
        clickstream_data = pd.concat(all_data, ignore_index=True)
        print(f"Loaded {len(clickstream_data)} total rows from {valid_count} files")
        print(f"Failed to load {error_count} files")
        
        # Display sample data
        print(clickstream_data.head())
    else:
        print("No data was loaded successfully")
        clickstream_data = None
else:
    print("No data files to process")
    clickstream_data = None

## Process Data for Iceberg

Clean and prepare the data for Iceberg storage.

In [None]:
# Data processing function
def process_clickstream_data(df):
    if df is None or len(df) == 0:
        return None
    
    # Make a copy to avoid modifying the original
    processed_df = df.copy()
    
    # Handle string columns with JSON content
    json_cols = ['session_data', 'device_info', 'event_data', 'raw_event']
    
    for col in json_cols:
        if col in processed_df.columns:
            # Convert string representations of dictionaries to actual dictionaries
            # This handles both Python dict notation and JSON notation
            try:
                processed_df[col] = processed_df[col].apply(
                    lambda x: json.loads(str(x).replace("'", '"')) if pd.notna(x) else None
                )
            except Exception as e:
                print(f"Warning: Could not parse JSON in column {col}: {str(e)}")
                # If parsing fails, keep as string
                pass
    
    # Convert timestamp to datetime if needed
    if 'timestamp' in processed_df.columns and processed_df['timestamp'].dtype == 'object':
        processed_df['timestamp'] = pd.to_datetime(processed_df['timestamp'])
    
    return processed_df

# Process the data if available
if clickstream_data is not None:
    processed_data = process_clickstream_data(clickstream_data)
    if processed_data is not None:
        print(f"Data processed successfully: {len(processed_data)} rows")
        processed_data.head()
    else:
        print("Data processing failed")
else:
    processed_data = None

## Create Iceberg Table

Create a new Iceberg table if it doesn't exist yet.

In [None]:
# Convert pandas DataFrame to PyArrow Table
def df_to_pyarrow(df):
    if df is None:
        return None
    
    try:
        # Convert to PyArrow Table
        return pa.Table.from_pandas(df)
    except Exception as e:
        print(f"Error converting to PyArrow: {str(e)}")
        return None

# Create Iceberg table if needed
def create_iceberg_table(catalog, namespace, table_name, schema):
    if catalog is None:
        return "Not connected to Iceberg catalog"
    
    try:
        table_identifier = (namespace, table_name)
        if not catalog.table_exists(table_identifier):
            table = catalog.create_table(
                table_identifier,
                schema=schema,
                partition_spec=["event_date", "hr"]  # Partition by date and hour
            )
            return f"Created table: {table_name}"
        else:
            table = catalog.load_table(table_identifier)
            return f"Table already exists: {table_name}"
    except Exception as e:
        return f"Error creating/loading table: {str(e)}"

# Convert processed data to PyArrow Table
if processed_data is not None:
    arrow_table = df_to_pyarrow(processed_data)
    if arrow_table is not None:
        print("Data converted to PyArrow Table successfully")
        print(f"Schema: {arrow_table.schema}")
        
        # Create or load the Iceberg table
        table_result = create_iceberg_table(catalog, iceberg_namespace, iceberg_table, arrow_table.schema)
        print(table_result)
    else:
        print("Failed to convert data to PyArrow Table")
else:
    print("No processed data available")

## Write Data to Iceberg Table

Write the processed clickstream data to the Iceberg table.

In [None]:
# Function to append data to the Iceberg table
def append_to_iceberg_table(catalog, namespace, table_name, data):
    if catalog is None:
        return "Not connected to Iceberg catalog"
    
    if data is None:
        return "No data to append"
        
    try:
        table_identifier = (namespace, table_name)
        if catalog.table_exists(table_identifier):
            table = catalog.load_table(table_identifier)
            
            # Append data to the table
            table.append(data)
            return f"Data appended to table: {table_name}"
        else:
            return f"Table does not exist: {table_name}"
    except Exception as e:
        return f"Error appending data: {str(e)}"

# Append the data to the Iceberg table
if arrow_table is not None:
    append_result = append_to_iceberg_table(catalog, iceberg_namespace, iceberg_table, arrow_table)
    print(append_result)

## Verify Data in Iceberg Table

Query the Iceberg table to verify the data was written successfully.

In [None]:
# Function to query and display table data
def query_iceberg_table(catalog, namespace, table_name):
    if catalog is None:
        return "Not connected to Iceberg catalog"
        
    try:
        table_identifier = (namespace, table_name)
        if catalog.table_exists(table_identifier):
            table = catalog.load_table(table_identifier)
            
            # Scan table data and convert to pandas
            scanned = table.scan().to_arrow()
            if len(scanned) > 0:
                return scanned.to_pandas()
            else:
                return "Table exists but has no data"
        else:
            return f"Table does not exist: {table_name}"
    except Exception as e:
        return f"Error querying table: {str(e)}"

# Query to verify data was written
result = query_iceberg_table(catalog, iceberg_namespace, iceberg_table)
if isinstance(result, pd.DataFrame):
    print(f"Retrieved {len(result)} rows from Iceberg table")
    result.head()
else:
    print(result)

## Filter Data by Partition

Query the Iceberg table with partition filters to verify partitioning works correctly.

In [None]:
# Function to query table with partition filters
def query_with_filters(catalog, namespace, table_name, filters):
    if catalog is None:
        return "Not connected to Iceberg catalog"
        
    try:
        table_identifier = (namespace, table_name)
        if catalog.table_exists(table_identifier):
            table = catalog.load_table(table_identifier)
            
            # Create a filtered scan
            scan = table.scan()
            for col, op, val in filters:
                scan = scan.filter(getattr(scan, col)[op](val))
                
            # Execute the scan
            results = scan.to_arrow()
            if len(results) > 0:
                return results.to_pandas()
            else:
                return "No data matching filters"
        else:
            return f"Table does not exist: {table_name}"
    except Exception as e:
        return f"Error querying table: {str(e)}"

# Query with the partition filters for our specific day and hour
filters = [
    ("event_date", "equals", event_date),
    ("hr", "equals", event_hour)
]

filtered_results = query_with_filters(catalog, iceberg_namespace, iceberg_table, filters)
if isinstance(filtered_results, pd.DataFrame):
    print(f"Retrieved {len(filtered_results)} rows matching filters")
    filtered_results.head()
else:
    print(filtered_results)