In [1]:
import json
import re
import os
import time
from typing import Dict, Any, List, Tuple
from tqdm import tqdm
from pydantic import BaseModel
import heapq

from openai import OpenAI
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
dataset = list(load_dataset("vashistht/11763_datasets", "graph_dev")["dev_test"])
print(dataset)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

graph_dev/dev_test-00000-of-00001.parque(â€¦):   0%|          | 0.00/88.7k [00:00<?, ?B/s]

graph_dev/dev-00000-of-00001.parquet:   0%|          | 0.00/687k [00:00<?, ?B/s]

Generating dev_test split:   0%|          | 0/100 [00:00<?, ? examples/s]

Generating dev split:   0%|          | 0/900 [00:00<?, ? examples/s]

[{'id': 979, 'graph_params': {'N': 16, 'M': 10, 'W': 437, 'P': 2}, 'edges': [[0, 1, 369], [0, 2, 280], [0, 3, 80], [0, 4, 366], [0, 5, 328], [0, 6, 385], [0, 7, 10], [0, 8, 151], [0, 9, 347], [0, 10, 263], [1, 2, 163], [1, 3, 433], [1, 4, 404], [1, 5, 339], [1, 6, 354], [1, 7, 156], [1, 8, 279], [1, 9, 433], [1, 10, 43], [1, 11, 72], [2, 3, 360], [2, 4, 71], [2, 5, 114], [2, 6, 388], [2, 7, 86], [2, 8, 425], [2, 9, 66], [2, 10, 324], [2, 11, 39], [2, 12, 404], [3, 4, 49], [3, 5, 50], [3, 6, 239], [3, 7, 172], [3, 8, 330], [3, 9, 256], [3, 10, 236], [3, 11, 404], [3, 12, 208], [3, 13, 114], [4, 5, 309], [4, 6, 314], [4, 7, 418], [4, 8, 383], [4, 9, 327], [4, 10, 229], [4, 11, 130], [4, 12, 218], [4, 13, 139], [4, 14, 422], [5, 6, 352], [5, 7, 378], [5, 8, 175], [5, 9, 162], [5, 10, 382], [5, 11, 266], [5, 12, 249], [5, 13, 3], [5, 14, 127], [5, 15, 368], [6, 7, 64], [6, 8, 360], [6, 9, 413], [6, 10, 13], [6, 11, 365], [6, 12, 356], [6, 13, 225], [6, 14, 109], [6, 15, 237], [6, 0, 159], 

In [3]:
def generate_problem_prompt(edges: list, params: dict) -> str:

    N, P = params["N"], params["P"]
    prompt = f"""You are given a directed graph with {N} nodes (numbered 0 to {N-1}) and the following edges:

    Edges (source -> target, weight):
    """
    for src, dst, weight in edges:
        prompt += f"{src} -> {dst}, weight: {weight}\n"

    prompt += f"""
    Find the top {P} shortest path{'s' if P > 1 else ''} from node 0 to node {N-1}."""

    return prompt

prompt_list = []
for i in range(len(dataset)):
    prompt_list.append(generate_problem_prompt(dataset[i]["edges"], dataset[i]["graph_params"]))

In [5]:
class PathInfo(BaseModel):
    """Information about a single path"""
    path: List[int]
    weight: int


class GraphPathSolution(BaseModel):
    """Solution containing top-P paths with their weights"""
    paths: List[PathInfo]


def find_top_p_paths(edges: List[Tuple[int, int, int]], N: int, P: int) -> GraphPathSolution:
    """
    Find the top P shortest paths from node 0 to node N-1 using dynamic programming.

    Args:
        edges: List of (source, target, weight) tuples
        N: Number of nodes
        P: Number of paths to find

    Returns:
        GraphPathSolution containing the top P shortest paths
    """
    # Build adjacency list
    graph = {i: [] for i in range(N)}
    for src, dst, weight in edges:
        graph[src].append((dst, weight))

    # Use modified Dijkstra's algorithm to find top P paths
    # Each state is (cost, path)
    pq = [(0, [0])]  # (cost, path)
    paths_found = []
    visited_states = set()

    while pq and len(paths_found) < P:
        cost, path = heapq.heappop(pq)
        current_node = path[-1]

        # Create a state key to avoid revisiting the same (node, path_length) combination
        state_key = (current_node, len(path))
        if state_key in visited_states:
            continue
        visited_states.add(state_key)

        # If we reached the target node, add this path to results
        if current_node == N - 1:
            paths_found.append(PathInfo(path=path, weight=cost))
            continue

        # Explore neighbors
        for neighbor, edge_weight in graph[current_node]:
            if neighbor not in path:  # Avoid cycles
                new_cost = cost + edge_weight
                new_path = path + [neighbor]
                heapq.heappush(pq, (new_cost, new_path))

    return GraphPathSolution(paths=paths_found)

In [6]:
TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "find_top_p_paths",
            "description": "Find the top P shortest paths from node 0 to node N-1 in a directed graph",
            "parameters": {
                "type": "object",
                "properties": {
                    "edges": {
                        "type": "array",
                        "items": {
                            "type": "array",
                            "items": {"type": "integer"},
                            "minItems": 3,
                            "maxItems": 3
                        },
                        "description": "List of edges as [source, target, weight] arrays"
                    },
                    "N": {
                        "type": "integer",
                        "description": "Number of nodes in the graph"
                    },
                    "P": {
                        "type": "integer",
                        "description": "Number of shortest paths to find"
                    }
                },
                "required": ["edges", "N", "P"]
            }
        }
    }
]

def execute_tool_call(tool_name: str, arguments: dict):
    if tool_name == "find_top_p_paths":
        edges = [tuple(edge) for edge in arguments["edges"]]
        return find_top_p_paths(edges, arguments["N"], arguments["P"])

In [8]:
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "Qwen/Qwen3-4B-Instruct-2507"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/727 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 3 files:   0%|          | 0/3 [00:00<?, ?it/s]

model-00003-of-00003.safetensors:   0%|          | 0.00/99.6M [00:00<?, ?B/s]

model-00002-of-00003.safetensors:   0%|          | 0.00/3.99G [00:00<?, ?B/s]

model-00001-of-00003.safetensors:   0%|          | 0.00/3.96G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/238 [00:00<?, ?B/s]

In [9]:
def parse_tool_call_from_response(response: str) -> tuple:
    """
    Parse tool call from model response.
    Returns (tool_name, arguments) or (None, None) if no tool call found.
    """
    try:
        # Check <tool_call> in response
        if "<tool_call>" in response and "</tool_call>" in response:
            tool_call_start = response.find("<tool_call>") + len("<tool_call>")
            tool_call_end = response.find("</tool_call>")
            tool_call_json = response[tool_call_start:tool_call_end].strip()

            tool_call = json.loads(tool_call_json)
            return tool_call.get("name"), tool_call.get("arguments")

        # Check function_name format
        func_call_pattern = r'find_top_p_paths\s*\(\s*edges\s*=\s*(\[.*?\])\s*,\s*N\s*=\s*(\d+)\s*,\s*P\s*=\s*(\d+)\s*\)'
        match = re.search(func_call_pattern, response, re.DOTALL)
        if match:
            edges_str, n_str, p_str = match.groups()
            edges = json.loads(edges_str)
            return "find_top_p_paths", {"edges": edges, "N": int(n_str), "P": int(p_str)}

        # finally check json structure
        json_pattern = r'\{[^{}]*"edges"[^{}]*\}'
        json_matches = re.findall(json_pattern, response, re.DOTALL)
        for json_str in json_matches:
            try:
                args = json.loads(json_str)
                if all(key in args for key in ["edges", "N", "P"]):
                    return "find_top_p_paths", args
            except:
                continue

        return None, None

    except Exception as e:
        print(f"Error parsing tool call: {e}")
        return None, None

In [None]:
system_message = """You are a helpful assistant that can solve graph problems. You have access to a tool called 'find_top_p_paths' that can find the shortest paths in a directed graph.

When given a graph problem, you should:
1. Parse the graph edges from the problem description
2. Identify the number of nodes (N) and paths requested (P)
3. Call the find_top_p_paths function with the appropriate parameters
4. Return the results in the requested format

You can call the function using this format:
<tool_call>
{"name": "find_top_p_paths", "arguments": {"edges": [[0,1,10], [1,2,5]], "N": 3, "P": 2}}
</tool_call>

The edges should be provided as a list of [source, target, weight] arrays."""


def generate_qwen_response_batch_with_tools(prompts: List[str], tokenizer, model, batch_size):
    all_responses = []
    all_tool_results = []

    for i in tqdm(range(0, len(prompts), batch_size)):
        batch_prompts = prompts[i:i + batch_size]

        # Prepare batch messages with tool support
        batch_messages = []
        for prompt in batch_prompts:
            messages = [
                {"role": "system", "content": system_message},
                {"role": "user", "content": prompt}
            ]
            text = tokenizer.apply_chat_template(
                messages,
                tools=TOOLS,
                tokenize=False,
                add_generation_prompt=True
            )
            batch_messages.append(text)

        # Tokenize the entire batch
        model_inputs = tokenizer(
            batch_messages,
            return_tensors="pt",
            padding=True,
        ).to(model.device)

        generated_ids = model.generate(
            **model_inputs,
            max_new_tokens=128,
            do_sample = True,
            # num_beams = 3
            temperature = 0.1,
            # typical_p=0.9
        )

        # Process each response in the batch
        batch_responses = []
        batch_tool_results = []

        for j, generated in enumerate(generated_ids):
            # Get the length of the input for this specific item
            input_length = len(model_inputs.input_ids[j])
            output_ids = generated[input_length:].tolist()

            # Decode the response
            content = tokenizer.decode(output_ids, skip_special_tokens=True).strip()
            batch_responses.append(content)

            # Try to execute tool call if present
            tool_name, arguments = parse_tool_call_from_response(content)
            if tool_name and arguments:
                try:
                    tool_result = execute_tool_call(tool_name, arguments)
                    batch_tool_results.append(tool_result)
                except Exception as e:
                    print(f"Error executing tool call for batch item {i+j}: {e}")
                    batch_tool_results.append(None)
            else:
                batch_tool_results.append(None)

        all_responses.extend(batch_responses)
        all_tool_results.extend(batch_tool_results)

    return all_responses, all_tool_results

In [58]:
responses, tool_responses = generate_qwen_response_batch_with_tools(prompt_list, tokenizer, model, batch_size=20)

  0%|          | 0/5 [00:00<?, ?it/s]A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
 20%|â–ˆâ–ˆ        | 1/5 [00:17<01:10, 17.62s/it]A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
 40%|â–ˆâ–ˆâ–ˆâ–ˆ      | 2/5 [00:36<00:54, 18.21s/it]A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
 60%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ    | 3/5 [00:53<00:35, 17.61s/it]A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
 80%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ  | 4/5 [01:11<00:17, 17.77s/it]A decoder-only architecture is being used, but righ

In [60]:
def evaluate_solution(correct_solution: GraphPathSolution, predicted_solution: GraphPathSolution, P: int) -> float:
    """
    Evaluate the predicted solution against the correct solution.

    Args:
        correct_solution: The correct GraphPathSolution
        predicted_solution: The predicted GraphPathSolution
        P: Number of paths that should be found

    Returns:
        Score between 0.0 and 1.0 (number of correct paths / P)
    """
    if predicted_solution is None:
        return 0.0
    if not correct_solution.paths or not predicted_solution.paths:
        return 0.0

    # Create sets of (path_tuple, weight) for comparison
    correct_paths = {(tuple(path.path), path.weight) for path in correct_solution.paths}
    predicted_paths = {(tuple(path.path), path.weight) for path in predicted_solution.paths}

    # Count how many predicted paths match correct paths
    matches = len(correct_paths.intersection(predicted_paths))

    return matches / P

In [61]:
def convert_dataset_solution(dataset_solution) -> GraphPathSolution:
    path_infos = []

    for path_dict in dataset_solution['paths']:
        path_info = PathInfo(
            path=path_dict['path'],
            weight=path_dict['weight']
        )
        path_infos.append(path_info)

    return GraphPathSolution(paths=path_infos)

true_solution = []
for i in range(100):
    true_solution.append(convert_dataset_solution(dataset[i]["solution"]))

print(true_solution)

[GraphPathSolution(paths=[PathInfo(path=[0, 7, 15], weight=123), PathInfo(path=[0, 3, 5, 13, 1, 10, 15], weight=252)]), GraphPathSolution(paths=[PathInfo(path=[0, 3], weight=31), PathInfo(path=[0, 1, 3], weight=499)]), GraphPathSolution(paths=[PathInfo(path=[0, 3, 7], weight=301)]), GraphPathSolution(paths=[PathInfo(path=[0, 6, 5, 8], weight=44), PathInfo(path=[0, 1, 6, 5, 8], weight=50)]), GraphPathSolution(paths=[PathInfo(path=[0, 1, 4], weight=580)]), GraphPathSolution(paths=[PathInfo(path=[0, 10], weight=352)]), GraphPathSolution(paths=[PathInfo(path=[0, 2, 5, 8, 11, 13, 15, 16], weight=739)]), GraphPathSolution(paths=[PathInfo(path=[0, 2, 7], weight=66)]), GraphPathSolution(paths=[PathInfo(path=[0, 4, 13], weight=366)]), GraphPathSolution(paths=[PathInfo(path=[0, 2, 5], weight=345)]), GraphPathSolution(paths=[PathInfo(path=[0, 1, 3, 6], weight=321)]), GraphPathSolution(paths=[PathInfo(path=[0, 5, 6, 8], weight=177), PathInfo(path=[0, 5, 6, 7, 8], weight=210), PathInfo(path=[0, 8],

In [62]:
scores = []
for i in range(100):
    scores.append(evaluate_solution(true_solution[i], tool_responses[i], dataset[i]["graph_params"]["P"]))

print(f"Final Score - {sum(scores)/len(scores)}")