In [1]:
import pandas as pd
from openai import OpenAI
from pathlib import Path
import json
import base64
import pprint
from dotenv import load_dotenv

load_dotenv(override=True)

True

# OpenAI API batching

In [30]:
# testing api
system_instructions =  """You are an assistant that's helping me with this research project. The structure of this project is that we have had undergraduates label traits of images manually. These traits include things like ontology, substrates, font information, text, covid-relation, confidence, and more. You are now going to be doing what they have been doing. When provided with an image, you should return a json object that describes the image according to the structure given here. The structure you should return as is provided below, along with an example. Keep in mind there can be multiple substrates (up to 4), with multiple fonts for each substrate (up to 8). Many of the images will have multiple substrates, look closely in the images for any other substrates which are clear and sharp. When annotating the text (called copy) that is in the image, ensure that you use the markdown guide that is included here. If there is a code of some type (barcode, scan code) in the image, do not include that in the json. 
    
        This is the format you should return to me in, with comments denoting the purpose of the field for some:
    
        |||{return format}
        {
            "substrateCount": // the number of substrates in this image,
            "substrates": [
                {
                    "placement": // refer to placement examples in screenshots of form,
                    "additionalNotes": // not always necessary to include,
                    "thisIsntReallyASign": // set this field to true if this doesn't really fit any of the placement categories and isn't a sign, else false,
                    "notASignDescription": // use this if the previous field was true, describe the placement,
                    "typefaces": [
                        {
                            "typefaceStyle": [], // can be multiple, choose from the options provided to you below,
                            "copy": // make sure that the text is annotated according to the markdown guide in one of the screenshots provided to you,
                            "letteringOntology": [], // refer to the OC Fonts: Codebook Descriptions & Photo Examples file for examples and descriptions of what each of these are,
                            "messageFunction": // again refer to the OC Fonts: Codebook Descriptions & Photo Examples file for examples and descriptions of what each of these are,
                            "covidRelated": // is this text covid related?,
                            "additionalNotes": // any additional notes needed about this image
                        }
                    ],
                    "confidence": // overall confidence in your annotation, 0 being the lowest, 5 the highest,
                    "confidenceReasoning": // reasoning for confidence rating,
                    "additionalInfo": // any additional info about the substrate, not always necessary to include
                }
            ]
        }
        |||
    
        This is an example of the returned product you should give to me. This example only has one substrate but other images may have multiple.
    
        |||{return example}
        {
            "substrateCount": 1,
            "substrates": [
                {
                    "placement": "Window-stuck",
                    "additionalNotes": "decal stickers on a parking meter",
                    "thisIsntReallyASign": false,
                    "notASignDescription": "",
                    "typefaces": [
                        {
                            "typefaceStyle": ["Serif", "Stylized"],
                            "copy": "Please, no food or\\ndrink in the store.\\nThank You!",
                            "letteringOntology": ["Painted", "Pan-face"],
                            "messageFunction": "Operational information",
                            "covidRelated": false,
                            "additionalNotes": "Text is center aligned, \\\"OPEN\\\" is larger than \\\"5PM DAILY\\\""
                        }
                    ],
                    "confidence": 4,
                    "confidenceReasoning": "Could be painted or pan-face or both",
                    "additionalInfo": "Image heavily cut off"
                }
            ]
        }
        |||
    
        These are the options for message function, typeface style, ontology and placements:

|||    
// --- Annotated option definitions ---
{
  "typeface": [
    "Serif",        // Typefaces that have the serifs on the edges of the letters
    "Sans serif",   // Typefaces that do not have the serifs on the edges of the letters
    "Slab serif",   // Typefaces with thick, squared-off serifs that do not come to a point
    "Script",       // Text set in a cursive or handwriting-like typeface
    "Stylized",     // Fonts with a unique or decorative design that don't fit other categories
    "Quirky"        // Playful or informal fonts that convey a particular feeling (e.g., Comic Sans)
  ],

  "lettering_ontology": [
    "Printed",       // Ink letters printed onto paper, posterboard, or similar substrates
    "Decal",         // A sticker/film transferred onto the surface with adhesive or heat
    "Painted",       // Letters hand- or professionally-painted directly onto a surface
    "Pan channel",   // Individually-cut, three-dimensional letters (mounted to a fascia)
    "Pan face",      // A three-dimensional sign unit (one piece) usually mounted to a fascia
    "Handmade",      // Vernacular/DIY signs made by non-professionals (may accompany other ontologies)
    "Embossed",      // Text raised (3-D) above the surrounding surface
    "Debossed",      // Text engraved or pressed into the surface (3-D recess)
    "Pen or marker", // Text written using a pen or marker
    "Reader board",  // Sign with movable letters/characters to change messages
    "Spray paint",   // Letters spray-painted onto the surface (often graffiti or stenciled)
    "LED",           // Signs lit by colored lights/electrical LEDs (distinct from neon)
    "Other electronic", // Any electronic signage that is not LED or neon
    "Neon",          // Light-up signs formed from bent, gas-filled tubes (continuous-tube look)
    "Tile",          // Letters composed of or applied as tiles/mosaics
    "Chalk",         // Letters written in chalk (typically on a chalkboard)
    "House number",  // Address numbers made of individual shapes attached to a wall
    "Ghost sign"     // Remnant/faded trace of an old sign (weathered or stained letters)
  ],

  "placements": [
    "Window-stuck",   // Fixed to the window with adhesive (decal/sticker) and not easily moved
    "Window-placed",  // Mounted on the window but removable or easily moved (not adhesive)
    "Awning/canopy",  // Text on a cloth or canopy structure projecting outward over an entrance
    "Blade",          // Sign that protrudes perpendicular from the wall (hangs off a mount)
    "Fascia",         // Sign mounted to the building face, usually large and above eye level
    "Marquee",        // Protruding, attached sign (movie-theater style) attached to a structure
    "Hanging",        // Blade-style sign hanging from ceiling or roof overhang
    "Name-plate",     // Panel listing multiple names/businesses arranged in a table/line
    "Painted wall",   // Sign painted directly onto the wall surface (mural-like)
    "Freestanding",   // Moveable signs that stand alone (A-frames, sandwich boards, posters)
    "Parapet",        // Large fascia-style sign mounted on a low wall/railing at roof edge
    "Ground",         // Sign affixed to or at ground/floor level
    "Bench",          // Sign that is part of or attached to a bench
    "Flag",           // Sign printed on flexible material (flag) secured to a pole
    "Pole-mounted",   // Sign secured to a pole or post
    "Post and panel", // Freestanding board/panel attached to two posts (less moveable)
    "Pylon",          // Large freestanding sign > ~8 ft tall, supported by poles/structure
    "Banner",         // Sign printed on flexible cloth-like material and hung up
    "Wall-placed",    // Mounted on a wall but not permanently stuck (can be removed/moved)
    "Wall-stuck",     // Fixed/stuck to a wall (adhesive, screwed in, or otherwise permanent)
    "Other-stuck",    // Mounted to an atypical object or surface not covered above
    "Snipe",          // Small sign affixed to or overlaying another sign
    "Graffiti",       // Graffiti tag or stylized street art (often spray paint/paint pens)
    "Infrastructure", // Municipal or utility markings (manholes, construction spraypaint, etc.)
    "Memorial",       // Commemorative plaque or memorial (often on benches, plaques)
    "Sticker"         // Non-sign sticker (stylized sticker placed as vandalism or sticker art)
  ],

  "message_function": [
    "Identification",       // Identifies the name of the establishment or structure
    "Address",              // Address number or the name of the address/property
    "Joint tenant",         // Lists multiple businesses/people occupying the same building/area
    "Operational information", // Info about business operations (hours, phone, open/closed, etc.)
    "Advisement/regulation", // Regulatory or advisory signs (parking, warnings, no smoking, etc.)
    "Directory",            // Lists names plus location info (where to find each listed item)
    "Generic information",  // Explanatory or descriptive info (subheading, service description)
    "Menu of options",      // Lists options/services/prices (menus, price boards, service lists)
    "Commemoration",        // Memorial or homage (dedication plaques, historical displays)
    "Street name",          // Street sign or text naming a street
    "Advertisement",        // Commercial message intended to sell a product/service
    "Wayfinding",           // Directional information (arrows, "inside", "downstairs", etc.)
    "Infrastructure",       // Municipal/trade info for workers (IDs, construction labels, manhole tags)
    "Covid-related"         // Any sign specifically related to the pandemic/COVID guidance
  ]
}
// --- end annotated definitions ---
|||

This is the markdown guide for annotating the text(called copy) within images:
|||
~Smallcaps: {TEXT} // if some text that is part of the same typeface (thus in the same copy) is smaller 
~Italics: *text*
~Bold: **text**
~Bold italics: ***text***
~Underlined: __text__
~Illegible: [illegible]
|||
        """

client = OpenAI()

response = client.responses.create(
    model="gpt-5",
    input="Write a one-sentence bedtime story about a unicorn."
)
response


Response(id='resp_0368ca7d9c29081d006907fa03b0b481a28d008488287a0302', created_at=1762130435.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-5-2025-08-07', object='response', output=[ResponseReasoningItem(id='rs_0368ca7d9c29081d006907fa04d81081a29431d7c34fc2fd3d', summary=[], type='reasoning', status=None), ResponseOutputMessage(id='msg_0368ca7d9c29081d006907fa0809f481a2afb965aae8e19022', content=[ResponseOutputText(annotations=[], text='Under a moonlit sky, a drowsy unicorn named Luma tiptoed through a meadow of whispering stars, gathering dreams on her silver horn to share with every sleeping child before dawn.', type='output_text', logprobs=[])], role='assistant', status='completed', type='message')], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[], top_p=1.0, max_output_tokens=None, previous_response_id=None, reasoning=Reasoning(effort='medium', generate_summary=None, summary=None), service_tier='default', status='completed

In [None]:
import os

images_path = 'images/Garden Grove'
image_extensions = {'.JPG', '.jpg', '.jpeg'}
output_file_name = "batchGardenGrove" 
batch_size = 10

template = {
    "custom_id": "",
    "method": "POST",
    "url": "/v1/responses",
    "body": {
        "model": "gpt-5",
        "instructions": system_instructions,
        "input": [
            {
                "type": "message",  # Changed from "image_url" to "message"
                "message": {
                    "role": "user",
                    "content": [
                        {
                            "type": "input_text",
                            "text": "Use the provided system instructions to annotate this image."
                        },
                        {
                            "type": "image_url",  # image_url goes INSIDE the message content
                            "image_url": {"url": "data:image/jpeg;base64,IMAGE_DATA_HERE"}
                        }
                    ]
                }
            }
        ],
        "text": {
            "format": "text"
        }
    }
}

def encode_image(images_dir, image_name):
    '''Encodes an image into base64 so it can be stored in a jsonl file'''
    root_path = Path(images_dir)
    for file in root_path.rglob(image_name):
        with open(str(file), "rb") as image_file:
            return base64.b64encode(image_file.read()).decode("utf-8")
    print(f"Skipping an image, unable to find {image_name}")
    return None

def create_batch_object(images_path, image_extensions, output_file_name):
    # Get all files in the directory (no subfolders)
    image_files = [
        f for f in os.listdir(images_path)
        if os.path.isfile(os.path.join(images_path, f)) and 
        os.path.splitext(f)[1].lower() in image_extensions
    ]
    
    output_files = []
    print("Found", len(image_files), "images in", images_path)
    print("Processing...")
    
    name_ptr = 0
    i = 0
    
    while name_ptr < len(image_files):
        with open(f"{output_file_name}{i}.jsonl", "w") as file:
            output_files.append(f"{output_file_name}{i}.jsonl")
            
            while name_ptr < len(image_files):
                filename = image_files[name_ptr]
                
                # Create a fresh copy of the template for each image
                base64_image = encode_image(images_path, filename)
                if not base64_image:
                    name_ptr += 1
                    continue
                
                current_template = {
                    "custom_id": filename.lower(),
                    "method": "POST",
                    "url": "/v1/responses",
                    "body": {
                        "model": "gpt-5",
                        "instructions": system_instructions,
                        "input": [
                            {
                                "role": "user",
                                "content": [
                                    {
                                        "type": "input_text",
                                        "text": "Use the provided system instructions to annotate this image."
                                    },
                                    {
                                        "type": "input_image",
                                        "image_url": f"data:image/jpeg;base64,{base64_image}"
                                    }
                                ]
                            }
                        ],
                        "text": {"format": {"type": "text"}}
                    }
                }
                    
                file.write(json.dumps(current_template) + '\n')
                name_ptr += 1
                if name_ptr % batch_size == 0:  # 10 images per batch file
                    break
            
            i += 1
    
    print("Successfully processed", len(image_files), "images into", i, "separate files")
    return output_files

In [None]:
output_files = create_batch_object(images_path, image_extensions, output_file_name)

In [41]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def upload_single_file(filename, index, total):
    """Upload a single file to OpenAI"""
    client = OpenAI(timeout=300.0, max_retries=3)
    
    try:
        with open(filename, "rb") as f:
            file_obj = client.files.create(file=f, purpose="batch")
        print(f"✓ Uploaded file {index + 1}/{total}: {filename} -> {file_obj.id}")
        return file_obj.id
    except Exception as e:
        print(f"✗ Failed to upload file {index + 1}/{total}: {filename} - {e}")
        return None

def create_single_batch(file_id, index, total):
    """Create a single batch job"""
    client = OpenAI(timeout=60.0, max_retries=3)
    
    try:
        batch = client.batches.create(
            input_file_id=file_id,
            endpoint="/v1/responses",
            completion_window="24h",
            metadata={"description": "one of many garden grove jobs"}
        )
        print(f"✓ Created batch job {index + 1}/{total}: {batch.id}")
        return batch
    except Exception as e:
        print(f"✗ Failed to create batch job {index + 1}/{total}: {e}")
        return None

def sendToOpenAI(output_files, max_workers=5):
    """
    Upload files and create batch jobs using multithreading
    
    Args:
        output_files: List of file paths to upload
        max_workers: Maximum number of concurrent threads (default: 5)
    """
    file_ids = []
    
    print(f"Starting upload of {len(output_files)} files with {max_workers} threads...")
    start_time = time.time()
    
    # Step 1: Upload all files concurrently
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all upload tasks
        future_to_file = {
            executor.submit(upload_single_file, filename, i, len(output_files)): (filename, i)
            for i, filename in enumerate(output_files)
        }
        
        # Collect results as they complete
        for future in as_completed(future_to_file):
            filename, index = future_to_file[future]
            try:
                file_id = future.result()
                if file_id:
                    file_ids.append((index, file_id))  # Store with index to maintain order
            except Exception as e:
                print(f"✗ Exception during upload of {filename}: {e}")
    
    # Sort by original index to maintain order
    file_ids.sort(key=lambda x: x[0])
    file_ids = [fid for _, fid in file_ids]  # Extract just the IDs
    
    upload_time = time.time() - start_time
    print(f"\nCompleted uploads in {upload_time:.2f} seconds")
    print(f"Successfully uploaded {len(file_ids)}/{len(output_files)} files")
    print(f"File IDs: {file_ids}\n")
    
    # Step 2: Create batch jobs concurrently
    print(f"Starting creation of {len(file_ids)} batch jobs...")
    batch_start = time.time()
    
    batches = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all batch creation tasks
        future_to_id = {
            executor.submit(create_single_batch, file_id, i, len(file_ids)): (file_id, i)
            for i, file_id in enumerate(file_ids)
        }
        
        # Collect results as they complete
        for future in as_completed(future_to_id):
            file_id, index = future_to_id[future]
            try:
                batch = future.result()
                if batch:
                    batches.append((index, batch))  # Store with index to maintain order
            except Exception as e:
                print(f"✗ Exception during batch creation for {file_id}: {e}")
    
    # Sort by original index to maintain order
    batches.sort(key=lambda x: x[0])
    batches = [batch for _, batch in batches]  # Extract just the batch objects
    
    batch_time = time.time() - batch_start
    total_time = time.time() - start_time
    
    print(f"\nCompleted batch creation in {batch_time:.2f} seconds")
    print(f"Successfully created {len(batches)}/{len(file_ids)} batch jobs")
    print(f"Total time: {total_time:.2f} seconds")
    
    return batches


# Alternative: Simpler version with less verbose output
def sendToOpenAI_simple(output_files, max_workers=5):
    """Simplified multithreaded version with less output"""
    from concurrent.futures import ThreadPoolExecutor
    
    client = OpenAI(timeout=300.0, max_retries=3)
    
    def upload_file(filename):
        with open(filename, "rb") as f:
            return client.files.create(file=f, purpose="batch").id
    
    def create_batch(file_id):
        return client.batches.create(
            input_file_id=file_id,
            endpoint="/v1/responses",
            completion_window="24h",
            metadata={"description": "one of many garden grove jobs"}
        )
    
    print(f"Uploading {len(output_files)} files...")
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        file_ids = list(executor.map(upload_file, output_files))
    print(f"✓ Uploaded {len(file_ids)} files")
    
    print(f"Creating {len(file_ids)} batch jobs...")
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        batches = list(executor.map(create_batch, file_ids))
    print(f"✓ Created {len(batches)} batch jobs")
    
    return batches

In [42]:
for file in output_files:
    size_mb = os.path.getsize(file) / (1024 * 1024)
    print(f"{file}: {size_mb:.2f} MB")

batchGardenGrove0.jsonl: 34.37 MB
batchGardenGrove1.jsonl: 36.00 MB
batchGardenGrove2.jsonl: 35.93 MB
batchGardenGrove3.jsonl: 33.19 MB
batchGardenGrove4.jsonl: 33.62 MB
batchGardenGrove5.jsonl: 34.15 MB
batchGardenGrove6.jsonl: 38.93 MB
batchGardenGrove7.jsonl: 31.96 MB
batchGardenGrove8.jsonl: 41.39 MB
batchGardenGrove9.jsonl: 40.87 MB
batchGardenGrove10.jsonl: 34.95 MB
batchGardenGrove11.jsonl: 39.15 MB
batchGardenGrove12.jsonl: 31.58 MB
batchGardenGrove13.jsonl: 31.46 MB
batchGardenGrove14.jsonl: 35.75 MB
batchGardenGrove15.jsonl: 26.71 MB


In [43]:
batches = sendToOpenAI(output_files, max_workers=5)

Starting upload of 16 files with 5 threads...
✓ Uploaded file 4/16: batchGardenGrove3.jsonl -> file-YcGvAWzkFe8YkN6LdBdB96
✓ Uploaded file 3/16: batchGardenGrove2.jsonl -> file-V8GvSf7M4EwRP5EfofU7nU
✓ Uploaded file 1/16: batchGardenGrove0.jsonl -> file-YQhZZDwJ772AAkvFT93jNg
✓ Uploaded file 5/16: batchGardenGrove4.jsonl -> file-51FcUiu4HX6p7ScCjUXNEx
✓ Uploaded file 2/16: batchGardenGrove1.jsonl -> file-YSgqNSuPxHMBPTuYirdf88
✓ Uploaded file 6/16: batchGardenGrove5.jsonl -> file-XQhzY5jVSUi5CQFeSUgkYX
✓ Uploaded file 7/16: batchGardenGrove6.jsonl -> file-FLB7PFcxxdgjGhd6q8VxJ4
✓ Uploaded file 8/16: batchGardenGrove7.jsonl -> file-Xhvru5pVKdTMQc9yrm8p2i
✓ Uploaded file 10/16: batchGardenGrove9.jsonl -> file-YYUKdSL8HQhZ63W7P1ckPv
✓ Uploaded file 9/16: batchGardenGrove8.jsonl -> file-XbVAV5oEPdC7ATSokLRX7J
✗ Failed to upload file 12/16: batchGardenGrove11.jsonl - Connection error.
✗ Failed to upload file 13/16: batchGardenGrove12.jsonl - Connection error.
✓ Uploaded file 14/16: batchGar

In [36]:
import os
import json
import base64
from pathlib import Path
from openai import OpenAI
import time

# Configuration
images_path = 'images/Garden Grove'
image_extensions = {'.JPG', '.jpg', '.jpeg'}
system_instructions = """Your system instructions here..."""

def encode_image(images_dir, image_name):
    root_path = Path(images_dir)
    for file in root_path.rglob(image_name):
        with open(str(file), "rb") as image_file:
            return base64.b64encode(image_file.read()).decode("utf-8")
    print(f"Skipping an image, unable to find {image_name}")
    return None

def create_single_test_batch():
    """Create a batch file with just ONE image for testing"""
    image_files = [
        f for f in os.listdir(images_path)
        if os.path.isfile(os.path.join(images_path, f)) and 
        os.path.splitext(f)[1].lower() in image_extensions
    ]
    
    if not image_files:
        print("No images found!")
        return None
    
    # Take just the first image
    filename = image_files[0]
    print(f"Creating test batch with: {filename}")
    
    base64_image = encode_image(images_path, filename)
    if not base64_image:
        return None
    
    # CORRECTED FORMAT: input should be an array with role/content structure
    template = {
        "custom_id": filename.lower(),
        "method": "POST",
        "url": "/v1/responses",
        "body": {
            "model": "gpt-5",
            "instructions": system_instructions,
            "input": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "input_text",
                            "text": "Use the provided system instructions to annotate this image."
                        },
                        {
                            "type": "input_image",  # Changed from "image_url" to "input_image"
                            "image_url": f"data:image/jpeg;base64,{base64_image}"
                        }
                    ]
                }
            ],
            "text": {"format": {"type": "text"}}
        }
    }
    
    output_file = "test_single_image.jsonl"
    with open(output_file, "w") as file:
        file.write(json.dumps(template) + '\n')
    
    # Check file size
    size_mb = os.path.getsize(output_file) / (1024 * 1024)
    print(f"Created {output_file}: {size_mb:.2f} MB")
    
    return output_file

def upload_with_retry(filename, max_attempts=3):
    """Upload file with exponential backoff retry"""
    client = OpenAI(
        timeout=300.0,  # 5 minute timeout
        max_retries=0   # We'll handle retries manually
    )
    
    for attempt in range(max_attempts):
        try:
            print(f"Upload attempt {attempt + 1}/{max_attempts}...")
            with open(filename, "rb") as f:
                file_obj = client.files.create(
                    file=f,
                    purpose="batch"
                )
            print(f"✓ Upload successful! File ID: {file_obj.id}")
            return file_obj.id
        except Exception as e:
            print(f"✗ Upload attempt {attempt + 1} failed: {e}")
            if attempt < max_attempts - 1:
                wait_time = (2 ** attempt)  # Exponential backoff: 1, 2, 4 seconds
                print(f"Waiting {wait_time} seconds before retry...")
                time.sleep(wait_time)
            else:
                print("All upload attempts failed!")
                raise
    return None

def create_batch_job(file_id):
    """Create the batch job"""
    client = OpenAI(timeout=60.0)
    
    try:
        batch = client.batches.create(
            input_file_id=file_id,
            endpoint="/v1/responses",
            completion_window="24h",
            metadata={"description": "test batch job"}
        )
        print(f"✓ Batch job created: {batch.id}")
        return batch
    except Exception as e:
        print(f"✗ Failed to create batch job: {e}")
        raise

# Run the test
print("=== Testing Single Image Batch Upload ===\n")

# Step 1: Create test file
test_file = create_single_test_batch()
if not test_file:
    print("Failed to create test file")
else:
    print(f"\n=== Uploading {test_file} ===\n")
    
    # Step 2: Upload with retry
    try:
        file_id = upload_with_retry(test_file)
        
        # Step 3: Create batch job
        if file_id:
            print(f"\n=== Creating batch job ===\n")
            batch = create_batch_job(file_id)
            print(f"\n✓✓✓ SUCCESS! Batch ID: {batch.id}")
    except Exception as e:
        print(f"\n✗✗✗ FAILED: {e}")
        print("\nTroubleshooting steps:")
        print("1. Check your OPENAI_API_KEY is set correctly")
        print("2. Try: pip install --upgrade openai certifi")
        print("3. Check your network/firewall settings")
        print("4. Try using a VPN if you're in a restricted network")

=== Testing Single Image Batch Upload ===

Creating test batch with: GardenGrove148.jpeg
Created test_single_image.jsonl: 3.13 MB

=== Uploading test_single_image.jsonl ===

Upload attempt 1/3...
✓ Upload successful! File ID: file-KRsdXvdaUHNW8UMHr2Us93

=== Creating batch job ===

✓ Batch job created: batch_690803417ed48190aed85cc145ab9391

✓✓✓ SUCCESS! Batch ID: batch_690803417ed48190aed85cc145ab9391


# Gemini API batching

In [7]:
# If needed:
%pip -q install -U google-genai

Note: you may need to restart the kernel to use updated packages.


In [40]:

import os, json, pathlib, base64
from google import genai
from google.genai import types

# === CONFIG ===
images_path = "images/Garden Grove"   # <-- set your folder here
image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
output_prefix = "gemini_batch_"       # output JSONL name prefix
submit_to_gemini = True              # True = upload + batch submit
# =============

def find_images(folder, exts):
    p = pathlib.Path(folder)
    return [str(f) for f in sorted(p.iterdir()) if f.suffix.lower() in exts and f.is_file()]

def base64_encode_file(path):
    with open(path, "rb") as f:
        return base64.b64encode(f.read()).decode("ascii")

def mime_from_suffix(suffix):
    suf = suffix.lower().lstrip(".")
    return {
        "jpg": "image/jpeg", "jpeg": "image/jpeg",
        "png": "image/png", "webp": "image/webp", "gif": "image/gif"
    }.get(suf, "application/octet-stream")

def build_gemini_jsonl_from_folder(images_path, out_prefix, system_text):
    images = find_images(images_path, image_extensions)
    if not images:
        print("No images found in", images_path)
        return []
    jsonl_paths = []
    N = 40  # images per JSONL file
    for idx, img in enumerate(images):
        out_index = idx // N
        out_path = f"{out_prefix}{out_index}.jsonl"
        if not pathlib.Path(out_path).exists():
            open(out_path, "w").close()
        mime = mime_from_suffix(pathlib.Path(img).suffix)
        b64 = base64_encode_file(img)
        key = pathlib.Path(img).name
        req = {
            "key": key,
            "request": {
                "contents": [
                    {
                        "role": "user",
                        "parts": [
                            {"text": "Use the provided system instructions to annotate this image."},
                            {"inline_data": {"mime_type": mime, "data": b64}}
                        ]
                    }
                ],
                "generation_config": {"temperature": 0},
                "config": {
                    "system_instruction": {"parts": [{"text": system_text}]}
                }
            }
        }
        with open(out_path, "a") as fout:
            fout.write(json.dumps(req) + "\n")
        if out_path not in jsonl_paths:
            jsonl_paths.append(out_path)
    print("Created Gemini JSONL files:", jsonl_paths)
    return jsonl_paths

gemini_jsonls = build_gemini_jsonl_from_folder(images_path, output_prefix, system_instructions)

# Optionally upload + create batch jobs
if submit_to_gemini and gemini_jsonls:
    api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")
    if not api_key:
        print("GEMINI_API_KEY/GOOGLE_API_KEY not set — skipping upload/submission.")
    else:
        client = genai.Client()
        jobs = {}
        for jsonl in gemini_jsonls:
            uploaded = client.files.upload(
                file=f"./{jsonl}",
                config=types.UploadFileConfig(display_name=pathlib.Path(jsonl).name, mime_type="jsonl")
            )
            for model in ("gemini-2.5-flash", "gemini-2.5-pro"):
                job = client.batches.create(
                    model=model,
                    src=uploaded.name,
                    config={"display_name": f"{pathlib.Path(jsonl).name} -> {model}"}
                )
                jobs.setdefault(model, []).append(job.name)
                print(f"[{model}] created batch job: {job.name}")

        print("Submitted jobs (per model):")
        for m, lst in jobs.items():
            print(m, len(lst))


Created Gemini JSONL files: ['gemini_batch_0.jsonl', 'gemini_batch_1.jsonl', 'gemini_batch_2.jsonl', 'gemini_batch_3.jsonl']


KeyError: 'file'

# Claude API

In [9]:
# === CLAUDE BATCH BUILDER (25 IMAGES PER SUBMISSION) ===
# Requirements: anthropic package, requests, ANTHROPIC_API_KEY
# %pip -q install anthropic requests

import os, json, time, base64, requests
from pathlib import Path
import anthropic

# === CONFIG ===
images_path = "images/Garden Grove"   # <-- set this
image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
output_prefix = "claude_batch_"       # JSONL files will be written as claude_batch_0.jsonl ...
MAX_REQS_PER_BATCH = 25               # <-- limit by number of records
submit_to_claude = False              # True = upload and poll results
# =================

def find_images(folder, exts):
    p = Path(folder)
    return [str(f) for f in sorted(p.iterdir()) if f.suffix.lower() in exts and f.is_file()]

def base64_encode_file(path):
    with open(path, "rb") as f:
        return base64.b64encode(f.read()).decode("ascii")

def mime_from_suffix(suffix):
    suf = suffix.lower().lstrip(".")
    return {
        "jpg": "image/jpeg", "jpeg": "image/jpeg",
        "png": "image/png", "webp": "image/webp", "gif": "image/gif"
    }.get(suf, "application/octet-stream")

def build_claude_requests_from_folder(images_path, system_text):
    imgs = find_images(images_path, image_extensions)
    if not imgs:
        print("No images found:", images_path)
        return []
    requests_list = []
    for i, img in enumerate(imgs, start=1):
        mime = mime_from_suffix(Path(img).suffix)
        b64 = base64_encode_file(img)
        custom_id = Path(img).name
        params = {
            "model": "claude-sonnet-4-5",
            "system": system_text,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Use the provided system instructions to annotate this image."},
                        {"type": "image", "source": {"type": "base64", "media_type": mime, "data": b64}}
                    ]
                }
            ],
            "max_tokens": 2048
        }
        requests_list.append({"custom_id": custom_id, "params": params})
    print(f"Prepared {len(requests_list)} Claude requests from folder")
    return requests_list

def chunk_requests(requests_list, max_items=MAX_REQS_PER_BATCH):
    """Chunk purely by number of items (no size consideration)."""
    return [requests_list[i:i + max_items] for i in range(0, len(requests_list), max_items)]

def write_jsonl_chunks(chunks, prefix):
    out_files = []
    for i, c in enumerate(chunks, start=1):
        p = f"{prefix}{i-1}.jsonl"
        with open(p, "w") as f:
            for item in c:
                f.write(json.dumps(item) + "\n")
        out_files.append(p)
    print("Wrote Claude JSONL chunk files:", out_files)
    return out_files

def submit_claude_chunks_and_wait(chunks, model_alias="claude-sonnet-4-5"):
    api_key = os.getenv("ANTHROPIC_API_KEY")
    if not api_key:
        print("ANTHROPIC_API_KEY not set — skipping submission.")
        return []
    client = anthropic.Anthropic(api_key=api_key)
    results = []
    for i, chunk in enumerate(chunks, start=1):
        batch = client.messages.batches.create(requests=chunk)
        print(f"Submitted chunk {i}: {batch.id}")
        while True:
            b = client.messages.batches.retrieve(batch.id)
            status = getattr(b, "processing_status", None)
            print(f"  chunk {i} status: {status}")
            if status == "ended" and getattr(b, "results_url", None):
                break
            if status in ("failed", "cancelled", "expired"):
                print("Chunk failed/ended with status:", status)
                break
            time.sleep(5)
        results_url = getattr(b, "results_url", None)
        if results_url:
            headers = {"x-api-key": api_key, "anthropic-version": "2023-06-01"}
            out_path = f"{model_alias}_results_part{i:03d}.jsonl"
            with requests.get(results_url, headers=headers, stream=True) as r:
                r.raise_for_status()
                with open(out_path, "wb") as f:
                    for chunk_bytes in r.iter_content(chunk_size=8192):
                        f.write(chunk_bytes)
            print("Saved results to", out_path)
            results.append(out_path)
    return results

# Run
reqs = build_claude_requests_from_folder(images_path, system_instructions)
if reqs:
    chunks = chunk_requests(reqs)
    print(f"Chunked into {len(chunks)} POST(s) ({MAX_REQS_PER_BATCH} per batch).")
    jsonl_files = write_jsonl_chunks(chunks, output_prefix)
    if submit_to_claude:
        results = submit_claude_chunks_and_wait(chunks)
        print("Result files downloaded:", results)


Prepared 158 Claude requests from folder
Chunked into 7 POST(s) (25 per batch).
Wrote Claude JSONL chunk files: ['claude_batch_0.jsonl', 'claude_batch_1.jsonl', 'claude_batch_2.jsonl', 'claude_batch_3.jsonl', 'claude_batch_4.jsonl', 'claude_batch_5.jsonl', 'claude_batch_6.jsonl']
