# Gemini Batch API Test Notebook

This notebook demonstrates how to use the Gemini Batch API for processing large volumes of rug analysis requests asynchronously at 50% reduced cost. The Batch API is ideal for non-urgent, large-scale tasks with a target turnaround time of 24 hours.

## Features Covered:
- File-based batch processing with JSONL uploads
- Inline batch requests for smaller datasets  
- Job status monitoring and result retrieval
- Error handling and job management
- Integration with our Next.js rug processing application

## 1. Setup and Import Libraries

Import all necessary libraries for working with the Gemini Batch API, including authentication, file handling, and JSON processing.

In [None]:
# Install required packages if not already installed
# !pip install google-generativeai python-dotenv requests

import json
import time
import os
from pathlib import Path
from typing import Dict, List, Any, Optional
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Import Gemini SDK
try:
    from google import genai
    from google.genai import types
    print("‚úÖ Google GenAI SDK imported successfully")
except ImportError as e:
    print("‚ùå Error importing Google GenAI SDK:", e)
    print("üí° Install with: pip install google-generativeai")

# Standard libraries for API calls and file handling
import requests
from urllib.parse import urljoin

print("üì¶ All libraries imported successfully")

## 2. Configure Gemini API Client

Set up the Gemini API client with authentication and initialize for batch operations.

In [None]:
# Configure API credentials
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY') or os.getenv('GOOGLE_GENERATIVE_AI_API_KEY')

if not GEMINI_API_KEY:
    print("‚ùå No API key found!")
    print("üí° Set GEMINI_API_KEY or GOOGLE_GENERATIVE_AI_API_KEY environment variable")
    GEMINI_API_KEY = input("Enter your Gemini API key: ")

# Initialize Gemini client
try:
    client = genai.Client(api_key=GEMINI_API_KEY)
    print("‚úÖ Gemini client initialized successfully")
    
    # Test API connection with a simple request
    print("üîç Testing API connection...")
    # Note: You would add a simple API test here if needed
    
except Exception as e:
    print(f"‚ùå Error initializing client: {e}")

# Configuration for our rug processing batch jobs
BATCH_CONFIG = {
    'model': 'gemini-2.5-flash',
    'base_url': 'https://generativelanguage.googleapis.com/v1beta',
    'default_display_name': 'Rug Analysis Batch Job',
    'max_file_size': 2 * 1024 * 1024 * 1024,  # 2GB limit
    'poll_interval': 30  # seconds
}

print(f"üìã Batch configuration loaded:")
print(f"   Model: {BATCH_CONFIG['model']}")
print(f"   Max file size: {BATCH_CONFIG['max_file_size'] / (1024**3):.1f}GB")

## 3. Create JSONL File for Batch Requests

Generate a JSON Lines file containing multiple rug analysis requests. Each line contains a complete `GenerateContentRequest` object with a unique key for result mapping.

In [None]:
# Sample rug data for testing (simulating CSV data from our Next.js app)
sample_rugs = [
    {
        "sku": "26171",
        "title": "3'2\"x13'10\" Antique Persian Northwest Boteh Design Runner Handmade Rug 26171",
        "size": "3'2\" x 13'10\"",
        "material": "Wool",
        "origin": "IRAN (Islamic Republic of Iran)",
        "style": "Traditional",
        "prompt": "Photo-realistic hallway in Traditional Decor, featuring an indoor rug, with natural daylight. Place a runner area rug (3'2\" x 13'10\") centered under a coffee table. Rug collection: Persian; secondary: Persian,Antique; style: Traditional; origin: IRAN (Islamic Republic of Iran). Pile: Wool; foundation: Wool; material: Wool; weave type: Hand-Knotted; dominant colors: Blue, Ivory, Navy Blue, Blue, Pink, Gold, Forest Green, Purple, Black, Denim Blue. Preserve the rug's real physical proportions exactly as shown in the product image. Maintain the correct length-to-width ratio with no distortion, stretching, compression, or reshaping. Render the rug in the scene at a realistic scale relative to the room and surrounding objects. Hardwood floor, soft shadows, realistic perspective from eye level (~1.2m), 35mm lens, high detail."
    },
    {
        "sku": "28392",
        "title": "8'x10' Modern Contemporary Abstract Area Rug",
        "size": "8' x 10'",
        "material": "Polypropylene",
        "origin": "Turkey",
        "style": "Modern",
        "prompt": "Photo-realistic living room in Modern Decor, featuring an indoor rug, with natural daylight. Place a rectangle area rug (8' x 10') centered under a coffee table. Rug collection: Modern & Contemporary; secondary: Abstract; style: Modern; origin: Turkey. Pile: Low pile; foundation: Jute; material: Polypropylene; weave type: Machine-made; dominant colors: Grey, White, Black, Silver. Preserve the rug's real physical proportions exactly as shown in the product image. Maintain the correct length-to-width ratio with no distortion, stretching, compression, or reshaping. Render the rug in the scene at a realistic scale relative to the room and surrounding objects. Hardwood floor, soft shadows, realistic perspective from eye level (~1.2m), 35mm lens, high detail."
    },
    {
        "sku": "31045",
        "title": "5'x8' Vintage Distressed Oriental Area Rug",
        "size": "5' x 8'",
        "material": "Wool",
        "origin": "India",
        "style": "Transitional",
        "prompt": "Photo-realistic living room, parlor or library in Transitional Decor, featuring an indoor rug, with natural daylight. Place a rectangle area rug (5' x 8') centered under a coffee table. Rug collection: Vintage; secondary: Distressed; style: Transitional; origin: India. Pile: Medium; foundation: Cotton; material: Wool; weave type: Hand-tufted; dominant colors: Beige, Brown, Rust, Cream. Preserve the rug's real physical proportions exactly as shown in the product image. Maintain the correct length-to-width ratio with no distortion, stretching, compression, or reshaping. Render the rug in the scene at a realistic scale relative to the room and surrounding objects. Hardwood floor, soft shadows, realistic perspective from eye level (~1.2m), 35mm lens, high detail."
    }
]

def create_batch_requests(rugs_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Create batch requests in the format expected by Gemini Batch API
    """
    batch_requests = []
    
    for rug in rugs_data:
        request = {
            "key": f"rug-{rug['sku']}",
            "request": {
                "contents": [{
                    "parts": [{
                        "text": f"""Analyze this rug and generate a detailed product description based on the following prompt: {rug['prompt']}

Please provide:
1. A detailed visual description of the rug
2. Suggested room placement and styling tips  
3. Key features and benefits
4. Care instructions

Product Details:
- SKU: {rug['sku']}
- Title: {rug['title']}
- Size: {rug['size']}
- Material: {rug['material']}
- Origin: {rug['origin']}
- Style: {rug['style']}"""
                    }]
                }],
                "generation_config": {
                    "temperature": 0.7,
                    "max_output_tokens": 1000
                }
            }
        }
        batch_requests.append(request)
    
    return batch_requests

# Generate batch requests
print("üìù Creating batch requests...")
batch_requests = create_batch_requests(sample_rugs)
print(f"‚úÖ Created {len(batch_requests)} batch requests")

# Create JSONL file
jsonl_filename = "rug-batch-requests.jsonl"
jsonl_path = Path(jsonl_filename)

print(f"üíæ Writing JSONL file: {jsonl_filename}")
with open(jsonl_path, 'w', encoding='utf-8') as f:
    for request in batch_requests:
        f.write(json.dumps(request) + '\n')

file_size = jsonl_path.stat().st_size
print(f"‚úÖ JSONL file created successfully")
print(f"   File size: {file_size:,} bytes ({file_size / 1024:.1f} KB)")
print(f"   Requests: {len(batch_requests)}")

# Display first request as example
print(f"\nüìã Sample request (first item):")
print(json.dumps(batch_requests[0], indent=2)[:500] + "..." if len(json.dumps(batch_requests[0], indent=2)) > 500 else json.dumps(batch_requests[0], indent=2))

## 4. Upload File Using File API

Upload the JSONL file to Google's File API to prepare for batch job creation. Files are automatically deleted after 2 days.

In [None]:
# Upload JSONL file using Gemini File API
def upload_file_to_gemini(file_path: Path, display_name: str) -> Optional[Dict[str, Any]]:
    """
    Upload a file to Gemini File API
    """
    try:
        print(f"üì§ Uploading file: {file_path}")
        
        # Use the Gemini client to upload
        uploaded_file = client.files.upload(
            file=str(file_path),
            config=types.UploadFileConfig(
                display_name=display_name,
                mime_type='application/jsonl'
            )
        )
        
        print(f"‚úÖ File uploaded successfully!")
        print(f"   File name: {uploaded_file.name}")
        print(f"   Display name: {uploaded_file.display_name}")
        print(f"   Size: {uploaded_file.size_bytes:,} bytes")
        print(f"   State: {uploaded_file.state}")
        print(f"   Expires: {uploaded_file.expiration_time}")
        
        return {
            'name': uploaded_file.name,
            'display_name': uploaded_file.display_name,
            'size_bytes': uploaded_file.size_bytes,
            'state': uploaded_file.state,
            'uri': uploaded_file.uri,
            'expiration_time': uploaded_file.expiration_time
        }
        
    except Exception as e:
        print(f"‚ùå Error uploading file: {e}")
        return None

# Upload the JSONL file
display_name = f"Rug Batch Requests {time.strftime('%Y-%m-%d %H:%M')}"
uploaded_file_info = upload_file_to_gemini(jsonl_path, display_name)

if uploaded_file_info:
    print(f"\nüéâ File upload successful!")
    print(f"üìÅ File reference: {uploaded_file_info['name']}")
else:
    print("‚ùå File upload failed. Cannot proceed with batch job creation.")

## 5. Submit Batch Job

Create a batch job using the uploaded file. This initiates asynchronous processing at 50% cost reduction.

In [None]:
# Create batch job using the uploaded file
def create_batch_job(file_name: str, display_name: str) -> Optional[Dict[str, Any]]:
    """
    Create a batch job using an uploaded file
    """
    try:
        print(f"üöÄ Creating batch job...")
        
        batch_job = client.batches.create(
            model=BATCH_CONFIG['model'],
            src=file_name,  # Use the file name from upload
            config={
                'display_name': display_name
            }
        )
        
        print(f"‚úÖ Batch job created successfully!")
        print(f"   Job name: {batch_job.name}")
        print(f"   Display name: {batch_job.display_name}")
        print(f"   State: {batch_job.state}")
        print(f"   Model: {batch_job.model}")
        print(f"   Created: {batch_job.create_time}")
        
        return {
            'name': batch_job.name,
            'display_name': batch_job.display_name,
            'state': batch_job.state,
            'model': batch_job.model,
            'create_time': batch_job.create_time,
            'full_job': batch_job
        }
        
    except Exception as e:
        print(f"‚ùå Error creating batch job: {e}")
        return None

# Only proceed if file upload was successful
batch_job_info = None
if uploaded_file_info:
    job_display_name = f"Rug Analysis Batch - {time.strftime('%Y-%m-%d %H:%M')}"
    batch_job_info = create_batch_job(uploaded_file_info['name'], job_display_name)
    
    if batch_job_info:
        print(f"\nüéØ Batch job ready for monitoring!")
        print(f"üìã Job ID: {batch_job_info['name']}")
        
        # Store job name for monitoring
        current_job_name = batch_job_info['name']
    else:
        print("‚ùå Failed to create batch job")
else:
    print("‚ö†Ô∏è  Skipping batch job creation - no uploaded file")

## 6. Monitor Job Status

Implement polling logic to monitor batch job progress through different states: PENDING ‚Üí RUNNING ‚Üí SUCCEEDED/FAILED/CANCELLED/EXPIRED.

In [None]:
# Job status monitoring
def monitor_batch_job(job_name: str, max_polls: int = 120, poll_interval: int = 30) -> Optional[Dict[str, Any]]:
    """
    Monitor batch job status until completion or timeout
    
    Args:
        job_name: The batch job name to monitor
        max_polls: Maximum number of polling attempts (default: 120 = 1 hour)
        poll_interval: Seconds between polls (default: 30)
    """
    completed_states = {
        'JOB_STATE_SUCCEEDED',
        'JOB_STATE_FAILED', 
        'JOB_STATE_CANCELLED',
        'JOB_STATE_EXPIRED'
    }
    
    print(f"üëÄ Monitoring batch job: {job_name}")
    print(f"‚è±Ô∏è  Polling every {poll_interval} seconds (max {max_polls} attempts)")
    
    for attempt in range(1, max_polls + 1):
        try:
            # Get current job status
            batch_job = client.batches.get(name=job_name)
            
            # Display current status
            print(f"\nüìä Poll #{attempt} - Status: {batch_job.state}")
            
            if hasattr(batch_job, 'request_count'):
                print(f"   üìù Total requests: {batch_job.request_count}")
                
            if hasattr(batch_job, 'batch_stats') and batch_job.batch_stats:
                completed = getattr(batch_job.batch_stats, 'completed_request_count', 0)
                failed = getattr(batch_job.batch_stats, 'failed_request_count', 0)
                print(f"   ‚úÖ Completed: {completed}")
                print(f"   ‚ùå Failed: {failed}")
                
            # Check if job is complete
            if batch_job.state in completed_states:
                print(f"\nüèÅ Job finished with state: {batch_job.state}")
                
                if batch_job.state == 'JOB_STATE_SUCCEEDED':
                    print("üéâ Job completed successfully!")
                elif batch_job.state == 'JOB_STATE_FAILED':
                    print(f"üí• Job failed: {getattr(batch_job, 'error', 'Unknown error')}")
                elif batch_job.state == 'JOB_STATE_CANCELLED':
                    print("üõë Job was cancelled")
                elif batch_job.state == 'JOB_STATE_EXPIRED':
                    print("‚è∞ Job expired (ran longer than 48 hours)")
                
                return {
                    'job': batch_job,
                    'final_state': batch_job.state,
                    'completed': batch_job.state == 'JOB_STATE_SUCCEEDED'
                }
                
            # Wait before next poll
            if attempt < max_polls:
                print(f"‚è≥ Waiting {poll_interval} seconds for next poll...")
                time.sleep(poll_interval)
                
        except Exception as e:
            print(f"‚ùå Error polling job status: {e}")
            return None
    
    print(f"‚è∞ Timeout reached after {max_polls} attempts")
    return None

# Monitor the job if we created one
if 'current_job_name' in locals() and current_job_name:
    print(f"üîÑ Starting job monitoring for: {current_job_name}")
    
    # For demo purposes, let's do just a few quick polls to show the concept
    # In practice, you'd want to poll until completion
    demo_polls = 3
    poll_interval = 10  # Shorter interval for demo
    
    print(f"üìã Demo monitoring (first {demo_polls} polls, {poll_interval}s intervals)")
    
    for i in range(1, demo_polls + 1):
        try:
            batch_job = client.batches.get(name=current_job_name)
            print(f"\nüìä Poll #{i}:")
            print(f"   State: {batch_job.state}")
            print(f"   Model: {batch_job.model}")
            print(f"   Created: {batch_job.create_time}")
            
            if i < demo_polls:
                print(f"   ‚è≥ Next poll in {poll_interval}s...")
                time.sleep(poll_interval)
                
        except Exception as e:
            print(f"‚ùå Error in demo poll #{i}: {e}")
            break
    
    print(f"\nüí° For full monitoring until completion, use:")
    print(f"   final_result = monitor_batch_job('{current_job_name}')")
    
else:
    print("‚ö†Ô∏è  No active job to monitor")

## 7. Retrieve and Process Results

Download and process batch job results once the job completes successfully. Results are returned as JSONL file for file-based jobs.

In [None]:
# Results retrieval and processing
def retrieve_batch_results(job_name: str) -> Optional[List[Dict[str, Any]]]:
    """
    Retrieve and parse results from a completed batch job
    """
    try:
        # Get the completed job
        batch_job = client.batches.get(name=job_name)
        
        if batch_job.state != 'JOB_STATE_SUCCEEDED':
            print(f"‚ùå Job not successful. Current state: {batch_job.state}")
            return None
            
        print(f"‚úÖ Job completed successfully!")
        print(f"üìä Job details:")
        print(f"   Display name: {batch_job.display_name}")
        print(f"   Model: {batch_job.model}")
        print(f"   State: {batch_job.state}")
        
        results = []
        
        # Check for file-based results
        if hasattr(batch_job, 'dest') and batch_job.dest and hasattr(batch_job.dest, 'file_name'):
            result_file_name = batch_job.dest.file_name
            print(f"üìÅ Results available in file: {result_file_name}")
            
            # Download the results file
            print(f"‚¨áÔ∏è  Downloading results file...")
            file_content = client.files.download(file=result_file_name)
            
            # Parse JSONL content
            content_str = file_content.decode('utf-8') if isinstance(file_content, bytes) else str(file_content)
            lines = content_str.strip().split('\\n')
            
            print(f"üìã Processing {len(lines)} result lines...")
            
            for i, line in enumerate(lines):
                if line.strip():
                    try:
                        result = json.loads(line)
                        results.append({
                            'line_number': i + 1,
                            'key': result.get('key', f'unknown-{i+1}'),
                            'success': 'response' in result,
                            'error': result.get('error'),
                            'response_text': result.get('response', {}).get('text') if 'response' in result else None,
                            'raw_result': result
                        })
                    except json.JSONDecodeError as e:
                        print(f"‚ö†Ô∏è  Error parsing line {i+1}: {e}")
                        results.append({
                            'line_number': i + 1,
                            'key': f'parse-error-{i+1}',
                            'success': False,
                            'error': f'JSON parse error: {e}',
                            'response_text': None,
                            'raw_result': line
                        })
        
        # Check for inline results  
        elif hasattr(batch_job, 'dest') and batch_job.dest and hasattr(batch_job.dest, 'inlined_responses'):
            print(f"üìÑ Results available inline")
            inline_responses = batch_job.dest.inlined_responses
            
            for i, response in enumerate(inline_responses):
                results.append({
                    'line_number': i + 1,
                    'key': f'inline-{i+1}',
                    'success': hasattr(response, 'response') and response.response,
                    'error': getattr(response, 'error', None),
                    'response_text': getattr(response.response, 'text', None) if hasattr(response, 'response') and response.response else None,
                    'raw_result': response
                })
        
        else:
            print("‚ùå No results found in job response")
            return None
            
        # Summary
        successful = len([r for r in results if r['success']])
        failed = len([r for r in results if not r['success']])
        
        print(f"\\nüìà Results Summary:")
        print(f"   ‚úÖ Successful: {successful}")
        print(f"   ‚ùå Failed: {failed}")
        print(f"   üìä Total: {len(results)}")
        
        return results
        
    except Exception as e:
        print(f"‚ùå Error retrieving results: {e}")
        return None

def display_results_sample(results: List[Dict[str, Any]], max_samples: int = 2):
    """
    Display a sample of results for review
    """
    if not results:
        print("üì≠ No results to display")
        return
        
    print(f"\\nüîç Sample Results (showing up to {max_samples}):")
    
    successful_results = [r for r in results if r['success']]
    failed_results = [r for r in results if not r['success']]
    
    # Show successful results
    for i, result in enumerate(successful_results[:max_samples]):
        print(f"\\n‚úÖ Successful Result #{i+1}:")
        print(f"   Key: {result['key']}")
        if result['response_text']:
            text_preview = result['response_text'][:200] + "..." if len(result['response_text']) > 200 else result['response_text']
            print(f"   Response: {text_preview}")
    
    # Show failed results if any
    if failed_results and max_samples > len(successful_results):
        remaining_slots = max_samples - len(successful_results[:max_samples])
        for i, result in enumerate(failed_results[:remaining_slots]):
            print(f"\\n‚ùå Failed Result #{i+1}:")
            print(f"   Key: {result['key']}")
            print(f"   Error: {result['error']}")

# Example usage (commented out since job may still be running)
print("üí° To retrieve results after job completion:")
print("   results = retrieve_batch_results(current_job_name)")
print("   display_results_sample(results)")
print("")
print("üîß For testing with a specific job name:")
print("   # Replace 'your-job-name' with actual job name")
print("   # results = retrieve_batch_results('batches/your-job-name')")

## 8. Handle Inline Requests

For smaller batches (< 20MB), you can submit requests directly inline without uploading files. This is more convenient for testing and smaller datasets.

In [None]:
# Inline batch requests (for smaller datasets)
def create_inline_batch_job(requests: List[Dict[str, Any]], display_name: str) -> Optional[Dict[str, Any]]:
    """
    Create a batch job with inline requests (no file upload needed)
    Suitable for smaller datasets under 20MB total size
    """
    try:
        # Convert our JSONL format to inline format
        inline_requests = []
        for req in requests:
            # Extract the actual request content
            request_data = req['request']
            inline_requests.append(request_data)
        
        print(f"üöÄ Creating inline batch job with {len(inline_requests)} requests...")
        
        # Create batch job with inline requests
        batch_job = client.batches.create(
            model=BATCH_CONFIG['model'],
            src=inline_requests,  # Pass requests directly
            config={
                'display_name': display_name
            }
        )
        
        print(f"‚úÖ Inline batch job created successfully!")
        print(f"   Job name: {batch_job.name}")
        print(f"   Display name: {batch_job.display_name}")
        print(f"   State: {batch_job.state}")
        print(f"   Requests: {len(inline_requests)}")
        
        return {
            'name': batch_job.name,
            'display_name': batch_job.display_name,
            'state': batch_job.state,
            'model': batch_job.model,
            'request_count': len(inline_requests),
            'full_job': batch_job
        }
        
    except Exception as e:
        print(f"‚ùå Error creating inline batch job: {e}")
        return None

# Create a smaller sample for inline testing
inline_test_requests = batch_requests[:2]  # Just first 2 requests for testing

print(f"üìù Testing inline batch with {len(inline_test_requests)} requests")

# Calculate approximate size
sample_size = len(json.dumps(inline_test_requests).encode('utf-8'))
print(f"üìè Estimated size: {sample_size:,} bytes ({sample_size / 1024:.1f} KB)")

if sample_size < 20 * 1024 * 1024:  # 20MB limit
    print("‚úÖ Size is within inline batch limits")
    
    # Create inline batch job
    inline_job_name = f"Inline Rug Test - {time.strftime('%Y-%m-%d %H:%M')}"
    inline_job_info = create_inline_batch_job(inline_test_requests, inline_job_name)
    
    if inline_job_info:
        print(f"\\nüéØ Inline batch job created!")
        print(f"üìã Job ID: {inline_job_info['name']}")
        
        # Quick status check
        try:
            current_job = client.batches.get(name=inline_job_info['name'])
            print(f"üìä Current status: {current_job.state}")
        except Exception as e:
            print(f"‚ö†Ô∏è  Could not check status: {e}")
    else:
        print("‚ùå Failed to create inline batch job")
else:
    print("‚ö†Ô∏è  Requests too large for inline batch (>20MB)")

print(f"\\nüí° Inline batches are ideal for:")
print(f"   ‚Ä¢ Testing and prototyping")
print(f"   ‚Ä¢ Small datasets (< 20MB)")
print(f"   ‚Ä¢ Quick turnaround scenarios")
print(f"   ‚Ä¢ Results returned directly in response")

## 9. Implement Error Handling

Comprehensive error handling for batch operations, including job management, cancellation, and failure recovery.

In [None]:
# Error handling and job management utilities

def cancel_batch_job(job_name: str) -> bool:
    """
    Cancel a running batch job
    """
    try:
        print(f"üõë Cancelling batch job: {job_name}")
        client.batches.cancel(name=job_name)
        
        # Verify cancellation
        batch_job = client.batches.get(name=job_name)
        if batch_job.state == 'JOB_STATE_CANCELLED':
            print("‚úÖ Job cancelled successfully")
            return True
        else:
            print(f"‚ö†Ô∏è  Job state after cancellation: {batch_job.state}")
            return False
            
    except Exception as e:
        print(f"‚ùå Error cancelling job: {e}")
        return False

def delete_batch_job(job_name: str) -> bool:
    """
    Delete a batch job (removes it completely)
    """
    try:
        print(f"üóëÔ∏è  Deleting batch job: {job_name}")
        client.batches.delete(name=job_name)
        print("‚úÖ Job deleted successfully")
        return True
        
    except Exception as e:
        print(f"‚ùå Error deleting job: {e}")
        return False

def list_batch_jobs(limit: int = 10) -> List[Dict[str, Any]]:
    """
    List recent batch jobs for monitoring and management
    """
    try:
        print(f"üìã Listing recent batch jobs (limit: {limit})...")
        
        # Note: The actual list method may vary depending on SDK version
        # This is a conceptual implementation
        jobs = client.batches.list(limit=limit)
        
        job_list = []
        for job in jobs:
            job_info = {
                'name': job.name,
                'display_name': getattr(job, 'display_name', 'N/A'),
                'state': job.state,
                'model': getattr(job, 'model', 'N/A'),
                'create_time': getattr(job, 'create_time', 'N/A')
            }
            job_list.append(job_info)
            
        return job_list
        
    except Exception as e:
        print(f"‚ùå Error listing jobs: {e}")
        return []

def handle_batch_errors(results: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Analyze and categorize batch processing errors
    """
    if not results:
        return {'error': 'No results provided'}
    
    error_summary = {
        'total_requests': len(results),
        'successful': 0,
        'failed': 0,
        'errors_by_type': {},
        'failed_keys': []
    }
    
    for result in results:
        if result['success']:
            error_summary['successful'] += 1
        else:
            error_summary['failed'] += 1
            error_summary['failed_keys'].append(result['key'])
            
            # Categorize error types
            error_msg = str(result.get('error', 'Unknown error'))
            error_type = 'unknown'
            
            if 'timeout' in error_msg.lower():
                error_type = 'timeout'
            elif 'quota' in error_msg.lower() or 'limit' in error_msg.lower():
                error_type = 'quota_limit'
            elif 'invalid' in error_msg.lower() or 'malformed' in error_msg.lower():
                error_type = 'invalid_request'
            elif 'permission' in error_msg.lower() or 'auth' in error_msg.lower():
                error_type = 'auth_error'
            
            error_summary['errors_by_type'][error_type] = error_summary['errors_by_type'].get(error_type, 0) + 1
    
    # Calculate success rate
    error_summary['success_rate'] = (error_summary['successful'] / error_summary['total_requests']) * 100
    
    return error_summary

# Utility functions for robust batch processing
class BatchJobManager:
    """
    A helper class for managing batch jobs with error handling
    """
    
    def __init__(self, client, config):
        self.client = client
        self.config = config
        self.active_jobs = {}
    
    def submit_job(self, requests_or_file, display_name, job_type='file'):
        """
        Submit a batch job with error handling
        """
        try:
            if job_type == 'file':
                job = self.client.batches.create(
                    model=self.config['model'],
                    src=requests_or_file,
                    config={'display_name': display_name}
                )
            else:  # inline
                job = self.client.batches.create(
                    model=self.config['model'],
                    src=requests_or_file,
                    config={'display_name': display_name}
                )
            
            self.active_jobs[job.name] = {
                'job': job,
                'display_name': display_name,
                'submit_time': time.time(),
                'type': job_type
            }
            
            return job.name
            
        except Exception as e:
            print(f"‚ùå Failed to submit job: {e}")
            return None
    
    def monitor_job(self, job_name, callback=None):
        """
        Monitor a job with optional progress callback
        """
        if job_name not in self.active_jobs:
            print(f"‚ö†Ô∏è  Job {job_name} not found in active jobs")
            return None
        
        try:
            job = self.client.batches.get(name=job_name)
            
            if callback:
                callback(job)
            
            return job.state
            
        except Exception as e:
            print(f"‚ùå Error monitoring job {job_name}: {e}")
            return None
    
    def cleanup_jobs(self, older_than_hours=24):
        """
        Clean up old jobs
        """
        current_time = time.time()
        cutoff_time = current_time - (older_than_hours * 3600)
        
        jobs_to_remove = []
        for job_name, job_info in self.active_jobs.items():
            if job_info['submit_time'] < cutoff_time:
                try:
                    self.client.batches.delete(name=job_name)
                    jobs_to_remove.append(job_name)
                    print(f"üóëÔ∏è  Cleaned up old job: {job_name}")
                except Exception as e:
                    print(f"‚ö†Ô∏è  Could not delete old job {job_name}: {e}")
        
        for job_name in jobs_to_remove:
            del self.active_jobs[job_name]

# Example usage of error handling
print("üõ†Ô∏è  Error Handling Tools Available:")
print("   ‚Ä¢ cancel_batch_job(job_name)")
print("   ‚Ä¢ delete_batch_job(job_name)")
print("   ‚Ä¢ handle_batch_errors(results)")
print("   ‚Ä¢ BatchJobManager class for advanced management")

print("\\nüí° Best Practices for Error Handling:")
print("   ‚Ä¢ Always check job status before retrieving results")
print("   ‚Ä¢ Handle timeout scenarios gracefully")
print("   ‚Ä¢ Implement retry logic for transient failures")
print("   ‚Ä¢ Monitor quota usage to avoid limits")
print("   ‚Ä¢ Clean up old jobs regularly")

# Demonstrate error analysis on sample data
sample_error_results = [
    {'key': 'rug-1', 'success': True, 'error': None},
    {'key': 'rug-2', 'success': False, 'error': 'Request timeout'},
    {'key': 'rug-3', 'success': True, 'error': None},
    {'key': 'rug-4', 'success': False, 'error': 'Quota limit exceeded'},
]

print("\\nüìä Sample Error Analysis:")
error_analysis = handle_batch_errors(sample_error_results)
print(f"   Success rate: {error_analysis['success_rate']:.1f}%")
print(f"   Total requests: {error_analysis['total_requests']}")
print(f"   Failed requests: {error_analysis['failed']}")
print(f"   Error types: {error_analysis['errors_by_type']}")

## Summary and Next Steps

This notebook demonstrated the complete Gemini Batch API workflow for processing rug analysis requests at scale with 50% cost savings.

### Key Benefits of Batch API:
- **Cost Effective**: 50% reduction in processing costs
- **Scalable**: Handle large volumes (up to 2GB per file)
- **Asynchronous**: Non-blocking processing with 24-hour SLA
- **Reliable**: Built-in error handling and retry mechanisms

### Integration with Next.js App:
The techniques demonstrated here are already integrated into our rug processing application:

1. **File Upload**: `src/app/api/submit-batch/route.ts`
2. **Job Monitoring**: `src/app/api/batch-status/route.ts`  
3. **Result Retrieval**: `src/app/api/download-results/route.ts`
4. **Request Generation**: `src/lib/gemini-service.ts`

### Production Considerations:
- Monitor job quotas and limits
- Implement proper error recovery
- Set up job cleanup procedures
- Use appropriate polling intervals
- Handle timeout scenarios

### Testing Your Implementation:
1. Upload a CSV file through the Next.js web interface
2. Generate batch requests with your rug data
3. Submit the batch job and monitor progress
4. Download and analyze results when complete

The Batch API is ideal for large-scale rug inventory processing, product description generation, and bulk AI analysis tasks.

## 10. Cleanup and Reset

Use this section to clean up any problematic batch jobs, reset variables, and handle error states.

In [None]:
# Cleanup and Reset Tools

def cleanup_notebook_state():
    """
    Clean up notebook variables and reset state
    """
    print("üßπ Cleaning up notebook state...")
    
    # Clear global variables
    globals_to_clear = [
        'current_job_name', 
        'batch_job_info', 
        'uploaded_file_info',
        'batch_requests',
        'inline_job_info'
    ]
    
    cleared_count = 0
    for var_name in globals_to_clear:
        if var_name in globals():
            del globals()[var_name]
            cleared_count += 1
            print(f"   ‚úÖ Cleared {var_name}")
    
    print(f"üéâ Cleanup complete! Cleared {cleared_count} variables")
    return True

def safe_job_status_check(job_name: str = None):
    """
    Safely check job status with error handling
    """
    if not job_name:
        if 'current_job_name' in globals():
            job_name = current_job_name
        else:
            print("‚ùå No job name provided and no current_job_name found")
            return None
    
    try:
        print(f"üîç Checking status for job: {job_name}")
        batch_job = client.batches.get(name=job_name)
        
        print(f"üìä Job Status:")
        print(f"   Name: {getattr(batch_job, 'name', 'N/A')}")
        print(f"   State: {getattr(batch_job, 'state', 'UNKNOWN')}")
        print(f"   Display Name: {getattr(batch_job, 'display_name', 'N/A')}")
        print(f"   Model: {getattr(batch_job, 'model', 'N/A')}")
        
        return batch_job
        
    except Exception as e:
        print(f"‚ùå Error checking job status: {e}")
        print("üí° This might be normal if the job was cancelled or doesn't exist")
        return None

def force_cancel_all_jobs():
    """
    Attempt to cancel any running batch jobs (use with caution)
    """
    print("‚ö†Ô∏è  WARNING: This will attempt to cancel ALL your batch jobs!")
    response = input("Type 'YES' to confirm: ")
    
    if response != 'YES':
        print("‚ùå Cancelled by user")
        return False
    
    try:
        # This is a conceptual implementation - actual implementation may vary
        print("üõë Attempting to list and cancel jobs...")
        # Note: You would need to implement actual job listing and cancellation
        print("üí° Manual cancellation required - check your Google AI Studio dashboard")
        return True
        
    except Exception as e:
        print(f"‚ùå Error during bulk cancellation: {e}")
        return False

def reset_client_connection():
    """
    Reset the Gemini API client connection
    """
    global client
    try:
        print("üîÑ Resetting client connection...")
        
        # Re-initialize the client
        if 'GEMINI_API_KEY' in globals() and GEMINI_API_KEY:
            client = genai.Client(api_key=GEMINI_API_KEY)
            print("‚úÖ Client reconnected successfully")
            return True
        else:
            print("‚ùå No API key available for reconnection")
            return False
            
    except Exception as e:
        print(f"‚ùå Error resetting client: {e}")
        return False

# Quick cleanup options
print("üõ†Ô∏è  Cleanup Tools Available:")
print("   ‚Ä¢ cleanup_notebook_state() - Clear all notebook variables")
print("   ‚Ä¢ safe_job_status_check() - Check job status safely") 
print("   ‚Ä¢ reset_client_connection() - Reset API client")
print("   ‚Ä¢ force_cancel_all_jobs() - Cancel running jobs (use carefully)")

print("\\nüö® If you're seeing 'undefined batch state' errors:")
print("   1. Run: cleanup_notebook_state()")
print("   2. Run: reset_client_connection()")
print("   3. Restart from cell 1 if needed")

print("\\nüí° Quick Reset:")
print("   cleanup_notebook_state()")