In [1]:
# ----------------------------------------------------------
# Forward Reasoning in First-Order Logic (Robert is Criminal)
# ----------------------------------------------------------

facts = set()
rules = []

# --- FACTS ---
facts.update({
    ("American", "Robert"),
    ("Enemy", "A", "America"),
    ("Missile", "T1"),
    ("Owns", "A", "T1")
})

# --- RULES (Horn Clauses) ---

# 1. Law: American(p) ∧ Weapon(q) ∧ Sells(p, q, r) ∧ Hostile(r) → Criminal(p)
rules.append({
    "if": [
        ("American", "p"),
        ("Weapon", "q"),
        ("Sells", "p", "q", "r"),
        ("Hostile", "r")
    ],
    "then": ("Criminal", "p")
})

# 2. Missiles are weapons: Missile(x) → Weapon(x)
rules.append({
    "if": [("Missile", "x")],
    "then": ("Weapon", "x")
})

# 3. Enemy implies hostile: Enemy(x, America) → Hostile(x)
rules.append({
    "if": [("Enemy", "x", "America")],
    "then": ("Hostile", "x")
})

# 4. All missiles owned by A are sold to A by Robert:
rules.append({
    "if": [("Missile", "x"), ("Owns", "A", "x")],
    "then": ("Sells", "Robert", "x", "A")
})


# ---------------------------
# Forward Chaining Functions
# ---------------------------
def match_fact(pattern, fact):
    """Check if pattern can match fact with variable substitution."""
    if pattern[0] != fact[0]:
        return None
    substitution = {}
    for p, f in zip(pattern[1:], fact[1:]):
        if p.islower():  # variable
            substitution[p] = f
        elif p != f:
            return None
    return substitution


def substitute(pattern, subst):
    """Substitute variables in pattern using substitution dictionary."""
    return tuple(subst.get(p, p) for p in pattern)


def forward_chain(facts, rules, goal):
    new_fact = True
    while new_fact:
        new_fact = False
        for rule in rules:
            conds = rule["if"]
            then = rule["then"]

            matches = [{}]
            for cond in conds:
                new_matches = []
                for m in matches:
                    for fact in facts:
                        subst = match_fact(cond, fact)
                        if subst is not None:
                            combined = {**m, **subst}
                            new_matches.append(combined)
                matches = new_matches

            # Apply rule with all matches
            for m in matches:
                inferred = substitute(then, m)
                if inferred not in facts:
                    facts.add(inferred)
                    new_fact = True
                    print("Inferred:", inferred)
                    if inferred == goal:
                        print("\n✅ Goal reached:", inferred)
                        return True
    return False


# ---------------------------
# Run the reasoning
# ---------------------------
goal = ("Criminal", "Robert")

print("=== Forward Reasoning Trace ===")
result = forward_chain(facts, rules, goal)

if not result:
    print("\n❌ Could not prove the goal.")


=== Forward Reasoning Trace ===
Inferred: ('Weapon', 'T1')
Inferred: ('Hostile', 'A')
Inferred: ('Sells', 'Robert', 'T1', 'A')
Inferred: ('Criminal', 'Robert')

✅ Goal reached: ('Criminal', 'Robert')


In [2]:
# Simple forward chaining engine with detailed trace
from itertools import product
from copy import deepcopy

def is_variable(tok):
    # Variables start with uppercase letter (X, Y, Z).
    return tok and tok[0].isupper()

def atom_pred_name(atom):
    return atom[0]

def atom_arity(atom):
    return len(atom) - 1

def unify_atom_with_fact(atom, fact, theta):
    """
    Try to extend substitution theta to make atom match fact.
    atom and fact are tuples like ('Sells', 'X', 'Y', 'Z') or ('Sells','Robert','m1','CountryA')
    theta is a dict mapping variables -> constants (or other variables already bound).
    Returns: new_theta (dict) or None if clash.
    """
    if atom_pred_name(atom) != atom_pred_name(fact) or atom_arity(atom) != atom_arity(fact):
        return None
    new_theta = deepcopy(theta)
    # iterate through arguments
    for a_tok, f_tok in zip(atom[1:], fact[1:]):
        # if atom token is variable
        if is_variable(a_tok):
            # if variable already bound, it must match f_tok
            if a_tok in new_theta:
                if new_theta[a_tok] != f_tok:
                    return None
            else:
                new_theta[a_tok] = f_tok
        else:
            # atom token is a constant: must equal fact token (no variables allowed here)
            if a_tok != f_tok:
                return None
    return new_theta

def substitute_in_atom(atom, theta):
    """Apply theta to an atom, returning a ground atom (variables replaced)."""
    subs_args = []
    for tok in atom[1:]:
        if is_variable(tok) and tok in theta:
            subs_args.append(theta[tok])
        else:
            subs_args.append(tok)
    return (atom[0],) + tuple(subs_args)

def format_atom(atom):
    return f"{atom[0]}({', '.join(atom[1:])})"

def forward_chain(kb_facts, kb_rules, query=None, verbose=True):
    """
    kb_facts: set of ground atoms (tuples)
    kb_rules: list of rules, each rule is (head_atom, [body_atoms...])
             atoms are tuples: ('Predicate', 'arg1', 'arg2', ...)
             variables: tokens starting with uppercase letter
    query: optional atom to stop when found
    """
    facts = set(kb_facts)  # ground atoms
    inferred = set()
    step = 0

    if verbose:
        print("Initial facts:")
        for f in facts:
            print("  ", format_atom(f))
        print("\nStarting forward chaining...\n")

    while True:
        new_fact_added = False
        # Try each rule
        for head, body in kb_rules:
            # We must find substitutions that satisfy all body atoms using current facts.
            # We'll do a backtracking search across body atoms.
            def backtrack(index, theta):
                nonlocal new_fact_added, step
                if index == len(body):
                    # All body atoms matched, produce head with substitution
                    instantiated_head = substitute_in_atom(head, theta)
                    if instantiated_head not in facts and instantiated_head not in inferred:
                        step += 1
                        if verbose:
                            print(f"Step {step}: Rule fired:")
                            print("    Rule: {} :- {}".format(format_atom(head), ", ".join(format_atom(b) for b in body)))
                            print("    Substitution:", theta)
                            print("    Derived:", format_atom(instantiated_head))
                            print()
                        inferred.add(instantiated_head)
                        new_fact_added = True
                    return

                atom = body[index]
                # Try to match atom against any fact
                matched_any = False
                for f in facts:
                    theta_prime = unify_atom_with_fact(atom, f, theta)
                    if theta_prime is not None:
                        matched_any = True
                        if verbose:
                            print(f"  Matching body atom {format_atom(atom)} with fact {format_atom(f)} -> theta now {theta_prime}")
                        backtrack(index + 1, theta_prime)
                # if no fact matched this body atom, this branch fails
                if not matched_any:
                    if verbose:
                        print(f"  No match for body atom {format_atom(atom)} under theta {theta}")
                    return

            if verbose:
                print("Trying rule:", format_atom(head), ":-", ", ".join(format_atom(b) for b in body))
            backtrack(0, {})
            if verbose:
                print("-" * 50)

        # add inferred facts to facts set
        if inferred:
            for f in inferred:
                facts.add(f)
            inferred.clear()

        if not new_fact_added:
            if verbose:
                print("No new facts derived in this pass. Forward chaining halts.\n")
            break
        else:
            new_fact_added = False

        # optional early termination
        if query and any(q for q in facts if q[0] == query[0] and (len(q) == len(query)) and all((q_i == query_i or is_variable(query_i)) for q_i, query_i in zip(q[1:], query[1:]))):
            if verbose:
                print("Query found in facts; halting early.\n")
            break

    return facts

# ---- Define KB ----
# Facts (ground atoms)
kb_facts = {
    ('American', 'Robert'),
    ('Missile', 'm1'),
    ('Hostile', 'CountryA'),
    ('Sells', 'Robert', 'm1', 'CountryA')
}

# Rules (head, [body atoms...]) using variables X,Y,Z
# Criminal(X) :- American(X), Sells(X,Y,Z), Missile(Y), Hostile(Z).
kb_rules = [
    (('Criminal', 'X'),
     [
         ('American', 'X'),
         ('Sells', 'X', 'Y', 'Z'),
         ('Missile', 'Y'),
         ('Hostile', 'Z')
     ])
]

# Run forward chaining and trace
derived_facts = forward_chain(kb_facts, kb_rules, query=('Criminal','Robert'))
print("Final facts:")
for f in sorted(derived_facts):
    print("  ", format_atom(f))



Initial facts:
   Hostile(CountryA)
   Sells(Robert, m1, CountryA)
   Missile(m1)
   American(Robert)

Starting forward chaining...

Trying rule: Criminal(X) :- American(X), Sells(X, Y, Z), Missile(Y), Hostile(Z)
  Matching body atom American(X) with fact American(Robert) -> theta now {'X': 'Robert'}
  Matching body atom Sells(X, Y, Z) with fact Sells(Robert, m1, CountryA) -> theta now {'X': 'Robert', 'Y': 'm1', 'Z': 'CountryA'}
  Matching body atom Missile(Y) with fact Missile(m1) -> theta now {'X': 'Robert', 'Y': 'm1', 'Z': 'CountryA'}
  Matching body atom Hostile(Z) with fact Hostile(CountryA) -> theta now {'X': 'Robert', 'Y': 'm1', 'Z': 'CountryA'}
Step 1: Rule fired:
    Rule: Criminal(X) :- American(X), Sells(X, Y, Z), Missile(Y), Hostile(Z)
    Substitution: {'X': 'Robert', 'Y': 'm1', 'Z': 'CountryA'}
    Derived: Criminal(Robert)

--------------------------------------------------
Query found in facts; halting early.

Final facts:
   American(Robert)
   Criminal(Robert)
   Host