# sitq Interactive Tutorial

Welcome to the interactive tutorial for sitq! This notebook will guide you through the core concepts and features of the Simple Task Queue library.

## What is sitq?

sitq is a simple, reliable task queue for Python that makes it easy to:
- Execute tasks asynchronously
- Handle background processing
- Manage task dependencies
- Scale processing with multiple workers
- Handle errors gracefully

Let's get started!

## 1. Basic Setup

First, let's import sitq and set up a basic task queue.

In [None]:
# Import sitq
import sitq
import time
from datetime import datetime

# Create a task queue with in-memory SQLite backend
# In production, you'd use a file-based backend like SQLiteBackend("tasks.db")
queue = sitq.TaskQueue(backend=sitq.SQLiteBackend(":memory:"))

print("‚úÖ Task queue created successfully!")
print(f"Queue backend: {type(queue.backend).__name__}")

## 2. Creating and Enqueuing Tasks

Tasks are the fundamental unit of work in sitq. Let's create our first task!

In [None]:
# Define a simple function to be executed as a task
def greet(name):
    """Simple greeting function."""
    time.sleep(1)  # Simulate some work
    return f"Hello, {name}! Welcome to sitq!"

# Create a task
task = sitq.Task(
    function=greet,
    args=["Alice"],
    metadata={"created_by": "tutorial", "priority": "high"}
)

print("üìù Task created:")
print(f"  Function: {task.function.__name__}")
print(f"  Args: {task.args}")
print(f"  Metadata: {task.metadata}")

In [None]:
# Enqueue the task
task_id = queue.enqueue(task)

print(f"üöÄ Task enqueued with ID: {task_id}")
print(f"Task status: {queue.get_task_status(task_id)}")

## 3. Processing Tasks with Workers

Workers execute tasks from the queue. Let's create a worker and process our task.

In [None]:
# Create a worker
worker = sitq.Worker(queue)

print("üë∑ Worker created")
print(f"Worker ID: {worker.worker_id}")

In [None]:
# Process the task
print("‚è≥ Processing task...")
start_time = time.time()

result = worker.process_task(task_id)

end_time = time.time()
duration = end_time - start_time

print(f"‚úÖ Task completed in {duration:.2f} seconds")
print(f"Result: {result.value}")
print(f"Success: {result.is_success}")
print(f"Final task status: {queue.get_task_status(task_id)}")

## 4. Working with Different Task Types

Let's explore different types of tasks and their use cases.

In [None]:
# Mathematical computation task
def calculate_fibonacci(n):
    """Calculate nth Fibonacci number."""
    if n <= 1:
        return n
    a, b = 0, 1
    for _ in range(2, n + 1):
        a, b = b, a + b
    return b

# Data processing task
def process_data(data):
    """Process a list of numbers."""
    return {
        "count": len(data),
        "sum": sum(data),
        "average": sum(data) / len(data) if data else 0,
        "max": max(data) if data else None,
        "min": min(data) if data else None
    }

# I/O simulation task
def simulate_api_call(endpoint, delay=0.5):
    """Simulate an API call."""
    time.sleep(delay)
    return {"endpoint": endpoint, "status": 200, "data": "Success"}

print("üîß Task functions defined")

In [None]:
# Create and process different tasks
tasks = [
    ("Fibonacci", sitq.Task(function=calculate_fibonacci, args=[20])),
    ("Data Processing", sitq.Task(function=process_data, args=[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])),
    ("API Call", sitq.Task(function=simulate_api_call, args=["/api/users", 0.3]))
]

print("üìã Processing different task types:")
print("-" * 50)

for task_name, task in tasks:
    task_id = queue.enqueue(task)
    result = worker.process_task(task_id)
    
    print(f"{task_name}:")
    print(f"  Result: {result.value}")
    print(f"  Success: {result.is_success}")
    print()

## 5. Error Handling

What happens when tasks fail? Let's explore error handling.

In [None]:
# Define a function that will fail
def failing_task(should_fail=True):
    """A task that always fails."""
    if should_fail:
        raise ValueError("This task is designed to fail!")
    return "Success!"

# Define a function with a potential error
def risky_division(a, b):
    """Division that might fail."""
    return a / b

print("‚ö†Ô∏è Testing error handling")

In [None]:
# Process failing task
failing_task_obj = sitq.Task(function=failing_task, args=[True])
task_id = queue.enqueue(failing_task_obj)
result = worker.process_task(task_id)

print("‚ùå Failing task result:")
print(f"  Is Error: {result.is_error}")
print(f"  Error Message: {result.error}")
print(f"  Error Type: {type(result.error).__name__}")
print()

In [None]:
# Process risky division task
risky_task = sitq.Task(function=risky_division, args=[10, 0])  # Division by zero
task_id = queue.enqueue(risky_task)
result = worker.process_task(task_id)

print("üö® Risky task result:")
print(f"  Is Error: {result.is_error}")
print(f"  Error Message: {result.error}")
print(f"  Error Type: {type(result.error).__name__}")

## 6. Task Retry Logic

sitq provides built-in retry mechanisms. Let's see how they work.

In [None]:
# Create a flaky function that fails a few times before succeeding
attempt_count = 0

def flaky_task():
    """A task that fails a few times before succeeding."""
    global attempt_count
    attempt_count += 1
    
    if attempt_count < 3:
        raise ValueError(f"Attempt {attempt_count} failed")
    
    return f"Success on attempt {attempt_count}!"

# Reset attempt counter
attempt_count = 0

# Create task with retry configuration
retry_task = sitq.Task(
    function=flaky_task,
    max_retries=3,
    retry_delay=0.1  # 100ms between retries
)

print("üîÑ Testing retry logic")
print(f"Max retries: {retry_task.max_retries}")
print(f"Retry delay: {retry_task.retry_delay}s")

In [None]:
# Process the flaky task
task_id = queue.enqueue(retry_task)
result = worker.process_task(task_id)

print(f"‚úÖ Final result: {result.value}")
print(f"Success: {result.is_success}")
print(f"Total attempts made: {attempt_count}")

## 7. Batch Processing

Let's explore how to process multiple tasks efficiently.

In [None]:
# Create multiple tasks
def square_number(x):
    """Square a number."""
    time.sleep(0.1)  # Simulate work
    return x ** 2

# Create a batch of tasks
batch_size = 10
tasks = []

for i in range(batch_size):
    task = sitq.Task(
        function=square_number,
        args=[i],
        priority=i  # Lower numbers = higher priority
    )
    tasks.append(task)

print(f"üì¶ Created {len(tasks)} tasks for batch processing")

In [None]:
# Enqueue all tasks at once
start_time = time.time()
task_ids = queue.enqueue_batch(tasks)
enqueue_time = time.time() - start_time

print(f"‚ö° Enqueued {len(task_ids)} tasks in {enqueue_time:.3f} seconds")
print(f"Enqueue rate: {len(task_ids)/enqueue_time:.1f} tasks/sec")

In [None]:
# Process all tasks
start_time = time.time()
results = []

for task_id in task_ids:
    result = worker.process_task(task_id)
    results.append(result)

processing_time = time.time() - start_time

print(f"‚ö° Processed {len(results)} tasks in {processing_time:.3f} seconds")
print(f"Processing rate: {len(results)/processing_time:.1f} tasks/sec")
print()

# Show some results
print("Sample results:")
for i, result in enumerate(results[:5]):
    print(f"  Task {i}: {result.value} (Success: {result.is_success})")

## 8. Task Priorities

Tasks can have priorities to control execution order. Let's see this in action.

In [None]:
# Create tasks with different priorities
def create_task_with_priority(name, priority):
    """Create a task that returns its name and priority."""
    def task_func():
        time.sleep(0.1)
        return {"name": name, "priority": priority}
    
    return sitq.Task(
        function=task_func,
        priority=priority
    )

# Create tasks with mixed priorities
priority_tasks = [
    create_task_with_priority("Low Priority 1", 10),
    create_task_with_priority("High Priority 1", 1),
    create_task_with_priority("Medium Priority", 5),
    create_task_with_priority("Low Priority 2", 10),
    create_task_with_priority("High Priority 2", 1),
]

print("üéØ Creating tasks with different priorities")
for task in priority_tasks:
    task_id = queue.enqueue(task)
    print(f"  Enqueued task with priority {task.priority}")

In [None]:
# Process tasks in priority order
print("üìã Processing tasks (should be in priority order):")
print("-" * 50)

processed_order = []

for _ in range(len(priority_tasks)):
    # Get next task (should respect priority)
    task = queue.dequeue()
    if task:
        result = worker.process_task(task.id)
        processed_order.append(result.value)
        print(f"  {result.value['name']} (Priority: {result.value['priority']})")

print("\n‚úÖ All tasks processed!")

## 9. Queue Statistics and Monitoring

Let's explore how to monitor queue health and performance.

In [None]:
# Get queue statistics
stats = queue.get_stats()

print("üìä Queue Statistics:")
print(f"  Total tasks: {stats.total_tasks}")
print(f"  Queued tasks: {stats.queued_tasks}")
print(f"  Running tasks: {stats.running_tasks}")
print(f"  Completed tasks: {stats.completed_tasks}")
print(f"  Failed tasks: {stats.failed_tasks}")

In [None]:
# Get worker statistics
worker_stats = worker.get_stats()

print("üë∑ Worker Statistics:")
print(f"  Worker ID: {worker_stats.worker_id}")
print(f"  Tasks processed: {worker_stats.tasks_processed}")
print(f"  Tasks failed: {worker_stats.tasks_failed}")
print(f"  Success rate: {worker_stats.success_rate:.1%}")
print(f"  Average task duration: {worker_stats.avg_task_duration:.3f}s")
print(f"  Is running: {worker_stats.is_running}")

## 10. Continuous Worker Operation

Workers can run continuously, processing tasks as they arrive.

In [None]:
# Create a new worker for continuous operation
continuous_worker = sitq.Worker(queue, worker_id="continuous_worker")

# Enqueue some tasks
def quick_task(name):
    """Quick task for demonstration."""
    time.sleep(0.2)
    return f"Completed {name}"

print("üîÑ Enqueuing tasks for continuous processing...")

for i in range(5):
    task = sitq.Task(function=quick_task, args=[f"Task {i}"])
    task_id = queue.enqueue(task)
    print(f"  Enqueued: {task_id}")

print("\n‚è≥ Starting continuous worker for 3 seconds...")

In [None]:
# Run worker for a short time
import threading

def run_worker_for_duration(worker, duration):
    """Run worker for specified duration."""
    worker.run(duration=duration)

# Start worker in background thread
worker_thread = threading.Thread(
    target=run_worker_for_duration,
    args=(continuous_worker, 3.0)
)
worker_thread.start()

# Add more tasks while worker is running
time.sleep(1)
print("‚ûï Adding more tasks while worker is running...")

for i in range(5, 8):
    task = sitq.Task(function=quick_task, args=[f"Late Task {i}"])
    task_id = queue.enqueue(task)
    print(f"  Enqueued: {task_id}")

# Wait for worker to finish
worker_thread.join()
print("\n‚úÖ Continuous worker finished!")

## 11. Using the Synchronous Wrapper

For simple use cases, sitq provides a synchronous wrapper that simplifies task execution.

In [None]:
# Use the synchronous wrapper
with sitq.SyncTaskQueue() as sync_queue:
    print("üîÑ Using synchronous wrapper")
    
    # Execute a task directly
    result = sync_queue.execute(lambda x: x * 2, 21)
    print(f"Direct execution result: {result}")
    
    # Create and execute a task
    task = sitq.Task(function=lambda name: f"Hello {name}!", args=["Sync"])
    task_id = sync_queue.enqueue(task)
    result = sync_queue.get_result(task_id)
    print(f"Task result: {result.value}")

print("‚úÖ Synchronous wrapper demo completed")

## 12. Real-world Example: Data Processing Pipeline

Let's create a realistic data processing pipeline using sitq.

In [None]:
# Define pipeline stages
def extract_data(source):
    """Extract data from source."""
    time.sleep(0.5)  # Simulate data extraction
    return {
        "source": source,
        "data": [i for i in range(1, 101)],  # Numbers 1-100
        "extracted_at": datetime.now().isoformat()
    }

def transform_data(data_dict):
    """Transform the data."""
    time.sleep(0.3)  # Simulate transformation
    data = data_dict["data"]
    return {
        **data_dict,
        "transformed_data": [x * 2 for x in data],
        "transformed_at": datetime.now().isoformat()
    }

def load_data(data_dict):
    """Load data to destination."""
    time.sleep(0.2)  # Simulate loading
    transformed_data = data_dict["transformed_data"]
    return {
        "status": "loaded",
        "record_count": len(transformed_data),
        "sum": sum(transformed_data),
        "loaded_at": datetime.now().isoformat()
    }

print("üè≠ Data pipeline functions defined")

In [None]:
# Execute the pipeline
print("üöÄ Starting data processing pipeline...")
pipeline_start = time.time()

# Stage 1: Extract
extract_task = sitq.Task(function=extract_data, args=["database"])
extract_id = queue.enqueue(extract_task)
extract_result = worker.process_task(extract_id)
print(f"‚úÖ Extract completed: {len(extract_result.value['data'])} records")

# Stage 2: Transform
transform_task = sitq.Task(function=transform_data, args=[extract_result.value])
transform_id = queue.enqueue(transform_task)
transform_result = worker.process_task(transform_id)
print(f"‚úÖ Transform completed: {len(transform_result.value['transformed_data'])} records")

# Stage 3: Load
load_task = sitq.Task(function=load_data, args=[transform_result.value])
load_id = queue.enqueue(load_task)
load_result = worker.process_task(load_id)
print(f"‚úÖ Load completed: {load_result.value['record_count']} records loaded")

pipeline_end = time.time()
pipeline_duration = pipeline_end - pipeline_start

print(f"\nüéâ Pipeline completed in {pipeline_duration:.2f} seconds")
print(f"Final result: {load_result.value}")

## 13. Performance Comparison

Let's compare the performance of different approaches.

In [None]:
# Compare synchronous vs asynchronous processing
def cpu_intensive_task(n):
    """CPU-intensive task."""
    return sum(range(n))

# Synchronous processing
print("üîÑ Synchronous processing:")
sync_start = time.time()

for i in range(5):
    result = cpu_intensive_task(50000)
    print(f"  Task {i}: {result:,}")

sync_time = time.time() - sync_start
print(f"Synchronous time: {sync_time:.3f}s")
print()

# Asynchronous processing with sitq
print("‚ö° Asynchronous processing with sitq:")
async_start = time.time()

# Create tasks
tasks = []
for i in range(5):
    task = sitq.Task(function=cpu_intensive_task, args=[50000])
    task_id = queue.enqueue(task)
    tasks.append(task_id)

# Process tasks
for i, task_id in enumerate(tasks):
    result = worker.process_task(task_id)
    print(f"  Task {i}: {result.value:,}")

async_time = time.time() - async_start
print(f"Asynchronous time: {async_time:.3f}s")
print()

print(f"üìä Performance comparison:")
print(f"  Synchronous: {sync_time:.3f}s")
print(f"  Asynchronous: {async_time:.3f}s")
print(f"  Speedup: {sync_time/async_time:.2f}x")

## 14. Best Practices Summary

Based on what we've learned, here are some best practices for using sitq:

### ‚úÖ Best Practices

1. **Choose the Right Backend**
   - Use `:memory:` for testing and development
   - Use file-based SQLite for production
   - Configure connection pooling for better performance

2. **Handle Errors Gracefully**
   - Always check `result.is_error` after processing
   - Use retry logic for transient failures
   - Log errors for debugging

3. **Use Appropriate Task Granularity**
   - Tasks should be small enough to fail fast
   - But large enough to justify overhead
   - Batch similar operations when possible

4. **Monitor Performance**
   - Track queue statistics regularly
   - Monitor worker health and throughput
   - Set up alerts for error rates

5. **Use Priorities Wisely**
   - Lower numbers = higher priority
   - Use for critical tasks that must run first
   - Avoid over-reliance on priorities for flow control

6. **Configure Workers Appropriately**
   - Use multiple workers for CPU-bound tasks
   - Set reasonable timeouts
   - Configure retry policies based on task characteristics

### ‚ùå Common Pitfalls to Avoid

1. **Blocking Operations in Tasks**
   - Avoid long-running synchronous operations
   - Break large tasks into smaller chunks

2. **Ignoring Resource Limits**
   - Monitor memory usage
   - Don't create too many workers
   - Handle resource exhaustion gracefully

3. **Poor Error Handling**
   - Don't ignore task failures
   - Log errors with context
   - Implement proper retry logic

4. **Inefficient Task Design**
   - Avoid tasks that are too small (high overhead)
   - Avoid tasks that are too large (poor failure isolation)
   - Don't pass large objects in task arguments

## 15. Next Steps

Congratulations! You've completed the sitq interactive tutorial. Here's what you can do next:

### üìö Learn More
- Read the [sitq documentation](https://sitq.readthedocs.io/)
- Explore the [API reference](../reference/api/)
- Check out the [examples](../how-to/examples/)

### üõ†Ô∏è Build Something
- Create a web application with background tasks
- Build a data processing pipeline
- Implement a microservices architecture

### ü§ù Contribute
- Report bugs on GitHub
- Suggest new features
- Submit pull requests

### üéØ Advanced Topics
- Custom backends
- Advanced serialization
- Performance optimization
- Production deployment

## üéâ Thank You!

Thank you for trying out sitq! We hope this tutorial has been helpful. If you have any questions or feedback, please don't hesitate to reach out.

**Happy task queuing! üöÄ**