# Parallel Requests Demo

This notebook demonstrates making parallel POST requests to the HuggingFace Audio Classification API container endpoint.

**Model**: MIT/ast-finetuned-audioset-10-10-0.4593 (Audio Spectrogram Transformer)

**Endpoint**: http://localhost/api/v1/inference

In [2]:
import asyncio
import aiohttp
import requests
import time
import numpy as np
import soundfile as sf
from typing import List, Dict
import json

# API Configuration
API_URL = 'http://localhost/api/v1/inference'
HEALTH_URL = 'http://localhost/api/v1/health'

In [3]:
# Check if API is running
try:
    response = requests.get(HEALTH_URL, timeout=5)
    if response.status_code == 200:
        print("✅ API is healthy and ready")
        api_ready = True
    else:
        print(f"⚠️ API returned status: {response.status_code}")
        api_ready = False
except Exception as e:
    print(f"❌ API not available: {e}")
    print("Make sure to run: docker compose up -d")
    api_ready = False

✅ API is healthy and ready


In [4]:
# Generate test audio files
def create_test_audio(filename: str, audio_type: str = "music"):
    """Create a simple test audio file"""
    duration = 3  # 3 seconds
    sample_rate = 16000  # AST model expects 16kHz
    t = np.linspace(0, duration, duration * sample_rate)
    
    if audio_type == "music":
        # Musical chord (C major)
        audio = (np.sin(2 * np.pi * 261.63 * t) +  # C4
                0.5 * np.sin(2 * np.pi * 329.63 * t) +  # E4  
                0.3 * np.sin(2 * np.pi * 392.00 * t))   # G4
    elif audio_type == "speech":
        # Speech-like formants
        audio = np.sin(2 * np.pi * 200 * t) * np.sin(2 * np.pi * 5 * t)
    else:  # noise
        # Filtered noise
        audio = np.random.randn(len(t)) * 0.1
    
    # Normalize
    audio = audio / np.max(np.abs(audio)) * 0.7
    sf.write(filename, audio, sample_rate)
    return filename

# Create test files
test_files = [
    create_test_audio("music_sample.wav", "music"),
    create_test_audio("speech_sample.wav", "speech"), 
    create_test_audio("noise_sample.wav", "noise")
]

print(f"Created {len(test_files)} test audio files:")
for f in test_files:
    print(f"  📁 {f}")

Created 3 test audio files:
  📁 music_sample.wav
  📁 speech_sample.wav
  📁 noise_sample.wav


In [5]:
# Single request example
def send_single_request(audio_file: str) -> Dict:
    """Send a single POST request to the inference endpoint"""
    with open(audio_file, 'rb') as f:
        files = {'file': (audio_file, f, 'audio/wav')}
        start_time = time.time()
        response = requests.post(API_URL, files=files)
        end_time = time.time()
    
    if response.status_code == 200:
        result = response.json()
        result['request_time'] = end_time - start_time
        return result
    else:
        return {'error': f"HTTP {response.status_code}: {response.text}"}

# Test single request
if api_ready:
    print("🎯 Testing single request...")
    result = send_single_request(test_files[0])
    
    if 'error' not in result:
        print(f"\n📊 Response for {result['filename']}:")
        predictions = result['results']['predictions'][:3]  # Top 3
        
        for i, pred in enumerate(predictions):
            print(f"  {i+1}. {pred['label']}: {pred['score']:.3f}")
        
        print(f"\nRequest took: {result['request_time']:.2f}s")
    else:
        print(f"❌ Error: {result['error']}")

🎯 Testing single request...

📊 Response for music_sample.wav:
  1. Sine wave: 0.775
  2. Dial tone: 0.147
  3. Busy signal: 0.012

Request took: 3.24s


In [6]:
# Parallel requests implementation
async def send_async_request(session: aiohttp.ClientSession, audio_file: str) -> Dict:
    """Send an async POST request"""
    try:
        with open(audio_file, 'rb') as f:
            data = aiohttp.FormData()
            data.add_field('file', f, filename=audio_file, content_type='audio/wav')
            
            start_time = time.time()
            async with session.post(API_URL, data=data) as response:
                end_time = time.time()
                
                if response.status == 200:
                    result = await response.json()
                    result['request_time'] = end_time - start_time
                    result['audio_file'] = audio_file
                    return result
                else:
                    text = await response.text()
                    return {
                        'error': f"HTTP {response.status}: {text}",
                        'audio_file': audio_file,
                        'request_time': end_time - start_time
                    }
    except Exception as e:
        return {
            'error': str(e),
            'audio_file': audio_file,
            'request_time': 0
        }

async def send_parallel_requests(audio_files: List[str]) -> List[Dict]:
    """Send multiple requests in parallel"""
    async with aiohttp.ClientSession() as session:
        tasks = [send_async_request(session, file) for file in audio_files]
        results = await asyncio.gather(*tasks)
        return results

In [7]:
# Demonstrate parallel requests
if api_ready:
    print("⚡ Sending parallel requests to all test files...\n")
    
    # Send requests in parallel
    start_time = time.time()
    results = await send_parallel_requests(test_files)
    total_time = time.time() - start_time
    
    print(f"📊 Parallel Results (completed in {total_time:.2f}s):")
    print("=" * 60)
    
    for i, result in enumerate(results):
        if 'error' not in result:
            print(f"\n📁 File: {result['audio_file']}")
            print(f"⏱️  Request time: {result['request_time']:.2f}s")
            
            # Get top prediction
            predictions = result['results']['predictions']
            top_pred = max(predictions, key=lambda x: x['score'])
            print(f"🏆 Top prediction: {top_pred['label']} ({top_pred['score']:.3f})")
            
            # Show top 3
            sorted_preds = sorted(predictions, key=lambda x: x['score'], reverse=True)[:3]
            print("📈 Top 3:")
            for j, pred in enumerate(sorted_preds):
                print(f"   {j+1}. {pred['label']}: {pred['score']:.3f}")
        else:
            print(f"\n❌ Error for {result['audio_file']}: {result['error']}")
    
    print("\n" + "=" * 60)
    successful_requests = sum(1 for r in results if 'error' not in r)
    avg_response_time = np.mean([r['request_time'] for r in results if 'request_time' in r])
    
    print(f"✅ Successful requests: {successful_requests}/{len(results)}")
    print(f"⚡ Average response time: {avg_response_time:.2f}s")
    print(f"🚀 Total time for parallel requests: {total_time:.2f}s")
else:
    print("❌ API not ready. Please start the containers first.")

⚡ Sending parallel requests to all test files...

📊 Parallel Results (completed in 5.60s):

📁 File: music_sample.wav
⏱️  Request time: 5.59s
🏆 Top prediction: Sine wave (0.775)
📈 Top 3:
   1. Sine wave: 0.775
   2. Dial tone: 0.147
   3. Busy signal: 0.012

📁 File: speech_sample.wav
⏱️  Request time: 5.30s
🏆 Top prediction: Sine wave (0.716)
📈 Top 3:
   1. Sine wave: 0.716
   2. Sound effect: 0.054
   3. Busy signal: 0.034

📁 File: noise_sample.wav
⏱️  Request time: 5.47s
🏆 Top prediction: Static (0.798)
📈 Top 3:
   1. Static: 0.798
   2. White noise: 0.093
   3. Speech: 0.015

✅ Successful requests: 3/3
⚡ Average response time: 5.45s
🚀 Total time for parallel requests: 5.60s


In [8]:
# Optional: Simple stress test
if api_ready:
    print("🔥 Running a simple stress test (10 concurrent requests)...\n")
    
    # Create multiple requests using the same files
    stress_files = test_files * 4  # 12 total requests (3 files × 4)
    
    start_time = time.time()
    stress_results = await send_parallel_requests(stress_files)
    stress_time = time.time() - start_time
    
    successful = sum(1 for r in stress_results if 'error' not in r)
    failed = len(stress_results) - successful
    
    print(f"📊 Stress Test Results:")
    print(f"   Total requests: {len(stress_results)}")
    print(f"   ✅ Successful: {successful}")
    print(f"   ❌ Failed: {failed}")
    print(f"   ⚡ Total time: {stress_time:.2f}s")
    print(f"   🚀 Requests/second: {len(stress_results)/stress_time:.1f}")
    
    if successful > 0:
        response_times = [r['request_time'] for r in stress_results if 'request_time' in r and r['request_time'] > 0]
        print(f"   📈 Avg response time: {np.mean(response_times):.2f}s")
        print(f"   📊 Max response time: {max(response_times):.2f}s")

🔥 Running a simple stress test (10 concurrent requests)...

📊 Stress Test Results:
   Total requests: 12
   ✅ Successful: 12
   ❌ Failed: 0
   ⚡ Total time: 28.46s
   🚀 Requests/second: 0.4
   📈 Avg response time: 27.99s
   📊 Max response time: 28.45s


## Summary

This notebook demonstrated:

1. **✅ Health Check**: Verified the API container is running
2. **📁 Test Data**: Generated synthetic audio files for testing
3. **🎯 Single Request**: Made a single POST request to the inference endpoint
4. **⚡ Parallel Requests**: Used `asyncio` and `aiohttp` to send multiple requests concurrently
5. **📊 Response Analysis**: Printed and analyzed the API responses
6. **🔥 Stress Testing**: Tested the API with multiple concurrent requests

**Key Benefits of Parallel Requests:**
- Much faster than sequential requests
- Better utilization of the API server
- Demonstrates the scalability of the containerized solution

**To run this demo:**
1. Start the containers: `docker compose up -d`
2. Wait for the model to load (~2-3 minutes)
3. Run this notebook

The API uses the **Audio Spectrogram Transformer (AST)** model trained on AudioSet, which can classify 527 different types of audio events including music, speech, environmental sounds, and more.