In [1]:
import sys, os
sys.path.append(os.path.abspath(".."))  # go up one folder

import requests, json
from IPython.display import display, Markdown, clear_output
from ollama.prompt import answer_this_prompt, generate_chat
from bn_helpers.get_structures_print_tools import get_nets, printNet, get_BN_structure, get_BN_node_states
from bn_helpers.bn_helpers import AnswerStructure, BnHelper
from bn_helpers.utils import get_path

MODEL_QUIZ = "qwen2.5:7b"
MODEL_TOOLS = "gpt-oss:latest"
# MODEL_TOOLS = MODEL_QUIZ
# print(generate_chat("Print [A, C] with no additional text", model="qwen2.5:3b", num_predict=5))
# print(answer_this_prompt("Print [A, C] with no additional text", model="qwen2.5:7b", format=AnswerStructure.model_json_schema()))

Loading Netica


In [2]:
# tool_enforced_ollama.py
import requests, json, inspect, typing

OLLAMA_URL = "http://localhost:11434/api/chat"

pre_pre_query = "If the return value is grammatically correct, return exactly as that return value, otherwise fix the grammar and return the fixed return value."
pre_pre_query += "Do not check the return value's correctness. Just check the return value's grammar, fix only the grammar if necessary, then return the value."

# ---------- Python type -> JSON Schema ----------
def _pytype_to_schema(t):
    origin = typing.get_origin(t)
    args = typing.get_args(t)
    if t in (int,): return {"type": "integer"}
    if t in (float,): return {"type": "number"}
    if t in (bool,): return {"type": "boolean"}
    if t in (str,): return {"type": "string"}
    if t in (dict, typing.Dict, typing.Mapping): return {"type": "object"}
    if t in (list, typing.List, typing.Sequence): return {"type": "array"}
    if origin is typing.Union and len(args) == 2 and type(None) in args:
        other = args[0] if args[1] is type(None) else args[1]
        sch = _pytype_to_schema(other)
        if "type" in sch and isinstance(sch["type"], str):
            sch["type"] = [sch["type"], "null"]
        return sch
    if origin in (list, typing.List, typing.Sequence) and args:
        return {"type": "array", "items": _pytype_to_schema(args[0])}
    return {"type": "string"}

def function_to_tool_schema(fn, *, name=None, description=None):
    sig = inspect.signature(fn)
    props, required = {}, []
    for pname, p in sig.parameters.items():
        if p.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
            continue
        props[pname] = _pytype_to_schema(p.annotation) if p.annotation is not inspect._empty else {"type":"string"}
        if p.default is inspect._empty:
            required.append(pname)
    return {
        "type":"function",
        "function":{
            "name": name or fn.__name__,
            "description": description or (fn.__doc__.strip() if fn.__doc__ else f"Function {fn.__name__}"),
            "parameters":{"type":"object","properties":props,"required":required}
        }
    }

def _coerce_arg(value, param: inspect.Parameter):
    if param.annotation in (int, float, bool, str):
        try: return param.annotation(value)
        except Exception: return value
    return value

def _bind_and_call(fn, kwargs: dict):
    sig = inspect.signature(fn)
    bound = {}
    for pname, p in sig.parameters.items():
        if p.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
            continue
        if pname in kwargs:
            bound[pname] = _coerce_arg(kwargs[pname], p)
        elif p.default is not inspect._empty:
            bound[pname] = p.default
        else:
            raise TypeError(f"Missing required argument: {pname}")
    return fn(**bound)

def chat_with_tools(
    prompt: str,
    fns: dict,
    model: str = MODEL_TOOLS,
    temperature: float = 0.0,
    num_predict: int = 200,
    max_rounds: int = 4,
    require_tool: bool = True,
):
    tools = [function_to_tool_schema(fn, name=name) for name, fn in fns.items()]

    system_prompt = (
        "You are a tool-using assistant. "
        "Do NOT perform calculations or external actions yourself. "
        "Always call a tool when any tool could plausibly answer the user. "
        "After receiving tool results, if the return value is grammatically correct, return exactly as that return value, otherwise fix the grammar and return the fixed return value. "
        "Do not check the return value's correctness. "
        "Just check the return value's grammar, fix only the grammar if necessary, then return the value."
    )

    messages = [{"role":"system","content":system_prompt}, {"role":"user","content":prompt}]
    retries_left = max_rounds

    while retries_left > 0:
        r = requests.post(
            OLLAMA_URL,
            json={
                "model": model,
                "messages": messages,
                "tools": tools,
                "options": {"temperature": float(temperature), "num_predict": int(num_predict)},
            },
            stream=True,
        )
        if r.status_code != 200:
            raise RuntimeError(f"Ollama error {r.status_code}: {r.text}")

        assistant_msg = {"role":"assistant","content":"", "tool_calls":[]}
        for line in r.iter_lines(decode_unicode=True):
            if not line: continue
            try: chunk = json.loads(line)
            except json.JSONDecodeError: continue
            if "message" in chunk:
                m = chunk["message"]
                if m.get("content"): assistant_msg["content"] += m["content"]
                if "tool_calls" in m and m["tool_calls"]:
                    assistant_msg["tool_calls"] = m["tool_calls"]
            if chunk.get("done"): break

        messages.append(assistant_msg)

        if assistant_msg["tool_calls"]:
            # Run every tool the model asked for
            tool_msgs = []
            for i, call in enumerate(assistant_msg["tool_calls"], 1):
                fn_name = call["function"]["name"]
                args = call["function"].get("arguments", {})
                print(f"[TOOL AGENT] tool_call #{i}: {fn_name}({args})")
                if fn_name not in fns:
                    payload = {"error": f"Tool '{fn_name}' not registered"}
                else:
                    try:
                        out = _bind_and_call(fns[fn_name], args)
                        try:
                            json.dumps(out)  # ensure JSON-serializable
                            payload = {"result": out}
                        except TypeError:
                            payload = {"result": repr(out)}
                    except Exception as e:
                        payload = {"error": f"{type(e).__name__}: {e}"}
                print(f"[DEBUG] tool_result #{i}: {payload}")
                tool_msgs.append({"role":"tool","tool_name": fn_name,"content": json.dumps(payload)})

            messages.extend(tool_msgs)

            # let the model integrate tool outputs
            messages.append({
                "role": "user",
                "content": "Use the tool results above to answer the question succinctly. Do not call any tools again unless strictly necessary."
            })
            # Let the model integrate results into a final answer in the next turn
            retries_left -= 1
            continue

        # No tool calls returned
        if assistant_msg["content"].strip():
            return assistant_msg["content"].strip()

        # If we just asked it to finalize (the last message is our finalize prompt),
        # but it still returned empty content, fall back to tool outputs.
        if messages and any(m.get("role") == "tool" for m in messages[-3:]):
            # find the most recent tool message(s)
            for m in reversed(messages):
                if m.get("role") == "tool":
                    return m["content"]  # raw JSON string you sent; print or post-process as you like

        # Otherwise, nudge it to use tools (original behavior)
        if require_tool:
            print("[DEBUG] Model answered without tools; nudging it to use tools...")
            messages.append({
                "role":"user",
                "content":"Reminder: you must use the available tools. Do not answer directly."
            })
            retries_left -= 1
            continue

        return assistant_msg["content"].strip()

    # After loops, return last assistant text (if any)
    return messages[-1]["content"].strip()


In [3]:
nets = get_nets()
myNet = nets[0]

printNet(myNet)


A -> ['B', 'C']
B -> ['D']
C -> ['D', 'E']
D -> []
E -> []


In [11]:
def get_explain_XY_dconnected(net, node1, node2):
    open_path = get_path(net, node1, node2)  # must exist!
    return (f"Yes, {node1} is d-connected to {node2}, "
            f"which means that entering evidence for {node1} would "
            f"change the probability of {node2} and vice versa. They d-connected through the following path: {open_path}")

def get_explain_XY_dseparated(net, node1, node2):
    import random
    bn_helper = BnHelper()
    blocked_nodes = bn_helper.get_common_effect(net, node1, node2)
    random_blocked_node = random.choice(list(blocked_nodes)) if blocked_nodes else None

    base = (f"No, {node1} is not d-connected to {node2}, so evidence on {node1} "
            f"would not change the probability of {node2}.")
    if random_blocked_node:
        return base + f" They are blocked at {random_blocked_node} due to a common effect."
    return base + f" There is no open path between {node1} and {node2}."

def get_ground_truth(net, node1, node2):
    bn_helper = BnHelper()
    is_conn = bn_helper.is_XY_connected(net, node1, node2)
    return (get_explain_XY_dconnected(net, node1, node2)
            if is_conn else
            get_explain_XY_dseparated(net, node1, node2))

# ---- Tool wrapper (hides net) ----
def make_explain_d_connected_tool(net):
    def check_d_connected(from_node: str, to_node: str) -> dict:
        """Explain whether two nodes are d-connected and why.
        Returns: { "d_connected": boolean, "explanation": string }
        """
        try:
            bn_helper = BnHelper()
            is_conn = bn_helper.is_XY_connected(net, from_node, to_node)
            if is_conn:
                explanation = get_explain_XY_dconnected(net, from_node, to_node)
            else:
                explanation = get_explain_XY_dseparated(net, from_node, to_node)
            # return {"d_connected": bool(is_conn), "explanation": explanation}
            return explanation
        except Exception as e:
            # Surface errors to the model in a structured, visible way
            return {"d_connected": None, "error": f"{type(e).__name__}: {e}"}
    return check_d_connected

def make_explain_common_cause_tool(net):
    def check_common_cause(node1, node2):
        """Check if there is a common cause between two nodes."""
        bn_helper = BnHelper()
        ans = bn_helper.get_common_cause(net, node1, node2)
        if ans:
            template = f"The common cause(s) of {node1} and {node2} is/are: {', '.join(ans)}."
        else:
            template = f"There is no common cause between {node1} and {node2}."
        return template
    return check_common_cause

def make_explain_common_effect_tool(net):
    def check_common_effect(node1, node2):
        """Check if there is a common effect between two nodes."""
        bn_helper = BnHelper()
        ans = bn_helper.get_common_effect(net, node1, node2)
        if ans:
            template = f"The common effect(s) of {node1} and {node2} is/are: {', '.join(ans)}."
        else:
            template = f"There is no common effect between {node1} and {node2}."
        return template
    return check_common_effect

tools_map = {
    "check_d_connected": make_explain_d_connected_tool(myNet),
    "check_common_cause": make_explain_common_cause_tool(myNet),
    "check_common_effect": make_explain_common_effect_tool(myNet),
}

# Strongly worded test prompt to force tool usage
pre_query = """If user ask anything related to d-connected (d-connected means that entering evidence for one node will change the probability of the other node), use this function name: check_d_connected. 
If user ask anything related to common cause, use this function name: check_common_cause.
If user ask anything related to common effect, use this function name: check_common_effect.
User query: """

question = (
    # "Is A d-connected to B?"
    # "Is changing the evidence of A going to change the probability of B?"
    "What is the common effect of C and B?"
)

answer = chat_with_tools(
    prompt=pre_query+question,
    fns=tools_map,
    model=MODEL_TOOLS,
    require_tool=True,    
    temperature=0.0,
    num_predict=300,
    max_rounds=4,
)

print("\n=== FINAL ANSWER ===\n", answer)

[TOOL AGENT] tool_call #1: check_common_effect({'node1': 'C', 'node2': 'B'})
[DEBUG] tool_result #1: {'result': 'The common effect(s) of C and B is/are: D.'}

=== FINAL ANSWER ===
 The common effect of C and B is D.


In [17]:
import requests, json
from pydantic import BaseModel
from IPython.display import display, Markdown, clear_output

MODEL = "gpt-oss-bn-json"
def answer_this_prompt(prompt, stream=False, model=MODEL, temperature=0, format=None):
    payload = {
        "prompt": prompt,
        "model": model,
        "temperature": temperature,
        "max_new_tokens": 50, # only when stream = False work
        "format": format
    }
    headers = {
        'Content-Type': 'application/json'
    }
    endpoint = "http://localhost:11434/api/generate"

    # Send the POST request with streaming enabled
    with requests.post(endpoint, headers=headers, json=payload, stream=True) as response:
        if response.status_code == 200:
            try:
                # Process the response incrementally
                full_response = ""
                for line in response.iter_lines(decode_unicode=True):
                    if line.strip():  # Skip empty lines
                        response_json = json.loads(line)
                        chunk = response_json.get("response", "")
                        full_response += chunk
                        
                        # Render the response as Markdown
                        if stream:
                            clear_output(wait=True)
                            display(Markdown(full_response))
                        
                return full_response
            except json.JSONDecodeError as e:
                return "Failed to parse JSON: " + str(e)
        else:
            return "Failed to retrieve response: " + str(response.status_code)

class BnHelpers(BaseModel):
    fnName: str

def add(a=5, b=6):
    print('Go to function successfully')
    return a + b

output = answer_this_prompt('output this function name: add', stream=True, format=BnHelpers.model_json_schema())

bnHelpers = BnHelpers.model_validate_json(output)
if bnHelpers.fnName == 'add':
    print(add())

{"fnName":"add"}



Go to function successfully
11


In [16]:
bn_path = "./nets/collection/"
from bni_netica.bni_netica import *
from bni_netica.bni_netica import Net

CancerNeapolitanNet = Net(bn_path+"Cancer Neapolitan.neta")
ChestClinicNet = Net(bn_path+"ChestClinic.neta")
ClassifierNet = Net(bn_path+"Classifier.neta")
CoronaryRiskNet = Net(bn_path+"Coronary Risk.neta")
FireNet = Net(bn_path+"Fire.neta")
MendelGeneticsNet = Net(bn_path+"Mendel Genetics.neta")
RatsNet = Net(bn_path+"Rats.neta")
WetGrassNet = Net(bn_path+"Wet Grass.neta")
RatsNoisyOr = Net(bn_path+"Rats_NoisyOr.dne")
Derm = Net(bn_path+"Derm 7.9 A.dne")

BN = ""
for node in FireNet.nodes():
    BN += f"{node.name()} -> {[child.name() for child in node.children()]}\n"

def isConnected(net, fromNode, toNode):
  relatedNodes = net.node(fromNode).getRelated("d_connected")
  for node in relatedNodes:
    if node.name() == toNode:
      return True
  return False


BN = ""
for node in FireNet.nodes():
    BN += f"{node.name()} -> {[child.name() for child in node.children()]}\n"

PROMPT = "Within {BN}, is {fromNode} an ancestor of {toNode}?"
fromNode = 'Alarm'
toNode = 'Fire'

PROMPT = PROMPT.format(BN=BN, fromNode=fromNode, toNode=toNode)
inputPrompt = PROMPT + 'if user ask anything related to are these two nodes connected to each other, output this function name: isConnected'
output2 = answer_this_prompt(inputPrompt, stream=True, format=BnHelpers.model_json_schema())

{"fnName":"isConnected"}


In [None]:
questions = [
    """In this Bayesian Networks: {BN}, is {fromNode} connected to {toNode}?""",
    """In this Bayesian Networks: {BN}, is {fromNode} connected to {toNode}? What are the two nodes mentioned?""",
    "Within the Bayesian Network {BN}, does a path exist from {fromNode} to {toNode}?",
    "In the graph {BN}, can information flow from {fromNode} to {toNode}?", # top perform 
    "Are {fromNode} and {toNode} dependent in the Bayesian Network {BN}?",
    "In {BN}, is there any direct or indirect connection between {fromNode} and {toNode}?",
    "Can {fromNode} influence {toNode} in the Bayesian Network {BN}?",
    "Is {toNode} reachable from {fromNode} in the structure of {BN}?",
    "Does {BN} contain a path that links {fromNode} to {toNode}?",
    "Are there any edges—direct or through other nodes—connecting {fromNode} and {toNode} in {BN}?",
    "Is {toNode} conditionally dependent on {fromNode} in the Bayesian Network {BN}?",
    "Within {BN}, is {fromNode} an ancestor of {toNode}?"
]

In [None]:
listOfNets = [CancerNeapolitanNet, ChestClinicNet, ClassifierNet, CoronaryRiskNet, FireNet, MendelGeneticsNet, RatsNet, WetGrassNet, RatsNoisyOr, Derm]

for question in questions:
  total = 0
  correct = 0
  print(f"Question: {question.format(BN=net.name(), fromNode=fromNode, toNode=toNode)}")
  for net in listOfNets:
      for _ in range(5):
        total += 1
        fromNode, toNode = pickTwoRandomNodes(net)
        if fromNode and toNode:
            
            correctIdentified, queryFromNode, queryToNode = correctIdentification(question, net, fromNode, toNode)
            if correctIdentified:
              correct += 1
            else:
              print(f"Incorrect identification for {net.name()}")
              printNet(net)
              print()
              print("Expected:", fromNode, "->", toNode)
              print("Reality:", queryFromNode, "->", queryToNode)
              print("----------------------------------------------------")

  print(f"Total: {total}, Correct: {correct}, Accuracy: {correct/total:.2%}")
  print("<------------------------------------------------------------------------->")

In [None]:
from bni_netica.support_tools import get_nets, printNet, get_BN_structure, get_BN_node_states
from bni_netica.bn_helpers import BnHelper, QueryTwoNodes, ParamExtractor
from ollama.prompt import answer_this_prompt
from bni_netica.scripts import HELLO_SCRIPT, MENU_SCRIPT, GET_FN_SCRIPT

# PROMPT = """Consider this question: '{question}'. 
# What are the two nodes in this question? 
# Make sure to correctly output the names of nodes exactly as mentioned in the network and in the order as the question mentioned. 
# For example, if the question mentioned "A and B" then the two nodes are fromNode: A, toNode: B; or if the question mentioned "Smoking and Cancer" then the two nodes are fromNode: Smoking, toNode: Cancer. 
# Answer in JSON format."""

def query_menu(BN_string, net):
    """Input: BN: string, net: object"""
    pre_query = f"""In this Bayesian Network: 
{BN_string}
"""
    user_query = input("Enter your query here: ")
    get_fn_prompt = pre_query + "\n" + user_query + GET_FN_SCRIPT

    get_fn = answer_this_prompt(get_fn_prompt, format=BnHelper.model_json_schema())
    print("\nBayMin:")
    print(get_fn)

    get_fn = BnHelper.model_validate_json(get_fn)
    fn = get_fn.function_name

    bn_helper = BnHelper(function_name=fn)
    param_extractor = ParamExtractor()
    
    if fn == "is_XY_connected":
        
        get_params = param_extractor.extract_two_nodes_from_query(pre_query, user_query)
        print(get_params)

        ans = bn_helper.is_XY_connected(net, get_params.from_node, get_params.to_node)

        if ans:
            template = f"Yes, {get_params.from_node} is d-connected to {get_params.to_node}, which means that entering evidence for {get_params.from_node} would change the probability of {get_params.to_node} and vice versa."
        else:
            template = f"No, {get_params.from_node} is not d-connected to {get_params.to_node}, which means that entering evidence for {get_params.from_node} would not change the probability of {get_params.to_node}."
        
        explain_prompt = f"""User asked: In this '{BN_string}', '{user_query}'. We use {fn} function and the output is: '{ans}'. Follow this exact template to provide the answer: '{template}'."""
        print(answer_this_prompt(explain_prompt))

    print()
    
    print(MENU_SCRIPT)
    choice = int(input("Enter your choice: "))
    print()

    if choice == 1:
        input("Enter your query here: ")
        print('This is a sample answer.\n')
    elif choice == 2:
        input("Enter your query here: ")
        print('This is a sample answer.\n')
    elif choice == 3:
        print("Not yet implemented\n")
        return 
    elif choice == 4:
        print("Goodbye!\n")
        return    

def main():
    print(HELLO_SCRIPT)
    nets = get_nets()

    
    for i, net in enumerate(nets):
        print(f"{i}: {net.name()}")

    print()
    choice = int(input("Enter the number of the network you want to use: "))
    print()
    if choice < 0 or choice >= len(nets):
        print("Invalid choice. Exiting.")
        return
    
    net = nets[choice]
    print(f"You chose: {net.name()}")
    printNet(net)
    print('\nBN states:\n')
    print(get_BN_node_states(net))

    BN_string = get_BN_structure(net)
    query_menu(BN_string=BN_string, net=net)


if __name__ == "__main__":
    main()
