<a href="https://colab.research.google.com/github/omkhairate/ThinkerAGI/blob/main/Thinker.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
from collections import deque
from typing import Dict, List, Optional, Any

from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from pydantic import BaseModel, Field
from langchain.chains.base import Chain


In [2]:
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore

In [3]:
os.environ['OPENAI_API_KEY'] = "sk-"

In [4]:
embeddings_model = OpenAIEmbeddings()

import faiss

embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})

In [5]:
class thequestionChain(LLMChain):
  """Chain to make questions on the given objective"""

  @classmethod
  def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:

    question_template = (
        "You are an question creation AI that uses the result of a doubt agent "
        "to create new questions in order to reflect on the following objective: {objective},"
        "The last answered question has the result: {result},"
        "This result was based on this task description: {task_description},"
        "These are unanswered questions: {unanswered_questions},"
        "Based on the result, create new questions to be answered"
        "by the AI system that do not overlap with questions that are already made."
        "Return the questions as an array"

    )
    prompt = PromptTemplate(
        template=question_template,
        input_variables=["result", "task_description", "unanswered_questions", "objective"]
    )
    return cls(prompt=prompt, llm=llm, verbose=verbose)


In [6]:
class QuestionPrioritization(LLMChain):
  """Chain to prioritize the questions to be answered."""

  @classmethod
  def from_llm(cls, llm:BaseLLM, verbose: bool=True) -> LLMChain:
    question_prioritization_template = (
        "You are a question prioritization AI tasked with cleaning and formatting of and reprioritizing"
        "the following questions to be answered: {questions},"
        "Consider the ultimate objective to reflect on the topic: {objective}."
        "Do not remove any questions. Return the result as a numbered list, like:"
        "#. First question"
        "#. Second question"
        "Start the question list with the number {next_question_id}."
    )
    prompt = PromptTemplate(
        template=question_prioritization_template,
        input_variables=["questions", "next_question_id", "objective"],
    )
    return cls(prompt=prompt, llm=llm, verbose=verbose)

In [7]:
class AnswerChain(LLMChain):
  """Chain to answer questions and find solutions"""
  @classmethod
  def from_llm(cls, llm:BaseLLM, verbose: bool = True) -> LLMChain:

    answer_template = (
        "You are an AI who answers questions based on reflecting upon the following objective: {objective}."
        "Take in to account these previously answered questions: {context}."
        "Justify your answer."
        "Your question: {question}"
        "Response:"
    )
    prompt = PromptTemplate(
        template=answer_template,
        input_variables=["objective","context","question"],
    )
    return cls(prompt=prompt, llm=llm, verbose=verbose)

In [8]:
class CritiscismChain(LLMChain):
  """Chain to criticize answers"""
  @classmethod
  def from_llm(cls, llm:BaseLLM, verbose: bool = True) -> LLMChain:

    criticize_template = (
        "You are an AI who criticizes the answers given by the answer chain based on reflecting upon the following objective: {objective}."
        "Take in to account these previously answered questions: {context}."
        "Your response must be a criticism of the previously produced answers and must provide a viable solution to the criticism"
        "Response:"
    )
    prompt = PromptTemplate(
        template=criticize_template,
        input_variables=["objective","context"],
    )
    return cls(prompt=prompt, llm=llm, verbose=verbose)

In [9]:
def get_next_question(question_chain: LLMChain, result: Dict, task_description: str, question_list: List[str], objective: str) -> List[Dict]:
  """Get next question"""
  unanswered_questions = ",".join(question_list)
  response = question_chain.run(result=result, task_description=task_description, unanswered_questions=unanswered_questions, objective=objective)
  new_questions = response.split('\n')
  return[{"question_name": question_name} for question_name in new_questions if question_name.strip()]
  

In [10]:
def prioritize_question(question_prioritization_chain:LLMChain, this_question_id: int, question_list: List[Dict], objective:str) -> List[Dict]:

  questions = [t["question_name"] for t in question_list]
  next_question_id = int(this_question_id) + 1
  response = question_prioritization_chain.run(questions=questions, next_question_id=next_question_id, objective=objective)
  new_questions = response.split('\n')
  prioritized_question_list = []
  for question_string in new_questions:
    if not question_string.strip():
      continue
    question_parts = question_string.strip().split(".", 1)
    if len(question_parts) == 2:
      question_id = question_parts[0].strip()
      question_name = question_parts[1].strip()
      prioritized_question_list.append({"question_id": question_id, "question_name": question_name})
  return prioritized_question_list 

In [11]:
def _get_top_questions(vectorstore, query: str, k: int) -> List[str]:
  """Get the top k questions based on the query"""
  results = vectorstore.similarity_search_with_score(query, k=k)
  if not results:
    return []
  sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True))
  return [str(item.metadata['question']) for item in sorted_results]

def answer_question(vectorstore, answer_chain:LLMChain, objective: str, question: str, k: int = 5) -> str:
  """answer the question"""
  context = _get_top_questions(vectorstore, query=objective, k=k)
  return answer_chain.run(objective=objective, context=context, question=question)

In [12]:
class ThinkerAGI(Chain, BaseModel):

  question_list: deque = Field(default_factory=deque)
  question_chain: thequestionChain = Field(...)
  question_prioritization_chain: QuestionPrioritization = Field(...)
  answer_chain: AnswerChain = Field(...)
  question_id_counter: int = Field(1)
  vectorstore: VectorStore = Field(init=False)
  max_iterations: Optional[int]=None

  class Config:
    arbitrary_types_allowed = True

  def add_question(self, question: Dict):
    self.question_list.append(question)

  def print_question_list(self):
    print("\033[95m\033[1m" + "\n*****QUESTION LIST*****\n" +"\033[0m\033[0m")
    for t in self.question_list:
      print(str(t["question_id"]) + ": " + t["question_name"])

  def print_next_question(self, question: Dict):
    print("\033[95m\033[1m" + "\n*****NEXT QUESTION*****\n" +"\033[0m\033[0m")
    print(str(question["question_id"]) + ": " + question["question_name"])

  def print_question_result(self, result: str):
        print("\033[93m\033[1m" + "\n*****QUESTION ANSWER*****\n" + "\033[0m\033[0m")
        print(result)
        
  @property
  def input_keys(self) -> List[str]:
      return ["objective"]
    
  @property
  def output_keys(self) -> List[str]:
      return []

  def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
      """Run the agent."""
      objective = inputs['objective']
      first_question = inputs.get("first_question", "Questions that one must ask")
      self.add_question({"question_id": 1, "question_name": first_question})
      num_iters = 0
      while True:
            if self.question_list:
                self.print_question_list()

                # Step 1: Pull the first task
                question = self.question_list.popleft()
                self.print_next_question(question)

                # Step 2: Execute the task
                result = answer_question(
                    self.vectorstore, self.answer_chain, objective, question["question_name"]
                )
                this_question_id = int(question["question_id"])
                self.print_question_result(result)

                # Step 3: Store the result in Pinecone
                result_id = f"result_{question['question_id']}"
                self.vectorstore.add_texts(
                    texts=[result],
                    metadatas=[{"question": question["question_name"]}],
                    ids=[result_id],
                )

                # Step 4: Create new tasks and reprioritize task list
                new_questions = get_next_question(
                    self.question_chain, result, question["question_name"], [t["question_name"] for t in self.question_list], objective
                )
                for new_question in new_questions:
                    self.question_id_counter += 1
                    new_question.update({"question_id": self.question_id_counter})
                    self.add_question(new_question)
                self.question_list = deque(
                    prioritize_question(
                        self.question_prioritization_chain, this_question_id, list(self.question_list), objective
                    )
                )
            num_iters += 1
            if self.max_iterations is not None and num_iters == self.max_iterations:
                print("\033[91m\033[1m" + "\n*****ENDING*****\n" + "\033[0m\033[0m")
                break
      return {}

  @classmethod
  def from_llm(
        cls,
        llm: BaseLLM,
        vectorstore: VectorStore,
        verbose: bool = False,
        **kwargs
    ) -> "ThinkerAGI":
        """Initialize the ThinkerAGI Controller."""
        question_chain = thequestionChain.from_llm(
            llm, verbose=verbose
        )
        question_prioritization_chain = QuestionPrioritization.from_llm(
            llm, verbose=verbose
        )
        answer_chain = AnswerChain.from_llm(llm, verbose=verbose)
        return cls(
            question_chain=question_chain,
            question_prioritization_chain=question_prioritization_chain,
            answer_chain=answer_chain,
            vectorstore=vectorstore,
            **kwargs
      )  

In [13]:
OBJECTIVE = "Feminism in India"

In [14]:
llm = OpenAI(temperature=0)

In [15]:
verbose = False

max_iterations: Optional[int] = 3
thinker_agi = ThinkerAGI.from_llm(
    llm=llm,
    vectorstore=vectorstore,
    verbose=verbose,
    max_iterations=max_iterations
)

In [16]:
thinker_agi({"objective": OBJECTIVE})

[95m[1m
*****QUESTION LIST*****
[0m[0m
1: Questions that one must ask
[95m[1m
*****NEXT QUESTION*****
[0m[0m
1: Questions that one must ask
[93m[1m
*****QUESTION ANSWER*****
[0m[0m


Questions that one must ask about feminism in India include: What are the main challenges facing women in India? How has the feminist movement evolved in India over time? What are the successes and failures of the feminist movement in India? What are the current laws and policies in India that support gender equality? How can we ensure that women in India have access to equal opportunities and rights?
[95m[1m
*****QUESTION LIST*****
[0m[0m
2: What are the cultural and social norms that impede gender equality in India?
3: How can we ensure that women in India have access to education and employment opportunities?
4: What are the most effective strategies for promoting gender equality in India?
5: How can we ensure that women in India have access to healthcare and other basic services?
6: Wha

{'objective': 'Feminism in India'}