In [2]:
from narrative_llm_agent.crews.job_crew import JobCrew
from langchain_openai import ChatOpenAI, OpenAI
import openai
import os
from crewai import Crew, Agent, Task
from narrative_llm_agent.agents.kbase_agent import KBaseAgent
from langchain.tools import tool
import json
from pydantic import BaseModel
from narrative_llm_agent.agents.metadata import MetadataAgent
from narrative_llm_agent.agents.analyst import AnalystAgent
import re

In [3]:
from pydantic import BaseModel
from typing import List

# Define a model for each analysis step
class AnalysisStep(BaseModel):
    Step: int
    Name: str
    Description: str
    expect_new_object: bool
    app_id: str

# Define a model for the complete workflow
class AnalysisPipeline(BaseModel):
    steps_to_run: List[AnalysisStep]

In [4]:
used_llm = ChatOpenAI(
    model="openai/gpt-4o",
    temperature=0,
    api_key=os.environ.get('CBORG_API_KEY'),
    base_url="https://api.cborg.lbl.gov"  # For LBL-Net, use "https://api-local.cborg.lbl.gov"
)

In [5]:
def extract_json_from_string(string_data):
    # Use regex to find the JSON content within the string
    json_match = re.search(r'\[.*\]', string_data, re.DOTALL)
    
    if json_match:
        json_str = json_match.group(0)
        try:
            # Load the JSON string as Python object
            json_data = json.loads(json_str)
            return json_data
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")
    else:
        print("No JSON data found in the string.")
        return None

In [6]:
class AppRunInputs(BaseModel):
    narrative_id: int
    app_id: str
    input_object_upa: str

class WorkflowRunner(KBaseAgent):
    job_crew: JobCrew
    role: str = "You are a workflow runner, your role is to efficiently run KBase workflows."
    goal: str = "Your goal is to create and run elegant and scientifically meaningful computational biology workflows."
    backstory: str = "You are a dedicated and effective computational biologist. You have deep knowledge of how to run workflows in the DOE KBase system and have years of experience using this to produce high quality scientific knowledge."
    
    def __init__(self, llm, token: str = None):
        self.job_crew = JobCrew(llm)
        self._llm = llm
        self._token = token

        @tool(args_schema=AppRunInputs)
        def do_app_run(narrative_id: int, app_id: str, input_object_upa: str):
            """
            This invokes a CrewAI crew to run a new KBase app from start to finish and
            returns the results. It takes in the narrative_id, app_id (formalized as module_name/app_name), and
            UPA of the input object.
            """
            return self.run_app_crew(narrative_id, app_id, input_object_upa)
            
        self.agent = Agent(
            role=self.role,
            goal=self.goal,
            backstory=self.backstory,
            verbose=True,
            tools=[
                do_app_run
            ],  # + human_tools,
            llm=self._llm,
            allow_delegation=False,
            memory=True,
        )
    
    def run_app_crew(self, narrative_id: int, app_id: str, input_object_upa: str):
        return self.job_crew.start_job(app_id, input_object_upa, narrative_id, app_id=app_id)

wf_runner = WorkflowRunner(used_llm)

In [7]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Dict, Any, Optional

# Define the state schema for the unified workflow
class GenomeAnalysisState(TypedDict):
    narrative_id: str
    reads_id: str
    description: str  # Full description as you provide it currently
    analysis_plan: Optional[List[Dict[str, Any]]]
    steps_to_run: Optional[List[Dict[str, Any]]]
    results: Optional[str]
    error: Optional[str]

# Create the analyst node function
def analyst_node(state: GenomeAnalysisState):
    try:
        # Get the existing description from the state
        description = state["description"]
        
        # Initialize the analyst agent with your existing configuration
        analyst_expert = AnalystAgent(used_llm, cborg_api_key =os.environ.get('CBORG_API_KEY'), token=os.environ["KB_AUTH_TOKEN"])
        
        # Create the analysis task using your existing format
        analysis_agent_task = Task(
            description=description,
            expected_output="a json of the analysis workflow",
            output_json=AnalysisPipeline,
            agent=analyst_expert.agent
        )
        
        # Create and run the crew
        crew = Crew(
            agents=[analyst_expert.agent],
            tasks=[analysis_agent_task],
            verbose=True,
        )
        
        output = crew.kickoff()
        
        # Extract the JSON from the output using your existing function
        analysis_plan = extract_json_from_string(output.raw)
        
        # Return updated state with analysis plan
        return {
            **state,
            "analysis_plan": analysis_plan,
            "steps_to_run": analysis_plan,  
            "error": None
        }
    except Exception as e:
        # Handle errors
        return {
            **state,
            "analysis_plan": None,
            "steps_to_run": None,
            "error": str(e)
        }

# Function to determine the next node based on the state
def router(state: GenomeAnalysisState):
    if state["error"]:
        return "handle_error"
    else:
        return "run_workflow"

# Build the complete graph with both analyst and workflow nodes
def build_genome_analysis_graph():
    # Create a new graph
    genome_graph = StateGraph(GenomeAnalysisState)
    
    # Add the nodes
    genome_graph.add_node("analyst", analyst_node)
    genome_graph.add_node("run_workflow", workflow_runner_node)
    genome_graph.add_node("handle_error", lambda state: {**state, "results": f"Error: {state['error']}"})
    
    # Define the edges with the router
    genome_graph.add_conditional_edges(
        "analyst",
        router,
        {
            "run_workflow": "run_workflow",
            "handle_error": "handle_error"
        }
    )
    genome_graph.add_edge("run_workflow", END)
    genome_graph.add_edge("handle_error", END)
    
    # Set the entry point
    genome_graph.set_entry_point("analyst")
    
    # Compile the graph
    return genome_graph.compile()

# Example usage of the complete graph
def run_genome_analysis(narrative_id, reads_id, description):
    graph = build_genome_analysis_graph()
    
    # Initialize the state using the exact description you provide
    initial_state = {
        "narrative_id": narrative_id,
        "reads_id": reads_id,
        "description": description,
        "analysis_plan": None,
        "steps_to_run": None,
        "results": None,
        "error": None
    }
    
    # Execute the graph and get the final state
    final_state = graph.invoke(initial_state)
    return final_state

In [8]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Dict, Any, Optional
from pydantic import BaseModel, Field
import json
from typing_extensions import Annotated

# Define the state schema
class WorkflowState(TypedDict):
    steps_to_run: List[Dict[str, Any]]
    narrative_id: int
    reads_id: str
    results: Optional[str]
    error: Optional[str]

# Create the workflow node
def workflow_runner_node(state: WorkflowState):
    try:
        steps_to_run = state["steps_to_run"]
        narrative_id = state["narrative_id"]
        reads_id = state["reads_id"]
        
        # Initialize the workflow runner
        wf_runner = WorkflowRunner(used_llm)
        
        # Create the task
        run_apps_task = Task(
            description=f"""
            This task involves running multiple apps where the output of one (if any) is fed into the next as input. 
            Here are the tasks in JSON format: {json.dumps(steps_to_run)}.
            If any task has "expect_new_object" set to True, then that should receive a new data object in its output as a "created_object". That object should be used as input for the next task.
            If a task as "expect_new_object" set to False, then that should not receive a new object to use in the next task. In that case, use the same input object from the previous step for the next one.
            These steps must be run sequentially. 
            These must be run in the narrative with id {narrative_id} and start with using the paired-end reads object {reads_id}.
            If any step ends with an error, immediately stop the task and end with an error.
            In the end, return a brief summary of steps taken and resulting output objects.
            """,
            expected_output="A summary of task completion, the number of apps run, and the upa of any output objects.",
            agent=wf_runner.agent
        )
        
        # Create and run the crew
        crew = Crew(
            agents=[wf_runner.agent],
            tasks=[run_apps_task],
            verbose=True,
        )
        
        result = crew.kickoff()
        
        # Return updated state with results
        return {
            **state,
            "results": result,
            "error": None
        }
    except Exception as e:
        # Handle errors
        return {
            **state,
            "results": None,
            "error": str(e)
        }

In [9]:
sequencing_technology="Illumina sequencing"
organism = "Bacillus subtilis sp. strain UAMC"
genome_type = "isolate"

In [10]:

sample_description = f"""The user has uploaded paired-end sequencing reads into the narrative. Here is the metadata for the reads:
sequencing_technology : {sequencing_technology}
organism: {organism}
genome type : {genome_type}
I want you to generate an analysis plan for annotating the uploaded pair-end reads obtained from {sequencing_technology} for a {genome_type} genome using KBase apps.
The goal is to have a complete annotated genome and classify the microbe
This analysis is for a Microbiology Resource Announcements (MRA) paper so these need to be a part of analysis. Always keep in mind the following:
- The analysis steps should begin with read quality assessment. 
- Make sure you select appropriate KBase apps based on genome type.
-Relevant statistics for the assembly (e.g., number of contigs and N50 values).
-Estimates of genome completeness, where applicable.
-Classify the microbe for taxonomy, where relevant.
Based on the metadata, devise a detailed step-by-step analysis workflow, the apps and app_ids should be from the app graph.
The analysis plan should be a json with schema as: 
```json
{{"Step": "Integer number indicating the step",
 "Name": "Name of the step",
 "Description": "Describe the step",
 "App": "Name of the app",
 "expect_new_object": boolean indicating if this step creates a new data object,
 "app_id": "Id of the KBase app"}}
```
Ensure that app_ids are obtained from the app graph and are correct.
Make sure that the analysis plan is included in the final response.
"""


run_genome_analysis("210493", "210493/2/1", sample_description)

[1m[95m# Agent:[00m [1m[92mKBase Analyst and Information Provider[00m
[95m## Task:[00m [92mThe user has uploaded paired-end sequencing reads into the narrative. Here is the metadata for the reads:
sequencing_technology : Illumina sequencing
organism: Bacillus subtilis sp. strain UAMC
genome type : isolate
I want you to generate an analysis plan for annotating the uploaded pair-end reads obtained from Illumina sequencing for a isolate genome using KBase apps. 
Always keep in mind the following:
- The analysis steps should begin with read quality assessment. 
- The goal is to have a complete annotated genome and classify the microbe 
- Make sure you select appropriate KBase apps based on genome type.
This analysis is for a Microbiology Resource Announcements (MRA) paper so these need to be a part of analysis:
-Relevant statistics for the assembly (e.g., number of contigs and N50 values).
-Estimates of genome completeness, where applicable.
-Classify the microbe for taxonomy, whe

Overriding of current TracerProvider is not allowed




[1m[95m# Agent:[00m [1m[92mKBase Analyst and Information Provider[00m
[95m## Final Answer:[00m [92m
{
  "steps_to_run": [
    {
      "Step": 1,
      "Name": "Read Quality Assessment",
      "Description": "Assess the quality of the paired-end reads using FastQC to ensure data quality before proceeding with assembly.",
      "expect_new_object": false,
      "app_id": "kb_fastqc/runFastQC"
    },
    {
      "Step": 2,
      "Name": "Genome Assembly",
      "Description": "Assemble the high-quality reads into contigs using the SPAdes assembler.",
      "expect_new_object": true,
      "app_id": "kb_SPAdes/run_SPAdes"
    },
    {
      "Step": 3,
      "Name": "Genome Annotation",
      "Description": "Annotate the assembled genome using the Prokka annotation pipeline to identify genes and other features.",
      "expect_new_object": true,
      "app_id": "ProkkaAnnotation/annotate_metagenome"
    },
    {
      "Step": 4,
      "Name": "Microbial Classification",
      "De

Overriding of current TracerProvider is not allowed


[1m[95m# Agent:[00m [1m[92mProject coordinator[00m
[95m## Task:[00m [92m
            From the given KBase app id, kb_fastqc/runFastQC, fetch the list of parameters needed to run it. Use the App and Job manager agent
            for assistance. Using the data object with UPA "210493/2/1", populate a dictionary
            with the parameters where the keys are parameter ids, and values are the proper parameter values, or their
            default values if no value can be found or calculated.
            Any input object parameter must be the input object UPA.
            Be sure to make sure there is a non-null value for any parameter that is not optional.
            Any parameter that has a true value for "is_output_object" must have a valid name for the new object. The new object name should be based on
            the input object name, not its UPA. If the input object name is not available, the Workspace Manager can assist.
            If the parameter type is 'dropdown',

Overriding of current TracerProvider is not allowed


[1m[95m# Agent:[00m [1m[92mProject coordinator[00m
[95m## Task:[00m [92m
            From the given KBase app id, kb_SPAdes/run_SPAdes, fetch the list of parameters needed to run it. Use the App and Job manager agent
            for assistance. Using the data object with UPA "210493/2/1", populate a dictionary
            with the parameters where the keys are parameter ids, and values are the proper parameter values, or their
            default values if no value can be found or calculated.
            Any input object parameter must be the input object UPA.
            Be sure to make sure there is a non-null value for any parameter that is not optional.
            Any parameter that has a true value for "is_output_object" must have a valid name for the new object. The new object name should be based on
            the input object name, not its UPA. If the input object name is not available, the Workspace Manager can assist.
            If the parameter type is 'dropdown'

Overriding of current TracerProvider is not allowed


[1m[95m# Agent:[00m [1m[92mProject coordinator[00m
[95m## Task:[00m [92m
            From the given KBase app id, ProkkaAnnotation/annotate_metagenome, fetch the list of parameters needed to run it. Use the App and Job manager agent
            for assistance. Using the data object with UPA "210493/12/1", populate a dictionary
            with the parameters where the keys are parameter ids, and values are the proper parameter values, or their
            default values if no value can be found or calculated.
            Any input object parameter must be the input object UPA.
            Be sure to make sure there is a non-null value for any parameter that is not optional.
            Any parameter that has a true value for "is_output_object" must have a valid name for the new object. The new object name should be based on
            the input object name, not its UPA. If the input object name is not available, the Workspace Manager can assist.
            If the parameter t

Overriding of current TracerProvider is not allowed


[1m[95m# Agent:[00m [1m[92mProject coordinator[00m
[95m## Task:[00m [92m
            From the given KBase app id, kb_gtdbtk/run_kb_gtdbtk_classify_wf, fetch the list of parameters needed to run it. Use the App and Job manager agent
            for assistance. Using the data object with UPA "210493/14/1", populate a dictionary
            with the parameters where the keys are parameter ids, and values are the proper parameter values, or their
            default values if no value can be found or calculated.
            Any input object parameter must be the input object UPA.
            Be sure to make sure there is a non-null value for any parameter that is not optional.
            Any parameter that has a true value for "is_output_object" must have a valid name for the new object. The new object name should be based on
            the input object name, not its UPA. If the input object name is not available, the Workspace Manager can assist.
            If the parameter ty

{'narrative_id': '210493',
 'reads_id': '210493/2/1',
 'description': 'The user has uploaded paired-end sequencing reads into the narrative. Here is the metadata for the reads:\nsequencing_technology : Illumina sequencing\norganism: Bacillus subtilis sp. strain UAMC\ngenome type : isolate\nI want you to generate an analysis plan for annotating the uploaded pair-end reads obtained from Illumina sequencing for a isolate genome using KBase apps. \nAlways keep in mind the following:\n- The analysis steps should begin with read quality assessment. \n- The goal is to have a complete annotated genome and classify the microbe \n- Make sure you select appropriate KBase apps based on genome type.\nThis analysis is for a Microbiology Resource Announcements (MRA) paper so these need to be a part of analysis:\n-Relevant statistics for the assembly (e.g., number of contigs and N50 values).\n-Estimates of genome completeness, where applicable.\n-Classify the microbe for taxonomy, where relevant.\nBas