# Streaming API for Large Datasets

This notebook demonstrates how to use uubed's streaming API to efficiently process large datasets without loading everything into memory.

In [None]:
import numpy as np
from uubed import encode_stream, encode_file_stream, StreamingEncoder, batch_encode
import tempfile
import os

## 1. Streaming from Generators

Process embeddings from a generator without loading all data into memory:

In [None]:
def embedding_generator(n_embeddings=10000, dimensions=768):
    """Generate random embeddings on-the-fly."""
    for i in range(n_embeddings):
        # Simulate loading embeddings from a database or API
        yield np.random.randint(0, 256, dimensions, dtype=np.uint8)
        if i % 1000 == 0:
            print(f"Generated {i} embeddings...", end="\r")

# Process embeddings in a streaming fashion
print("Processing embeddings with streaming API...")
encoded_count = 0
for encoded in encode_stream(embedding_generator(1000, 768), method="shq64"):
    encoded_count += 1
    if encoded_count % 100 == 0:
        print(f"Encoded {encoded_count} embeddings", end="\r")

print(f"\nTotal encoded: {encoded_count} embeddings")

## 2. Streaming from Files

Process embeddings directly from binary files:

In [None]:
# Create a test file with embeddings
with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as tmp:
    temp_filename = tmp.name
    
    # Write 100 embeddings of 768 dimensions each
    for _ in range(100):
        embedding = np.random.randint(0, 256, 768, dtype=np.uint8)
        tmp.write(embedding.tobytes())

print(f"Created test file: {temp_filename}")
print(f"File size: {os.path.getsize(temp_filename):,} bytes")

In [None]:
# Process file with streaming API
output_file = "encoded_embeddings.txt"

encoded_list = []
for encoded in encode_file_stream(
    temp_filename,
    output_file,
    method="eq64",
    embedding_size=768
):
    encoded_list.append(encoded)

print(f"Processed {len(encoded_list)} embeddings")
print(f"Output written to: {output_file}")
print(f"First encoding: {encoded_list[0][:50]}...")

# Clean up
os.unlink(temp_filename)

## 3. StreamingEncoder Context Manager

Use the context manager for clean resource handling:

In [None]:
# Process embeddings with automatic file writing
with StreamingEncoder("streaming_output.txt", method="zoq64") as encoder:
    for i in range(50):
        embedding = np.random.randint(0, 256, 512, dtype=np.uint8)
        encoded = encoder.encode(embedding)
        
        if i % 10 == 0:
            print(f"Processed {encoder.count} embeddings")

print(f"\nTotal processed: {encoder.count} embeddings")
print("Output automatically saved to streaming_output.txt")

# Verify the output
with open("streaming_output.txt", "r") as f:
    lines = f.readlines()
    print(f"File contains {len(lines)} encoded embeddings")

## 4. Batch Processing

Process multiple embeddings at once for better performance:

In [None]:
# Generate a batch of embeddings
batch_size = 1000
embeddings = [
    np.random.randint(0, 256, 384, dtype=np.uint8).tobytes()
    for _ in range(batch_size)
]

# Time different approaches
import time

# Sequential processing
start = time.time()
sequential_results = [encode(emb, method="t8q64", k=16) for emb in embeddings]
sequential_time = time.time() - start

# Batch processing
start = time.time()
batch_results = batch_encode(embeddings, method="t8q64", k=16)
batch_time = time.time() - start

print(f"Sequential processing: {sequential_time:.3f}s")
print(f"Batch processing: {batch_time:.3f}s")
print(f"Results match: {sequential_results == batch_results}")

## 5. Memory-Efficient Processing Pipeline

Build a complete pipeline for processing large embedding datasets:

In [None]:
class EmbeddingPipeline:
    """Example pipeline for processing embeddings."""
    
    def __init__(self, method="auto", batch_size=100):
        self.method = method
        self.batch_size = batch_size
        self.stats = {
            "processed": 0,
            "total_bytes": 0,
            "encoding_lengths": []
        }
    
    def process_stream(self, embedding_source):
        """Process embeddings from any iterable source."""
        batch = []
        
        for embedding in embedding_source:
            batch.append(embedding)
            
            if len(batch) >= self.batch_size:
                yield from self._process_batch(batch)
                batch = []
        
        # Process remaining
        if batch:
            yield from self._process_batch(batch)
    
    def _process_batch(self, batch):
        """Process a batch of embeddings."""
        encoded_batch = batch_encode(batch, method=self.method)
        
        for emb, encoded in zip(batch, encoded_batch):
            self.stats["processed"] += 1
            self.stats["total_bytes"] += len(emb)
            self.stats["encoding_lengths"].append(len(encoded))
            yield encoded
    
    def get_stats(self):
        """Get processing statistics."""
        if not self.stats["encoding_lengths"]:
            return self.stats
        
        avg_encoding_len = np.mean(self.stats["encoding_lengths"])
        compression_ratio = self.stats["total_bytes"] / sum(self.stats["encoding_lengths"])
        
        return {
            **self.stats,
            "avg_encoding_length": avg_encoding_len,
            "compression_ratio": compression_ratio
        }

# Use the pipeline
pipeline = EmbeddingPipeline(method="shq64", batch_size=50)

# Process embeddings
print("Processing with pipeline...")
results = []
for encoded in pipeline.process_stream(embedding_generator(500, 256)):
    results.append(encoded)

# Display statistics
stats = pipeline.get_stats()
print(f"\nPipeline Statistics:")
print(f"  Processed: {stats['processed']} embeddings")
print(f"  Total input bytes: {stats['total_bytes']:,}")
print(f"  Average encoding length: {stats['avg_encoding_length']:.1f} chars")
print(f"  Compression ratio: {stats['compression_ratio']:.2f}x")

## 6. Streaming with Progress Tracking

Add progress tracking for long-running operations:

In [None]:
from tqdm.notebook import tqdm

def process_with_progress(n_embeddings=10000):
    """Process embeddings with a progress bar."""
    
    # Create embedding source
    embeddings = embedding_generator(n_embeddings, 512)
    
    # Process with progress tracking
    encoded_embeddings = []
    
    with tqdm(total=n_embeddings, desc="Encoding embeddings") as pbar:
        for encoded in encode_stream(embeddings, method="eq64", batch_size=100):
            encoded_embeddings.append(encoded)
            pbar.update(1)
    
    return encoded_embeddings

# Note: If tqdm is not installed, you can install it with: pip install tqdm
try:
    encoded = process_with_progress(1000)
    print(f"\nEncoded {len(encoded)} embeddings successfully!")
except ImportError:
    print("Install tqdm for progress bars: pip install tqdm")

## Clean Up

In [None]:
# Clean up generated files
for filename in ["encoded_embeddings.txt", "streaming_output.txt"]:
    if os.path.exists(filename):
        os.unlink(filename)
        print(f"Removed {filename}")

## Summary

In this notebook, we covered:
1. Streaming from generators for memory-efficient processing
2. Processing embeddings directly from files
3. Using the StreamingEncoder context manager
4. Batch processing for better performance
5. Building complete processing pipelines
6. Adding progress tracking

Key benefits of the streaming API:
- **Memory efficient**: Process datasets larger than RAM
- **Flexible**: Works with any iterable source
- **Fast**: Batch processing improves throughput
- **Clean**: Context managers handle resources automatically

Next, check out the LangChain integration notebook!