# Streaming Data Loading Demonstration

This notebook demonstrates the new streaming data loading capabilities added to the Pulse retrodiction training pipeline. The optimizations focus on three key areas:

1. **Streaming Data Loading**: Process data as it's being loaded rather than loading all at once
2. **Columnar Data Format Optimization**: Leverage Arrow/Parquet for high-performance analytical processing
3. **Advanced Caching & Prefetching**: Intelligently preload and cache data based on access patterns

In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import time
import os
import sys

# Add parent directory to path so we can import from recursive_training
sys.path.append('..')

## Creating Sample Dataset

First, let's create a sample dataset to work with. We'll create a large time series dataset to demonstrate the benefits of streaming.

In [None]:
# Create a sample dataset
def create_sample_dataset(rows=100000, variables=5):
    """Create a sample dataset for testing"""
    # Create a time series index
    start_date = datetime(2020, 1, 1)
    dates = [start_date + timedelta(hours=i) for i in range(rows)]
    
    # Create a DataFrame with random values
    data = {}
    data['timestamp'] = dates
    
    # Add some variables
    for i in range(variables):
        # Create a random walk with some seasonality
        values = np.random.randn(rows).cumsum()
        # Add some seasonality
        values += 10 * np.sin(np.arange(rows) * 2 * np.pi / (24 * 7))  # Weekly pattern
        data[f'variable_{i}'] = values
    
    df = pd.DataFrame(data)
    return df

# Create a large sample dataset
print("Creating sample dataset...")
large_df = create_sample_dataset(rows=100000, variables=5)
print(f"Created dataset with {len(large_df)} rows and {len(large_df.columns)} columns")
large_df.head()

## Store Data Using Different Data Stores

Now, let's store the data using our different data store implementations to compare their performance.

In [None]:
from recursive_training.data.data_store import RecursiveDataStore

try:
    from recursive_training.data.optimized_data_store import OptimizedDataStore
    optimized_available = True
except ImportError:
    optimized_available = False
    print("OptimizedDataStore not available")

try:
    from recursive_training.data.streaming_data_store import StreamingDataStore
    streaming_available = True
except ImportError:
    streaming_available = False
    print("StreamingDataStore not available")

# Initialize data stores
base_store = RecursiveDataStore({"storage_path": "./demo_data/base"})

if optimized_available:
    optimized_store = OptimizedDataStore({"storage_path": "./demo_data/optimized"})

if streaming_available:
    streaming_store = StreamingDataStore({
        "storage_path": "./demo_data/streaming", 
        "chunk_size": 5000,
        "prefetch_chunks": 2
    })

### Store Data in Each Format

Now let's store our sample dataset using each data store implementation and measure the performance.

In [None]:
# Convert DataFrame to list of dictionaries for the data store
print("Converting dataset to records...")
data_records = large_df.to_dict('records')
print(f"Converted to {len(data_records)} records")

# Store using base RecursiveDataStore
print("\nStoring data using base RecursiveDataStore...")
start_time = time.time()
base_id = base_store.store_dataset("sample_data", data_records)
base_time = time.time() - start_time
print(f"Stored dataset with ID: {base_id} in {base_time:.2f} seconds")

# Store using OptimizedDataStore if available
if optimized_available:
    print("\nStoring data using OptimizedDataStore...")
    start_time = time.time()
    optimized_id = optimized_store.store_dataset_optimized("sample_data", data_records)
    optimized_time = time.time() - start_time
    print(f"Stored dataset with ID: {optimized_id} in {optimized_time:.2f} seconds")
    print(f"Speedup vs base: {base_time / optimized_time:.2f}x")
else:
    optimized_time = float('inf')

# Store using StreamingDataStore if available
if streaming_available:
    print("\nStoring data using StreamingDataStore...")
    
    # Create a generator function to simulate streaming data
    def stream_generator():
        for record in data_records:
            yield record
    
    start_time = time.time()
    streaming_id = streaming_store.store_dataset_streaming("sample_data", stream_generator(), batch_size=5000)
    streaming_time = time.time() - start_time
    print(f"Stored dataset with ID: {streaming_id} in {streaming_time:.2f} seconds")
    print(f"Speedup vs base: {base_time / streaming_time:.2f}x")
    print(f"Speedup vs optimized: {optimized_time / streaming_time:.2f}x")
else:
    streaming_time = float('inf')

## Data Retrieval Performance Comparison

Now, let's compare the performance of retrieving data using the different data store implementations.

In [None]:
# Define a date range for filtering
start_date = datetime(2020, 1, 15).isoformat()
end_date = datetime(2020, 1, 20).isoformat()

# Retrieve from base RecursiveDataStore
print("\nRetrieving data using base RecursiveDataStore...")
start_time = time.time()
base_data, base_meta = base_store.retrieve_dataset("sample_data")
# Filter manually
base_data_filtered = [item for item in base_data if start_date <= item.get('timestamp') <= end_date]
base_time = time.time() - start_time
print(f"Retrieved {len(base_data)} records (filtered to {len(base_data_filtered)}) in {base_time:.2f} seconds")

# Retrieve from OptimizedDataStore if available
if optimized_available:
    print("\nRetrieving data using OptimizedDataStore...")
    start_time = time.time()
    optimized_df, optimized_meta = optimized_store.retrieve_dataset_optimized("sample_data", start_time=start_date, end_time=end_date)
    optimized_time = time.time() - start_time
    print(f"Retrieved {len(optimized_df)} filtered records in {optimized_time:.2f} seconds")
    print(f"Speedup vs base: {base_time / optimized_time:.2f}x")
else:
    optimized_time = float('inf')

# Retrieve from StreamingDataStore if available
if streaming_available:
    print("\nRetrieving data using StreamingDataStore streaming...")
    
    # Define a callback function to process chunks
    chunks_processed = 0
    total_records = 0
    
    def process_chunk(chunk):
        nonlocal chunks_processed, total_records
        chunks_processed += 1
        total_records += len(chunk)
        # Just do some minimal processing to simulate real-world usage
        mean_values = chunk.mean(numeric_only=True)
        return mean_values
    
    start_time = time.time()
    streaming_meta = streaming_store.retrieve_dataset_streaming(
        "sample_data", process_chunk, start_time=start_date, end_time=end_date
    )
    streaming_time = time.time() - start_time
    print(f"Processed {chunks_processed} chunks with {total_records} records in {streaming_time:.2f} seconds")
    print(f"Speedup vs base: {base_time / streaming_time:.2f}x")
    print(f"Speedup vs optimized: {optimized_time / streaming_time:.2f}x")
    
    # Also test the generator-based streaming interface
    print("\nRetrieving data using StreamingDataStore generator...")
    start_time = time.time()
    total_records = 0
    for chunk in streaming_store.stream_dataset("sample_data", start_time=start_date, end_time=end_date):
        total_records += len(chunk)
        # Just do some minimal processing to simulate real-world usage
        mean_values = chunk.mean(numeric_only=True)
    
    gen_streaming_time = time.time() - start_time
    print(f"Processed {total_records} records using generator in {gen_streaming_time:.2f} seconds")
else:
    streaming_time = float('inf')
    gen_streaming_time = float('inf')

## Memory Usage Comparison

One of the main advantages of streaming data is reduced memory usage. Let's compare the memory usage of the different approaches.

In [None]:
import psutil
import gc

def get_memory_usage():
    """Get current memory usage in MB"""
    return psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)

# Function to measure memory usage during data retrieval
def measure_memory_usage(retrieval_func, *args, **kwargs):
    # Force garbage collection to get a clean measurement
    gc.collect()
    
    # Measure memory before
    start_memory = get_memory_usage()
    
    # Perform the retrieval
    result = retrieval_func(*args, **kwargs)
    
    # Measure peak memory during/after retrieval
    peak_memory = get_memory_usage()
    
    # Calculate memory usage
    memory_used = peak_memory - start_memory
    
    return result, memory_used

# Compare memory usage for different retrieval methods
print("\nComparing memory usage for different retrieval methods...")

# Base data store
results, base_memory = measure_memory_usage(
    lambda: base_store.retrieve_dataset("sample_data")
)
print(f"Base DataStore memory usage: {base_memory:.2f} MB")

# Optimized data store
if optimized_available:
    results, optimized_memory = measure_memory_usage(
        lambda: optimized_store.retrieve_dataset_optimized("sample_data")
    )
    print(f"OptimizedDataStore memory usage: {optimized_memory:.2f} MB")
    print(f"Memory reduction vs base: {base_memory / optimized_memory:.2f}x")
else:
    optimized_memory = float('inf')

# Streaming data store with callback
if streaming_available:
    chunks_processed = 0
    total_records = 0
    
    def process_chunk(chunk):
        nonlocal chunks_processed, total_records
        chunks_processed += 1
        total_records += len(chunk)
    
    results, streaming_memory = measure_memory_usage(
        lambda: streaming_store.retrieve_dataset_streaming("sample_data", process_chunk)
    )
    print(f"StreamingDataStore memory usage: {streaming_memory:.2f} MB")
    print(f"Memory reduction vs base: {base_memory / streaming_memory:.2f}x")
    print(f"Memory reduction vs optimized: {optimized_memory / streaming_memory:.2f}x")

## Performance with PyArrow/Parquet

One of the optimizations in `StreamingDataStore` is the use of columnar data formats like Parquet with PyArrow. Let's specifically test those capabilities.

In [None]:
# Skip this cell if PyArrow isn't available
if streaming_available:
    try:
        import pyarrow as pa
        pyarrow_available = True
    except ImportError:
        pyarrow_available = False
        print("PyArrow not available - skipping Arrow-specific tests")
else:
    pyarrow_available = False
    
if streaming_available and pyarrow_available:
    print("\nTesting PyArrow-specific capabilities...")
    
    # Test the Arrow-specific streaming method
    start_time = time.time()
    total_records = 0
    total_batches = 0
    
    # Use the Arrow RecordBatch generator
    for batch in streaming_store.stream_dataset_arrow("sample_data", start_time=start_date, end_time=end_date):
        total_records += batch.num_rows
        total_batches += 1
        
        # Do some Arrow-specific processing
        # Calculate the mean of one of the numeric columns using PyArrow compute
        if 'variable_0' in batch.schema.names:
            mean = pa.compute.mean(batch.column('variable_0')).as_py()
    
    arrow_time = time.time() - start_time
    print(f"Processed {total_records} records in {total_batches} Arrow batches in {arrow_time:.2f} seconds")
    
    # Compare with Pandas-based processing from earlier
    if 'gen_streaming_time' in locals():
        print(f"Speedup vs Pandas-based streaming: {gen_streaming_time / arrow_time:.2f}x")
    
    # Also test creating data with the Arrow interface
    # Create a smaller dataset for this test
    small_df = create_sample_dataset(rows=10000, variables=3)
    
    print("\nCreating dataset using Arrow Table interface...")
    start_time = time.time()
    dataset_id, arrow_table = streaming_store.create_arrow_table("arrow_sample", small_df.to_dict('records'))
    arrow_create_time = time.time() - start_time
    print(f"Created Arrow table with {arrow_table.num_rows} rows in {arrow_create_time:.2f} seconds")

## Integration with ParallelTrainingCoordinator

Finally, let's demonstrate the integration with the ParallelTrainingCoordinator to show how these optimizations benefit the overall training process.

In [None]:
from recursive_training.parallel_trainer import ParallelTrainingCoordinator
from datetime import datetime

# Initialize the coordinator
coordinator = ParallelTrainingCoordinator(max_workers=2)

# Create some dummy variables that match our created datasets
variables = ['variable_0', 'variable_1', 'variable_2']

# Choose a time range that has data
start_time = datetime(2020, 1, 1)
end_time = datetime(2020, 1, 10)

# Prepare batches for training
print("\nPreparing training batches...")
batches = coordinator.prepare_training_batches(
    variables=variables,
    start_time=start_time,
    end_time=end_time,
    batch_size_days=2,  # Smaller batch size for demonstration
    overlap_days=1,     # Some overlap between batches
    preload_data=True   # Enable data preloading
)
print(f"Created {len(batches)} training batches")

# Define a simple progress callback
def progress_callback(progress_data):
    print(f"Progress: {progress_data['completed_percentage']} ({progress_data['completed_batches']}/{progress_data['total_batches']} batches)")

# Start training (this will use our streaming data store if available)
print("\nStarting parallel training...")
coordinator.start_training(progress_callback=progress_callback)

# Get the results
results = coordinator.get_results_summary()
print("\nTraining completed with results:")
print(f"Speedup factor: {results['performance']['speedup_factor']:.2f}x")

## Conclusion

This demonstration has shown the benefits of the streaming data loading optimizations:

1. **Memory Efficiency**: Streaming data processing uses significantly less memory compared to loading entire datasets at once.
2. **Performance**: The optimized columnar data formats with Arrow/Parquet provide better performance for analytical workloads.
3. **Scalability**: These optimizations make it possible to work with larger datasets that wouldn't fit in memory otherwise.
4. **Integration**: The streaming data capabilities integrate seamlessly with the parallel training framework.