In [1]:
from openai import OpenAI
openai_api_key = "EMPTY"
openai_api_base = "http://localhost:8000/v1"

client = OpenAI(
    api_key=openai_api_key,
    base_url=openai_api_base,
)

def generate(prompt:str):
    response=client.chat.completions.create(
    model="meta-llama/Meta-Llama-3.1-70B-Instruct",
    messages=[
        {"role": "user", "content":prompt}
    ],
    temperature=1,
    max_tokens=8192
    ).choices[0].message.content
    return response


generate("hi")

"How's it going? Is there something I can help you with or would you like to chat?"

In [2]:
import pandas as pd

train_df=pd.read_parquet("wsdm/train.parquet")

In [3]:
import json, os, aiohttp, asyncio, re
from tqdm.asyncio import tqdm_asyncio
import pandas as pd
from collections import defaultdict

results_buffer = defaultdict(dict)
buffer_size = 500

def clean_json_string(s):
    json_match = re.search(r'\{.*\}', s, re.DOTALL)
    if json_match:
        s = json_match.group(0)
    
    try:
        return json.loads(s)
    except json.JSONDecodeError:
        try:
            s = re.sub(r':\s*"([^"]*)"', lambda m: ': "' + m.group(1).replace('\n', ' ').replace('  ', ' ') + '"', s)
            s = s.replace('\n', ' ').replace('\\', '\\\\').strip()
            return json.loads(s)
        except:
            rationale = re.search(r'"rationale":\s*"([^"]*)"', s)
            if rationale:
                return {"rationale": rationale.group(1)}
            return {"rationale": "Failed to parse response"}

async def generate_response(session, semaphore, row):
    async with semaphore:
        url, headers = "http://localhost:8000/v1/chat/completions", {"Content-Type": "application/json", "Authorization": "Bearer EMPTY"}
        winner = 'A' if row['winner'] == 'model_a' else 'B'
        formatted_prompt = f"""Given these two responses to the prompt: "{row['prompt']}"
Response A:\n{row['response_a']}\nResponse B:\n{row['response_b']}\nThe better response was: Response {winner}
Explain step by step why Response {winner} was better than Response {'A' if winner == 'B' else 'B'}.
IMPORTANT: Your response must be in valid JSON format with a single field called "rationale"."""

        data = {"model": "meta-llama/Meta-Llama-3.1-70B-Instruct", "messages": [{"role": "user", "content": formatted_prompt}], 
                "temperature": 0.7, "max_tokens": 2048}
        
        try:
            async with session.post(url, headers=headers, json=data) as response:
                if response.status == 200:
                    result = await response.json()
                    return result["choices"][0]["message"]["content"]
                print(f"Error response code {response.status} for ID {row['id']}")
                return None
        except Exception as e:
            print(f"Request error for ID {row['id']}: {str(e)}")
            await asyncio.sleep(1)
            return None

async def process_task(i, row, session, semaphore, pbar):
    try:
        response_content = await generate_response(session, semaphore, row)
        if response_content:
            parsed_response = clean_json_string(response_content)
            row_dict = row.to_dict()
            row_dict['rationale'] = str(parsed_response.get('rationale', 'Failed to extract rationale'))
            results_buffer[row['id']] = row_dict
            
            if len(results_buffer) >= buffer_size:
                await save_results('wsdm/semisynthetic_train.parquet')
    except Exception as e:
        print(f"Error processing row {i} (ID {row['id']}): {str(e)}")
        row_dict = row.to_dict()
        row_dict['rationale'] = f"Error: {str(e)}"
        results_buffer[row['id']] = row_dict
    
    pbar.update(1)

async def save_results(cache_path):
    if not results_buffer:
        return
        
    try:
        if os.path.exists(cache_path):
            df = pd.read_parquet(cache_path)
            df = df[~df['id'].isin(results_buffer.keys())]
        else:
            df = pd.DataFrame()
        
        new_df = pd.DataFrame.from_records(list(results_buffer.values()))
        result_df = pd.concat([df, new_df], ignore_index=True) if not df.empty else new_df
        
        # Convert rationale column to string type
        result_df['rationale'] = result_df['rationale'].astype(str)
        
        result_df.to_parquet(cache_path)
        results_buffer.clear()
    except Exception as e:
        print(f"Error saving results: {str(e)}")

async def main(train_df):
    os.makedirs('wsdm', exist_ok=True)
    cache_path = 'wsdm/semisynthetic_train.parquet'
    
    if os.path.exists(cache_path):
        cached_df = pd.read_parquet(cache_path)
        processed_ids = set(cached_df['id'].tolist())
        processed_count = len(processed_ids)
        print(f"Loaded {processed_count} cached entries")
    else:
        processed_ids = set()
        processed_count = 0

    semaphore = asyncio.Semaphore(500)
    tasks = []
    
    async with aiohttp.ClientSession() as session:
        pbar = tqdm_asyncio(total=len(train_df))
        pbar.update(processed_count)
            
        for i in range(len(train_df)):
            row = train_df.iloc[i]
            if row['id'] not in processed_ids:
                tasks.append(process_task(i, row, session, semaphore, pbar))
        
        await asyncio.gather(*tasks)
        
        if results_buffer:
            await save_results(cache_path)
        
        pbar.close()
    
    return pd.read_parquet(cache_path)

result_df = await main(train_df)

Loaded 41370 cached entries


100%|██████████| 48439/48439 [31:15<00:00, 25.83it/s]  


In [4]:
semisynthetic_train_df=pd.read_parquet("wsdm/semisynthetic_train.parquet")

# filter rows where rationale contains "Error" or "Failed"
semisynthetic_train_df=semisynthetic_train_df[~semisynthetic_train_df['rationale'].str.contains("Error|Failed")]
semisynthetic_train_df.to_parquet("wsdm/semisynthetic_train.parquet")