## Tutorial (dev): Make components
- Define a Node class
- Define a Transition class
- Define a Reward class
- Generation with the classes defined above
- Customize generator

In [None]:
# Imports

import sys
repo_root = "../../"
if repo_root not in sys.path:
    sys.path.insert(0, repo_root)

# Define a Node class
A node object also stores its parent node, the transition probability from the parent (, and optionally the corresponding action).

In [None]:
from typing import Self
from node import Node

class PointNode(Node):
    """Node class that represents points in x-y plane."""
    def __init__(self, x: float, y: float, parent=None, last_prob=1.0, last_action=None):
        self.x = x
        self.y = y
        super().__init__(parent=parent, last_prob=last_prob, last_action=last_action)
        
    # Define the reward condition
    def has_reward(self) -> bool:
        return True # all PointNode instances have reward
    
    # Return the key string. Keys of generated nodes will be recorded.
    def key(self) -> str:
        return f"{self.x}, {self.y}"
    
    # (Optional) Create a Node instance from a key. Only needed for yaml-based root specifications and chain generations.
    @classmethod
    def node_from_key(cls, key: str, parent=None, last_action: int=None, last_prob=1.0) -> Self:
        parts = [v.strip() for v in key.split(", ")]
        x = float(parts[0])
        y = float(parts[1])
        return PointNode(x=x, y=y, parent=parent, last_prob=last_prob, last_action=last_action)

# Define a Transition class
A transition class must implement `next_nodes()`, which returns the next nodes and their transition probabilities for a given node.

In [None]:
from transition import Transition

class PointTransition(Transition):
    """Transition class for NumberNode that randomly prepends a number to the beginning of the string."""
        
    def next_nodes(self, node: PointNode) -> list[PointNode]:
        """Note: If the node is terminal, an empty list [] should be returned."""
        x = node.x
        y = node.y
        
        if node.parent is None: # From the root node
            dx, dy = 10, 10
        elif node.parent.x is None:
            raise ValueError("Set 'discard_unneeded_states' in MCTS to False.")
        else:
            dx = (x - node.parent.x) / 2
            dy = (y - node.parent.y) / 2
        
        return [PointNode(x+dx, y, parent=node),
                PointNode(x-dx, y, parent=node),
                PointNode(x, y+dy, parent=node),
                PointNode(x, y-dy, parent=node),
                PointNode(x+dx, y+dy, parent=node),
                PointNode(x+dx, y-dy, parent=node),
                PointNode(x-dx, y+dy, parent=node),
                PointNode(x-dx, y-dy, parent=node)]

# Define a Reward class
A reward class maps nodes to real values, representing the objective to be optimized. 

We also support multi-objective settings: although the objectives must ultimately be folded into a single scalar value for optimization, our method allows tracking of individual objective values throughout the process.

In [None]:
from reward import Reward, SingleReward

# Multi-objective example
class MOConvexReward(Reward):
    def __init__(self, goal_x: float, goal_y: float):
        self.goal_x = goal_x
        self.goal_y = goal_y
    
    def objective_functions(self):
        def x_distance(node: Node) -> float:
            return abs(self.goal_x - node.x)
        
        def y_distance(node: Node) -> float:
            return abs(self.goal_y - node.y)
        
        return [x_distance, y_distance]
    
    def reward_from_objective_values(self, objective_values):
        x_distance = objective_values[0]
        y_distance = objective_values[1]
        return 1 / (1 + (x_distance + y_distance)) # Scale the reward to (0, 1]
    
# Single-objective example
class ConvexReward(SingleReward):
    def __init__(self, goal_x: float, goal_y: float):
        self.goal_x = goal_x
        self.goal_y = goal_y
    
    def reward(self, node):
        return 1 / (1 + (abs(self.goal_x - node.x) + abs(self.goal_y - node.y)))

# Generation with classes defined above

In [None]:
# Run the MCTS generation using the components defined above

import math
from generator import MCTS
from policy import UCT

# Define components
root = PointNode.node_from_key("0, 0") # root node
transition = PointTransition()
# reward = ConvexReward(goal_x = math.pi, goal_y = math.pi)
reward = MOConvexReward(goal_x = math.pi, goal_y = math.pi)
policy = UCT(c=0.2, best_rate=0.8)

# Define generator
generator = MCTS(root=root, transition=transition, reward=reward, policy=policy, filters=None, info_interval=200,
                 avoid_duplicates=True, # If set to True, nodes with the same key won't be added to the search tree. Can be false to increase the performance if the tree structure of the transiton graph is guranteed.
                 discard_unneeded_states=False # If set to True, states in non-leaf nodes (x, y in this case) will be discarded, making it impossible to access node.parent.x in PointTransition.next_nodes(). This behavior can be also adjusted by overriding Node.discard_unneeded_states() in the implemented Node class.
                )

# Run generator (generation results will be saved in the output directory)
generator.generate(max_generations=2000)

generator.analyze()
generator.plot(moving_average_window=0.02, reward_top_ps=[0.1, 0.5]) # This also save figs to the output directory

# Customize generator
MCTS class has `on_generation()` and `should_finish()` methods for users to customize its behavior. 

- `on_generation()` is called on generation. It receives the generated node instance, calculated objective values and reward as args.
- `should_finish()` is checked along with time_limit and max_generations.

It is also possible to inherit Generator base class and make a generator from scratch, which is not restricted to tree search. Refer to `generator.heapq_generator.py` for an example

In [None]:
class CustomGenerator(MCTS):
    def __init__(self, *args, **kwargs):    
        self._should_finish = False
        super().__init__(*args, **kwargs)

    # override
    def on_generation(self, node: PointNode, objective_values, reward):
        # Backtrack nodes if best reward is updated and reward > 0.9
        # To backtrack actions as well, include actions in Transition.next_nodes()
        if reward == self.best_reward and reward > 0.9:
            current_node = node
            while(current_node.parent is not None):
                current_node = current_node.parent
                self.logger.info(f"<- {current_node.key()}")

        # Record if 3.14 < x < 3.15 or 3.14 < y < 3.15 is achieved
        if 3.14 < node.x < 3.15 or 3.14 < node.y < 3.15:
            self._should_finish = True

    # finish generation if 3.14 < x < 3.15 or 3.14 < y < 3.15 is already achieved
    # override
    def should_finish(self):
        return self._should_finish

root = PointNode.node_from_key("0, 0") # Root node needs to be initialized when starting the generation from scratch
generator = CustomGenerator(root=root, transition=transition, reward=reward, policy=policy, filters=None, info_interval=200, avoid_duplicates=True, 
                discard_unneeded_states=False # This is required for backtracking as well
            )
generator.generate(max_generations=2000)