# A-MCTS Algorithm Implementation

This notebook explores the implementation of the Adaptive Monte Carlo Tree Search (A-MCTS) algorithm for temporal path discovery. We'll cover:

1. Key components of the algorithm
2. Search process walkthrough
3. Example run on a synthetic graph
4. Analysis of the algorithm's behavior

In [1]:
# Import required modules
import sys
import os
import numpy as np
import matplotlib.pyplot as plt
import time
import random
from typing import Dict, Tuple, List, Optional

# Add the project root to sys.path
sys.path.append(os.path.abspath('..'))

# Import project modules
from src.graph import AttributedDynamicGraph, TemporalPath
from src.embedding import EdgeEmbedding
from src.memory import ReplayMemory
from src.amcts import TreeNode, AMCTS

## Key Components of A-MCTS

The A-MCTS algorithm consists of several key components:

1. **TreeNode**: Represents a node in the search tree, containing a temporal path and statistics
2. **AMCTS**: The main algorithm that orchestrates the search process
3. **EdgeEmbedding**: Embeds edges in a low-dimensional space to capture structural similarities
4. **ReplayMemory**: Stores and retrieves past search experiences to improve efficiency

Let's examine each component in more detail.

### TreeNode

The TreeNode class represents a node in the search tree. Each node contains:
- A temporal path
- Visit count and reward statistics
- Children nodes corresponding to different actions

Key methods:
- `is_fully_expanded()`: Checks if all possible actions have been tried
- `is_terminal()`: Checks if the node has no available actions

In [2]:
# Create a sample graph
def create_example_graph() -> AttributedDynamicGraph:
    """Create an example attributed dynamic graph for demonstration."""
    graph = AttributedDynamicGraph()
    
    # Add edges with attributes (cost and time)
    # Format: (source, dest, length, dep_time, arr_time, attributes)
    graph.add_edge(1, 2, 2, 0, 2, {"cost": 2, "time": 2})
    graph.add_edge(1, 3, 3, 0, 3, {"cost": 3, "time": 3})
    graph.add_edge(2, 4, 2, 3, 5, {"cost": 2, "time": 2})
    graph.add_edge(3, 4, 2, 4, 6, {"cost": 2, "time": 2})
    graph.add_edge(2, 3, 1, 3, 4, {"cost": 1, "time": 1})
    graph.add_edge(4, 5, 3, 6, 9, {"cost": 3, "time": 3})
    graph.add_edge(4, 6, 4, 7, 11, {"cost": 4, "time": 4})
    graph.add_edge(5, 6, 2, 10, 12, {"cost": 2, "time": 2})
    
    return graph

# Create a graph
graph = create_example_graph()

# Create a root node with an empty path
root_path = TemporalPath(graph)
root_path.source = 1  # Start at node 1
root = TreeNode(root_path)

# Print information about the root node
print("Root node information:")
print(f"Path: {root.path}")
print(f"Is terminal: {root.is_terminal()}")
print(f"Is fully expanded: {root.is_fully_expanded()}")
print(f"Available actions: {len(root.path.get_actions())}")
print(f"Actions: {root.path.get_actions()}")

Root node information:
Path: Empty path from node 1
Is terminal: False
Is fully expanded: False
Available actions: 2
Actions: [(1, 2, 2, 0, 2), (1, 3, 3, 0, 3)]


### AMCTS Algorithm

The AMCTS class implements the Adaptive Monte Carlo Tree Search algorithm. It follows the standard MCTS procedure with adaptations:

1. **Selection**: Uses UCT formula with adaptive weights
2. **Expansion**: Expands nodes by trying new actions
3. **Simulation**: Runs random simulations until reaching a terminal state
4. **Backpropagation**: Updates node statistics based on simulation results

Let's create an instance of the algorithm and see it in action:

In [3]:
# Create an AMCTS instance
amcts = AMCTS(
    graph, 
    embedding_dim=8, 
    max_iterations=1000,
    replay_memory_size=500,
    max_sample_num=100,
    edge_limit=2,
    discount_positive=0.95,
    discount_negative=0.9,
    exploration_weight=0.45,
    priority_weight=0.3
)

# Define source, destination, constraints and time interval
source = 1
destination = 6
constraints = {"cost": 15, "time": 15}
time_interval = (0, 15)

print(f"Finding path from {source} to {destination} with constraints: {constraints}")
print(f"Time interval: {time_interval}")

# Measure search time
start_time = time.time()

# Run the search
path = amcts.find_path(source, destination, constraints, time_interval)

# Calculate search time
search_time = time.time() - start_time
print(f"Search completed in {search_time:.2f} seconds")

# Print results
if path is None:
    print("No feasible path found.")
else:
    print("\nFound path:")
    print(path)
    print(f"Path length: {path.length}")
    print(f"Path cost: {path.calculate_attribute_value('cost')}")
    print(f"Path time: {path.calculate_attribute_value('time')}")
    print(f"Arrival time: {path.arrival_time}")
    
    print("\nPath edges:")
    for i, edge in enumerate(path.edges):
        src, dst, length, dep, arr = edge
        cost = graph.get_edge_attribute(edge, "cost")
        time = graph.get_edge_attribute(edge, "time")
        print(f"Edge {i+1}: {src} -> {dst}, Length: {length}, Time: {dep}->{arr}, Cost: {cost}, Time: {time}")

Finding path from 1 to 6 with constraints: {'cost': 15, 'time': 15}
Time interval: (0, 15)
Search completed in 0.59 seconds

Found path:
Path from 1 to 6: (1 -> 2, t=0->2) (2 -> 4, t=3->5) (4 -> 6, t=7->11) 
Path length: 8
Path cost: 8
Path time: 8
Arrival time: 11

Path edges:
Edge 1: 1 -> 2, Length: 2, Time: 0->2, Cost: 2, Time: 2
Edge 2: 2 -> 4, Length: 2, Time: 3->5, Cost: 2, Time: 2
Edge 3: 4 -> 6, Length: 4, Time: 7->11, Cost: 4, Time: 4


## Search Process Walkthrough

Let's walk through a simplified version of the search process to understand how A-MCTS works. We'll focus on the four key steps of the MCTS algorithm with our adaptations:

In [4]:
# For demonstration, let's manually perform one iteration of the algorithm

# 1. Selection
def select_node(root):
    """Simplified version of selection for demonstration."""
    print("Selection step:")
    node = root
    path = [node]
    
    print(f"Starting at root: {node.path}")
    
    # Traverse the tree until reaching a leaf node or unexpanded node
    while not node.is_terminal() and node.is_fully_expanded():
        # For demonstration, just take the first child
        action = list(node.children.keys())[0]
        node = node.children[action]
        path.append(node)
        print(f"Selected node: {node.path}")
    
    return node, path

# 2. Expansion
def expand_node(node):
    """Simplified version of expansion for demonstration."""
    print("\nExpansion step:")
    
    if node.is_terminal():
        print("Node is terminal, no expansion possible.")
        return node
    
    if node.is_fully_expanded():
        print("Node is fully expanded, no expansion needed.")
        return node
    
    # Get unexpanded actions
    actions = node.path.get_actions()
    unexpanded = [a for a in actions if a not in node.children]
    
    if not unexpanded:
        print("No unexpanded actions available.")
        return node
    
    # Choose first unexpanded action
    action = unexpanded[0]
    
    # Create new path by adding the action
    new_path = node.path.copy()
    new_path.add_edge(action)
    
    # Create new node
    child = TreeNode(new_path, parent=node)
    
    # Add child to parent
    node.children[action] = child
    
    print(f"Expanded with action: {action}")
    print(f"New node: {child.path}")
    
    return child

# 3. Simulation
def simulate(node, destination, constraints, time_interval):
    """Simplified version of simulation for demonstration."""
    print("\nSimulation step:")
    
    current_path = node.path.copy()
    print(f"Starting simulation from: {current_path}")
    
    # Check if already at destination
    if current_path.current_node == destination:
        if current_path.is_feasible(constraints, time_interval):
            print(f"Already at destination. Path is feasible.")
            return 1.0
        else:
            print(f"Already at destination but path is not feasible.")
            return 0.0
    
    # Run random simulation
    steps = 0
    max_steps = 10  # Limit steps for demonstration
    
    while steps < max_steps:
        actions = current_path.get_actions()
        
        if not actions:
            print(f"No actions available. Simulation ends.")
            return 0.0
        
        # Choose random action
        action = random.choice(actions)
        current_path.add_edge(action)
        
        print(f"Step {steps+1}: Took action to {action[1]}")
        
        # Check if reached destination
        if current_path.current_node == destination:
            feasible = current_path.is_feasible(constraints, time_interval)
            print(f"Reached destination. Path is {'feasible' if feasible else 'not feasible'}.")
            return 1.0 if feasible else 0.0
        
        # Check constraints
        for attr, upper_bound in constraints.items():
            if current_path.calculate_attribute_value(attr) > upper_bound:
                print(f"Constraint '{attr}' violated. Simulation ends.")
                return 0.0
        
        steps += 1
    
    print(f"Reached maximum steps without conclusion.")
    return 0.0

# 4. Backpropagation
def backpropagate(node, path, reward):
    """Simplified version of backpropagation for demonstration."""
    print("\nBackpropagation step:")
    
    for i, n in enumerate(path):
        # Update visit count
        n.visit_count += 1
        
        # Update average reward
        old_avg = n.avg_reward
        n.avg_reward = (old_avg * (n.visit_count - 1) + reward) / n.visit_count
        
        print(f"Updated node {i}: Visit count = {n.visit_count}, Avg reward: {old_avg:.2f} -> {n.avg_reward:.2f}")

In [5]:
# Create a small tree for demonstration
root = TreeNode(root_path)

# Add some children manually
action1 = root.path.get_actions()[0]  # First action from root
child_path1 = root_path.copy()
child_path1.add_edge(action1)
child1 = TreeNode(child_path1, parent=root)
root.children[action1] = child1

# Perform one iteration manually
print("==== Manual MCTS Iteration ====")
selected_node, selected_path = select_node(root)
expanded_node = expand_node(selected_node)
reward = simulate(expanded_node, destination, constraints, time_interval)
backpropagate(expanded_node, selected_path + [expanded_node] if expanded_node != selected_node else selected_path, reward)

==== Manual MCTS Iteration ====
Selection step:
Starting at root: Empty path from node 1

Expansion step:
Expanded with action: (1, 3, 3, 0, 3)
New node: Path from 1 to 3: (1 -> 3, t=0->3) 

Simulation step:
Starting simulation from: Path from 1 to 3: (1 -> 3, t=0->3) 
Step 1: Took action to 4
Step 2: Took action to 5
Step 3: Took action to 6
Reached destination. Path is feasible.

Backpropagation step:
Updated node 0: Visit count = 1, Avg reward: 0.00 -> 1.00
Updated node 1: Visit count = 1, Avg reward: 0.00 -> 1.00


## Understanding the Adaptive Components

What makes A-MCTS "adaptive" is its ability to learn from past experiences and adjust its search strategy accordingly. This is achieved through:

1. **Replay Memory**: Stores past search experiences for efficient lookup
2. **Edge Embeddings**: Captures structural similarities between edges
3. **Adaptive Exploration**: Adjusts exploration weights based on node statistics
4. **Priority Mechanism**: Prioritizes nodes based on their potential

Let's explore these adaptive components:

In [6]:
# Explore edge embeddings
edge_embedder = EdgeEmbedding(graph, embedding_dim=4)
edge_embedder.train(num_walks=5, walk_length=5)

# Print embeddings for a few edges
print("Edge Embeddings:")
for i, edge in enumerate(graph.edges[:3]):
    embedding = edge_embedder.get_embedding(edge)
    print(f"Edge {i+1}: {edge[0]} -> {edge[1]}, Embedding: {embedding}")

# Explore replay memory
memory = ReplayMemory(max_size=10, max_sample_num=5, edge_limit=2)

# Create a few sample paths
path1 = TemporalPath(graph)
path1.source = 1
path1.add_edge((1, 2, 2, 0, 2))
path1.add_edge((2, 4, 2, 3, 5))

path2 = TemporalPath(graph)
path2.source = 1
path2.add_edge((1, 3, 3, 0, 3))
path2.add_edge((3, 4, 2, 4, 6))

# Create attribute embeddings
attr_emb1 = path1.get_attribute_embedding(constraints)
attr_emb2 = path2.get_attribute_embedding(constraints)

# Create trajectory embeddings (simplified for demo)
traj_emb1 = [edge_embedder.get_embedding(edge) for edge in path1.edges]
traj_emb2 = [edge_embedder.get_embedding(edge) for edge in path2.edges]

# Store in memory
memory.store(path1, attr_emb1, traj_emb1, 0.8, 5, 0.7)
memory.store(path2, attr_emb2, traj_emb2, 0.6, 3, 0.5)

# Query memory
query_path = TemporalPath(graph)
query_path.source = 1
query_path.add_edge((1, 2, 2, 0, 2))

query_attr = query_path.get_attribute_embedding(constraints)
query_traj = [edge_embedder.get_embedding(edge) for edge in query_path.edges]

result = memory.query(query_path, query_attr, query_traj, constraints)
print(f"\nReplay Memory Query Result: {result}")

Edge Embeddings:
Edge 1: 1 -> 2, Embedding: [-0.82621756  0.14677582 -0.09000504 -0.53639584]
Edge 2: 1 -> 3, Embedding: [-0.02427223 -0.15227087  0.84636426 -0.50979601]
Edge 3: 2 -> 4, Embedding: [ 0.36760748 -0.35627048 -0.47096339  0.71842159]

Replay Memory Query Result: 0.6


## Summary

In this notebook, we've explored the implementation of the A-MCTS algorithm for temporal path discovery. Key points:

1. **Core Components**: TreeNode and AMCTS classes implement the search algorithm
2. **Search Process**: Selection, expansion, simulation, and backpropagation steps
3. **Adaptive Mechanisms**: Replay memory, edge embeddings, and priority-based exploration

The A-MCTS algorithm combines the strengths of Monte Carlo Tree Search with adaptive mechanisms to efficiently find paths in temporal networks with multiple constraints.

In the next notebook, we'll run experiments to evaluate the algorithm's performance on different graph types and constraint configurations.