## OpenAI batch POC/Setup

### OpenAI Files and Batch Playground

In [1]:
# fetch files from open ai

import openai
import json
import os
from datetime import datetime
from dotenv import load_dotenv

load_dotenv(override=True, verbose=True)

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

In [5]:
client = openai.Client(api_key=OPENAI_API_KEY)

# read the latest upload
files = client.files.list()
latest_file = max(files, key=lambda file: file.created_at)
latest_file_id = latest_file.id
print(f'ID: {latest_file_id}')
print(f'Filename: {latest_file.filename}')
created_at = datetime.fromtimestamp(latest_file.created_at).strftime('%Y-%m-%d %H:%M:%S')
print(f'Created At: {created_at}')
print(f'Bytes: {str(latest_file.bytes)}')
print(f'URL: {latest_file}')
print('')

# get first batch for the latest upload file
batches = client.batches.list()
batch_id = None
for batch in batches:
    if batch.input_file_id == latest_file_id:
        # get id, status, created at, file id
        print(f'ID: {batch.id}')
        print(f'Status: {batch.status}')
        created_at = datetime.fromtimestamp(batch.created_at).strftime('%Y-%m-%d %H:%M:%S')
        print(f'Created At: {created_at}')
        print(f'File ID: {batch.input_file_id}')
        print('')
        batch_id = batch.id
        break

if batch_id:
    # check batch results
    batch = client.batches.retrieve(batch_id)
    # read status and output file id
    print(f'ID: {batch.id}')
    print(f'Status: {batch.status}')
    print(f'Output File ID: {batch.output_file_id}')
    print('')
    # read output file content
    if batch.output_file_id:
        output_file = client.files.content(batch.output_file_id)
        for line in output_file.iter_lines():
            print(json.loads(line)['response']['body'])

            break

ID: file-YcBdfvWdhZasZB1Bq45sqF
Filename: batch.jsonl
Created At: 2024-12-02 19:27:20
Bytes: 562
URL: FileObject(id='file-YcBdfvWdhZasZB1Bq45sqF', bytes=562, created_at=1733167640, filename='batch.jsonl', object='file', purpose='batch', status='processed', status_details=None)

ID: batch_674e0a1daee08190ac1e13b30379b84b
Status: in_progress
Created At: 2024-12-02 19:27:25
File ID: file-YcBdfvWdhZasZB1Bq45sqF

ID: batch_674e0a1daee08190ac1e13b30379b84b
Status: in_progress
Output File ID: None



### Async Batch

In [None]:
import json
from nest_asyncio import apply
import asyncio
import aiohttp
from aiohttp import ClientSession
from io import IOBase


def data_file():
    return iter(['this is a random data string which is going to be written to a file', 'this is a second line of data', 'this is a third line of data', 'this is a fourth line of data', 'this is a fifth line of data', 'this is a sixth line of data', 'this is a seventh line of data', 'this is a eighth line of data', 'this is a ninth line of data', 'this is a tenth line of data', 'this is an eleventh line of data', 'this is a twelfth line of data', 'this is a thirteenth line of data', 'this is a fourteenth line of data', 'this is a fifteenth line of data', 'this is a sixteenth line of data', 'this is a seventeenth line of data', 'this is a eighteenth line of data', 'this is a nineteenth line of data', 'this is a twentieth line of data', 'this is a twenty-first line of data', 'this is a twenty-second line of data', 'this is a twenty-third line of data', 'this is a twenty-fourth line of data', 'this is a twenty-fifth line of data', 'this is a twenty-sixth line of data', 'this is a twenty-seventh line of data', 'this is a twenty-eighth line of data', 'this is a twenty-ninth line of data', 'this is a thirtieth line of data', 'this is a thirty-first line of data', 'this is a thirty-second line of data', 'this is a thirty-third line of data', 'this is a thirty-fourth line of data', 'this is a thirty-fifth line of data', 'this is a thirty-sixth line of data', 'this is a thirty-seventh line of data', 'this is a thirty-eighth line of data', 'this is a thirty-ninth line of data', 'this is a fortieth line of data', 'this is a forty-first line of data', 'this is a forty-second line of data', 'this is a forty-third line of data', 'this is a forty-fourth line of data', 'this is a forty-fifth line of data', 'this is a forty-sixth line of data', 'this is a forty-seventh line of data', 'this is a forty-eighth line of data', 'this is a forty-ninth line of data', 'this is a fiftieth line of data'])


# to run asyncio in jupyter notebook
apply()
running = True


class AsyncFileLikeObject(IOBase):
    def __init__(self, iterable):
        self.chunk = b""
        self.offset = 0
        self.i = iterable

    async def load(self, size):

        while size:
            if self.offset == len(self.chunk):
                try:
                    self.chunk = await anext(self.i)
                except StopAsyncIteration:
                    break
                else:
                    self.offset = 0
            to_yield = min(size, len(self.chunk) - self.offset)
            self.offset += to_yield
            size -= to_yield
            part = self.chunk[self.offset - to_yield:self.offset]
            yield part

    async def read(self, size=-1):
        print('Reading file...')
        chunks = []
        async for part in self.load(float("inf") if size is None or size < 0 else size):
            chunks.append(part)
        yield b"".join(chunks)


async def create_req_object(queries, queue):
    for idx, query in enumerate(queries):
        request = {
            "custom_id": f"request-{idx}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "content": query[0]
            },
        }
        print('Creating request object...')
        await queue.put((idx, query))
        yield (json.dumps(request) + '\n').encode()


async def upload_to_openai(file_like_obj, queue):
    url = 'https://api.openai.com/v1/files'
    headers = {
        'Authorization': 'Bearer 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'
    }

    async with ClientSession() as session:
        data = aiohttp.FormData()
        data.add_field('purpose', 'batch')
        # aggregate data from file_like_obj here - asyncgenerator for logs
        data.add_field('file', file_like_obj, filename='batch.jsonl')
        async with session.post(url, headers=headers, data=data) as response:
            await queue.put(str(response))
            running = False
            return await response.json()


async def flush_queue(queue):
    while running:
        item = await queue.get()
        print(f"Flushed 1 items from the queue")
        print(item)


async def create_batch(data, queue):
    objects = create_req_object(data, queue)
    file = AsyncFileLikeObject(objects)
    file_data = file.read()
    response = await upload_to_openai(file_data, queue)
    return response


print('Starting execution...')
queue = asyncio.Queue(10)
data = data_file()

# coroutine will yield query + idx to the
asyncio.create_task(flush_queue(queue))

await create_batch(data, queue)  # type: ignore

Task was destroyed but it is pending!
task: <Task pending name='Task-21' coro=<flush_queue() running at /tmp/ipykernel_2218574/1030680135.py:83> wait_for=<Future pending cb=[Task.__wakeup()]>>


Starting execution...
Reading file...
Creating request object...
Creating request object...
Creating request object...
Creating request object...
Creating request object...
Creating request object...
Creating request object...
Creating request object...
Creating request object...
Creating request object...
Creating request object...
Flushed 1 items from the queue
[(0, 'this is a random data string which is going to be written to a file')]
Flushed 1 items from the queue
[(1, 'this is a second line of data')]
Flushed 1 items from the queue
[(2, 'this is a third line of data')]
Flushed 1 items from the queue
[(3, 'this is a fourth line of data')]
Flushed 1 items from the queue
[(4, 'this is a fifth line of data')]
Flushed 1 items from the queue
[(5, 'this is a sixth line of data')]
Flushed 1 items from the queue
[(6, 'this is a seventh line of data')]
Flushed 1 items from the queue
[(7, 'this is a eighth line of data')]
Flushed 1 items from the queue
[(8, 'this is a ninth line of data')]


{'object': 'file',
 'id': 'file-rr1qiD3l8kv4XRrwTAcchiM2',
 'purpose': 'batch',
 'filename': 'batch.jsonl',
 'bytes': 5140,
 'created_at': 1732205787,
 'status': 'processed',
 'status_details': None}