In [None]:
%pip install openai pydantic nest-asyncio  upgrade

In [None]:
import nest_asyncio
nest_asyncio

In [11]:
import asyncio
from typing import List
from pydantic import BaseModel, Field
from openai import AsyncOpenAI

In [4]:
import getpass
import os

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")

In [7]:
client = AsyncOpenAI()
MODEL = "gpt-4o-mini"

In [54]:
class SubTask(BaseModel):
    name: str = Field(..., description="The name of the subtask")
    description: str = Field(..., description="The description of the subtask")

class OrchestratorOutput(BaseModel):
    objective: str = Field(..., description="Summary of the coding task")
    subtasks: list[SubTask] = Field(..., description="List of subtasks to solve the coding task")


In [55]:
ORCHESTRATOR_PROMPT = """
You are a skilled software engineer.
Read the coding problem and break it down into subtasks in JSON format.

Problem:
{problem}

1. Summarize the objective of the task.
2. List the subtasks needed to solve the task.
3. Provide your answer in JSON format with fields:
    - objective: str
    - subtasks: List[SubTask], where each subtask includes:
        - name: str (a short title for the subtask)
        - description: str (a detailed description of the subtask)
"""

WORKER_PROMPT = """
You are a skilled software engineer.
Read the subtask and generate code to solve the subtask.
Subtask name : {name}
Subtask description : {description}

Return only the code. Do not include any other text or comments.
Make sure that the code is valid Python code.
"""

AGGREGATOR_PROMPT = """
You are an experienced integrator of code.
We have code snippets from different sub-tasks.
Your job is the code snippets into a complete solution.
Subtasks Code: 
{subtasks_code}

Return only the complete code. Do not include any other text or comments.
"""

In [69]:
import json

async def call_orchestrator(problem: str, model = MODEL) -> OrchestratorOutput:
    prompt = ORCHESTRATOR_PROMPT.format(problem=problem)
    
    response = await client.beta.chat.completions.parse(
        model=model,
        messages=[
            {"role": "user", "content": prompt}
        ],
    )
    
    raw_content = response.choices[0].message.content.strip()
    
    if raw_content.startswith("```json"):
        raw_content = raw_content[len("```json"):].strip()
    if raw_content.endswith("```"):
        raw_content = raw_content[:-len("```")].strip()
    
    try:
        parsed = json.loads(raw_content)
    except json.JSONDecodeError as e:
        raise ValueError(f"JSON parsing error: {e.msg}\nContent:\n{raw_content}")
    
    return OrchestratorOutput(**parsed)

async def call_worker(name: str, description: str, model: str = MODEL) -> str:
    prompt = WORKER_PROMPT.format(name=name, description=description)
    response = await client.chat.completions.create(
        model=model,
        messages=[
            {"role": "user", "content": prompt}
        ],
    )
    return response.choices[0].message.content

async def call_aggregator(subtasks_code: str, model: str = MODEL) -> str:
    prompt = AGGREGATOR_PROMPT.format(subtasks_code=subtasks_code)
    response = await client.chat.completions.create(
        model=model,
        messages=[
            {"role": "user", "content": prompt}
        ],
    )
    return response.choices[0].message.content

In [64]:
async def orchestrator_worker_flow(problem: str, model: str = MODEL) -> str:
    
    orchestrator_output = await call_orchestrator(problem, model)

    subtasks_code = await asyncio.gather(*[call_worker(subtask.name, subtask.description, model) for subtask in orchestrator_output.subtasks])

    return await call_aggregator(subtasks_code, model)

In [65]:
async def main():
    problem = "Create a Python script that reads a CSV file, processes the data and outputs a summary in JSON format. This solution should handle missing values gracefully and hightlight data anomalies."
    final_code = await orchestrator_worker_flow(problem)
    print(final_code)

In [70]:
import asyncio

def run_main():
    asyncio.run(main())

if __name__ == "__main__":
    import threading
    thread = threading.Thread(target=run_main)
    thread.start()
    thread.join()

```python
import csv
import json
import os
import numpy as np
import pandas as pd
import unittest

def read_csv_file(file_path):
    with open(file_path, mode='r') as file:
        csv_reader = csv.reader(file)
        data = []
        for row in csv_reader:
            data.append(row)
    return data

def handle_missing_values(df, strategy='fill', fill_value=0):
    if strategy == 'fill':
        return df.fillna(fill_value)
    elif strategy == 'remove':
        return df.dropna()
    elif strategy == 'interpolate':
        return df.interpolate()
    else:
        raise ValueError("Invalid strategy. Use 'fill', 'remove', or 'interpolate'.")

def identify_anomalies(data, column, threshold=1.5):
    Q1 = data[column].quantile(0.25)
    Q3 = data[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - (threshold * IQR)
    upper_bound = Q3 + (threshold * IQR)
    anomalies = data[(data[column] < lower_bound) | (data[column] > upper_bound)]
    return anomalies

def summarize_