# Advanced Parquet Operations in Python: Schema Evolution, Metadata, and Performance Tuning

This advanced tutorial covers sophisticated Parquet operations including schema management, metadata handling, performance optimization, and integration with big data frameworks.

## Table of Contents
1. Advanced Schema Management
2. Working with Metadata
3. Schema Evolution Strategies
4. Advanced Filtering and Projection
5. Memory-Efficient Operations
6. Integration with Dask and PySpark
7. Performance Optimization Techniques
8. Data Validation and Quality Checks

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
import os
import json
from datetime import datetime, timedelta
import time

## 1. Advanced Schema Management

Understanding and managing Parquet schemas is crucial for data consistency and performance.

In [None]:
# Create a complex dataset with various data types
np.random.seed(42)
n_records = 10000

# Complex data including nested structures
complex_data = {
    'id': range(n_records),
    'timestamp': pd.date_range('2023-01-01', periods=n_records, freq='5min'),
    'user_id': np.random.randint(1000, 5000, n_records),
    'event_type': np.random.choice(['click', 'view', 'purchase', 'share'], n_records),
    'value': np.random.uniform(0.1, 1000, n_records),
    'metadata': [{'source': 'web', 'version': f'v{i%3+1}'} for i in range(n_records)],
    'tags': [['tag1', 'tag2'] if i % 2 == 0 else ['tag3'] for i in range(n_records)],
    'is_valid': np.random.choice([True, False], n_records, p=[0.95, 0.05])
}

df_complex = pd.DataFrame(complex_data)
print("Complex dataset created:")
print(df_complex.info())
print("\nFirst 3 rows:")
df_complex.head(3)

In [None]:
# Define custom schema with specific types and metadata
custom_schema = pa.schema([
    pa.field('id', pa.int64(), nullable=False),
    pa.field('timestamp', pa.timestamp('ns'), nullable=False),
    pa.field('user_id', pa.int32()),  # Downcast from int64 to int32
    pa.field('event_type', pa.dictionary(pa.int8(), pa.string())),  # Dictionary encoding
    pa.field('value', pa.float32()),  # Downcast from float64 to float32
    pa.field('metadata', pa.map_(pa.string(), pa.string())),  # Map type for key-value pairs
    pa.field('tags', pa.list_(pa.string())),  # List type
    pa.field('is_valid', pa.bool_())
], metadata={'created_by': 'Advanced Parquet Tutorial', 'version': '1.0'})

# Convert DataFrame to Table with custom schema
# Note: We need to handle the metadata column specially
df_for_arrow = df_complex.copy()
df_for_arrow['metadata'] = df_for_arrow['metadata'].apply(lambda x: list(x.items()))

table = pa.Table.from_pandas(df_for_arrow, schema=custom_schema, preserve_index=False)

# Write with custom schema
pq.write_table(table, 'advanced_schema.parquet')
print("✅ Parquet file written with custom schema")
print(f"\nSchema:\n{table.schema}")

## 2. Working with Metadata

Parquet files can store custom metadata at both file and column levels.

In [None]:
# Add metadata to table
metadata = {
    'department': 'Data Engineering',
    'created_date': datetime.now().isoformat(),
    'data_source': 'Event Stream',
    'processing_version': '2.1.0',
    'row_count': str(len(df_complex))
}

# Create table with metadata
table_with_metadata = table.replace_schema_metadata(metadata)

# Write table with metadata
pq.write_table(table_with_metadata, 'data_with_metadata.parquet')

# Read and display metadata
parquet_file = pq.ParquetFile('data_with_metadata.parquet')
print("📋 File Metadata:")
print(json.dumps(parquet_file.schema_arrow.metadata, indent=2))

# Column-level metadata
print("\n📊 Column Metadata:")
for field in parquet_file.schema_arrow:
    if field.metadata:
        print(f"{field.name}: {field.metadata}")

In [None]:
# Read file statistics and metadata
metadata = parquet_file.metadata
print(f"📈 File Statistics:")
print(f"Number of rows: {metadata.num_rows:,}")
print(f"Number of row groups: {metadata.num_row_groups}")
print(f"Format version: {metadata.format_version}")
print(f"Created by: {metadata.created_by}")

# Row group statistics
for i in range(metadata.num_row_groups):
    rg = metadata.row_group(i)
    print(f"\n📦 Row Group {i}:")
    print(f"  Rows: {rg.num_rows:,}")
    print(f"  Total byte size: {rg.total_byte_size:,}")
    print(f"  Columns: {rg.num_columns}")

## 3. Schema Evolution Strategies

Handle schema changes over time without breaking existing data pipelines.

In [None]:
# Original schema (v1)
data_v1 = {
    'id': range(1000),
    'name': [f'User_{i}' for i in range(1000)],
    'score': np.random.randint(0, 100, 1000)
}
df_v1 = pd.DataFrame(data_v1)
df_v1.to_parquet('schema_v1.parquet')
print("Schema v1:")
print(df_v1.dtypes)

# Evolved schema (v2) - Added new columns
data_v2 = {
    'id': range(1000, 2000),
    'name': [f'User_{i}' for i in range(1000, 2000)],
    'score': np.random.randint(0, 100, 1000),
    'created_at': pd.date_range('2024-01-01', periods=1000, freq='H'),  # New column
    'category': np.random.choice(['A', 'B', 'C'], 1000)  # New column
}
df_v2 = pd.DataFrame(data_v2)
df_v2.to_parquet('schema_v2.parquet')
print("\nSchema v2 (with new columns):")
print(df_v2.dtypes)

In [None]:
# Handle schema evolution when reading
def read_with_schema_evolution(file_paths, fill_missing=True):
    """
    Read multiple Parquet files with potentially different schemas
    """
    dataframes = []
    
    for path in file_paths:
        df = pd.read_parquet(path)
        dataframes.append(df)
    
    # Combine with schema alignment
    if fill_missing:
        # Get all columns across all dataframes
        all_columns = set()
        for df in dataframes:
            all_columns.update(df.columns)
        
        # Add missing columns with default values
        aligned_dfs = []
        for df in dataframes:
            for col in all_columns:
                if col not in df.columns:
                    # Add default values based on expected type
                    if col == 'created_at':
                        df[col] = pd.NaT
                    elif col == 'category':
                        df[col] = 'Unknown'
                    else:
                        df[col] = None
            aligned_dfs.append(df[sorted(all_columns)])
        
        return pd.concat(aligned_dfs, ignore_index=True)
    else:
        return pd.concat(dataframes, ignore_index=True, sort=True)

# Read files with different schemas
combined_df = read_with_schema_evolution(['schema_v1.parquet', 'schema_v2.parquet'])
print(f"Combined DataFrame shape: {combined_df.shape}")
print(f"\nColumns: {combined_df.columns.tolist()}")
print(f"\nSample of combined data:")
combined_df.sample(5)

## 4. Advanced Filtering and Projection Pushdown

Leverage Parquet's columnar format for efficient data access.

In [None]:
# Create a large dataset for performance testing
n_large = 1000000
large_data = {
    'id': range(n_large),
    'timestamp': pd.date_range('2023-01-01', periods=n_large, freq='1min'),
    'sensor_id': np.random.randint(1, 100, n_large),
    'temperature': np.random.normal(25, 5, n_large),
    'humidity': np.random.normal(60, 10, n_large),
    'pressure': np.random.normal(1013, 20, n_large),
    'location': np.random.choice(['North', 'South', 'East', 'West'], n_large),
    'status': np.random.choice(['OK', 'WARNING', 'ERROR'], n_large, p=[0.9, 0.08, 0.02])
}
df_large = pd.DataFrame(large_data)

# Write with row groups for better filtering
df_large.to_parquet('sensor_data.parquet', 
                   row_group_size=50000,  # 50k rows per group
                   engine='pyarrow')

print(f"✅ Large dataset created: {len(df_large):,} rows")

In [None]:
# Advanced filtering with PyArrow compute functions
parquet_file = pq.ParquetFile('sensor_data.parquet')

# Method 1: Filter using PyArrow expressions
start_time = time.time()
filters = [
    ('status', '==', 'ERROR'),
    ('temperature', '>', 30)
]
filtered_table = pq.read_table('sensor_data.parquet', 
                              filters=filters,
                              columns=['id', 'timestamp', 'sensor_id', 'temperature', 'status'])
filter_time = time.time() - start_time

print(f"⚡ Filtered read: {len(filtered_table):,} rows in {filter_time:.3f} seconds")

# Method 2: Read all and filter in memory (for comparison)
start_time = time.time()
df_all = pd.read_parquet('sensor_data.parquet')
df_filtered_pandas = df_all[(df_all['status'] == 'ERROR') & (df_all['temperature'] > 30)]
pandas_time = time.time() - start_time

print(f"🐼 Pandas filter: {len(df_filtered_pandas):,} rows in {pandas_time:.3f} seconds")
print(f"\n🚀 Parquet filtering is {pandas_time/filter_time:.1f}x faster!")

In [None]:
# Complex filtering with PyArrow compute
table = pq.read_table('sensor_data.parquet')

# Create complex filter expressions
import pyarrow.compute as pc

# Filter: (temperature > 30 OR humidity < 50) AND status != 'OK'
temp_filter = pc.greater(table['temperature'], 30)
humidity_filter = pc.less(table['humidity'], 50)
status_filter = pc.not_equal(table['status'], 'OK')

# Combine filters
complex_filter = pc.and_(pc.or_(temp_filter, humidity_filter), status_filter)

# Apply filter
filtered_table = table.filter(complex_filter)
print(f"Complex filter result: {len(filtered_table):,} rows")

# Convert to pandas for analysis
df_complex_filtered = filtered_table.to_pandas()
print(f"\nFiltered data statistics:")
print(df_complex_filtered[['temperature', 'humidity', 'status']].describe())

## 5. Memory-Efficient Operations

Handle large datasets that don't fit in memory.

In [None]:
# Process large file in batches
def process_parquet_in_batches(file_path, batch_size=10000, process_func=None):
    """
    Process large Parquet file in batches to manage memory usage
    """
    parquet_file = pq.ParquetFile(file_path)
    
    results = []
    total_rows = 0
    
    # Process each batch
    for batch in parquet_file.iter_batches(batch_size=batch_size):
        df_batch = batch.to_pandas()
        total_rows += len(df_batch)
        
        if process_func:
            result = process_func(df_batch)
            results.append(result)
        
        # Show progress
        if total_rows % 100000 == 0:
            print(f"Processed {total_rows:,} rows...")
    
    return results, total_rows

# Example: Calculate statistics per batch
def calculate_batch_stats(df):
    return {
        'mean_temp': df['temperature'].mean(),
        'error_count': (df['status'] == 'ERROR').sum(),
        'row_count': len(df)
    }

# Process in batches
batch_results, total = process_parquet_in_batches(
    'sensor_data.parquet',
    batch_size=50000,
    process_func=calculate_batch_stats
)

print(f"\n✅ Processed {total:,} rows in {len(batch_results)} batches")

# Aggregate results
total_errors = sum(r['error_count'] for r in batch_results)
avg_temp = np.average([r['mean_temp'] for r in batch_results], 
                     weights=[r['row_count'] for r in batch_results])

print(f"\n📊 Aggregate Statistics:")
print(f"Average temperature: {avg_temp:.2f}°C")
print(f"Total errors: {total_errors:,}")

In [None]:
# Memory mapping for random access
# This is useful for accessing specific parts of the file without loading everything

# Get specific row groups based on condition
def get_row_groups_by_stats(file_path, column, min_value=None, max_value=None):
    """
    Get row groups that might contain values in specified range
    using Parquet statistics
    """
    parquet_file = pq.ParquetFile(file_path)
    relevant_row_groups = []
    
    for i in range(parquet_file.num_row_groups):
        # Get row group metadata
        rg_meta = parquet_file.metadata.row_group(i)
        
        # Find column index
        col_idx = None
        for j in range(rg_meta.num_columns):
            col = rg_meta.column(j)
            if col.path_in_schema == column:
                col_idx = j
                break
        
        if col_idx is not None:
            col_meta = rg_meta.column(col_idx)
            if col_meta.statistics:
                stats = col_meta.statistics
                # Check if row group might contain values in range
                if ((min_value is None or stats.max >= min_value) and 
                    (max_value is None or stats.min <= max_value)):
                    relevant_row_groups.append(i)
    
    return relevant_row_groups

# Find row groups with high temperatures
hot_row_groups = get_row_groups_by_stats('sensor_data.parquet', 
                                         'temperature', 
                                         min_value=35)

print(f"Row groups potentially containing temperature > 35°C: {hot_row_groups}")

# Read only specific row groups
if hot_row_groups:
    parquet_file = pq.ParquetFile('sensor_data.parquet')
    hot_data = parquet_file.read_row_groups(hot_row_groups)
    df_hot = hot_data.to_pandas()
    actual_hot = df_hot[df_hot['temperature'] > 35]
    print(f"Actual rows with temperature > 35°C: {len(actual_hot):,}")

## 6. Integration with Dask for Distributed Processing

Dask enables parallel processing of large Parquet datasets.

In [None]:
# Note: Install dask with: pip install dask[complete]
try:
    import dask.dataframe as dd
    
    # Read Parquet with Dask
    ddf = dd.read_parquet('sensor_data.parquet', 
                         engine='pyarrow',
                         index=False)
    
    print(f"Dask DataFrame created with {ddf.npartitions} partitions")
    
    # Perform operations lazily
    result = ddf.groupby('location')['temperature'].agg(['mean', 'min', 'max', 'count'])
    
    # Compute results
    computed_result = result.compute()
    print("\nTemperature statistics by location:")
    print(computed_result)
    
except ImportError:
    print("Dask not installed. Install with: pip install dask[complete]")
    print("Skipping Dask examples...")

## 7. Performance Optimization Techniques

In [None]:
# Compare different optimization strategies
optimization_results = []

# 1. Default write
start = time.time()
df_large.to_parquet('test_default.parquet')
default_time = time.time() - start
default_size = os.path.getsize('test_default.parquet') / (1024**2)
optimization_results.append({
    'method': 'Default',
    'write_time': default_time,
    'file_size_mb': default_size
})

# 2. Dictionary encoding for categorical columns
df_optimized = df_large.copy()
df_optimized['location'] = df_optimized['location'].astype('category')
df_optimized['status'] = df_optimized['status'].astype('category')

start = time.time()
df_optimized.to_parquet('test_dictionary.parquet')
dict_time = time.time() - start
dict_size = os.path.getsize('test_dictionary.parquet') / (1024**2)
optimization_results.append({
    'method': 'Dictionary Encoding',
    'write_time': dict_time,
    'file_size_mb': dict_size
})

# 3. Type optimization
df_type_optimized = df_large.copy()
df_type_optimized['sensor_id'] = df_type_optimized['sensor_id'].astype('int16')  # was int64
df_type_optimized['temperature'] = df_type_optimized['temperature'].astype('float32')  # was float64
df_type_optimized['humidity'] = df_type_optimized['humidity'].astype('float32')
df_type_optimized['pressure'] = df_type_optimized['pressure'].astype('float32')

start = time.time()
df_type_optimized.to_parquet('test_optimized_types.parquet')
type_time = time.time() - start
type_size = os.path.getsize('test_optimized_types.parquet') / (1024**2)
optimization_results.append({
    'method': 'Optimized Types',
    'write_time': type_time,
    'file_size_mb': type_size
})

# Results
opt_df = pd.DataFrame(optimization_results)
opt_df['size_reduction'] = (1 - opt_df['file_size_mb'] / default_size) * 100
print("Optimization Results:")
opt_df

## 8. Data Validation and Quality Checks

In [None]:
def validate_parquet_file(file_path):
    """
    Comprehensive validation of Parquet file
    """
    results = {'file': file_path, 'checks': {}}
    
    try:
        # Basic file checks
        parquet_file = pq.ParquetFile(file_path)
        results['checks']['file_readable'] = True
        results['checks']['num_rows'] = parquet_file.metadata.num_rows
        results['checks']['num_row_groups'] = parquet_file.num_row_groups
        
        # Schema validation
        schema = parquet_file.schema_arrow
        results['checks']['num_columns'] = len(schema)
        results['checks']['column_names'] = schema.names
        
        # Check for corrupted data
        corrupted_row_groups = []
        for i in range(parquet_file.num_row_groups):
            try:
                _ = parquet_file.read_row_group(i)
            except Exception as e:
                corrupted_row_groups.append(i)
        
        results['checks']['corrupted_row_groups'] = corrupted_row_groups
        results['checks']['data_integrity'] = len(corrupted_row_groups) == 0
        
        # Statistics availability
        stats_available = []
        for i in range(parquet_file.metadata.row_group(0).num_columns):
            col = parquet_file.metadata.row_group(0).column(i)
            if col.statistics:
                stats_available.append(col.path_in_schema)
        
        results['checks']['statistics_available'] = stats_available
        
        # Compression info
        compressions = set()
        for i in range(parquet_file.num_row_groups):
            rg = parquet_file.metadata.row_group(i)
            for j in range(rg.num_columns):
                col = rg.column(j)
                compressions.add(str(col.compression))
        
        results['checks']['compression_types'] = list(compressions)
        
    except Exception as e:
        results['checks']['file_readable'] = False
        results['checks']['error'] = str(e)
    
    return results

# Validate our files
validation_results = validate_parquet_file('sensor_data.parquet')
print("Parquet File Validation Results:")
print(json.dumps(validation_results, indent=2))

## Clean Up

In [None]:
# Remove temporary files
import glob

files_to_remove = glob.glob('*.parquet')
for file in files_to_remove:
    if os.path.exists(file):
        os.remove(file)

print(f"✅ Removed {len(files_to_remove)} temporary files")

## Summary and Best Practices

### Key Takeaways:

1. **Schema Management**
   - Define explicit schemas for consistency
   - Use dictionary encoding for categorical data
   - Plan for schema evolution from the start

2. **Performance Optimization**
   - Use appropriate data types (int32 vs int64, float32 vs float64)
   - Enable statistics for better query planning
   - Partition large datasets by commonly filtered columns

3. **Memory Efficiency**
   - Process large files in batches
   - Use row group statistics for selective reading
   - Leverage PyArrow's zero-copy reads

4. **Data Quality**
   - Validate files after writing
   - Store metadata for data lineage
   - Implement schema validation in pipelines

### When to Use These Techniques:

- **Schema Evolution**: When your data model changes over time
- **Batch Processing**: For files larger than available RAM
- **Row Group Filtering**: When you need specific data ranges
- **Dictionary Encoding**: For columns with repeated values
- **Type Optimization**: To reduce storage and improve performance

### Next Steps:
- Explore Apache Arrow Dataset API for multi-file datasets
- Integrate with Apache Spark for distributed processing
- Implement automated data quality pipelines
- Learn about Delta Lake and Apache Iceberg for ACID transactions