In [None]:
import os
import json
import re

from crewai import Agent, Task, Crew
from crewai.tools import BaseTool
from textwrap import dedent
from typing import List, Dict, Any, Optional, Type, Union
from pydantic import BaseModel, Field

from datetime import datetime
from dotenv import load_dotenv

OUTPUT_DIR = "/home/irisowner/dev/output"

## Get the LLM provider

In [None]:
load_dotenv()

def get_facilis_llm():
    """Returns the appropriate chat model based on AI_ENGINE selection."""
    from crewai import LLM
    
    ai_engine = os.getenv("AI_ENGINE")
    api_key = os.getenv("API_KEY")
    model_name = os.getenv("LLM_MODEL_NAME")
    

    if ai_engine == "openai":
        from openai import OpenAI
        os.environ["OPENAI_API_KEY"] = api_key
        return LLM(model=model_name, temperature=0)  # Return just the model name

    if ai_engine in ["azureopenai", "azure_openai"]:
        azure_endpoint = os.getenv("AZURE_ENDPOINT")
        azure_deployment_name = os.getenv("AZURE_DEPLOYMENT_NAME")
        if not azure_endpoint or not azure_deployment_name:
            raise ValueError("Azure OpenAI requires AZURE_ENDPOINT and AZURE_DEPLOYMENT_NAME in .env")
        os.environ["AZURE_OPENAI_API_KEY"] = api_key
        os.environ["AZURE_OPENAI_ENDPOINT"] = azure_endpoint
        return azure_deployment_name  # Return the deployment name for Azure

    if ai_engine in ["anthropic", "claude"]:
        os.environ["ANTHROPIC_API_KEY"] = api_key
        return LLM(model=model_name, temperature=0)

    if ai_engine == "gemini":
        os.environ["GOOGLE_API_KEY"] = api_key
        return LLM(model=model_name, temperature=0)
    
    if ai_engine == "ollama":
        return LLM(model=model_name, temperature=0)
    
    return None


## Prepare the Crew

### Tools

In [None]:
class ProductionData:
    def __init__(self):
        self.productions = {}  # Store productions and their endpoints
        
    def add_production(self, name: str, namespace: str):
        if name not in self.productions:
            self.productions[name] = {
                'namespace': namespace,
                'created_at': datetime.now().isoformat(),
                'endpoints': []
            }
            
    def add_endpoint(self, production_name: str, endpoint_data: Dict):
        if production_name in self.productions:
            self.productions[production_name]['endpoints'].append(endpoint_data)
            
    def production_exists(self, name: str) -> bool:
        return name in self.productions

class OpenAPITransformer:
    @staticmethod
    def create_openapi_base(info: Dict) -> Dict:
        return {
            "openapi": "3.0.0",
            "info": {
                "title": info.get("production_name", "API Documentation"),
                "version": "1.0.0",
                "description": f"API documentation for {info.get('production_name')} in {info.get('namespace')}"
            },
            "paths": {},
            "components": {
                "schemas": {},
                "securitySchemes": {}
            }
        }

    @staticmethod
    def create_path_item(endpoint_spec: Dict) -> Dict:
        method = endpoint_spec["HTTP_Method"].lower()
        path_item = {
            method: {
                "summary": endpoint_spec.get("description", ""),
                "parameters": [],
                "responses": {
                    "200": {
                        "description": "Successful response"
                    }
                }
            }
        }

        if endpoint_spec.get("params"):
            for param in endpoint_spec["params"]:
                path_item[method]["parameters"].append({
                    "name": param,
                    "in": "query",
                    "required": False,
                    "schema": {"type": "string"}
                })

        if method in ["post", "put", "patch"] and endpoint_spec.get("json_model"):
            path_item[method]["requestBody"] = {
                "required": True,
                "content": {
                    "application/json": {
                        "schema": endpoint_spec["json_model"]
                    }
                }
            }

        return path_item


def extract_json_from_markdown(markdown_text: str) -> str:
    """
    Extracts JSON content from markdown-formatted string.
    Handles cases where JSON is wrapped in ```json or ``` code blocks.
    """
    try:
        # Convert the CrewOutput to string if it isn't already
        text = str(markdown_text)
        
        # If the text is already a valid JSON string, return it
        try:
            json.loads(text)
            return text
        except json.JSONDecodeError:
            pass

        # Try to extract JSON from markdown code blocks
        if '```json' in text:
            # Split by ```json and take the content after it
            parts = text.split('```json')
            if len(parts) > 1:
                # Split by ``` to get the content between the code block
                json_text = parts[1].split('```')[0]
                return json_text.strip()
        elif '```' in text:
            # Split by ``` and take the content between code blocks
            parts = text.split('```')
            if len(parts) > 1:
                potential_json = parts[1].strip()
                try:
                    json.loads(potential_json)
                    return potential_json
                except json.JSONDecodeError:
                    pass

        # If no code blocks, try to find JSON between curly braces
        start = text.find('{')
        end = text.rindex('}') + 1
        
        if start != -1 and end > start:
            json_str = text[start:end]
            # Validate that this is valid JSON
            json.loads(json_str)
            return json_str
            
        # If we couldn't find valid JSON, try to create a simple JSON object
        # from the text itself
        return json.dumps({"response": text})

    except Exception as e:
        return json.dumps({"error": "Failed to parse response", "raw_response": text})

class IRISClassWriter:
    """Tool for generating InterSystems IRIS interoperability class files"""
    _instance = None
    _initialized = False
    
    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, output_dir=OUTPUT_DIR):
        if not IRISClassWriter._initialized:
            self.output_dir = os.path.abspath(output_dir)
            self.generated_classes = {}
            self._ensure_output_directory()
            IRISClassWriter._initialized = True

    def validate_classes(self, class_names: Optional[List[str]] = None) -> Dict[str, Any]:
        """
        Validate the generated IRIS classes
        
        Args:
            class_names (Optional[List[str]]): Optional list of specific class names to validate
            
        Returns:
            dict: Validation results for each class
        """
        validation_results = {}
        
        classes_to_validate = (
            class_names if class_names 
            else list(self.generated_classes.keys())
        )
        
        for class_name in classes_to_validate:
            if class_name not in self.generated_classes:
                validation_results[class_name] = {
                    "valid": False,
                    "issues": ["Class not found in generated classes"]
                }
                continue
                
            class_content = self.generated_classes[class_name]
            issues = []
            
            # Basic validation checks
            if not "Class " in class_content:
                issues.append("Missing Class declaration")
            
            if not "Extends " in class_content:
                issues.append("Missing Extends keyword")
            
            # Add more validation checks as needed
            
            validation_results[class_name] = {
                "valid": len(issues) == 0,
                "issues": issues
            }
        
        return validation_results

    def _ensure_output_directory(self):
        """Ensures the output directory exists and is writable"""
        try:
            os.makedirs(self.output_dir, exist_ok=True)
            # Test if directory is writable
            test_file = os.path.join(self.output_dir, '.write_test')
            try:
                with open(test_file, 'w') as f:
                    f.write('test')
                os.remove(test_file)
            except (IOError, OSError) as e:
                raise PermissionError(f"Output directory {self.output_dir} is not writable: {str(e)}")
        except Exception as e:
            raise RuntimeError(f"Failed to create output directory {self.output_dir}: {str(e)}")
    
    def write_production_class(self, production_name, components):
        """
        Generate an IRIS Production class
        
        Args:
            production_name (str): Name of the production
            components (list): List of components to include in the production
            
        Returns:
            str: The generated class content
        """
        class_name = self._sanitize_class_name(production_name)
        
        class_content = f"""Class {class_name} Extends Ens.Production
        {{

        XData ProductionDefinition
        {{
        <Production Name="{class_name}" LogGeneralTraceEvents="false">
        """
            
        # Add components
        for component in components:
            comp_type = component.get("type", "")
            comp_name = component.get("name", "")
            comp_class = component.get("class", "")
            
            if comp_type and comp_name and comp_class:
                class_content += f"""  <{comp_type} Name="{comp_name}" Class="{comp_class}">
        </{comp_type}>
        """
                
                class_content += """</Production>
        }}

        }
        """
        
        self.generated_classes[class_name] = class_content
        return class_content
    
    def write_business_operation(self, operation_name, endpoint_info):
        """
        Generate an IRIS Business Operation class
        
        Args:
            operation_name (str): Name of the business operation
            endpoint_info (dict): Information about the API endpoint
            
        Returns:
            str: The generated class content
        """
        class_name = self._sanitize_class_name(f"bo{operation_name}")
        
        method = endpoint_info.get("method", "GET")
        path = endpoint_info.get("path", "/")
        
        class_content = f"""Class {class_name} Extends Ens.BusinessOperation
        {{

        Parameter ADAPTER = "EnsLib.HTTP.OutboundAdapter";

        Property Adapter As EnsLib.HTTP.OutboundAdapter;

        Parameter INVOCATION = "Queue";

        Method {operation_name}(pRequest As ms{operation_name}, Output pResponse As Ens.Response) As %Status
        {{
            Set tSC = $$$OK
            Try {{
                // Prepare HTTP request
                Set tHttpRequest = ##class(%Net.HttpRequest).%New()
                Set tHttpRequest.ContentType = "application/json"
                
                // Set request path and method
                Set tPath = "{path}"
                Set tMethod = "{method}"
                
                // Convert request message to JSON
                // [Additional logic for request preparation]
                
                // Send the HTTP request
                Set tSC = ..Adapter.SendFormDataArray(.tHttpResponse, tMethod, tPath, tHttpRequest)
                
                // Process response
                If $$$ISOK(tSC) {{
                    // Create response object
                    Set pResponse = ##class(Ens.Response).%New()
                    // Process HTTP response
                }}
            }}
            Catch ex {{
                Set tSC = ex.AsStatus()
            }}
            
            Return tSC
        }}

        XData MessageMap
        {{
        <MapItems>
        <MapItem MessageType="ms{operation_name}">
            <Method>{operation_name}</Method>
        </MapItem>
        </MapItems>
        }}

        }}
        """
        
        self.generated_classes[class_name] = class_content
        return class_content
    
    def write_message_class(self, message_name, schema_info):
        """
        Generate an IRIS Message class
        
        Args:
            message_name (str): Name of the message class
            schema_info (dict): Information about the schema
            
        Returns:
            str: The generated class content
        """
        class_name = self._sanitize_class_name(f"ms{message_name}")
        
        class_content = f"""Class {class_name} Extends Ens.Request
        {{

        """
        
        # Add properties based on schema
        if isinstance(schema_info, dict) and "properties" in schema_info:
            for prop_name, prop_info in schema_info["properties"].items():
                prop_type = self._map_schema_type_to_iris(prop_info.get("type", "string"))
                class_content += f"Property {prop_name} As {prop_type};\n\n"
        
        class_content += "}\n"
        
        self.generated_classes[class_name] = class_content
        return class_content
    
    def export_classes(self):
        """
        Export all generated classes to .cls files
        
        Returns:
            dict: Status of export operation
        """
        results = {}
        
        if not self.generated_classes:
            return {"status": "warning", "message": "No classes to export"}
        
        for class_name, class_content in self.generated_classes.items():
            file_path = os.path.join(self.output_dir, f"{class_name}.cls")
            
            try:
                # Ensure the directory exists (including package directories)
                os.makedirs(os.path.dirname(file_path), exist_ok=True)
                
                # Write the file with proper encoding
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.write(class_content)
                
                results[class_name] = {
                    "status": "success",
                    "path": file_path,
                    "size": os.path.getsize(file_path)
                }
            except Exception as e:
                results[class_name] = {
                    "status": "error",
                    "error": str(e),
                    "path": file_path
                }
        
        return results
    
    def validate_classes(self):
        """
        Validate the generated IRIS classes for syntax and structural correctness
        
        Returns:
            dict: Validation results
        """
        validation_results = {}
        
        for class_name, class_content in self.generated_classes.items():
            issues = []
            
            # Basic validation checks
            if not "Class " in class_content:
                issues.append("Missing Class declaration")
            
            if not "Extends " in class_content:
                issues.append("Missing Extends keyword")
            
            # Add more validation as needed
            
            validation_results[class_name] = {
                "valid": len(issues) == 0,
                "issues": issues
            }
        
        return validation_results

    def _sanitize_class_name(self, name):
        """Sanitize a name to be valid as an IRIS class name"""
        if not name:
            raise ValueError("Class name cannot be empty")
        if len(name) > 255:  # Example max length
            raise ValueError("Class name too long")

        # Replace non-alphanumeric characters with underscores
        name = re.sub(r'[^a-zA-Z0-9_]', '_', name)
        
        # Ensure first character is a letter
        if not name[0].isalpha():
            name = "X" + name
        
        return name
    
    def _map_schema_type_to_iris(self, schema_type):
        """Map OpenAPI schema type to IRIS type"""
        type_mapping = {
            "string": "%String",
            "integer": "%Integer",
            "number": "%Float",
            "boolean": "%Boolean",
            "array": "%Library.ListOfDataTypes",
            "object": "%DynamicObject"
        }
        
        return type_mapping.get(schema_type, "%String")

def sanitize_filename(name):
    """
    Sanitize a string to be used as a filename
    
    Args:
        name (str): The input string
        
    Returns:
        str: A sanitized filename
    """
    # Remove invalid characters for filenames
    invalid_chars = '<>:"/\\|?*'
    for char in invalid_chars:
        name = name.replace(char, '_')
    
    # Ensure it doesn't start with a space or period
    if name.startswith(' ') or name.startswith('.'):
        name = 'x' + name
    
    return name

class OpenAPIParser:
    """Tool for parsing and analyzing OpenAPI v3 specifications"""

    def analyze(self, openapi_spec):
        """
        Analyzes an OpenAPI specification and returns structured information
        
        Args:
            openapi_spec (Union[dict, str]): The OpenAPI specification as a Python dictionary or JSON string
            
        Returns:
            dict: Structured analysis of the OpenAPI specification
        """
        if isinstance(openapi_spec, str):
            try:
                openapi_spec = json.loads(openapi_spec)
            except json.JSONDecodeError as e:
                return {"error": f"Invalid JSON string provided: {str(e)}"}
        elif not isinstance(openapi_spec, dict):
            return {"error": "Input must be either a JSON string or a dictionary"}

        try:
            result = {
                "info": self._extract_info(openapi_spec),
                "endpoints": self._extract_endpoints(openapi_spec),
                "schemas": self._extract_schemas(openapi_spec)
            }
            return result
        except Exception as e:
            return {"error": f"Analysis failed: {str(e)}"}
    
    def _extract_info(self, spec):
        """Extract basic information from the API spec"""
        info = spec.get("info", {})
        return {
            "title": info.get("title", "Unknown API"),
            "version": info.get("version", "1.0.0"),
            "description": info.get("description", "")
        }
    
    def _extract_endpoints(self, spec):
        """Extract endpoint details from the paths section"""
        paths = spec.get("paths", {})
        endpoints = []
        
        for path, path_item in paths.items():
            for method, operation in path_item.items():
                if method in ["get", "post", "put", "delete", "patch"]:
                    endpoint = {
                        "path": path,
                        "method": method.upper(),
                        "operationId": operation.get("operationId", f"{method}_{path.replace('/', '_')}"),
                        "summary": operation.get("summary", ""),
                        "description": operation.get("description", ""),
                        "parameters": operation.get("parameters", []),
                        "requestBody": operation.get("requestBody", None),
                        "responses": operation.get("responses", {})
                    }
                    endpoints.append(endpoint)
        
        return endpoints
    
    def _extract_schemas(self, spec):
        """Extract schema definitions"""
        components = spec.get("components", {})
        schemas = components.get("schemas", {})
        
        return {name: details for name, details in schemas.items()}


class AnalyzeOpenAPIToolInput(BaseModel):
    openapi_spec: Union[str, Dict[str, Any]] = Field(
        description="OpenAPI specification as JSON string or dictionary"
    )

class AnalyzeOpenAPITool(BaseTool):
    name: str = "analyze_openapi"
    description: str = "Analyzes an OpenAPI specification and returns structured information"
    input_schema: Type[BaseModel] = AnalyzeOpenAPIToolInput

    def _run(self, openapi_spec: Union[str, Dict[str, Any]]) -> str:
        """
        Analyzes an OpenAPI specification and returns structured information
        
        Args:
            openapi_spec: The OpenAPI specification as a JSON string or dictionary
            
        Returns:
            str: JSON string containing structured analysis
        """
        parser = OpenAPIParser()
        
        try:
            # If input is string, try to parse it as JSON
            if isinstance(openapi_spec, str):
                try:
                    spec_dict = json.loads(openapi_spec)
                except json.JSONDecodeError:
                    return json.dumps({"error": "Invalid JSON string provided"})
            else:
                # If it's already a dictionary, use it directly
                spec_dict = openapi_spec
            
            result = parser.analyze(spec_dict)
            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": f"Analysis failed: {str(e)}"})

class GenerateProductionClassTool(BaseTool):
    name: str = "generate_production_class"
    description: str = "Generate an IRIS Production class"

    def _run(self, production_name: str, components: str) -> str:
        writer = IRISClassWriter()
        try:
            components_list = json.loads(components)
            return writer.write_production_class(production_name, components_list)
        except json.JSONDecodeError:
            return "Error: Invalid JSON format for components"

class CollectGeneratedFilesToolInput(BaseModel):
    directory: str = Field(
        description="Directory containing the generated .cls files"
    )

class CollectGeneratedFilesTool(BaseTool):
    name: str = "collect_generated_files"
    description: str = "Collects all generated .cls files and returns them as a JSON collection"
    input_schema: Type[BaseModel] = CollectGeneratedFilesToolInput

    def _run(self, directory: str) -> str:
        """
        Collects all generated .cls files and returns them as a JSON collection
        
        Args:
            directory (str): Directory containing the generated files
            
        Returns:
            str: JSON string containing file contents
        """
        try:
            files_collection = {}
            for root, _, files in os.walk(directory):
                for file in files:
                    if file.endswith('.cls'):
                        file_path = os.path.join(root, file)
                        with open(file_path, 'r', encoding='utf-8') as f:
                            files_collection[file] = {
                                'path': os.path.relpath(file_path, directory),
                                'content': f.read()
                            }
            return json.dumps(files_collection, indent=2)
        except Exception as e:
            return json.dumps({
                "status": "error",
                "error": str(e)
            })

class GenerateProductionClassToolInput(BaseModel):
    production_name: str = Field(description="Name of the production")
    components: Union[str, List[Dict[str, Any]]] = Field(description="Components as JSON string or list of dictionaries")

class GenerateBusinessServiceToolInput(BaseModel):
    service_name: str = Field(description="Name of the business service")
    endpoint_info: Union[str, Dict[str, Any]] = Field(description="Endpoint information as JSON string or dictionary")

class GenerateBusinessOperationToolInput(BaseModel):
    operation_name: str = Field(description="Name of the business operation")
    endpoint_info: Union[str, Dict[str, Any]] = Field(description="Endpoint information as JSON string or dictionary")

class GenerateMessageClassToolInput(BaseModel):
    message_name: str = Field(description="Name of the message class")
    schema_info: Union[str, Dict[str, Any]] = Field(description="Schema information as JSON string or dictionary")

class GenerateBusinessServiceTool(BaseTool):
    name: str = "generate_business_service"
    description: str = "Generate an IRIS Business Service class"
    input_schema: Type[BaseModel] = GenerateBusinessServiceToolInput

    def _run(self, service_name: str, endpoint_info: Union[str, Dict[str, Any]]) -> str:
        writer = IRISClassWriter()
        try:
            if isinstance(endpoint_info, str):
                try:
                    endpoint_dict = json.loads(endpoint_info)
                except json.JSONDecodeError:
                    return "Error: Invalid JSON format for endpoint info"
            else:
                endpoint_dict = endpoint_info

            class_content = writer.write_business_service(service_name, endpoint_dict)
            # Store the generated class
            writer.generated_classes[f"BS.{service_name}"] = class_content
            return class_content
        except Exception as e:
            return f"Error generating business service: {str(e)}"

class GenerateBusinessOperationTool(BaseTool):
    name: str = "generate_business_operation"
    description: str = "Generate an IRIS Business Operation class"
    input_schema: Type[BaseModel] = GenerateBusinessOperationToolInput

    def _run(self, operation_name: str, endpoint_info: Union[str, Dict[str, Any]]) -> str:
        writer = IRISClassWriter()
        try:
            if isinstance(endpoint_info, str):
                try:
                    endpoint_dict = json.loads(endpoint_info)
                except json.JSONDecodeError:
                    return "Error: Invalid JSON format for endpoint info"
            else:
                endpoint_dict = endpoint_info

            class_content = writer.write_business_operation(operation_name, endpoint_dict)
            # Store the generated class
            writer.generated_classes[f"BO.{operation_name}"] = class_content
            return class_content
        except Exception as e:
            return f"Error generating business operation: {str(e)}"

class GenerateMessageClassTool(BaseTool):
    name: str = "generate_message_class"
    description: str = "Generate an IRIS Message class"
    input_schema: Type[BaseModel] = GenerateMessageClassToolInput

    def _run(self, message_name: str, schema_info: Union[str, Dict[str, Any]]) -> str:
        writer = IRISClassWriter()
        try:
            if isinstance(schema_info, str):
                try:
                    schema_dict = json.loads(schema_info)
                except json.JSONDecodeError:
                    return "Error: Invalid JSON format for schema info"
            else:
                schema_dict = schema_info

            class_content = writer.write_message_class(message_name, schema_dict)
            # Store the generated class
            writer.generated_classes[f"MSG.{message_name}"] = class_content
            return class_content
        except Exception as e:
            return f"Error generating message class: {str(e)}"

class GenerateProductionClassTool(BaseTool):
    name: str = "generate_production_class"
    description: str = "Generate an IRIS Production class"
    input_schema: Type[BaseModel] = GenerateProductionClassToolInput

    def _run(self, production_name: str, components: Union[str, List[Dict[str, Any]]]) -> str:
        writer = IRISClassWriter()
        try:
            if isinstance(components, str):
                try:
                    components_list = json.loads(components)
                except json.JSONDecodeError:
                    return "Error: Invalid JSON format for components"
            else:
                components_list = components

            class_content = writer.write_production_class(production_name, components_list)
            # Store the generated class
            writer.generated_classes[f"Production.{production_name}"] = class_content
            return class_content
        except Exception as e:
            return f"Error generating production class: {str(e)}"

class ExportIRISClassesToolInput(BaseModel):
    output_dir: Optional[str] = Field(
        default=None,
        description="Optional output directory path. If not provided, will use default directory"
    )

class ExportIRISClassesTool(BaseTool):
    name: str = "export_iris_classes"
    description: str = "Export all generated classes to .cls files"
    input_schema: Type[BaseModel] = ExportIRISClassesToolInput

    def _run(self, output_dir: Optional[str] = None) -> str:
        writer = IRISClassWriter()
        try:
            if not writer.generated_classes:
                return json.dumps({
                    "status": "warning",
                    "message": "No classes to export",
                    "details": "The generated_classes dictionary is empty. Make sure classes were generated successfully before exporting."
                })

            if output_dir:
                writer.output_dir = os.path.abspath(output_dir)
                writer._ensure_output_directory()

            results = writer.export_classes()
            return json.dumps({
                "status": "success",
                "message": f"Exported {len(writer.generated_classes)} classes",
                "details": results
            }, indent=2)
        except Exception as e:
            return json.dumps({
                "status": "error",
                "error": str(e)
            })

class ValidateIRISClassesToolInput(BaseModel):
    class_names: Optional[List[str]] = Field(
        default=None,
        description="Optional list of specific class names to validate. If not provided, validates all classes"
    )

class ValidateIRISClassesTool(BaseTool):
    name: str = "validate_iris_classes"
    description: str = "Validate the generated IRIS classes"
    input_schema: Type[BaseModel] = ValidateIRISClassesToolInput

    def _run(self, class_names: Optional[List[str]] = None) -> str:
        """
        Validate the generated IRIS classes
        
        Args:
            class_names (Optional[List[str]]): Optional list of specific class names to validate
            
        Returns:
            str: JSON string containing validation results
        """
        writer = IRISClassWriter()
        try:
            results = writer.validate_classes(class_names)
            return json.dumps(results, indent=2)
        except Exception as e:
            return json.dumps({
                "status": "error",
                "error": str(e)
            })

# Create tool instances
analyze_openapi_tool = AnalyzeOpenAPITool()
generate_production_class_tool = GenerateProductionClassTool()
generate_business_service_tool = GenerateBusinessServiceTool()
generate_business_operation_tool = GenerateBusinessOperationTool()
generate_message_class_tool = GenerateMessageClassTool()
export_iris_classes_tool = ExportIRISClassesTool()
validate_iris_classes_tool = ValidateIRISClassesTool()
collect_generated_files_tool = CollectGeneratedFilesTool()



### Agents

In [None]:

class APIAgents:
    def __init__(self, llm):
        self.llm = llm

    def create_production_agent(self) -> Agent:
        return Agent(
            role='Production Manager',
            goal='Manage production environments and namespaces',
            backstory=dedent("""
                You are responsible for managing production environments and their namespaces.
                You interact with users to gather production details and validate their existence.
            """),
            allow_delegation=False,
            llm=self.llm,
            verbose=True
        )

    def create_interaction_agent(self) -> Agent:
        return Agent(
            role='User Interaction Specialist',
            goal='Interact with users to obtain missing API specification fields',
            backstory=dedent("""
                You are a specialist in user interaction, responsible for identifying
                and requesting missing information in API specifications.
            """),
            allow_delegation=False,
            llm=self.llm,
            verbose=True
        )

    def create_validation_agent(self) -> Agent:
        return Agent(
            role='API Validator',
            goal='Validate API specifications for correctness and consistency',
            backstory=dedent("""
                You are an expert in API validation, ensuring all specifications
                meet the required standards and format.
            """),
            allow_delegation=False,
            llm=self.llm,
            verbose=True
        )

    def create_extraction_agent(self) -> Agent:
        return Agent(
            role='API Specification Extractor',
            goal='Extract API specifications from natural language descriptions',
            backstory=dedent("""
                You are specialized in interpreting natural language descriptions
                and extracting structured API specifications.
            """),
            allow_delegation=True,
            llm=self.llm,
            verbose=True
        )

    def create_transformation_agent(self) -> Agent:
        return Agent(
            role='OpenAPI Transformation Specialist',
            goal='Convert API specifications into OpenAPI documentation',
            backstory=dedent("""
                You are an expert in OpenAPI specifications and documentation.
                Your role is to transform validated API details into accurate
                and comprehensive OpenAPI 3.0 documentation.
            """),
            allow_delegation=False,
            llm=self.llm,
            verbose=True
        )

    def create_reviewer_agent(self) -> Agent:
        return Agent(
            role='OpenAPI Documentation Reviewer',
            goal='Ensure OpenAPI documentation compliance and quality',
            backstory=dedent("""
                You are the final authority on OpenAPI documentation quality and compliance.
                With extensive experience in OpenAPI 3.0 specifications, you meticulously
                review documentation for accuracy, completeness, and adherence to standards.
            """),
            allow_delegation=True,
            llm=self.llm,
            verbose=True
        )


    def create_analyzer_agent(self) -> Agent:
        return Agent(
            role="OpenAPI Specification Analyzer",
            goal="Thoroughly analyze OpenAPI specifications and plan IRIS Interoperability components",
            backstory="""You are an expert in both OpenAPI specifications and InterSystems IRIS Interoperability. 
            Your job is to analyze OpenAPI documents and create a detailed plan for how they should be 
            implemented as IRIS Interoperability components.""",
            verbose=True,
            allow_delegation=False,
            tools=[analyze_openapi_tool],
            llm=self.llm
        )

    def create_bs_generator_agent(self) -> Agent:
        return Agent(
            role="IRIS Production and Business Service Generator",
            goal="Generate properly formatted IRIS Production and Business Service classes from OpenAPI specifications",
            backstory="""You are an experienced InterSystems IRIS developer specializing in Interoperability Productions.
            Your expertise is in creating Business Services and Productions that can receive and process incoming requests based on
            API specifications.""",
            verbose=True,
            allow_delegation=True,
            tools=[generate_production_class_tool, generate_business_service_tool],
            llm=self.llm
        )

    def create_bo_generator_agent(self) -> Agent:
        return Agent(
            role="IRIS Business Operation Generator",
            goal="Generate properly formatted IRIS Business Operation classes from OpenAPI specifications",
            backstory="""You are an experienced InterSystems IRIS developer specializing in Interoperability Productions.
            Your expertise is in creating Business Operations that can send requests to external systems
            based on API specifications.""",
            verbose=True,
            allow_delegation=True,
            tools=[generate_business_operation_tool, generate_message_class_tool],
            llm=self.llm
        )

    def create_exporter_agent(self) -> Agent:
        return Agent(
            role="IRIS Class Exporter",
            goal="Export and validate IRIS class definitions to proper .cls files",
            backstory="""You are an InterSystems IRIS deployment specialist. Your job is to ensure 
            that generated IRIS class definitions are properly exported as valid .cls files that 
            can be directly imported into an IRIS environment.""",
            verbose=True,
            allow_delegation=False,
            tools=[export_iris_classes_tool, validate_iris_classes_tool],
            llm=self.llm
        )
        
    def create_collector_agent(self) -> Agent:
        return Agent(
            role="IRIS Class Collector",
            goal="Collect all generated IRIS class files into a JSON collection",
            backstory="""You are a file system specialist responsible for gathering and 
            organizing generated IRIS class files into a structured collection.""",
            verbose=True,
            allow_delegation=False,
            tools=[CollectGeneratedFilesTool()],
            llm=self.llm
        )

### Agents Tasks

In [None]:

class APISpecificationCrew:
    def __init__(self, llm, production_data: ProductionData):
        self.current_agent = None 
        api_agents = APIAgents(llm)
        self.openapi_parser = OpenAPIParser()
        self.iris_class_writer = IRISClassWriter(output_dir=OUTPUT_DIR)
        self.production_agent = api_agents.create_production_agent()
        self.interaction_agent = api_agents.create_interaction_agent()
        self.validation_agent = api_agents.create_validation_agent()
        self.extraction_agent = api_agents.create_extraction_agent()
        self.transformation_agent = api_agents.create_transformation_agent()
        self.reviewer_agent = api_agents.create_reviewer_agent()
        self.production_data = production_data
        self.analyzer_agent = api_agents.create_analyzer_agent()
        self.bs_generator_agent = api_agents.create_bs_generator_agent()
        self.bo_generator_agent = api_agents.create_bo_generator_agent()
        self.exporter_agent = api_agents.create_exporter_agent()
        self.collector_agent = api_agents.create_collector_agent()

    def get_production_details(self) -> Task:
        return Task(
            description=dedent("""
                Interact with the user to obtain:
                1. Production name
                2. Namespace
                
                Check if the production exists in the system.
                If it doesn't exist, confirm if a new production should be created.
                
                Return results in JSON format:
                {
                    "production_name": string,
                    "namespace": string,
                    "exists": boolean,
                    "create_new": boolean
                }
            """),
            expected_output="""A JSON object containing production details including name, namespace, existence status, and creation flag""",
            agent=self.production_agent
        )

    def handle_missing_fields(self, missing_fields: List[str], endpoint_info: Dict) -> Task:
        # TODO: Implement the callback logic here
        return Task(
            description= dedent(f"""
            The following fields are missing for endpoint {endpoint_info.get('endpoint', 'unknown')}:
            {', '.join(missing_fields)}
            
            Current endpoint info: {json.dumps(endpoint_info, indent=2)}
            
            For each missing field:
            1. If HTTP_Method: Must be one of GET, POST, PUT, DELETE, PATCH
            2. If json_model: Required for POST/PUT/PATCH methods
            3. If params: List of parameter names
            4. If production_name: Name of the production environment
            5. If namespace: Namespace for the production
            
            Return the collected information in JSON format.
        """),
            expected_output="A JSON object containing the updated field values",
            agent=self.interaction_agent
        )
        

    def validate_api_spec(self, extracted_data: Dict) -> Task:
        return Task(
            description=dedent(f"""
                Validate the following API specification:
                {json.dumps(extracted_data, indent=2)}
                
                Check for:
                1. Valid host format
                2. Endpoint starts with '/'
                3. Valid HTTP method (GET, POST, PUT, DELETE, PATCH)
                4. Valid port number (if provided)
                5. JSON model presence for POST/PUT/PATCH/DELETE methods
                
                Return validation results in JSON format.
            """),
            expected_output="""A JSON object containing validation results with any errors or confirmation of validity""",
            agent=self.validation_agent
        )

    def extract_api_specs(self, descriptions: List[str]) -> Task:
        return Task(
            description=dedent(f"""
                Extract API specifications from the following descriptions:
                {json.dumps(descriptions, indent=2)}
                
                For each description, extract:
                - host (required)
                - endpoint (required)
                - HTTP_Method (required)
                - params (optional)
                - port (if available)
                - json_model (for POST/PUT/PATCH/DELETE)
                - authentication (if applicable)
                
                Mark any missing required fields as 'missing'.
                Return results in JSON format as an array of specifications.
            """),
            expected_output="""A JSON array containing extracted API specifications with all required and optional fields""",
            agent=self.extraction_agent
        )

    def transform_to_openapi(self, validated_endpoints: List[Dict], production_info: Dict) -> Task:
        return Task(
            description=dedent(f"""
                Transform the following validated API specifications into OpenAPI 3.0 documentation:
                
                Production Information:
                {json.dumps(production_info, indent=2)}
                
                Validated Endpoints:
                {json.dumps(validated_endpoints, indent=2)}
                
                Requirements:
                1. Generate complete OpenAPI 3.0 specification
                2. Include proper request/response schemas
                3. Document all parameters and request bodies
                4. Include authentication if specified
                5. Ensure proper path formatting
                
                Return the OpenAPI specification in both JSON and YAML formats.
            """),
            expected_output="""A JSON object containing the complete OpenAPI 3.0 specification with all endpoints and schemas""",
            agent=self.transformation_agent
        )

    def review_openapi_spec(self, openapi_spec: Dict) -> Task:
        return Task(
            description=dedent(f"""
                Review the following OpenAPI specification for compliance and quality:
                
                {json.dumps(openapi_spec, indent=2)}
                
                Review Checklist:
                1. OpenAPI 3.0 Compliance
                - Verify correct version specification
                - Check required root elements
                - Validate schema structure
                
                2. Completeness
                - All endpoints properly documented
                - Parameters fully specified
                - Request/response schemas defined
                - Security schemes properly configured
                
                3. Quality Checks
                - Consistent naming conventions
                - Clear descriptions
                - Proper use of data types
                - Meaningful response codes
                
                4. Best Practices
                - Proper tag usage
                - Consistent parameter naming
                - Appropriate security definitions
                
                You must return a JSON object with the following structure:
                {{
                    "is_valid": boolean,
                    "approved_spec": object (the reviewed and possibly corrected OpenAPI spec),
                    "issues": [array of strings describing any issues found],
                    "recommendations": [array of improvement suggestions]
                }}
            """),
            expected_output="""A JSON object containing: is_valid (boolean), approved_spec (object), issues (array), and recommendations (array)""",
            agent=self.reviewer_agent
        )


    def analysis_task(self, openapi_spec: Dict) -> Task:
        return Task(
            description="""Analyze the OpenAPI specification and plan the necessary IRIS Interoperability components. 
            Include a list of all components that should be in the Production class.""",
            agent=self.analyzer,
            expected_output="A detailed analysis of OpenAPI spec and plan for IRIS components, including Production components list",
            input={
                "openapi_spec": openapi_spec,
                "production_name": "${production_name}" 
            }
        )

    def bs_generation_task(self) -> Task:
        return Task(
            description="Generate Business Service classes based on the OpenAPI endpoints",
            agent=self.bs_generator,
            expected_output="IRIS Business Service class definitions",
            context=[self.analysis_task]
        )

    def bo_generation_task(self) -> Task:
        return Task(
            description="Generate Business Operation classes based on the OpenAPI endpoints",
            agent=self.bo_generator,
            expected_output="IRIS Business Operation class definitions",
            context=[self.analysis_task]
        )

    def export_task(self) -> Task:
        return Task(
            description="Export all generated IRIS classes as valid .cls files",
            agent=self.exporter,
            expected_output="Valid IRIS .cls files saved to output directory",
            context=[self.bs_generation_task, self.bo_generation_task],
            input={
                "output_dir": OUTPUT_DIR
            }
        )

    def validate_task(self) -> Task:
        return Task(
            description="Validate all generated IRIS classes",
            agent=self.exporter,
            expected_output="Validation results for all generated classes",
            context=[self.export_task],
            input={
                "class_names": None  # Optional, will validate all classes if not specified
            }
        )

    def production_generation_task(self) -> Task:
        return Task(
            description="Generate the Production class that includes all generated components",
            agent=self.bs_generator,  # We can use the bs_generator since it has the generate_production_class_tool
            expected_output="IRIS Production class definition",
            context=[self.bs_generation_task, self.bo_generation_task],  # This ensures it runs after BS and BO generation
        )

    def production_generation_task(self) -> Task:
        return Task(
            description="Generate the Production class that includes all generated components",
            agent=self.bs_generator,  # We can use the bs_generator since it has the generate_production_class_tool
            input={
                "production_name": "${production_name}"  # Add production name input
            },
            expected_output="IRIS Production class definition",
            context=[self.bs_generation_task, self.bo_generation_task],  # This ensures it runs after BS and BO generation
        )

    def collection_task(self) -> Task:
        return Task(
            description="Collect all generated IRIS class files into a JSON collection",
            agent=self.collector,
            expected_output="JSON collection of all generated .cls files",
            context=[self.export_task, self.validate_task],
            input={
                "directory": OUTPUT_DIR
            }
        )


# Process API Integration

In [None]:
user_input = input()

In [None]:
production_data = ProductionData()

try:
    llm = get_facilis_llm()
    if llm is None:
        raise ValueError("Invalid AI_ENGINE selection")

    crew = APISpecificationCrew(llm, production_data)

    # Process endpoints
    endpoints = user_input.split('\n')
    
    api_crew = Crew(
        agents=[
            crew.production_agent,
            crew.extraction_agent,
            crew.validation_agent,
            crew.interaction_agent,
            crew.transformation_agent,
            crew.reviewer_agent,
            crew.analyzer_agent, 
            crew.bs_generator_agent, 
            crew.bo_generator_agent, 
            crew.exporter_agent, 
            crew.collector_agent
        ],
        tasks=[crew.get_production_details()],
        verbose=True
    )

    production_result = api_crew.kickoff()
    production_info = json.loads(extract_json_from_markdown(production_result))

    if not isinstance(production_info, dict):
        raise ValueError("Invalid production info format")

    if not production_info.get('production_name') or not production_info.get('namespace'):
        production_info["production_name"] = input("Enter production name: ")
        production_info["namespace"] = input("Enter namespace: ")

    # Add production to production data if it doesn't exist
    if not production_data.production_exists(production_info['production_name']):
        production_data.add_production(
            production_info['production_name'],
            production_info['namespace']
        ) 

    api_crew = Crew(
        agents=[crew.extraction_agent],
        tasks=[crew.extract_api_specs(endpoints)],
        verbose=True
    )

    extracted_json = extract_json_from_markdown(api_crew.kickoff())
    extracted_results = json.loads(extracted_json)
    
    if not isinstance(extracted_results, list):
        extracted_results = [extracted_results] if extracted_results else []

    final_endpoints = []
    for endpoint_spec in extracted_results:
        if not isinstance(endpoint_spec, dict):
            continue
            
        missing_fields = [k for k, v in endpoint_spec.items() if v == 'missing']
        if missing_fields:
            api_crew = Crew(
                agents=[crew.interaction_agent],
                tasks=[crew.handle_missing_fields(missing_fields, endpoint_spec)],
                verbose=True
            )

            updated_json = extract_json_from_markdown(api_crew.kickoff())
            try:
                updated_spec = json.loads(updated_json)
                if isinstance(updated_spec, dict):
                    endpoint_spec.update(updated_spec)
            except (json.JSONDecodeError, AttributeError) as e:
                print(f"Failed to update endpoint spec: {e}")

        api_crew = Crew(
            agents=[crew.validation_agent],
            tasks=[crew.validate_api_spec(endpoint_spec)],
            verbose=True
        )

        validation_json = extract_json_from_markdown(api_crew.kickoff())
        validation_result = json.loads(validation_json)
        
        if isinstance(validation_result, dict) and 'error' not in validation_result:
            final_endpoints.append(endpoint_spec)
            crew.production_data.add_endpoint(production_info['production_name'], endpoint_spec)
        else:
            print(f"Endpoint validation failed: {validation_result}")

    api_crew = Crew(
        agents=[crew.transformation_agent],
        tasks=[crew.transform_to_openapi(final_endpoints, production_info)],
        verbose=True
    )
    openapi_result = json.loads(extract_json_from_markdown(api_crew.kickoff()))

    api_crew = Crew(
        agents=[crew.reviewer_agent],
        tasks=[crew.review_openapi_spec(openapi_result)],
        verbose=True
    )
    review_json = extract_json_from_markdown(api_crew.kickoff())
    review_result = json.loads(review_json)
    
    # Add validation and default values
    if not isinstance(review_result, dict):
        review_result = {
            "is_valid": False,
            "approved_spec": openapi_result,
            "issues": ["Invalid review result format"],
            "recommendations": []
        }
    
    # Ensure required keys exist
    review_result.setdefault("is_valid", False)
    review_result.setdefault("approved_spec", openapi_result)
    review_result.setdefault("issues", [])
    review_result.setdefault("recommendations", [])
    if review_result["is_valid"]:
        
        # TODO: Implement Iris integration
        iris_payload = {
            "production_name": production_info['production_name'],
            "namespace": production_info['namespace'],
            "openapi_spec": review_result["approved_spec"]
        }
        
    else:
        print( {
            "success": False,
            "message": "OpenAPI specification not approved for integration",
            "timestamp": datetime.now().isoformat(),
            "issues": review_result.get("issues", [])
        })

except Exception as e:
    print(f"Error in process_api_integration: {str(e)}")
    raise
