# Pipeline for submitting images to GPT to query whether they contain a high-quality view of Monarda flowers

In [1]:
import pandas as pd
import numpy as np
from openai import OpenAI
import openai
import time
import base64
from PIL import Image
from io import BytesIO
import json
import os
import csv

# load gpt client
client = OpenAI(
    api_key='your-key-here' # fill in api key here
)

### Image processing function

In [2]:
# function to process an image from a local file
def process_image(idx, file_path):
    retries = 3
    for attempt in range(retries):
        try:
            # open image from disk
            image = Image.open(file_path)
            original_width, original_height = image.size

            # set new dimensions but keep ratio
            if original_width >= original_height:
                new_width = 512
                new_height = int((new_width / original_width) * original_height)
            else:
                new_height = 512
                new_width = int((new_height / original_height) * original_width)

            # resize
            image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)

            # save resized image into a jpeg bytes buffer
            buffered = BytesIO()
            image.save(buffered, format="JPEG")
            resized_image_data = base64.b64encode(buffered.getvalue()).decode("utf-8")

            # build query for gpt api
            return {
                "custom_id": f"task-{idx}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o",
                    "messages": [
                        {
                            "role": "user",
                            "content": [
                                {"type": "text", "text": query},
                                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{resized_image_data}"}}
                            ],
                        }
                    ],
                    "max_tokens": 300
                }
            }
        except Exception as e:
            print(f"Error processing image {file_path} at index {idx}, attempt {attempt + 1}: {e}")
            if attempt < retries - 1:
                time.sleep(1)  # wait a sec before retry
            else:
                return None

### Set up query

In [3]:
# specify the query
query = ("Answer with one word, either: YES, if this photo is of close-up \
beebalm flowers (usually lavender in color), or NO, if the photo is not close up, not of a flower, \
or low quality somehow. If it is just a bare seed head or leaves, your answer should be NO.")

Maximum json file size is 200MB, and maximum query number for one batch submission is 50,000

# Loop method to perform async in parallel:

In [37]:
# overall settings
start_overall = 20000 # use these to specify which idxs to run
stop_overall = 41000
step = 1000

# file name to store meta
job_metadata_file = "job_metadata.json"

# dict to store job meta, keys= job id
jobs = {}

# batch submit loop: submit all batches in async
for starter_val in range(start_overall, stop_overall, step):
    # make a list of file paths to images for curr batch
    file_names = [f"{i}.jpg" for i in range(starter_val, starter_val + step)]
    file_paths = [os.path.join('/Volumes/My Passport/monarda_fistulosa_segmentation/image_dataset/images/', fname)
                  for fname in file_names]

    # write out local batch submission file( unique file name per batch)
    batch_task_filename = f"batch_tasks_{starter_val}_{starter_val+step}.jsonl"
    with open(batch_task_filename, 'w') as file:
        for idx in range(len(file_paths)):
            task = process_image(idx, file_paths[idx])
            if task:
                file.write(json.dumps(task) + '\n')
    
    # submit the batch file to api
    batch_file = client.files.create(
        file=open(batch_task_filename, "rb"),
        purpose="batch"
    )
    batch_job = client.batches.create(
        input_file_id=batch_file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h"
    )
    print(f"Batch job submitted for images {starter_val}:{starter_val+step} with job ID {batch_job.id}")
    
    # save metadata for later polling/processing
    jobs[batch_job.id] = {
        "starter_val": starter_val,
        "step": step,
        "batch_task_filename": batch_task_filename
    }
    
    # might as well space them out a bit
    time.sleep(1)
    
# write job meta to a file for persistence,
# ie can reload later
with open(job_metadata_file, 'w') as f:
    json.dump(jobs, f, indent=4)
print(f"Job metadata saved to {job_metadata_file}.")

Error processing image /Volumes/My Passport/monarda_fistulosa_segmentation/image_dataset/images/20130.jpg at index 130, attempt 1: cannot write mode RGBA as JPEG
Error processing image /Volumes/My Passport/monarda_fistulosa_segmentation/image_dataset/images/20130.jpg at index 130, attempt 2: cannot write mode RGBA as JPEG
Error processing image /Volumes/My Passport/monarda_fistulosa_segmentation/image_dataset/images/20130.jpg at index 130, attempt 3: cannot write mode RGBA as JPEG
Batch job submitted for images 20000:21000 with job ID batch_67b7363248b08190b38721ade7062af7
Batch job submitted for images 21000:22000 with job ID batch_67b73693caec8190b2aa49cc717197cc
Batch job submitted for images 22000:23000 with job ID batch_67b736f4a1388190ade998575955d512
Batch job submitted for images 23000:24000 with job ID batch_67b73752753881909cd8da24ade97e0f
Error processing image /Volumes/My Passport/monarda_fistulosa_segmentation/image_dataset/images/24783.jpg at index 783, attempt 1: cannot 

In [39]:
# reload job metadata for polling status
with open(job_metadata_file, 'r') as f:
    jobs = json.load(f)
print("Job metadata reloaded.")

Job metadata reloaded.


In [40]:
# polling... check all submitted jobs until all are complete (or failed)
completed_jobs = {}
while len(completed_jobs) < len(jobs):
    for job_id in jobs:
        if job_id in completed_jobs:
            continue  # skip jobs that are done
        job = client.batches.retrieve(job_id)
        meta = jobs[job_id]
        if job.failed_at:
            print(f"Job {job_id} (images {meta['starter_val']}:{meta['starter_val']+meta['step']}) failed!")
            completed_jobs[job_id] = job  # record as finished (failed)
        elif job.completed_at:
            print(f"Job {job_id} (images {meta['starter_val']}:{meta['starter_val']+meta['step']}) completed.")
            completed_jobs[job_id] = job
        # other= job is still running
    # wait and then poll again
    time.sleep(30)

print("All jobs are now complete. Processing results...")

Job batch_67b7363248b08190b38721ade7062af7 (images 20000:21000) completed.
Job batch_67b73693caec8190b2aa49cc717197cc (images 21000:22000) completed.
Job batch_67b736f4a1388190ade998575955d512 (images 22000:23000) completed.
Job batch_67b73752753881909cd8da24ade97e0f (images 23000:24000) completed.
Job batch_67b737b1a0888190ab5994c489be36b5 (images 24000:25000) completed.
Job batch_67b7380f40788190a47a269e3983128b (images 25000:26000) completed.
Job batch_67b73871df408190b6d607cd77621322 (images 26000:27000) completed.
Job batch_67b738d7d60081908897af172d81d523 (images 27000:28000) completed.
Job batch_67b73937e9e08190aa7b52b6c0647ef3 (images 28000:29000) completed.
Job batch_67b7399603b88190a40d440a69943754 (images 29000:30000) completed.
Job batch_67b739f008a88190998053547948ef1f (images 30000:31000) completed.
Job batch_67b73a4a4a6c8190b7e3a714cf9a1569 (images 31000:32000) completed.
Job batch_67b73aabd6bc81908cb016812623bbf2 (images 32000:33000) completed.
Job batch_67b73b0b9cac819

In [41]:
# process results for each finished job
# for each job........
for job_id, job in completed_jobs.items():
    meta = jobs[job_id]
    starter_val = meta["starter_val"]
    step = meta["step"]
    
    # get the output file id and contents
    result_file_id = job.output_file_id
    result_content = client.files.content(result_file_id).content

    # write raw results to a jsonl file
    result_file_name = f"./{starter_val}_to_{starter_val+step}_batch.jsonl"
    with open(result_file_name, 'wb') as file:
        file.write(result_content)
    
    # parse the jsonl file into results
    results = []
    with open(result_file_name, 'r') as file:
        for line in file:
            json_object = json.loads(line.strip())
            results.append(json_object)
    
    # extract indices corresponding answers from the results
    idxs = [int(item['custom_id'].split('-')[1]) for item in results]
    answers = [item['response']['body']['choices'][0]['message']['content'] for item in results]
    
    # sort em
    sorted_idx_answers = sorted(zip(idxs, answers))
    if sorted_idx_answers:
        sorted_idxs, sorted_answers = zip(*sorted_idx_answers)
    else:
        sorted_idxs, sorted_answers = ([], [])
    
    # get a a full list of answers for this batch (mark missing idxs "FAILED")
    complete_answers = []
    for i in range(step):
        if i in sorted_idxs:
            answer = sorted_answers[sorted_idxs.index(i)]
            complete_answers.append(answer)
        else:
            complete_answers.append("FAILED")
    
    # write results to a csv file
    output_csv = f"./{starter_val}_to_{starter_val+step}.csv"
    with open(output_csv, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["flower_present"])  # Optional header row
        for ans in complete_answers:
            writer.writerow([ans])
    
    print(f"Results for images {starter_val}:{starter_val+step} saved to {output_csv}")

print("All batches have been processed.")

Results for images 20000:21000 saved to ./20000_to_21000.csv
Results for images 21000:22000 saved to ./21000_to_22000.csv
Results for images 22000:23000 saved to ./22000_to_23000.csv
Results for images 23000:24000 saved to ./23000_to_24000.csv
Results for images 24000:25000 saved to ./24000_to_25000.csv
Results for images 25000:26000 saved to ./25000_to_26000.csv
Results for images 26000:27000 saved to ./26000_to_27000.csv
Results for images 27000:28000 saved to ./27000_to_28000.csv
Results for images 28000:29000 saved to ./28000_to_29000.csv
Results for images 29000:30000 saved to ./29000_to_30000.csv
Results for images 30000:31000 saved to ./30000_to_31000.csv
Results for images 31000:32000 saved to ./31000_to_32000.csv
Results for images 32000:33000 saved to ./32000_to_33000.csv
Results for images 33000:34000 saved to ./33000_to_34000.csv
Results for images 34000:35000 saved to ./34000_to_35000.csv
Results for images 35000:36000 saved to ./35000_to_36000.csv
Results for images 36000

# Finish up the last few.

In [50]:
start_overall = 41000
stop_overall = 41069
# make a list of file paths to images
file_names = [str(i)+'.jpg' for i in range(start_overall,stop_overall)]
file_paths = [os.path.join('/Volumes/My Passport/monarda_fistulosa_segmentation/image_dataset/images/',i) for i in file_names]

# write out local batch submission file
start = 0
stop = len(file_paths)  # ensure we don't exceed the number of images
file_name = f"batch_tasks_loop.jsonl"

with open(file_name, 'w') as file:
    for idx in range(start, stop):
        task = process_image(idx, file_paths[idx])
        if task:
            file.write(json.dumps(task) + '\n')

In [51]:
# submit the batch file to api
batch_file = client.files.create(
    file=open(file_name, "rb"),
    purpose="batch"
)

batch_job = client.batches.create(
    input_file_id=batch_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h"
)

In [53]:
# check for results
batch_job = client.batches.retrieve(batch_job.id)

while not client.batches.retrieve(batch_job.id).completed_at:
    time.sleep(60)
    if client.batches.retrieve(batch_job.id).failed_at:
        print('job failed!')
        break
    time.sleep(5)

batch_job = client.batches.retrieve(batch_job.id)

# get results
result_file_id = batch_job.output_file_id
result = client.files.content(result_file_id).content

# write results to file
result_file_name = f"./{start_overall}_to_{stop_overall}_batch.jsonl"

with open(result_file_name, 'wb') as file:
    file.write(result)

# load data from file
results = []
with open(result_file_name, 'r') as file:
    for line in file:
        # parse the json to a dict and append to results list
        json_object = json.loads(line.strip())
        results.append(json_object)
idxs = [int(i['custom_id'].split('-')[1]) for i in results]
answers = [i['response']['body']['choices'][0]['message']['content'] for i in results]

#handle failed queries and sort
# first zip and sort
sorted_idx_answers = sorted(zip(idxs, answers))

# separate sorted pairs
sorted_idxs, sorted_answers = zip(*sorted_idx_answers)

# init the full list of answers
complete_answers = []

# iter through the expected range of idxs
for i in range(start,stop):
    if i in sorted_idxs:
        # get corresponding answer
        answer = sorted_answers[sorted_idxs.index(i)]
        complete_answers.append(answer)
    else:
        # fill in missing idxs = "FAILED"
        complete_answers.append("FAILED")

# write to csv
# define output csv name
output_file = f"./{start_overall}_to_{stop_overall}.csv"

# write list to single column csv file
with open(output_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["flower_present"])
    for item in complete_answers:
        writer.writerow([item])