In [14]:
file_path = 'aime-eval-runs/test_qwen-instruct_t0.6_k64_s0_e30_agenticorg.jsonl'

In [15]:
import json

def read_jsonl_basic(file_path):
    data = []
    with open(file_path, 'r') as file:
        for line in file:
            if line.strip():  # Skip empty lines
                json_obj = json.loads(line)
                data.append(json_obj)
    return data


In [16]:
data = read_jsonl_basic(file_path)

In [None]:
from dotenv import load_dotenv
import os
import time
import json
import math
import concurrent.futures
from together import Together

# Load environment variables from .env file
load_dotenv()

# Access the API key and set it in the environment
api_key = os.getenv('TOGETHER_API_KEY')
os.environ["TOGETHER_API_KEY"] = api_key

client = Together()

def fetch_response(question):
    """
    Makes a single API call to fetch a response for the given question using streaming.
    
    Returns:
        str: The final concatenated response.
    """
    response = client.chat.completions.create(
        model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
        messages=[
            {
                "role": "user",
                "content": "Please reason step by step, and put your final answer within \\boxed{} " + question
            },
        ],
        max_tokens=None,
        temperature=0.6,
        top_p=0.95,
        top_k=None,
        repetition_penalty=1,
        stop=["<｜end of sentence｜>"],
        stream=True,
    )
    current_response = []
    for token in response:
        if hasattr(token, 'choices'):
            current_response.extend([choice.delta.content for choice in token.choices])
    return "".join(current_response)

def get_responses_and_save(question, num_responses=64, output_file="output.jsonl"):
    """
    Fetches responses concurrently in batches of 60 and writes each response immediately to a JSONL file.
    
    After sending each batch of 60 calls concurrently, the function waits until 60 seconds have passed
    since the start of that batch before starting the next batch.
    
    Args:
        question (str): The question to ask.
        num_responses (int): Total number of responses to generate.
        output_file (str): Path to the JSONL file where responses will be saved.
    """
    batch_size = 60
    total_batches = math.ceil(num_responses / batch_size)
    global_count = 0

    with open(output_file, "a") as f:
        for batch in range(total_batches):
            start_batch = time.time()
            # Determine the number of requests in this batch
            current_batch_size = min(batch_size, num_responses - batch * batch_size)
            print(f"Starting batch {batch + 1} with {current_batch_size} requests.")

            # Use ThreadPoolExecutor to fire off concurrent API calls
            with concurrent.futures.ThreadPoolExecutor(max_workers=current_batch_size) as executor:
                # Map each future to its batch index
                future_to_index = {
                    executor.submit(fetch_response, question): i
                    for i in range(current_batch_size)
                }
                for future in concurrent.futures.as_completed(future_to_index):
                    batch_index = future_to_index[future]
                    try:
                        final_response = future.result()
                    except Exception as e:
                        final_response = f"Error: {e}"
                    global_count += 1
                    result = {
                        "question": question,
                        "generated_response": final_response
                    }
                    f.write(json.dumps(result) + "\n")
                    f.flush()
                    print(f"Saved response {global_count}/{num_responses}")

            # Enforce waiting until 60 seconds have passed since the batch started
            elapsed = time.time() - start_batch
            if elapsed < 60:
                wait_time = 60 - elapsed
                print(f"Batch {batch + 1} complete. Waiting {wait_time:.2f} seconds for rate limit.")
                time.sleep(wait_time)
            else:
                print(f"Batch {batch + 1} took {elapsed:.2f} seconds, no wait needed.")

# Example usage:
num_responses = 64  # Total desired responses

for question_data in data:
    question = question_data['question']
    get_responses_and_save(question, num_responses, output_file="together_run_output_full_run_parallel.jsonl")


Starting batch 1 with 60 requests.
Saved response 1/64
Saved response 2/64
Saved response 3/64
Saved response 4/64
Saved response 5/64
Saved response 6/64
Saved response 7/64
Saved response 8/64
Saved response 9/64
Saved response 10/64
Saved response 11/64
Saved response 12/64
Saved response 13/64
Saved response 14/64
Saved response 15/64
Saved response 16/64
Saved response 17/64
Saved response 18/64
Saved response 19/64
Saved response 20/64
Saved response 21/64
Saved response 22/64
Saved response 23/64
Saved response 24/64
Saved response 25/64
Saved response 26/64
Saved response 27/64
Saved response 28/64
Saved response 29/64
Saved response 30/64
Saved response 31/64
Saved response 32/64
Saved response 33/64
Saved response 34/64
Saved response 35/64
Saved response 36/64
Saved response 37/64
Saved response 38/64
Saved response 39/64
Saved response 40/64
Saved response 41/64
Saved response 42/64
Saved response 43/64
Saved response 44/64
Saved response 45/64
Saved response 46/64
Saved re