# Batch Processing

This tutorial demonstrates how to use Kluster.ai's batch API to process multiple AI tasks efficiently. 

You'll learn how to **create, submit, monitor**, and **retrieve** results from batch jobs, all with straightforward Python code.

### 1. Setup and Prerequisites

First, let's set up our environment with the necessary libraries:

In [None]:
# Install required libraries
%pip install openai pandas tqdm jupyterlab ipywidgets

In [None]:
# Import libraries
import os
import json
import time
import pandas as pd
import numpy as np
from getpass import getpass
from openai import OpenAI
from tqdm.notebook import tqdm
from IPython.display import display, Markdown, clear_output

### 1.1 Configuration Parameters

Set the model and batch size parameters below to configure your batch job:

In [92]:
# User-configurable parameters

# Choose your model - options include:
# - google/gemma-3-27b-it (fast)
# - meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8 (good quality, slower)
# - meta-llama/Llama-4-Scout-17B-16E-Instruct (fastest)
# - klusterai/Qwen2.5-7B-Instruct-Turbo (fast)
# - deepseek-ai/DeepSeek-V3-0324 (fastest, good quality)
# - deepseek-ai/DeepSeek-R1 (good quality, slower)

PRIMARY_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"  # Choose a faster model by default

# For certain tasks we might want a separate model (optional)
SECONDARY_MODEL = PRIMARY_MODEL  # Default: use same model for all tasks

# Number of quotes to process (for testing, use a small number)
NUM_QUOTES = 15  # Start with 5 for testing, increase later

In [31]:
# Set up Kluster.ai client
api_key = getpass("Enter your kluster.ai API key: ")

# Initialize OpenAI client pointing to kluster.ai API
client = OpenAI(
    base_url="https://api.kluster.ai/v1",
    api_key=api_key,
)

### 2. Creating a Fun Batch Processing Job

Let's create a batch job that analyzes movie quotes across multiple dimensions:

In [82]:
# Sample movie quotes for analysis
movie_quotes = [
    "May the Force be with you.",
    "I'm going to make him an offer he can't refuse.",
    "You talking to me?",
    "E.T. phone home.",
    "Here's looking at you, kid.",
    "Go ahead, make my day.",
    "Show me the money!",
    "Houston, we have a problem.",
    "I'll be back.",
    "Life is like a box of chocolates, you never know what you're gonna get.",
    "There's no place like home.",
    "I see dead people.",
    "My precious...",
    "To infinity and beyond!",
    "You can't handle the truth!"
]

# Limit quotes to the configured number
movie_quotes = movie_quotes[:NUM_QUOTES]

# Create a DataFrame for better visualization
quotes_df = pd.DataFrame({"quote": movie_quotes})
display(quotes_df)

Unnamed: 0,quote
0,May the Force be with you.
1,I'm going to make him an offer he can't refuse.
2,You talking to me?
3,E.T. phone home.
4,"Here's looking at you, kid."
5,"Go ahead, make my day."
6,Show me the money!
7,"Houston, we have a problem."
8,I'll be back.
9,"Life is like a box of chocolates, you never know what you're gonna get."


Now, let's create different tasks for our batch job:

In [93]:
# Define multiple tasks for each quote
batch_requests = []

for idx, quote in enumerate(movie_quotes):
    # Task 1: Sentiment analysis
    sentiment_request = {
        "custom_id": f"sentiment-{idx}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": PRIMARY_MODEL,
            "messages": [
                {"role": "system", "content": "You are an expert sentiment analyst. Respond with only a single word: POSITIVE, NEGATIVE, or NEUTRAL."},
                {"role": "user", "content": f"Analyze the sentiment of this quote: '{quote}'"}
            ],
            "max_completion_tokens": 100,
        }
    }
    batch_requests.append(sentiment_request)
    
    # Task 2: Identify movie
    movie_request = {
        "custom_id": f"movie-{idx}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": PRIMARY_MODEL,
            "messages": [
                {"role": "system", "content": "You are a movie expert. Name only the movie this quote is from. Just the title, nothing else."},
                {"role": "user", "content": f"What movie is this quote from: '{quote}'"}
            ],
            "max_completion_tokens": 100,
        }
    }
    batch_requests.append(movie_request)
    
    # Task 3: Generate a follow-up line
    followup_request = {
        "custom_id": f"followup-{idx}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": SECONDARY_MODEL,  # Using the secondary model (could be the same)
            "messages": [
                {"role": "system", "content": "You are a creative scriptwriter. Write a creative and witty follow-up line to this movie quote."},
                {"role": "user", "content": f"Write a creative follow-up line to this movie quote: '{quote}'"}
            ],
            "max_completion_tokens": 200,
        }
    }
    batch_requests.append(followup_request)

print(f"Created {len(batch_requests)} batch requests for processing ({NUM_QUOTES} quotes × 3 task types)")

Created 45 batch requests for processing (15 quotes × 3 task types)


## 3. Submitting the Batch Job

Now let's save our requests to a JSONL file and submit the batch job:

In [94]:
# Save tasks to a JSONL file
file_name = "movie_quotes_analysis.jsonl"
with open(file_name, "w") as file:
    for request in batch_requests:
        file.write(json.dumps(request) + "\n")

# Upload batch job file
print("Uploading batch file...")
batch_input_file = client.files.create(
    file=open(file_name, "rb"),
    purpose="batch"
)
print(f"File uploaded with ID: {batch_input_file.id}")

# Submit batch job
print("Submitting batch job...")
batch_request = client.batches.create(
    input_file_id=batch_input_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
)
print(f"Batch job submitted with ID: {batch_request.id}")

Uploading batch file...
File uploaded with ID: 68221787adafd73172ea7369
Submitting batch job...
File uploaded with ID: 68221787adafd73172ea7369
Submitting batch job...
Batch job submitted with ID: 68221787f3098dc792a547f9
Batch job submitted with ID: 68221787f3098dc792a547f9


## 3.1 Monitoring Batch Job Progress

Let's create a robust progress display for our batch job with improved error handling:

In [95]:
# Poll the batch status with enhanced error handling and progress display
from tqdm import tqdm

print("Monitoring batch job progress...")
max_retries = 3  # Maximum number of consecutive API errors we'll tolerate
retry_count = 0
max_duration = 600  # Maximum seconds to wait (10 minutes)
start_time = time.time()

with tqdm(total=len(batch_requests)) as pbar:
    completed_tasks = 0
    
    while True:
        try:
            # Check if we've exceeded the maximum duration
            if time.time() - start_time > max_duration:
                print(f"⚠️ Exceeded maximum wait time of {max_duration} seconds. Check job status manually.")
                break
                
            batch_status = client.batches.retrieve(batch_request.id)
            retry_count = 0  # Reset retry counter on successful API call
            
            # Update progress bar
            new_completed = batch_status.request_counts.completed - completed_tasks
            if new_completed > 0:
                pbar.update(new_completed)
                completed_tasks = batch_status.request_counts.completed
            
            # Display current status with color indicators
            status_color = {
                "validating": "🟡",
                "in_progress": "🔵",
                "finalizing": "🟠",
                "completed": "🟢",
                "failed": "🔴",
                "cancelled": "⚫"
            }.get(batch_status.status.lower(), "⚪")
            
            # Add failed count to the description if any
            status_desc = f"{status_color} Status: {batch_status.status}"
            if batch_status.request_counts.failed > 0:
                status_desc += f" (Failed: {batch_status.request_counts.failed})"
                
            pbar.set_description(status_desc)
            
            # Check if the job is done (completed, failed, or cancelled)
            if batch_status.status.lower() in ["completed", "failed", "cancelled"]:
                if batch_status.status.lower() == "failed":
                    print(f"\n⚠️ Batch job failed. Please check the job status manually.")
                break
                
            # Wait before checking again - don't poll too frequently
            time.sleep(5)
            
        except Exception as e:
            retry_count += 1
            if retry_count >= max_retries:
                print(f"\n❌ Error monitoring batch job after {max_retries} retries: {str(e)}")
                print("Please check the status manually using the batch job ID.")
                break
            time.sleep(2)  # Brief pause before retry

Monitoring batch job progress...


🟢 Status: completed: 100%|██████████| 45/45 [00:05<00:00,  8.32it/s]  
🟢 Status: completed: 100%|██████████| 45/45 [00:05<00:00,  8.32it/s]  


## 4.Retrieving and Processing Results

Once our batch job completes, let's retrieve and process the results:

In [96]:
# Retrieve and process batch results
try:
    # Get the latest job status
    batch_status = client.batches.retrieve(batch_request.id)
    
    if batch_status.status.lower() == "completed":
        # Retrieve and parse results
        results = []
        result_content = client.files.content(batch_status.output_file_id).content
        for line in result_content.decode().strip().split("\n"):
            results.append(json.loads(line))
        
        print(f"Retrieved {len(results)} results successfully")
        
        # Organize results by quote index and task type
        organized_data = {idx: {"quote": quote, "sentiment": "", "movie": "", "followup": ""} 
                         for idx, quote in enumerate(movie_quotes)}
        
        # Extract content from each result and store in the organized structure
        for result in results:
            custom_id = result.get("custom_id", "")
            if "-" in custom_id:
                task_type, idx_str = custom_id.split("-", 1)
                idx = int(idx_str)
                
                # Extract content from the nested response structure
                body = result.get("response", {}).get("body", {})
                choices = body.get("choices", [])
                if choices and len(choices) > 0:
                    content = choices[0].get("message", {}).get("content", "")
                    if idx < len(movie_quotes):
                        organized_data[idx][task_type] = content
        
        # Create a DataFrame for display
        analysis_results = []
        for idx, data in organized_data.items():
            analysis_results.append({
                "Quote": data["quote"],
                "Movie": data["movie"],
                "Sentiment": data["sentiment"],
                "Follow-up Line": data["followup"]
            })
        
        # Display results as a nice DataFrame
        pd.set_option('display.max_colwidth', None)  # Show full cell contents
        results_df = pd.DataFrame(analysis_results)
        display(results_df)
        
        # Show sentiment distribution
        sentiment_counts = results_df["Sentiment"].value_counts()
        print("\nSentiment Distribution:")
        for sentiment, count in sentiment_counts.items():
            print(f"{sentiment}: {count}")
    elif batch_status.status.lower() == "failed":
        print(f"❌ Batch job failed. Check your API key, quota, and model availability.")
    else:
        print(f"⚠️ Batch job has status: {batch_status.status}")
except Exception as e:
    print(f"❌ Error retrieving batch results: {str(e)}")

Retrieved 45 results successfully


Unnamed: 0,Quote,Movie,Sentiment,Follow-up Line
0,May the Force be with you.,Star Wars: Episode IV - A New Hope,POSITIVE,'And may your wifi be strong enough to stream the Empire's demise.'
1,I'm going to make him an offer he can't refuse.,The Godfather,NEGATIVE,"'And if that doesn't work, I'll send Vito over to discuss the fine print... over a nice plate of pasta.'"
2,You talking to me?,Taxi Driver,NEUTRAL,"'Only if you're talking back, because I'm buying.'"
3,E.T. phone home.,E.T. the Extra-Terrestrial,NEUTRAL,"'E.T. pay his Verizon bill, he's getting some serious overage charges!'"
4,"Here's looking at you, kid.",Casablanca,POSITIVE,"""And here's hoping you're still looking back, because I'm about to make a scene that'll be etched in your memory forever."""
5,"Go ahead, make my day.",Sudden Impact,NEGATIVE,"""But first, make it a cappuccino day, I've got a caffeine craving that's more deadly than your aim."""
6,Show me the money!,Jerry Maguire,POSITIVE,"""I'm not just looking for a payday, I'm looking for a lifetime supply of Benjamins – and a 401k to match!"""
7,"Houston, we have a problem.",Apollo 13,NEGATIVE,"""And by 'problem,' I mean a guy in a tin can who's running out of oxygen and hasn't showered in weeks – literally, a space-astrophe!"""
8,I'll be back.,The Terminator,NEGATIVE,"""And when I am, you'll be begging for a refund on that 'Hasta La Vista, Baby' t-shirt."""
9,"Life is like a box of chocolates, you never know what you're gonna get.",Forrest Gump,POSITIVE,"'But at least with a box of chocolates, you can lick your way out of a bad decision.'"



Sentiment Distribution:
POSITIVE: 7
NEGATIVE: 6
NEUTRAL: 2


## 5. Advanced: Listing and Managing Batch Jobs

Let's add functionality to list and manage our batch jobs:

In [97]:
# List all batch jobs
def list_batch_jobs():
    try:
        response = client.batches.list(limit=10)
        
        # Display as a DataFrame
        jobs_data = []
        for job in response.data:
            jobs_data.append({
                "ID": job.id,
                "Status": job.status,
                "Created": pd.to_datetime(job.created_at, unit='s'),
                "Completed": pd.to_datetime(job.completed_at, unit='s') if job.completed_at else "N/A",
                "Total Requests": job.request_counts.total,
                "Completed Requests": job.request_counts.completed
            })
            
        return pd.DataFrame(jobs_data)
    except Exception as e:
        print(f"Error listing batch jobs: {e}")
        return pd.DataFrame()

# Display recent batch jobs
recent_jobs = list_batch_jobs()
display(Markdown("## Recent Batch Jobs"))
display(recent_jobs)

## Recent Batch Jobs

Unnamed: 0,ID,Status,Created,Completed,Total Requests,Completed Requests
0,68221787f3098dc792a547f9,completed,2025-05-12 15:45:11,2025-05-12 15:45:14,45,45
1,68221745e5f4b4fb13176a22,completed,2025-05-12 15:44:05,2025-05-12 15:44:14,45,45
2,68221701e5f4b4fb13176714,completed,2025-05-12 15:42:57,2025-05-12 15:43:06,45,45
3,682216156c35fc54cc07705d,completed,2025-05-12 15:39:01,2025-05-12 15:39:04,45,45
4,682215d5cd770dcad800627c,completed,2025-05-12 15:37:57,2025-05-12 15:38:00,45,45
5,68221598e5f4b4fb131755dd,completed,2025-05-12 15:36:56,2025-05-12 15:36:59,45,45
6,68221550f3098dc792a52978,completed,2025-05-12 15:35:44,2025-05-12 15:35:47,45,45
7,6822145badafd73172ea4e4f,completed,2025-05-12 15:31:39,2025-05-12 15:31:42,45,45
8,68221427e5f4b4fb13174539,completed,2025-05-12 15:30:47,2025-05-12 15:30:53,45,45
9,682212cf4e1240bf3c8c59b0,completed,2025-05-12 15:25:03,2025-05-12 15:25:07,45,45


In [None]:
# Function to cancel a batch job
def cancel_batch_job(batch_id):
    try:
        response = client.batches.cancel(batch_id)
        print(f"Job {batch_id} cancellation requested. New status: {response.status}")
        return response
    except Exception as e:
        print(f"Error cancelling batch job: {e}")
        return None

## Practical Use Cases

The batch processing capabilities you've just learned can be applied to many real-world scenarios:

1. **Content Moderation**: Process thousands of user comments/posts to detect inappropriate content
2. **Customer Support**: Analyze large volumes of support tickets for sentiment, urgency, and categorization
3. **Market Research**: Process survey responses or product reviews at scale
4. **Document Processing**: Extract key information from legal documents, contracts, or reports
5. **Educational Assessment**: Grade and provide feedback on student essays or assignments
6. **Media Analysis**: Analyze news articles, social media posts, or transcripts for trends and insights
7. **Data Enrichment**: Add AI-generated metadata to your existing datasets

With Kluster.ai's batch API, these tasks can be performed efficiently at scale without building complex infrastructure.

## Conclusion

## Benefits of Using Kluster.ai's Batch API

- **Efficiency**: Process thousands of tasks in parallel
- **Cost-effective**: Optimize for throughput rather than latency
- **Flexible**: Use different models and prompts within the same batch
- **Scalable**: Handle datasets of any size
- **Easy to use**: Simple integration with the OpenAI SDK

Now you can apply these batch processing techniques to your own use cases!