# Using fsspec for Cloud Storage with OmniQ

This notebook demonstrates how to use `fsspec` with OmniQ's `FileTaskQueue` and `FileResultStorage` to connect to various storage backends including local files, in-memory storage, and cloud storage providers.

## Overview

OmniQ leverages the `fsspec` library to provide a unified interface for different storage backends. This allows you to seamlessly switch between local development and cloud production environments without changing your code structure.

## Setup and Imports

First, let's import the necessary modules and set up our environment.

In [None]:
import asyncio
import time
import tempfile
from pathlib import Path

# Import OmniQ components
from omniq import OmniQ, AsyncOmniQ
from omniq.queue import FileTaskQueue
from omniq.results import FileResultStorage

print("✅ Imports successful!")

## Define Sample Tasks

Let's define some sample tasks to demonstrate the different storage backends.

In [None]:
def sample_task(data: str, multiplier: int = 1) -> str:
    """Sample synchronous task function."""
    time.sleep(0.1)  # Simulate some work
    return f"Processed: {data} (x{multiplier})"

async def async_sample_task(data: str, delay: float = 0.1) -> str:
    """Sample asynchronous task function."""
    await asyncio.sleep(delay)
    return f"Async processed: {data}"

def data_processing_task(numbers: list) -> dict:
    """More complex task that processes a list of numbers."""
    return {
        "sum": sum(numbers),
        "average": sum(numbers) / len(numbers) if numbers else 0,
        "count": len(numbers),
        "max": max(numbers) if numbers else None,
        "min": min(numbers) if numbers else None
    }

print("✅ Sample tasks defined!")

## 1. Local Filesystem Storage

Let's start with the most basic storage backend - the local filesystem.

In [None]:
# Create a temporary directory for this demo
temp_dir = tempfile.mkdtemp()
base_path = Path(temp_dir) / "omniq_local_demo"

print(f"Using temporary directory: {base_path}")

# Create FileTaskQueue and FileResultStorage with local filesystem
local_queue = FileTaskQueue(
    project_name="local_demo",
    base_dir=str(base_path / "tasks"),
    queues=["high", "medium", "low"]
)

local_result_store = FileResultStorage(
    project_name="local_demo",
    base_dir=str(base_path / "results")
)

print("✅ Local filesystem storage components created!")

In [None]:
# Create OmniQ instance with local storage
local_oq = OmniQ(
    project_name="local_demo",
    task_queue=local_queue,
    result_store=local_result_store
)

print("Starting local filesystem demonstration...")

with local_oq:
    # Enqueue tasks to different queues
    task_ids = []
    
    task_ids.append(local_oq.enqueue(
        sample_task,
        func_args={"data": "Local Task 1", "multiplier": 2},
        queue_name="high"
    ))
    
    task_ids.append(local_oq.enqueue(
        data_processing_task,
        func_args={"numbers": [1, 2, 3, 4, 5]},
        queue_name="medium"
    ))
    
    task_ids.append(local_oq.enqueue(
        sample_task,
        func_args={"data": "Local Task 3", "multiplier": 1},
        queue_name="low"
    ))
    
    print(f"✅ Enqueued {len(task_ids)} tasks to local filesystem")
    
    # Wait for tasks to complete
    time.sleep(2)
    
    # Get and display results
    print("\n📋 Results from local filesystem:")
    for i, task_id in enumerate(task_ids, 1):
        result = local_oq.get_result(task_id)
        print(f"  Task {i}: {result}")

In [None]:
# Show the directory structure that was created
print("📁 Directory structure created:")
for path in sorted(base_path.rglob("*")):
    if path.is_file():
        relative_path = path.relative_to(base_path)
        print(f"  {relative_path}")

## 2. Memory Filesystem Storage

Now let's demonstrate using in-memory storage, which is perfect for testing and development.

In [None]:
# Create FileTaskQueue and FileResultStorage with memory filesystem
memory_queue = FileTaskQueue(
    project_name="memory_demo",
    base_dir="memory://memory_demo/tasks",
    queues=["high", "medium", "low"]
)

memory_result_store = FileResultStorage(
    project_name="memory_demo",
    base_dir="memory://memory_demo/results"
)

print("✅ Memory filesystem storage components created!")

In [None]:
# Create OmniQ instance with memory storage
memory_oq = OmniQ(
    project_name="memory_demo",
    task_queue=memory_queue,
    result_store=memory_result_store
)

print("Starting memory filesystem demonstration...")
print("(No files will be written to disk - everything stays in memory)")

with memory_oq:
    # Enqueue tasks
    task_ids = []
    
    task_ids.append(memory_oq.enqueue(
        sample_task,
        func_args={"data": "Memory Task 1", "multiplier": 5},
        queue_name="high"
    ))
    
    task_ids.append(memory_oq.enqueue(
        data_processing_task,
        func_args={"numbers": [10, 20, 30, 40, 50]},
        queue_name="medium"
    ))
    
    task_ids.append(memory_oq.enqueue(
        sample_task,
        func_args={"data": "Memory Task 3", "multiplier": 3},
        queue_name="low"
    ))
    
    print(f"✅ Enqueued {len(task_ids)} tasks to memory storage")
    
    # Wait for tasks to complete
    time.sleep(2)
    
    # Get and display results
    print("\n📋 Results from memory storage:")
    for i, task_id in enumerate(task_ids, 1):
        result = memory_oq.get_result(task_id)
        print(f"  Task {i}: {result}")

## 3. Async Usage with Memory Storage

Let's demonstrate using AsyncOmniQ with memory storage for asynchronous task processing.

In [None]:
async def async_demo():
    """Demonstrate async usage with memory storage."""
    
    # Create async components
    async_queue = FileTaskQueue(
        project_name="async_memory_demo",
        base_dir="memory://async_demo/tasks",
        queues=["high", "medium", "low"]
    )
    
    async_result_store = FileResultStorage(
        project_name="async_memory_demo",
        base_dir="memory://async_demo/results"
    )
    
    # Create AsyncOmniQ instance
    async_oq = AsyncOmniQ(
        project_name="async_memory_demo",
        task_queue=async_queue,
        result_store=async_result_store
    )
    
    print("Starting async demonstration with memory storage...")
    
    async with async_oq:
        # Enqueue async tasks
        task_ids = []
        
        task_ids.append(await async_oq.enqueue(
            async_sample_task,
            func_args={"data": "Async Task 1", "delay": 0.2},
            queue_name="high"
        ))
        
        task_ids.append(await async_oq.enqueue(
            async_sample_task,
            func_args={"data": "Async Task 2", "delay": 0.1},
            queue_name="medium"
        ))
        
        task_ids.append(await async_oq.enqueue(
            sample_task,  # Mix sync and async tasks
            func_args={"data": "Mixed Task", "multiplier": 4},
            queue_name="low"
        ))
        
        print(f"✅ Enqueued {len(task_ids)} async tasks")
        
        # Wait for tasks to complete
        await asyncio.sleep(1)
        
        # Get and display results
        print("\n📋 Results from async processing:")
        for i, task_id in enumerate(task_ids, 1):
            result = await async_oq.get_result(task_id)
            print(f"  Async Task {i}: {result}")

# Run the async demo
await async_demo()

## 4. Cloud Storage Examples (Commented)

The following cells show how to configure OmniQ for various cloud storage providers. These examples are commented out because they require proper credentials and cloud resources.

### Amazon S3 Configuration

In [None]:
# AMAZON S3 EXAMPLE
# Requires: pip install s3fs
# 
# s3_queue = FileTaskQueue(
#     project_name="s3_demo",
#     base_dir="s3://my-bucket/omniq/tasks",
#     queues=["high", "medium", "low"],
#     storage_options={
#         "key": "your_access_key_id",
#         "secret": "your_secret_access_key",
#         "region": "us-east-1"  # Optional
#     }
# )
# 
# s3_results = FileResultStorage(
#     project_name="s3_demo",
#     base_dir="s3://my-bucket/omniq/results",
#     storage_options={
#         "key": "your_access_key_id",
#         "secret": "your_secret_access_key",
#         "region": "us-east-1"
#     }
# )
# 
# # Create OmniQ with S3 storage
# s3_oq = OmniQ(
#     project_name="s3_demo",
#     task_queue=s3_queue,
#     result_store=s3_results
# )
# 
# # Usage is identical to local storage
# with s3_oq:
#     task_id = s3_oq.enqueue(
#         sample_task,
#         func_args={"data": "S3 Task", "multiplier": 2}
#     )
#     result = s3_oq.get_result(task_id)
#     print(f"S3 Result: {result}")

print("📝 S3 configuration example shown above (commented out)")

### Azure Blob Storage Configuration

In [None]:
# AZURE BLOB STORAGE EXAMPLE
# Requires: pip install adlfs
#
# azure_queue = FileTaskQueue(
#     project_name="azure_demo",
#     base_dir="abfs://my-container/omniq/tasks",
#     queues=["high", "medium", "low"],
#     storage_options={
#         "account_name": "mystorageaccount",
#         "account_key": "your_account_key"
#         # Alternative: "sas_token": "your_sas_token"
#     }
# )
# 
# azure_results = FileResultStorage(
#     project_name="azure_demo",
#     base_dir="abfs://my-container/omniq/results",
#     storage_options={
#         "account_name": "mystorageaccount",
#         "account_key": "your_account_key"
#     }
# )
# 
# # Create OmniQ with Azure storage
# azure_oq = OmniQ(
#     project_name="azure_demo",
#     task_queue=azure_queue,
#     result_store=azure_results
# )
# 
# # Usage is identical to local storage
# with azure_oq:
#     task_id = azure_oq.enqueue(
#         sample_task,
#         func_args={"data": "Azure Task", "multiplier": 3}
#     )
#     result = azure_oq.get_result(task_id)
#     print(f"Azure Result: {result}")

print("📝 Azure Blob Storage configuration example shown above (commented out)")

### Google Cloud Storage Configuration

In [None]:
# GOOGLE CLOUD STORAGE EXAMPLE
# Requires: pip install gcsfs
#
# gcs_queue = FileTaskQueue(
#     project_name="gcs_demo",
#     base_dir="gs://my-bucket/omniq/tasks",
#     queues=["high", "medium", "low"],
#     storage_options={
#         "token": "/path/to/service-account.json"
#         # Alternative: "token": "cloud" for default credentials
#     }
# )
# 
# gcs_results = FileResultStorage(
#     project_name="gcs_demo",
#     base_dir="gs://my-bucket/omniq/results",
#     storage_options={
#         "token": "/path/to/service-account.json"
#     }
# )
# 
# # Create OmniQ with GCS storage
# gcs_oq = OmniQ(
#     project_name="gcs_demo",
#     task_queue=gcs_queue,
#     result_store=gcs_results
# )
# 
# # Usage is identical to local storage
# with gcs_oq:
#     task_id = gcs_oq.enqueue(
#         sample_task,
#         func_args={"data": "GCS Task", "multiplier": 4}
#     )
#     result = gcs_oq.get_result(task_id)
#     print(f"GCS Result: {result}")

print("📝 Google Cloud Storage configuration example shown above (commented out)")

## 5. Environment Variable Configuration

You can also configure cloud storage using environment variables for better security and flexibility.

In [None]:
import os

print("Environment Variable Configuration Examples:")
print("="*50)

print("\n🔐 For Amazon S3:")
print("export AWS_ACCESS_KEY_ID='your_access_key'")
print("export AWS_SECRET_ACCESS_KEY='your_secret_key'")
print("export AWS_DEFAULT_REGION='us-east-1'")

print("\n🔐 For Azure Blob Storage:")
print("export AZURE_STORAGE_ACCOUNT_NAME='mystorageaccount'")
print("export AZURE_STORAGE_ACCOUNT_KEY='your_account_key'")

print("\n🔐 For Google Cloud Storage:")
print("export GOOGLE_APPLICATION_CREDENTIALS='/path/to/service-account.json'")

print("\n📝 With environment variables set, you can create storage without explicit credentials:")
print("""
# No storage_options needed when using environment variables
cloud_queue = FileTaskQueue(
    project_name="cloud_demo",
    base_dir="s3://my-bucket/omniq",  # or abfs:// or gs://
    queues=["high", "medium", "low"]
)
""")

# Show current environment variables (safely)
cloud_env_vars = [
    "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_DEFAULT_REGION",
    "AZURE_STORAGE_ACCOUNT_NAME", "AZURE_STORAGE_ACCOUNT_KEY",
    "GOOGLE_APPLICATION_CREDENTIALS"
]

print("\n🔍 Current environment variables:")
for var in cloud_env_vars:
    value = os.environ.get(var)
    if value:
        # Mask sensitive values
        if "KEY" in var or "SECRET" in var:
            display_value = value[:4] + "*" * (len(value) - 8) + value[-4:] if len(value) > 8 else "***"
        else:
            display_value = value
        print(f"  ✅ {var} = {display_value}")
    else:
        print(f"  ❌ {var} = (not set)")

## 6. Performance Comparison

Let's compare the performance of different storage backends with a simple benchmark.

In [None]:
import time

def benchmark_storage(name: str, oq_instance, num_tasks: int = 5):
    """Benchmark a storage backend."""
    print(f"\n🏃 Benchmarking {name} with {num_tasks} tasks...")
    
    start_time = time.time()
    
    with oq_instance:
        # Enqueue tasks
        task_ids = []
        for i in range(num_tasks):
            task_id = oq_instance.enqueue(
                sample_task,
                func_args={"data": f"{name} Task {i+1}", "multiplier": i+1}
            )
            task_ids.append(task_id)
        
        enqueue_time = time.time()
        
        # Wait for completion
        time.sleep(1)
        
        # Get results
        results = []
        for task_id in task_ids:
            result = oq_instance.get_result(task_id)
            results.append(result)
        
        end_time = time.time()
    
    enqueue_duration = enqueue_time - start_time
    total_duration = end_time - start_time
    
    print(f"  📊 Enqueue time: {enqueue_duration:.3f}s")
    print(f"  📊 Total time: {total_duration:.3f}s")
    print(f"  ✅ All {len(results)} tasks completed successfully")
    
    return {
        "name": name,
        "enqueue_time": enqueue_duration,
        "total_time": total_duration,
        "tasks": num_tasks
    }

# Run benchmarks
print("🏁 Storage Backend Performance Comparison")
print("="*50)

benchmarks = []

# Benchmark memory storage
benchmarks.append(benchmark_storage("Memory", memory_oq))

# Benchmark local storage
benchmarks.append(benchmark_storage("Local", local_oq))

# Summary
print("\n📈 Performance Summary:")
print("-" * 50)
for bench in benchmarks:
    print(f"{bench['name']:10} | Enqueue: {bench['enqueue_time']:.3f}s | Total: {bench['total_time']:.3f}s")

fastest = min(benchmarks, key=lambda x: x['total_time'])
print(f"\n🏆 Fastest: {fastest['name']} storage ({fastest['total_time']:.3f}s total)")

## 7. Best Practices and Tips

Here are some best practices when using fsspec with OmniQ:

In [None]:
print("💡 Best Practices for fsspec with OmniQ")
print("="*50)

practices = [
    "🔐 Use environment variables for credentials instead of hardcoding",
    "🌍 Choose storage regions close to your compute resources",
    "📁 Use descriptive project names and base directories",
    "🔄 Test with memory storage first, then local, then cloud",
    "📊 Monitor storage costs and usage patterns",
    "🔒 Configure appropriate access controls and IAM policies",
    "💾 Implement backup strategies for important data",
    "🚀 Use cloud storage for distributed and high-volume workloads",
    "🧪 Use memory storage for unit tests and development",
    "📈 Profile performance with different storage backends"
]

for practice in practices:
    print(f"  {practice}")

print("\n🔧 Troubleshooting Tips:")
print("-" * 30)
troubleshooting = [
    "Check credentials and permissions for authentication errors",
    "Verify network connectivity and firewall rules",
    "Ensure bucket/container exists and has proper permissions",
    "Verify region configuration matches your resources",
    "Install required packages: s3fs, adlfs, gcsfs",
    "Enable debug logging for detailed fsspec operations"
]

for tip in troubleshooting:
    print(f"  • {tip}")

## Summary

In this notebook, we've demonstrated:

✅ **Local Filesystem Storage** - Standard file system operations  
✅ **Memory Filesystem Storage** - In-memory storage for testing  
✅ **Async Usage** - Asynchronous task processing with memory storage  
📝 **Cloud Storage Examples** - S3, Azure, and GCS configurations (commented)  
🔐 **Environment Variables** - Secure credential management  
🏁 **Performance Comparison** - Benchmarking different storage backends  
💡 **Best Practices** - Tips for production usage  

### Key Takeaways

1. **Unified Interface**: The same OmniQ API works across all storage backends
2. **Easy Migration**: Switch from local to cloud storage with minimal code changes
3. **Development Workflow**: Use memory → local → cloud progression
4. **Performance**: Memory storage is fastest, local is good for development, cloud scales best
5. **Security**: Always use environment variables or credential files for cloud access

### Next Steps

To use cloud storage in production:

1. Install required packages: `pip install s3fs adlfs gcsfs`
2. Set up cloud credentials and permissions
3. Uncomment and modify the cloud storage examples
4. Test with small workloads before scaling up
5. Monitor costs and performance

For more information, see the [README.md](README.md) file in this directory.