In [276]:
#%pip install openai
#%pip install python-dotenv
%pip install pyjson5
import os
import json
import shutil
import asyncio
import logging
from pprint import pprint, PrettyPrinter
from typing import Dict, Any, Optional, List
from openai import AsyncOpenAI
from dotenv import load_dotenv
from copy import deepcopy
from pathlib import Path
pp = PrettyPrinter(
    indent=2,
    width=1,        # Force maximum line breaks
    depth=None,     # No depth limit
    compact=False,  # Items on separate lines
    sort_dicts=False # Preserve dictionary order
)
load_dotenv()
client = AsyncOpenAI(
  organization=os.getenv("OPENAI_ORGANIZATION"),
  project=os.getenv("OPENAI_PROJECT")
)

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

#current_directory = os.getcwd('../')
#plan_name = "-".join( (parts := current_directory.split('/'))[parts.index("plans"):] )


Note: you may need to restart the kernel to use updated packages.


In [277]:
def build_nested_structure(root_path: str, parent_id: str = None, is_root: bool = True) -> dict:
    root = Path(root_path)
    base_name = root.name
    
    # For root node, just use base_name
    # For first level children, just use their base_name
    # For deeper children, use the full lineage starting from first child
    if is_root:
        current_id = base_name
    elif parent_id is None:
        current_id = base_name
    else:
        current_id = f"{parent_id}-{base_name}"
    
    structure = {"id": current_id}
    
    index_path = root / "index.json"
    if index_path.exists():
        try:
            with open(index_path) as f:
                index_data = json.load(f)
                structure.update(index_data)
        except (json.JSONDecodeError, Exception) as e:
            print(f"Warning: Error with index.json in {root_path}: {str(e)}")
    
    subdirs = [d for d in root.iterdir() if d.is_dir()]
    if subdirs:
        structure["pods"] = {"items": []}
        sub_structures = []
        for subdir in subdirs:
            # If this is root, don't pass parent_id
            # If this is first level, pass just the base_name as parent_id
            # Otherwise, pass the full current_id
            next_parent_id = None if is_root else current_id
            sub_structure = build_nested_structure(
                str(subdir), 
                parent_id=next_parent_id, 
                is_root=False
            )
            sub_structures.append(sub_structure)
        
        sorted_structures = sorted(sub_structures, key=lambda x: x.get('order', 999999))
        structure["pods"]["items"] = sorted_structures
            
    return structure

In [278]:
class PodPad:
    def __init__(self, root_path: str):
        self.root_path = Path(root_path)
        self.structure = self._load_structure()

    def _load_structure(self) -> dict:
        """Reuse existing build_nested_structure function"""
        return build_nested_structure(str(self.root_path))

    async def get_pod(self, id: str) -> Optional[dict]:
        def find_pod(structure: dict, target_id: str) -> Optional[dict]:
            if structure['id'] == target_id:
                return deepcopy(structure)
            
            if 'pods' in structure and 'items' in structure['pods']:
                for item in structure['pods']['items']:
                    result = find_pod(item, target_id)
                    if result:
                        return result
            return None

        return find_pod(self.structure, id)

    async def update_pod(self, id: str, data: dict) -> dict:
        """
        Updates or creates a pod with given id and data.
        Creates all necessary parent directories and their index.json files.
        """
        # Ensure the pod path exists
        await self._ensure_pod_path(id)
        
        # Add id to the data being written
        pod_data = {"id": id, **data}
        
        # Update the index.json file
        pod_path = self._get_pod_path(id)
        index_path = pod_path / 'index.json'
        
        await self._write_json(index_path, pod_data)
        
        # Reload structure and return updated branch
        self.structure = self._load_structure()
        return await self.get_pod(id)
    
    async def delete_pod(self, id: str) -> bool:
        pod_path = self._get_pod_path(id)
        if pod_path.exists():
            shutil.rmtree(pod_path)
            self.structure = self._load_structure()
            return True
        return False

    async def reload(self) -> dict:
        self.structure = self._load_structure()
        return deepcopy(self.structure)

    def _get_pod_path(self, id: str) -> Path:
        """Convert pod ID to filesystem path"""
        parts = id.split('-')
        return self.root_path.joinpath(*parts)

    async def _ensure_pod_path(self, id: str) -> None:
        """Create directories and empty index.json files for pod path"""
        parts = id.split('-')
        current_path = self.root_path
        
        for part in parts:
            current_path = current_path / part
            current_path.mkdir(exist_ok=True)
            
            index_path = current_path / 'index.json'
            if not index_path.exists():
                await self._write_json(index_path, {})

    async def _write_json(self, path: Path, data: dict) -> None:
        """Write JSON data to file asynchronously"""
        def _write():
            with open(path, 'w') as f:
                json.dump(data, f, indent=2)
        
        await asyncio.get_event_loop().run_in_executor(None, _write)




# Example usage:
# async def main():
#     pod_pad = PodPad("podpad")
    
#     # Get a pod
#     pod = await pod_pad.get_pod("schema-plan-section")
    
#     # Update a pod
#     updated = await pod_pad.update_pod("schema-plan-section-new", {
#         "desc": "New Section",
#         "order": 1
#     })
    
#     # Delete a pod
#     deleted = await pod_pad.delete_pod("schema-plan-section-old")
    
#     # Reload structure
#     structure = await pod_pad.reload()

In [279]:
async def initialize_openai_agents(client: AsyncOpenAI, payload: Dict[str, Any]) -> Dict[str, Any]:
    """
    Initialize OpenAI assistants based on provided configuration payload.
    
    Args:
        client: AsyncOpenAI client instance
        payload (dict): Configuration payload containing assistant definitions
        
    Returns:
        dict: Original payload with updated assistant information
    """
    # List existing assistants
    existing_assistants = {
        assistant.name: assistant 
        for assistant in (await client.beta.assistants.list()).data
    }
    
    # Process each assistant configuration
    for item in payload["pods"]["items"]:
        # Extract model and description from tags
        model = next(
            (tag["value"] for tag in item["tags"] if tag["key"] == "model"),
            "gpt-4"  # Default model if not specified
        )
        description = next(
            (tag["value"] for tag in item["tags"] if tag["key"] == "description"),
            ""  # Default empty description if not specified
        )
        
        # Build assistant configuration
        assistant_config = {
            "name": item["id"],
            "instructions": item["string"],
            "model": model,
            "description": description,
            "temperature": item["amount"],
        }
        
        # Add response format if specified
        if "meta" in item and "response_format" in item["meta"]:
            assistant_config["response_format"] = item["meta"]["response_format"]
            
        # Check if assistant exists
        if item["id"] in existing_assistants:
            existing_assistant = existing_assistants[item["id"]]
            
            # Compare existing assistant with new configuration
            needs_update = (
                existing_assistant.instructions != assistant_config["instructions"] or
                existing_assistant.model != assistant_config["model"] or
                existing_assistant.description != assistant_config["description"] or
                abs(existing_assistant.temperature - assistant_config["temperature"]) > 0.001
            )
            
            if "response_format" in assistant_config:
                needs_update = needs_update or (
                    getattr(existing_assistant, "response_format", None) != 
                    assistant_config["response_format"]
                )
            
            if needs_update:
                assistant = await client.beta.assistants.update(
                    existing_assistant.id,
                    **assistant_config
                )
            else:
                assistant = existing_assistant
        else:
            # Create new assistant
            assistant = await client.beta.assistants.create(**assistant_config)
        
        # Update the item in the payload with the assistant ID
        item["desc"] = assistant.id
        
        # Ensure tags contain updated information
        existing_tags = {tag["key"]: tag for tag in item["tags"]}
        
        if "model" not in existing_tags:
            item["tags"].append({"key": "model", "value": assistant.model})
        else:
            existing_tags["model"]["value"] = assistant.model
            
        if "description" not in existing_tags:
            item["tags"].append({"key": "description", "value": assistant.description})
        else:
            existing_tags["description"]["value"] = assistant.description
    
    return payload


# Example usage:
async def main():
    client = OpenAI()
    payload = {
        "id": "schema-agent-editor",
        "pods": {
            "items": [
                {
                    "id": "schema-agent-editor-researcher",
                    "string": "...",
                    "order": 1,
                    "amount": 0.7,
                    "tags": [
                        {"key": "description", "value": "Researcher"},
                        {"key": "model", "value": "gpt-4o-mini"}
                    ],
                    "meta": {"response_format": {"type": "json_object"}}
                }
            ]
        }
    }
    
    updated_payload = await initialize_openai_agents(client, payload)
    print("Assistants initialized and payload updated")

In [280]:
async def manage_thread(client, pod):
    """
    Manages OpenAI threads based on the provided pod configuration.
    
    Args:
        client: OpenAI client object
        pod (dict): Dictionary containing pod configuration with at least 'id' field
                   and optionally a 'desc' field containing thread ID
    
    Returns:
        dict: Modified pod dictionary with thread ID stored in 'desc' field
    """
    # If desc is not present, create a new thread
    if 'desc' not in pod:
        # Create a new thread using the OpenAI client
        thread = await client.beta.threads.create()
        
        # Store the thread ID in the desc field
        pod['desc'] = thread.id
    
    return pod

# Example usage
from openai import OpenAI
import asyncio

client = OpenAI()
pod = {'id': 'pad-banana'}

# Example usage in an async context
async def main():
    updated_pod = await manage_thread(client, pod)
    print(updated_pod)  # Will show the pod with the thread ID in desc

# Run the async function
asyncio.run(main())

In [281]:
if __name__ == "__main__":
    result = build_nested_structure("podpad")
    print(json.dumps(result, indent=2))

{
  "id": "podpad",
  "pods": {
    "items": [
      {
        "id": "pad",
        "pods": {
          "items": [
            {
              "id": "pad-banana",
              "desc": "thread_def42nqJp4oaR5qs4WXZmL3e",
              "pods": {
                "items": [
                  {
                    "id": "pad-banana-intention",
                    "tags": [
                      {
                        "key": "context-location-geographic_position",
                        "value": "Property located in El Zamorano, Honduras, near Escuela Agricola Zamorano, situated in a valley with favorable agricultural climate and average annual temperature of 21\u00b0C",
                        "keywords": [
                          "company_overview",
                          "operations_plan"
                        ]
                      },
                      {
                        "key": "context-location-market_access",
                        "value": "Strategic position w

In [282]:
pod_pad = PodPad("podpad")
structure = await pod_pad.reload()

In [283]:
pod_agents = await pod_pad.get_pod("schema-agent-editor")

In [284]:
print (json.dumps(pod, indent=1))

{
 "id": "schema-agent-editor",
 "pods": {
  "items": [
   {
    "id": "schema-agent-editor-researcher",
    "string": "The Researcher agent is responsible for gathering accurate quantitative and qualitative data from reputable academic databases, peer-reviewed journals, scientific publications, technical documentation, and established news sources, while providing clear attribution and maintaining data provenance; they leverage their model training to identify relevant information and patterns but must explicitly distinguish between factual data and model-derived insights; safety guardrails include: verifying source credibility through cross-referencing, avoiding potential misinformation/disinformation sources, flagging data quality issues, maintaining version control of gathered information, respecting copyright and data privacy regulations, declining requests for harmful/illegal information, and alerting human oversight when encountering ethically ambiguous queries; source priority 

In [285]:
updated_payload = await initialize_openai_agents(client, pod_agents)
print("Assistants initialized and payload updated", json.dumps(updated_payload, indent=1))

INFO:httpx:HTTP Request: GET https://api.openai.com/v1/assistants "HTTP/1.1 500 Internal Server Error"
INFO:openai._base_client:Retrying request to /assistants in 0.417677 seconds
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/assistants "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/assistants/asst_sVHUg54CtuUHp0mqOhxY5VOm "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/assistants/asst_RtNakCFeDqbA9szPBb7Ju4Rq "HTTP/1.1 200 OK"


Assistants initialized and payload updated {
 "id": "schema-agent-editor",
 "pods": {
  "items": [
   {
    "id": "schema-agent-editor-researcher",
    "string": "The Researcher agent is responsible for gathering accurate quantitative and qualitative data from reputable academic databases, peer-reviewed journals, scientific publications, technical documentation, and established news sources, while providing clear attribution and maintaining data provenance; they leverage their model training to identify relevant information and patterns but must explicitly distinguish between factual data and model-derived insights; safety guardrails include: verifying source credibility through cross-referencing, avoiding potential misinformation/disinformation sources, flagging data quality issues, maintaining version control of gathered information, respecting copyright and data privacy regulations, declining requests for harmful/illegal information, and alerting human oversight when encountering et

In [286]:
for updated_pod in updated_payload["pods"]["items"]:
    print (updated_pod)
    updated = await pod_pad.update_pod(updated_pod["id"], updated_pod)
    

{'id': 'schema-agent-editor-researcher', 'string': 'The Researcher agent is responsible for gathering accurate quantitative and qualitative data from reputable academic databases, peer-reviewed journals, scientific publications, technical documentation, and established news sources, while providing clear attribution and maintaining data provenance; they leverage their model training to identify relevant information and patterns but must explicitly distinguish between factual data and model-derived insights; safety guardrails include: verifying source credibility through cross-referencing, avoiding potential misinformation/disinformation sources, flagging data quality issues, maintaining version control of gathered information, respecting copyright and data privacy regulations, declining requests for harmful/illegal information, and alerting human oversight when encountering ethically ambiguous queries; source priority order: peer-reviewed research, government/institutional databases, t

In [287]:
structure = await pod_pad.reload()
pod_thread = await pod_pad.get_pod("pad-banana")
print(pod_thread)

{'id': 'pad-banana', 'desc': 'thread_def42nqJp4oaR5qs4WXZmL3e', 'pods': {'items': [{'id': 'pad-banana-intention', 'tags': [{'key': 'context-location-geographic_position', 'value': 'Property located in El Zamorano, Honduras, near Escuela Agricola Zamorano, situated in a valley with favorable agricultural climate and average annual temperature of 21°C', 'keywords': ['company_overview', 'operations_plan']}, {'key': 'context-location-market_access', 'value': 'Strategic position with access to Danli (45 minute drive) and Tegucigalpa (1 hour drive) markets, connected via well-maintained highway infrastructure', 'keywords': ['market_analysis', 'operations_plan']}, {'key': 'context-property-land_distribution', 'value': 'Total land area of 12.6 hectares, with 0.5 hectares of arable land identified for agricultural development, featuring gentle slope and good drainage characteristics', 'keywords': ['company_overview', 'operations_plan']}, {'key': 'context-property-forest_assets', 'value': 'Subst

In [288]:
structure = await pod_pad.reload()
updated_pod = await manage_thread(client, pod_thread)
print(updated_pod)  # Will show the pod with the thread ID in desc
updated_file = await pod_pad.update_pod(updated_pod["id"], updated_pod)
structure = await pod_pad.reload()

{'id': 'pad-banana', 'desc': 'thread_def42nqJp4oaR5qs4WXZmL3e', 'pods': {'items': [{'id': 'pad-banana-intention', 'tags': [{'key': 'context-location-geographic_position', 'value': 'Property located in El Zamorano, Honduras, near Escuela Agricola Zamorano, situated in a valley with favorable agricultural climate and average annual temperature of 21°C', 'keywords': ['company_overview', 'operations_plan']}, {'key': 'context-location-market_access', 'value': 'Strategic position with access to Danli (45 minute drive) and Tegucigalpa (1 hour drive) markets, connected via well-maintained highway infrastructure', 'keywords': ['market_analysis', 'operations_plan']}, {'key': 'context-property-land_distribution', 'value': 'Total land area of 12.6 hectares, with 0.5 hectares of arable land identified for agricultural development, featuring gentle slope and good drainage characteristics', 'keywords': ['company_overview', 'operations_plan']}, {'key': 'context-property-forest_assets', 'value': 'Subst

In [289]:
structure = await pod_pad.reload()
#print (json.dumps(await pod_pad.get_pod("pad-banana-intention"), indent=1))

In [378]:
async def check_thread_status(client: Any, thread_id: str) -> Dict[str, Any]:
    runs = await client.beta.threads.runs.list(thread_id=thread_id)
    return runs.data

async def poll_run(client: Any, thread_id: str, run_id: str) -> Dict[str, Any]:
    while True:
        steps = await client.beta.threads.runs.steps.list(
            thread_id=thread_id,
            run_id=run_id
        )
        last_step = steps.data[0] if steps.data else None
        
        if not last_step:
            logger.info(f"No steps found for run {run_id}")
            return None
            
        status = last_step.status
        logger.info(f"Run {run_id} status: {status}")
        
        if status in ["completed", "failed", "cancelled"]:
            return last_step
            
        await asyncio.sleep(1)

async def run_assistant(
    client: Any,
    pod_pad: Any,
    pod_banana: Dict[str, Any],
    schema: Dict[str, Any],
    agent_schema: Dict[str, Any]
) -> None:
    try:
        thread_id = pod_banana.get('desc')
        if not thread_id:
            raise ValueError("Thread ID not found in pod_banana desc")
            
        logger.info(f"Checking thread status for {thread_id}")
        existing_runs = await check_thread_status(client, thread_id)
        
        sections = schema.get('pods', {}).get('items', [])
        active_sections = [s for s in sections if s.get('status') == 'ACTIVE']
        logger.info(f"Found {len(active_sections)} active sections")

        if not active_sections:
            logger.warning("No active sections found")
            return

        current_number = 1
        plan_base_id = f"{pod_banana.get('id')}-plan-{current_number}"

        writer_agent = next(
            (a for a in agent_schema.get('pods', {}).get('items', [])
             if a.get('status') == 'ACTIVE'),
            None
        )

        if not writer_agent:
            raise ValueError("Writer agent not found")

        for section in active_sections:
            section_name = section.get('id', '').split('-')[-1]
            section_pod_id = f"{plan_base_id}-{section_name}"
            
            existing_pod = await pod_pad.get_pod(section_pod_id)
            if existing_pod and existing_pod.get('desc'):
                logger.info(f"Section {section_name} already has run {existing_pod['desc']}")
                continue

            run_payload = await assemble_run_payload(section, writer_agent, pod_banana)
            print("run_payload", run_payload)
            # Create and get run data
            run_response = await client.beta.threads.runs.create(
                thread_id=thread_id,
                **run_payload
            )
            
            # Get run ID from response directly
            logger.info(f"Run response type: {type(run_response)}")
            logger.info(f"Run response: {run_response}")
            
            # For now, return early to debug response structure
            return

            step = await poll_run(client, thread_id, run_id)
            if step and step.status == "completed":
                logger.info(f"Run {run.id} completed successfully")
                logger.info(f"Output: {json.dumps(step.step_details, indent=2)}")
            else:
                logger.error(f"Run {run.id} failed or was cancelled")

    except Exception as e:
        logger.error(f"Error: {str(e)}")
        raise

async def assemble_run_payload(
    section: Dict[str, Any],
    agent: Dict[str, Any],
    pod_banana: Dict[str, Any]
) -> Dict[str, Any]:
    try:
        intention_tags = []
        for pod_item in pod_banana.get('pods', {}).get('items', []):
            if pod_item.get('id', '').endswith('intention'):
                intention_tags = pod_item.get('tags', [])
                break

        instructions = [
            f"You are creating a section of a business plan: {section.get('string', '')}",
            f"{section.get('desc', '')}: {json.dumps(intention_tags)}",
            "Respond in markdown format without wrapper. Use the section name as the first heading starting with ## and use the full range of expression provided by the markdown specification."
        ]
        additional_instructions = "\n".join(instructions)

        model = next((tag['value'] for tag in agent.get('tags', []) if tag['key'] == 'model'), None)
        if not model:
            raise ValueError("Model not found in agent tags")
        
        return {
            "assistant_id": agent.get('desc'),
            "model": model,
            "additional_instructions": additional_instructions,
            "stream": True,
            "response_format": "auto"
        }

    except Exception as e:
        logger.error(f"Error assembling payload: {str(e)}")
        raise

In [379]:
structure = await pod_pad.reload()
# print (json.dumps(await pod_pad.get_pod("pad-banana"), indent=1))
run_pod = await pod_pad.get_pod("pad-banana")
run_agents = await pod_pad.get_pod("schema-agent-editor")
run_sections = await pod_pad.get_pod("schema-plan-section")
a = await run_assistant(client, pod_pad, run_pod, run_sections, run_agents)

INFO:__main__:Checking thread status for thread_def42nqJp4oaR5qs4WXZmL3e
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/threads/thread_def42nqJp4oaR5qs4WXZmL3e/runs "HTTP/1.1 200 OK"
INFO:__main__:Found 2 active sections
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/threads/thread_def42nqJp4oaR5qs4WXZmL3e/runs "HTTP/1.1 200 OK"
INFO:__main__:Run response type: <class 'openai.AsyncStream'>
INFO:__main__:Run response: <openai.AsyncStream object at 0x10c4a9e50>


In [333]:
# updated = await pod_pad.update_pod("schema-plan-section-new", {
#         "desc": "New Section",
#         "order": 1
#     })

In [None]:
# deleted = await pod_pad.delete_pod("schema-plan-section-new")

In [87]:
def initialize_thread(client: OpenAI, name: Optional[str] = None) -> str:
    try:
        # Create new thread with metadata
        thread = client.beta.threads.create(
            metadata={
                "name": name
            } if name else None
        )
        return thread.id
        
    except Exception as e:
        raise Exception(f"Failed to initialize thread: {str(e)}")

In [None]:
thread_id = initialize_thread(client, plan_name)

NameError: name 'plan_name' is not defined

In [20]:
print(thread_id)

thread_ItiqszTHSeEGmsFtlB3zRycf


In [10]:
stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Say this is a test"}],
    stream=True,
)
for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")

This is a test. How can I assist you today?