## Configure Azure Key Vault and OpenAI Credentials

Securely retrieve OpenAI API key from Azure Key Vault for authentication.
This ensures sensitive credentials are not hardcoded in the notebook.

In [16]:
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
import os

def get_openai_key():
    """Retrieve OpenAI API key from Azure Key Vault"""
    try:
        # Initialize the Azure credentials
        credential = DefaultAzureCredential()
        
        # Create a secret client
        vault_url = f"https://kvrunithesis.vault.azure.net/"
        secret_client = SecretClient(vault_url=vault_url, credential=credential)
        
        # Get the secret
        secret = secret_client.get_secret("alon-thesis-openai-key")
        
        # Set as environment variable
        os.environ["OPENAI_API_KEY"] = secret.value
        os.environ["THESIS_ALON_OPENAI_API_KEY"] = secret.value
        
        print("Successfully retrieved OpenAI API key from Azure Key Vault")
    except Exception as e:
        print(f"Error retrieving secret from Key Vault: {str(e)}")
        raise

# Retrieve and set the OpenAI API key
get_openai_key()

# Now you can initialize the OpenAI client which will automatically use the environment variable

INFO:azure.identity._credentials.environment:No environment configuration found.
INFO:azure.identity._credentials.managed_identity:ManagedIdentityCredential will use IMDS
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=REDACTED&resource=REDACTED'
Request method: 'GET'
Request headers:
    'User-Agent': 'azsdk-python-identity/1.20.0 Python/3.11.8 (Windows-10-10.0.22631-SP0)'
No body was attached to the request
INFO:azure.identity._credentials.chained:DefaultAzureCredential acquired a token from AzureCliCredential
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'https://kvrunithesis.vault.azure.net/secrets/alon-thesis-openai-key/?api-version=REDACTED'
Request method: 'GET'
Request headers:
    'Accept': 'application/json'
    'x-ms-client-request-id': '385a5536-f1a5-11ef-b41c-a0806955d9fc'
    'User-Agent': 'azsdk-python-keyvault-secrets/4.9.0 Python/3.11.8 (Windows-10-10.0.22631-SP0)

Successfully retrieved OpenAI API key from Azure Key Vault


In [4]:
import pandas as pd

# Load the test data
csv_file = r'C:\Users\orgrd\workspace\data\patentmatch_test\patentmatch_test_no_claims.csv'
df = pd.read_csv(csv_file)

In [5]:
df.head()

Unnamed: 0.1,Unnamed: 0,index,claim_id,patent_application_id,cited_document_id,text,text_b,label,date
0,5113165,5113165,111187_0,EP3157302A1,EP2903333,A network of handling a paging procedure in a ...,FIG.16 is a diagram illustrating an example of...,0,20170419
1,5658863,5658863,209068_1,EP3202314A1,EP2229880,A sensor information processing program for ca...,In a first step the fundamental movement frequ...,1,20170809
2,5584990,5584990,171472_0,EP3196007A1,EP2939828,A moulded trim part for a vehicle according to...,It was found that the thermoplastic polyuretha...,0,20170726
3,5137320,5137320,87572_0,EP3160147A1,EP1670252,A method for fast channel change characterized...,As to the issue of delivery modes the strategy...,0,20170426
4,5800528,5800528,204115_0,EP3217403A1,EP1855216,An audio asset information storage system comp...,Further it is assumed in the above circumstanc...,0,20170913


## Prepare JSONL Files for OpenAI Processing

This section prepares the data for batch processing with OpenAI's API. Here's what we're doing:

1. **Setup**: Import required libraries and configure logging
2. **Data Model**: Define a Pydantic model `NegationResponse` to validate OpenAI's responses
3. **Batch Processing**: 
   - Split data into batches of 1000 rows each
   - Create JSONL files with proper OpenAI API format
   - Each line contains:
     - Custom ID for tracking
     - API endpoint
     - Request body with messages and response format
4. **Output**: Save batches as separate JSONL files in `output_jsonl` directory

The JSONL format is required for OpenAI's batch processing endpoint.

In [13]:
from functools import partial
import json
import logging
from pathlib import Path
from typing import List, Optional
from pydantic import BaseModel
from tqdm import tqdm
from pymongo import MongoClient  # Use PyMongo instead of redis

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the NegationResponse model
class NegationResponse(BaseModel):
    negation_present: bool
    negation_types: Optional[List[str]]
    short_explanation: str

# Create output directory
output_dir = Path('output_jsonl')
output_dir.mkdir(exist_ok=True)

# Connect to MongoDB using your provided configuration
# Note: the connection string includes the root username, password, and database name.
client = MongoClient("mongodb://user:pass@localhost:27017/admin")
db = client.test_negationdb
collection = db.jsonl_batches  # Use a collection named "jsonl_batches"

# Process in smaller batches
batch_size = 1000
num_batches = len(df) // batch_size + 1  # assuming df is your DataFrame

def create_jsonl_line(row, column):
    text = row[column]
    messages = [
        {"role": "system", "content": "Analyze the text for negations and identify their types."},
        {"role": "user", "content": f"Analyze the following text: {text}"}
    ]
    
    body = {
        "model": "gpt-4-turbo-preview",
        "messages": messages,
        "response_format": {
            "type": "json_schema",
            "json_schema": {
                "name": "negation_response",
                "schema": NegationResponse.model_json_schema()
            }
        },
        "max_tokens": 500
    }
    
    return {
        "custom_id": f'request_{column}_{row["patent_application_id"]}_{row["index"]}',
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": body
    }

# Process batches and store each batch both on disk and in MongoDB
openai_files_disk_dump = []
for i in range(num_batches):
    start_idx = i * batch_size
    end_idx = min((i + 1) * batch_size, len(df))
    batch_df = df.iloc[start_idx:end_idx]
    
    logger.info(f"Processing batch {i + 1}/{num_batches}")
    lines = batch_df.apply(create_jsonl_line, axis=1, args=('text',))
    
    # Write batch to disk as a JSONL file
    batch_file_path = output_dir / f"batch_{i}.jsonl"
    with open(batch_file_path, "w", encoding='utf-8') as f:
        for line in lines:
            f.write(json.dumps(line, ensure_ascii=False) + "\n")
    
    # Prepare data for MongoDB storage
    batch_data = [json.loads(json.dumps(line)) for line in lines]  # ensure serializable
    batch_document = {
        "batch_key": f"pmtest:jsonls:batch_{i}",
        "batch_index": i,
        "data": batch_data
    }
    collection.insert_one(batch_document)
    logger.info(f"Stored batch {i} in MongoDB with key 'pmtest:jsonls:batch_{i}'")
    
    # Record disk dump info
    openai_files_disk_dump.append({
        "json_path": str(batch_file_path),
        "batch_index": i
    })

logger.info("Stored all batches in MongoDB.")

# Write the disk dump info to disk
openai_files_disk_dump_path = output_dir / "openai_files_disk_dump.json"
with open(openai_files_disk_dump_path, "w", encoding='utf-8') as f:
    f.write(json.dumps(openai_files_disk_dump))
    
logger.info("Created OpenAI files and stored batch data in MongoDB!")

# Show sample output from the first batch
logger.info("\nSample output from first batch:")
with open(output_dir / "batch_0.jsonl", "r", encoding='utf-8') as f:
    print(f.readline())


INFO:__main__:Processing batch 1/373
INFO:__main__:Stored batch 0 in MongoDB with key 'pmtest:jsonls:batch_0'
INFO:__main__:Processing batch 2/373
INFO:__main__:Stored batch 1 in MongoDB with key 'pmtest:jsonls:batch_1'
INFO:__main__:Processing batch 3/373
INFO:__main__:Stored batch 2 in MongoDB with key 'pmtest:jsonls:batch_2'
INFO:__main__:Processing batch 4/373
INFO:__main__:Stored batch 3 in MongoDB with key 'pmtest:jsonls:batch_3'
INFO:__main__:Processing batch 5/373
INFO:__main__:Stored batch 4 in MongoDB with key 'pmtest:jsonls:batch_4'
INFO:__main__:Processing batch 6/373
INFO:__main__:Stored batch 5 in MongoDB with key 'pmtest:jsonls:batch_5'
INFO:__main__:Processing batch 7/373
INFO:__main__:Stored batch 6 in MongoDB with key 'pmtest:jsonls:batch_6'
INFO:__main__:Processing batch 8/373
INFO:__main__:Stored batch 7 in MongoDB with key 'pmtest:jsonls:batch_7'
INFO:__main__:Processing batch 9/373
INFO:__main__:Stored batch 8 in MongoDB with key 'pmtest:jsonls:batch_8'
INFO:__mai

{"custom_id": "request_text_EP3157302A1_5113165", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-4-turbo-preview", "messages": [{"role": "system", "content": "Analyze the text for negations and identify their types."}, {"role": "user", "content": "Analyze the following text: A network of handling a paging procedure in a wireless communication system the network comprising a storage unit 210 for storing instructions and a processing circuit 200 coupled to the storage unit 210 wherein the storage unit 210 stores and the processing circuit 200 is configured to execute the instructions of receiving a first Non Access Stratum NAS message comprising a first time interval for monitoring a paging occasion and a second time interval for monitoring the paging occasion from a communication device wherein the second time interval is larger than the first time interval transmitting a second NAS message comprising a third time interval for monitoring the paging occasion to t

In [None]:
import os
import json
import asyncio
import logging
from pathlib import Path
from datetime import datetime
import openai
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception, after_log
import aio_pika
import aiofiles
from motor.motor_asyncio import AsyncIOMotorClient

# --- Logging Setup ---
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# --- Global Concurrency Limit ---
CONCURRENCY_LIMIT = 20
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)

# --- Global Variables ---
GLOBAL_EXCHANGE = None
ASYNC_CLIENT = None  # Async OpenAI client
MONGO_CLIENT = None
DB = None
FILES_COLLECTION = None  # For files API responses
BATCHES_COLLECTION = None  # For batch API responses

# --- OpenAI API Key and Async Client ---
openai_api_key = os.getenv("THESIS_ALON_OPENAI_API_KEY")
if not openai_api_key:
    raise ValueError("Please set the THESIS_ALON_OPENAI_API_KEY environment variable.")
ASYNC_CLIENT = openai.AsyncOpenAI(api_key=openai_api_key)

# --- RabbitMQ Connection Details ---
RABBITMQ_URL = "amqp://admin:password@localhost:5672/"

# --- MongoDB Connection Details ---
MONGO_URI = "mongodb://localhost:27017"
MONGO_DB_NAME = "test_negationdb"

# --- Directory with Files ---
batch_files_dir = Path(r'C:\Users\orgrd\workspace\repos\runi-thesis-project\notebooks\output_jsonl')

# --- Exchange & Queue Names ---
EXCHANGE_NAME = "thesis"
DLX_NAME = "thesis_dl"
UPLOAD_QUEUE = "upload_queue"
BATCH_QUEUE = "batch_queue"
COMPLETED_QUEUE = "completed_queue"

# --- Retry Helper ---
def is_rate_limit_error(exception):
    err_str = str(exception)
    return ("rate_limit_exceeded" in err_str or 
            "10054" in err_str or 
            "502" in err_str or 
            "Bad Gateway" in err_str)

@retry(
    stop=stop_after_attempt(10),
    wait=wait_exponential(multiplier=3, min=5, max=120),
    retry=retry_if_exception(is_rate_limit_error),
    after=after_log(logger, logging.WARNING)
)
async def upload_file(file_path: Path):
    """
    Upload a JSONL file to OpenAI using the async client and return metadata.
    """
    logger.info(f"Creating OpenAI file for {file_path}...")
    with open(file_path, "rb") as f:
        result = await ASYNC_CLIENT.files.create(file=f, purpose="batch")
    metadata = {
        "file_id": result.id,
        "original_filename": file_path.name,
        "status": result.status,
        "created_at": result.created_at,
        "upload_timestamp": datetime.now().isoformat(),
        "bytes": result.bytes,
        "purpose": result.purpose
    }
    return metadata

@retry(
    stop=stop_after_attempt(10),
    wait=wait_exponential(multiplier=3, min=5, max=120),
    retry=retry_if_exception(is_rate_limit_error),
    after=after_log(logger, logging.WARNING)
)
async def submit_batch_job(file_id: str):
    """
    Create a batch job using the async OpenAI client.
    Uses parameters as in your snippet.
    """
    # Retrieve the file's original filename from MongoDB
    file_record = await FILES_COLLECTION.find_one({"file_id": file_id})
    original_filename = file_record.get("original_filename", "") if file_record else ""
    logger.info(f"Creating batch for file {file_id} with original filename '{original_filename}'...")
    batch = await ASYNC_CLIENT.batches.create(
        input_file_id=file_id,
        endpoint="/v1/chat/completions",
        completion_window="24h",
        metadata={"input_file": original_filename},
    )
    return batch.id

# --- Consumer Callbacks ---

async def upload_consumer(message: aio_pika.IncomingMessage):
    """
    Upload consumer: reads the file path from the message.
    If a file record already exists in MongoDB (matching original_filename), skip re-upload.
    Otherwise, upload the file to OpenAI, save metadata to MongoDB, and publish a minimal message with file id.
    """
    async with semaphore:
        async with message.process():
            try:
                payload = json.loads(message.body.decode())
                file_path = Path(payload["file_path"])
                # Check if file already exists (by original_filename)
                existing = await FILES_COLLECTION.find_one({"original_filename": file_path.name})
                if existing:
                    logger.info(f"File {file_path.name} already uploaded with file_id {existing['file_id']}. Skipping upload.")
                    minimal_payload = {"file_id": existing["file_id"]}
                else:
                    logger.info(f"Uploading file: {file_path}")
                    metadata = await upload_file(file_path)
                    logger.info(f"Uploaded {file_path.name}, file_id: {metadata['file_id']}")
                    await FILES_COLLECTION.insert_one(metadata)
                    minimal_payload = {"file_id": metadata["file_id"]}
                await GLOBAL_EXCHANGE.publish(
                    aio_pika.Message(body=json.dumps(minimal_payload).encode()),
                    routing_key="batch"
                )
            except Exception as e:
                logger.error(f"Upload failed: {e}")
                raise e

async def batch_consumer(message: aio_pika.IncomingMessage):
    """
    Batch consumer: receives a message with the file id.
    If the file record already has a batch_id, skip creating a new batch.
    Otherwise, create a batch using the async OpenAI client, update MongoDB, and publish a completion message.
    """
    async with semaphore:
        async with message.process():
            try:
                payload = json.loads(message.body.decode())
                file_id = payload["file_id"]
                logger.info(f"Processing batch for file_id: {file_id}")
                # Check if a batch has already been created
                file_record = await FILES_COLLECTION.find_one({"file_id": file_id})
                if file_record and file_record.get("batch_id"):
                    batch_id = file_record["batch_id"]
                    logger.info(f"Batch already exists for file {file_id}: batch_id {batch_id}")
                else:
                    batch_id = await submit_batch_job(file_id)
                    logger.info(f"Batch created for file {file_id}, batch_id: {batch_id}")
                    await FILES_COLLECTION.update_one({"file_id": file_id}, {"$set": {"batch_id": batch_id, "status": "batch_created", "batch_timestamp": datetime.now().isoformat()}})
                    # Also, store batch record in a separate collection
                    await BATCHES_COLLECTION.insert_one({
                        "file_id": file_id,
                        "batch_id": batch_id,
                        "created_at": datetime.now().isoformat(),
                        "status": "batch_created"
                    })
                completed_payload = {"file_id": file_id, "batch_id": batch_id}
                await GLOBAL_EXCHANGE.publish(
                    aio_pika.Message(body=json.dumps(completed_payload).encode()),
                    routing_key="completed"
                )
            except Exception as e:
                logger.error(f"Batch creation failed for file {payload.get('file_id')}: {e}")
                raise e

async def completed_consumer(message: aio_pika.IncomingMessage):
    """
    Completed consumer: logs the completion and updates the file record in MongoDB.
    """
    async with semaphore:
        async with message.process():
            payload = json.loads(message.body.decode())
            file_id = payload["file_id"]
            batch_id = payload["batch_id"]
            logger.info(f"Completed processing: file_id {file_id}, batch_id {batch_id}")
            await FILES_COLLECTION.update_one({"file_id": file_id}, {"$set": {"status": "completed", "completed_timestamp": datetime.now().isoformat()}})

# --- Main Function: Setup Exchanges, Queues, Consumers, and Producers ---
async def main():
    global GLOBAL_EXCHANGE, MONGO_CLIENT, DB, FILES_COLLECTION, BATCHES_COLLECTION, ASYNC_CLIENT
    # Connect to RabbitMQ
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    # Connect to MongoDB
    MONGO_CLIENT = AsyncIOMotorClient(MONGO_URI)
    DB = MONGO_CLIENT[MONGO_DB_NAME]
    FILES_COLLECTION = DB.pmtest_files
    BATCHES_COLLECTION = DB.pmtest_batches

    async with connection:
        channel = await connection.channel()
        GLOBAL_EXCHANGE = await channel.declare_exchange(EXCHANGE_NAME, aio_pika.ExchangeType.DIRECT)
        dlx = await channel.declare_exchange(DLX_NAME, aio_pika.ExchangeType.FANOUT)
        upload_queue = await channel.declare_queue(UPLOAD_QUEUE, durable=True, arguments={"x-dead-letter-exchange": DLX_NAME})
        batch_queue = await channel.declare_queue(BATCH_QUEUE, durable=True, arguments={"x-dead-letter-exchange": DLX_NAME})
        completed_queue = await channel.declare_queue(COMPLETED_QUEUE, durable=True, arguments={"x-dead-letter-exchange": DLX_NAME})
        dead_letter_queue = await channel.declare_queue("dead_letter_queue", durable=True)
        await dead_letter_queue.bind(dlx, routing_key="")
        await upload_queue.bind(GLOBAL_EXCHANGE, routing_key="upload")
        await batch_queue.bind(GLOBAL_EXCHANGE, routing_key="batch")
        await completed_queue.bind(GLOBAL_EXCHANGE, routing_key="completed")
        await upload_queue.consume(upload_consumer)
        await batch_queue.consume(batch_consumer)
        await completed_queue.consume(completed_consumer)
        # Enqueue upload tasks for all JSONL files in the directory
        for file in sorted(batch_files_dir.glob("batch_*.jsonl")):
            payload = {"file_path": str(file)}
            await GLOBAL_EXCHANGE.publish(
                aio_pika.Message(body=json.dumps(payload).encode()),
                routing_key="upload"
            )
            logger.info(f"Enqueued upload task for {file.name}")
        logger.info("Pipeline is running. Press CTRL+C to exit.")
        await asyncio.Future()

# Run the pipeline in the current event loop.
await main()


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://ap

## Process OpenAI Batch Requests

This section handles batch processing with OpenAI, including:
1. Reading file IDs from tracking directory
2. Managing batch submissions (max 50 concurrent batches)
3. Tracking progress and handling errors
4. Retrying failed requests
5. Saving results as they arrive

In [19]:
import os
import json
import asyncio
import aiohttp
import logging
from pathlib import Path
from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception, after_log
import aio_pika

# --- Logging Setup ---
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.WARN)

# --- RabbitMQ Connection Details ---
# (Using the docker compose settings: username=admin, password=password, host=rabbitmq)
RABBITMQ_URL = "amqp://admin:password@rabbitmq:5672/"

# --- OpenAI API Key ---
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
    raise ValueError("Please set the OPENAI_API_KEY environment variable.")

# --- Directory with Files to Process ---
output_dir = Path('output_jsonl')  # This folder should contain files matching "batch_*.jsonl"

# --- Define Exchange and Queue Names ---
EXCHANGE_NAME = "thesis"         # main exchange
DLX_NAME = "thesis_dl"           # dead letter exchange
UPLOAD_QUEUE = "upload_queue"
BATCH_QUEUE = "batch_queue"
COMPLETED_QUEUE = "completed_queue"

# --- Retry Helpers for Rate Limits ---
def is_rate_limit_error(exception):
    return "rate_limit_exceeded" in str(exception)

@retry(
    after=after_log(logger, logging.WARN),
    stop=stop_after_attempt(10),
    wait=wait_exponential(multiplier=3, min=5, max=120),
    retry=retry_if_exception(is_rate_limit_error)
)
async def submit_batch_job(session, file_id):
    """
    Submit the uploaded file for batch processing.
    Returns the batch_id on success.
    """
    url = 'https://api.openai.com/v1/batches'
    headers = {
        'Authorization': f"Bearer {api_key}",
        'Content-Type': 'application/json'
    }
    data = {
        'file_id': file_id,
        'purpose': 'batch'
    }
    async with session.post(url, headers=headers, json=data) as response:
        response_json = await response.json()
        if response.status != 200:
            error_message = response_json.get("error", {}).get("message", "Unknown error")
            raise Exception(f"Error submitting batch job for file {file_id}: {error_message}")
        batch_id = response_json.get('id')
        if not batch_id:
            raise Exception(f"Received null batch_id for file {file_id}, retrying...")
        return batch_id

@retry(
    after=after_log(logger, logging.WARN),
    stop=stop_after_attempt(10),
    wait=wait_exponential(multiplier=3, min=5, max=120),
    retry=retry_if_exception(is_rate_limit_error)
)
async def upload_file(session, file_path: Path):
    """
    Upload a file to OpenAI and return the file_id along with metadata.
    """
    url = 'https://api.openai.com/v1/files'
    headers = {
        'Authorization': f"Bearer {api_key}",
    }
    data = aiohttp.FormData()
    data.add_field('purpose', 'batch')
    data.add_field('file', file_path.open('rb'), filename=file_path.name, content_type='application/jsonl')
    async with session.post(url, headers=headers, data=data) as response:
        response_json = await response.json()
        if response.status != 200:
            error_message = response_json.get("error", {}).get("message", "Unknown error")
            raise Exception(f"Error uploading {file_path.name}: {error_message}")
        file_id = response_json['id']
        metadata = {
            "file_id": file_id,
            "original_filename": file_path.name,
            "status": response_json['status'],
            "created_at": response_json['created_at'],
            "upload_timestamp": datetime.now().isoformat(),
            "bytes": response_json['bytes'],
            "purpose": response_json['purpose']
        }
        return metadata

# --- Consumer Callbacks ---
async def on_upload_message(message: aio_pika.IncomingMessage):
    """Process messages from the 'upload' queue: upload file and then forward for batch submission."""
    async with message.process():
        try:
            payload = json.loads(message.body)
            file_path_str = payload.get("file_path")
            file_path = Path(file_path_str)
            async with aiohttp.ClientSession() as session:
                metadata = await upload_file(session, file_path)
                logger.info(f"Uploaded {file_path.name}: file_id {metadata['file_id']}")
                # Publish to the batch queue
                batch_payload = {
                    "file_id": metadata["file_id"],
                    "original_filename": metadata["original_filename"]
                }
                channel = message.channel
                exchange = await channel.get_exchange(EXCHANGE_NAME)
                await exchange.publish(
                    aio_pika.Message(body=json.dumps(batch_payload).encode()),
                    routing_key="batch"
                )
        except Exception as e:
            logger.error(f"Upload processing failed: {e}")
            raise e  # Message will be rejected and eventually dead-lettered

async def on_batch_message(message: aio_pika.IncomingMessage):
    """Process messages from the 'batch' queue: submit batch job."""
    async with message.process():
        try:
            payload = json.loads(message.body)
            file_id = payload.get("file_id")
            async with aiohttp.ClientSession() as session:
                batch_id = await submit_batch_job(session, file_id)
                logger.info(f"Batch submitted for file {file_id}: batch_id {batch_id}")
                # Publish to the completed queue
                completed_payload = {
                    "file_id": file_id,
                    "batch_id": batch_id,
                    "timestamp": datetime.now().isoformat()
                }
                channel = message.channel
                exchange = await channel.get_exchange(EXCHANGE_NAME)
                await exchange.publish(
                    aio_pika.Message(body=json.dumps(completed_payload).encode()),
                    routing_key="completed"
                )
        except Exception as e:
            logger.error(f"Batch processing failed: {e}")
            raise e

# --- Main Function: Setup Exchanges, Queues, Producers, and Consumers ---
async def main():
    # Connect to RabbitMQ
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    async with connection:
        channel = await connection.channel()

        # Declare main exchange (direct) and dead-letter exchange (fanout)
        exchange = await channel.declare_exchange(EXCHANGE_NAME, aio_pika.ExchangeType.DIRECT)
        dlx = await channel.declare_exchange(DLX_NAME, aio_pika.ExchangeType.FANOUT)

        # Declare queues with dead-letter exchange arguments
        upload_queue = await channel.declare_queue(
            UPLOAD_QUEUE,
            durable=True,
            arguments={"x-dead-letter-exchange": DLX_NAME}
        )
        batch_queue = await channel.declare_queue(
            BATCH_QUEUE,
            durable=True,
            arguments={"x-dead-letter-exchange": DLX_NAME}
        )
        completed_queue = await channel.declare_queue(
            COMPLETED_QUEUE,
            durable=True,
            arguments={"x-dead-letter-exchange": DLX_NAME}
        )
        # Declare a dead-letter queue for messages that fail repeatedly
        dead_letter_queue = await channel.declare_queue("dead_letter_queue", durable=True)
        await dead_letter_queue.bind(dlx, routing_key="")

        # Bind queues to the exchange using routing keys
        await upload_queue.bind(exchange, routing_key="upload")
        await batch_queue.bind(exchange, routing_key="batch")
        await completed_queue.bind(exchange, routing_key="completed")

        # Start consumers for the upload and batch queues
        await upload_queue.consume(on_upload_message)
        await batch_queue.consume(on_batch_message)

        # --- Producer: Enqueue Upload Tasks ---
        # Publish an upload task for every file matching "batch_*.jsonl" in output_dir
        for file in sorted(output_dir.glob("batch_*.jsonl")):
            payload = {"file_path": str(file)}
            await exchange.publish(
                aio_pika.Message(body=json.dumps(payload).encode()),
                routing_key="upload"
            )
            logger.info(f"Enqueued upload task for {file.name}")

        logger.info("Waiting for messages. To exit press CTRL+C")
        # Run indefinitely
        await asyncio.Future()

# Run the main function
await main()


INFO:aio_pika.robust_connection:Connection to amqp://admin:******@rabbitmq:5672/ closed. Reconnecting after 5 seconds.


AMQPConnectionError: [Errno 11001] getaddrinfo failed

In [18]:
import os
import json
import asyncio
import aiohttp
from pathlib import Path
from datetime import datetime, timedelta
import openai
from tenacity import retry, stop_after_attempt, wait_exponential

# Set your OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')
if not openai.api_key:
    raise ValueError("Please set the OPENAI_API_KEY environment variable.")

# Define directories
tracking_dir = Path('output_jsonl/tracking')
results_dir = tracking_dir / "results"
results_dir.mkdir(parents=True, exist_ok=True)

# Retry configuration: 3 attempts with exponential backoff
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60))
async def fetch_batch_status(session, batch_id):
    """Fetch the status of a batch job."""
    url = f'https://api.openai.com/v1/batches/{batch_id}'
    headers = {
        'Authorization': f"Bearer {openai.api_key}",
    }
    async with session.get(url, headers=headers) as response:
        if response.status == 200:
            return await response.json()
        else:
            error_text = await response.text()
            raise Exception(f"Error fetching status for batch {batch_id}: {error_text}")

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60))
async def download_file(session, file_id, destination):
    """Download a file from OpenAI and save it to the destination."""
    url = f'https://api.openai.com/v1/files/{file_id}/content'
    headers = {
        'Authorization': f"Bearer {openai.api_key}",
    }
    async with session.get(url, headers=headers) as response:
        if response.status == 200:
            with open(destination, 'wb') as f:
                f.write(await response.read())
        else:
            error_text = await response.text()
            raise Exception(f"Error downloading file {file_id}: {error_text}")

async def process_batch(session, metadata_path):
    """Process a single batch based on its metadata file."""
    with open(metadata_path, 'r') as f:
        metadata = json.load(f)

    batch_id = metadata.get('batch_id')
    if not batch_id:
        print(f"No batch_id found in metadata {metadata_path.name}. Skipping.")
        return

    # Check batch status
    try:
        print(f"Checking status for batch {batch_id}...")
        batch_status = await fetch_batch_status(session, batch_id)
        metadata['batch_status'] = batch_status['status']

        # Save updated metadata
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)

        if batch_status['status'] == 'completed':
            output_file_id = batch_status.get('output_file_id')
            if output_file_id:
                output_path = results_dir / f"{batch_id}_results.jsonl"
                await download_file(session, output_file_id, output_path)
                print(f"Results for batch {batch_id} saved to {output_path}")
            else:
                print(f"Batch {batch_id} completed, but no output file found.")
        elif batch_status['status'] in ['failed', 'expired']:
            print(f"Batch {batch_id} failed or expired.")

    except Exception as e:
        print(f"Error processing batch {batch_id}: {e}")

async def main_check_batches():
    """Main function to check the status of all batch jobs."""
    async with aiohttp.ClientSession() as session:
        tasks = []
        for metadata_file in tracking_dir.glob("*_metadata.json"):
            tasks.append(process_batch(session, metadata_file))

        await asyncio.gather(*tasks)

# Run the batch checking function
await main_check_batches()


No batch_id found in metadata file-14kUprdXHVfy4c9QFkGbrF_metadata.json. Skipping.
No batch_id found in metadata file-14wPTMV99qr2uNevJawPRd_metadata.json. Skipping.
No batch_id found in metadata file-1aDJ7SoQhDh12BAGzFNbgm_metadata.json. Skipping.
No batch_id found in metadata file-1ip9b9hkBn5hn6H8aFjRWy_metadata.json. Skipping.
No batch_id found in metadata file-1Kd8cmQieKgDQCnXmd5La5_metadata.json. Skipping.
No batch_id found in metadata file-1tHtrGc4YUDvVikckkYSJh_metadata.json. Skipping.
No batch_id found in metadata file-1trT6Y1JFP6y9tPP3ZvMjo_metadata.json. Skipping.
No batch_id found in metadata file-1xSYGqcq6bgi3nrGpi89Lw_metadata.json. Skipping.
No batch_id found in metadata file-2Bx2AnihwVWttqq4hnZk2T_metadata.json. Skipping.
No batch_id found in metadata file-2C7cEJYYYmT9q2DyNtQ8RM_metadata.json. Skipping.
No batch_id found in metadata file-2TB2XdH27JgaSA6qWwu8ih_metadata.json. Skipping.
No batch_id found in metadata file-2uV4Hyee78BvXhRgiSPsoh_metadata.json. Skipping.
No b