<a href="https://colab.research.google.com/github/mattdepaolis/llm-tutorials/blob/main/Monte_Carlo_Tree_Self_Refine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install and load Libraries

In [2]:
!pip install openai python-dotenv

from openai import OpenAI
import re
import os
import dotenv



## Set LLM Endpoint

For this example, we're using Meta's Llama 3 8B model. To optimize performance, we're leveraging the Groq API. Please make sure to use your own Groq API key.

In [16]:
from openai import OpenAI
import re
import os
import dotenv


dotenv.load_dotenv()

model = "llama3-8b-8192"

api_endpoint = "https://api.groq.com/openai/v1"
api_key = os.getenv('GROQ_API_KEY')


# Initialize the OpenAI client
client = OpenAI(
    api_key=api_key,
    base_url=api_endpoint,
)

def chat_completion_request(prompt):

    messages = [
        {"role": "user", "content": prompt}
    ]

    # Create chat completion using the OpenAI client
    chat_response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=1.0,
        max_tokens=1500,
    )

    # Extract the completion text from the response
    if chat_response.choices:
        completion_text = chat_response.choices[0].message.content
    else:
        completion_text = None
    return completion_text


## Setting Up Seed Answers

In [4]:
# Seed answers to initiate the MCTS
seed_answers = [
    "I don't know the answer",
    "I'm not sure",
    "I can't say",
]

## Critiquing an Answer

In [5]:
# Get Critique
def critique_answer(question, answer):
    prompt = (
        f"Question: {question}\n"
        f"Answer Attempt: {answer}\n"
        "Please analyze the answer above. "
        "Identify any inaccuracies or areas lacking detail. "
        "Provide a thorough critique, highlighting specific flaws and suggesting improvements. "
        "Your critique should be detailed and step-by-step. "
        "Do not provide a revised answer."
    )
    # Request critique from the language model
    return chat_completion_request(prompt)

## Refining the Answer

In [6]:
# Improve the answer
def refine_answer(question, answer, critique):
    prompt = (
        f"Question: {question}\n"
        f"Current Answer: {answer}\n"
        f"Feedback: {critique}\n\n"
        "Based on the feedback, refine the answer to address all the points raised. "
        "Ensure the new answer is accurate, detailed, and well-structured. "
        "Present your reasoning process and verification steps before providing the final answer."
    )
    # Request refined answer from the language model
    return chat_completion_request(prompt)

## Evaluating the Answer

In [7]:
def evaluate_answer(question, answer):
    prompt = (
        f"Question: {question}\n"
        f"Answer: {answer}\n"
        "As an expert, assess the answer above for correctness and completeness. "
        "Provide a detailed critique, pointing out any issues. "
        "Then, assign a rating between 0 and 100, where 100 represents a perfect answer. "
        "Format:\n"
        "Critique: <Your detailed critique>\n"
        "Rating: <Numerical rating>"
    )
    # Request evaluation from the language model
    evaluation = chat_completion_request(prompt)

    # Extract the rating from the evaluation
    try:
        match = re.search(r'Rating:\s*(\d+\.?\d*)', evaluation)
        if match:
            rating = float(match.group(1))
            rating = min(rating, 95)  # Cap the rating at 95
            rating /= 100.0
        else:
            raise ValueError("Rating not found in the evaluation.")
    except Exception as e:
        print(f"Error extracting rating: {e}")
        print(f"Evaluation response: {evaluation}")
        rating = 0.0

    print(f"\nEvaluation Response:\n{evaluation}")
    return rating

## Defining the Tree Node Structure

In [8]:
import math
import random
import numpy as np
import re

# Define the maximum number of children per node
MAX_CHILDREN = 3

class TreeNode:
    def __init__(self, question, answer, parent=None):
        self.question = question
        self.answer = answer
        self.parent = parent
        self.children = []
        self.visits = 0
        self.value = 0.0
        self.Ra = []  # List to store all reward samples
        self.Q = 0.0  # Quality value Q(a)

    def fully_expanded(self):
        return len(self.children) >= MAX_CHILDREN

    def select_promising_child(self, exploration_param=1.41):
        best_score = float('-inf')
        best_child = None
        for child in self.children:
            if child.visits == 0:
                ucb_score = float('inf')
            else:
                # Compute Q(a) using the exact formula
                min_ra = min(child.Ra)
                avg_ra = child.value / child.visits
                child.Q = 0.5 * (min_ra + avg_ra)

                exploration = exploration_param * math.sqrt(2 * math.log(self.visits) / child.visits)
                ucb_score = child.Q + exploration
            if ucb_score > best_score:
                best_score = ucb_score
                best_child = child
        return best_child

    def most_visited_child(self):
        return max(self.children, key=lambda c: c.visits, default=None)

    def add_child(self, child_node):
        self.children.append(child_node)

## Implementing the Monte Carlo Tree Search

In [9]:
class MonteCarloTreeSearch:
    def __init__(self, question, initial_answers, iterations=3):
        self.question = question
        self.iterations = iterations
        # Initialize the root with a random seed answer
        self.root = TreeNode(question, random.choice(initial_answers))

    def perform_search(self):
        for iteration in range(self.iterations):
            print(f"\nIteration {iteration + 1}/{self.iterations}")
            node = self._tree_policy(self.root)
            print(f"Selected Node Answer: {node.answer}")
            reward = self._default_policy(node)
            print(f"Simulated Reward: {reward}")
            self._backpropagate(node, reward)
        best_child = self.root.most_visited_child()
        if best_child:
            print(f"Most Visited Child Visits: {best_child.visits}")
            return best_child.answer
        else:
            return self.root.answer

    def _tree_policy(self, node):
        while not node.fully_expanded():
            return self._expand(node)
        return self._best_child(node)

    def _expand(self, node):
        for _ in range(MAX_CHILDREN - len(node.children)):
            # Generate a critique and refine the answer
            critique = critique_answer(node.question, node.answer)
            print(f"\nCritique:\n{critique}")
            refined_answer = refine_answer(node.question, node.answer, critique)
            print(f"\nRefined Answer:\n{refined_answer}")
            # Create a new child node with the refined answer
            child_node = TreeNode(node.question, refined_answer, parent=node)
            node.add_child(child_node)
        # Return one of the newly added children
        return random.choice(node.children)

    def _default_policy(self, node):
        return evaluate_answer(node.question, node.answer)

    def _backpropagate(self, node, reward):
        while node is not None:
            node.visits += 1
            node.value += reward
            node.Ra.append(reward)  # Store the reward sample

            # Update Q(a) using the existing formula
            if node.Ra:
                min_ra = min(node.Ra)
                avg_ra = node.value / node.visits
                node.Q = 0.5 * (min_ra + avg_ra)

            # If the node has a parent, update the parent's Q(a) based on the formula
            if node.parent is not None:
                parent = node.parent
                # Compute the maximum Q among the parent's children
                if parent.children:
                    max_child_Q = max(child.Q for child in parent.children if child.Q is not None)
                    # Update the parent's Q(a)
                    parent.Q = 0.5 * (parent.Q + max_child_Q)
            node = node.parent

    def _best_child(self, node, exploration_param=1.41):
        return node.select_promising_child(exploration_param)

## Applying MCTSr to a Mathematical Problem

In [11]:
question = "Calculate the sum of the interior angles of a 12-sided polygon."
mcts = MonteCarloTreeSearch(question, seed_answers, iterations=10)
best_answer = mcts.perform_search()
print(f"\nBest Answer:\n{best_answer}")


Iteration 1/10

Critique:
Here's a detailed critique of the attempted answer:

**Initial Analysis:**
The answer simply states "I'm not sure" without providing any calculation or information about the polygon. This lack of effort and clarity makes it difficult to assess the accuracy of the response.

**Specific Flaws:**

1. **Lack of understanding:** The candidate seems to be unaware of the concept of calculating the sum of interior angles of a polygon. This suggests a fundamental gap in their knowledge.
2. **No attempt at calculation:** Even if the candidate had some understanding, they failed to make an attempt at calculation. This lack of effort and willingness to try is concerning.
3. **No information provided:** The answer does not provide any relevant information about the polygon, such as its number of sides, which makes it impossible to calculate the sum of interior angles.

**Recommended Improvements:**

1. **Read the question carefully:** Take the time to fully understand the