In [1]:
import os
os.environ["SWI_HOME_DIR"] = os.path.expanduser("~/.local/swipl/lib/swipl")
os.environ["LD_LIBRARY_PATH"] = os.path.expanduser("~/.local/swipl/lib/swipl/lib/x86_64-linux")
os.environ["PATH"] = os.path.expanduser("~/.local/swipl/bin") + ":" + os.environ["PATH"]

from pyswip import Prolog
from pathlib import Path
from deap import base, creator, tools, algorithms
import numpy as np
import deap 
import random


In [2]:
base_dir = Path('../symbolic_reasoning_sys/expanded versions/')
PROLOG_FILE = base_dir / "ethics_engine_expanded_32.pl"
prolog = Prolog()
prolog.consult(PROLOG_FILE)

# Call the main decision function
query = list(prolog.query("make_decision(dropped_wallet_32, Action, Justification, Score)"))
for result in query:
    print(f"Action: {result['Action']}")
    print(f"Justification: {result['Justification']}")
    print(f"Score: {result['Score']}")

Action: take_wallet
Justification: Action take_wallet justified by self_interest ethics based on scenario context: take_wallet.
Score: 0.35


In [3]:
GROUND_TRUTH = {
    'dropped_wallet_1': 'return_wallet',
    'dropped_wallet_2': 'return_wallet', 
    'dropped_wallet_3': 'return_wallet',
    'dropped_wallet_4': 'return_wallet',
    'dropped_wallet_5': 'return_wallet',
    'dropped_wallet_6': 'return_wallet',
    'dropped_wallet_7': 'return_wallet',
    'dropped_wallet_8': 'return_wallet',
    'dropped_wallet_9': 'return_wallet',
    'dropped_wallet_10': 'return_wallet', 
    'dropped_wallet_11': 'return_wallet',
    'dropped_wallet_12': "return_wallet",  # Was leave_wallet
    'dropped_wallet_13': 'return_wallet',
    'dropped_wallet_14': 'return_wallet',
    'dropped_wallet_15': 'return_wallet',
    'dropped_wallet_16': 'return_wallet',
    'dropped_wallet_17': "return_wallet",  # Was leave_wallet
    'dropped_wallet_18': 'leave_wallet', 
    'dropped_wallet_19': 'return_wallet',
    'dropped_wallet_20': 'take_wallet', ##
    'dropped_wallet_21': "return_wallet",  # Was leave_wallet
    'dropped_wallet_22': 'leave_wallet',
    'dropped_wallet_23': 'take_wallet', ##
    'dropped_wallet_24': 'take_wallet',
    'dropped_wallet_25': 'leave_wallet',
    'dropped_wallet_26': 'leave_wallet', 
    'dropped_wallet_27': 'leave_wallet',
    'dropped_wallet_28': 'leave_wallet',
    'dropped_wallet_29': 'leave_wallet',
    'dropped_wallet_30': 'leave_wallet',
    'dropped_wallet_31': 'leave_wallet',
    'dropped_wallet_32': 'leave_wallet',
}

In [4]:
def update_weights_in_file(weights):
    """
    Overwrite weight/2 facts in the Prolog file with new values.
    weights is a list or array: [w_utilitarian, w_deontological, w_self_interest]
    """
    with open(PROLOG_FILE, "r") as f:
        lines = f.readlines()

    new_lines = []
    for line in lines:
        if line.strip().startswith("weight(utilitarian"):
            new_lines.append(f"weight(utilitarian, {weights[0]:.3f}).\n")
        elif line.strip().startswith("weight(deontological"):
            new_lines.append(f"weight(deontological, {weights[1]:.3f}).\n")
        elif line.strip().startswith("weight(self_interest"):
            new_lines.append(f"weight(self_interest, {weights[2]:.3f}).\n")
        else:
            new_lines.append(line)

    with open(PROLOG_FILE, "w") as f:
        f.writelines(new_lines)
        

In [5]:
def run_prolog_query(weights):
    """
    Given a weight vector, update Prolog file, reload Prolog engine,
    run make_decision for all scenarios and calculate accuracy.
    """
    
    weights = np.array(weights)
    weights = weights / np.sum(weights) # Normalise weights so they sum to 1

    update_weights_in_file(weights)
    prolog = Prolog()
    prolog.consult(PROLOG_FILE)
    # for fact in prolog.query("weight(X, V)"):
    #     print(fact)

    correct = 0
    total = len(GROUND_TRUTH)

    for scenario_name, true_action in GROUND_TRUTH.items():
        query = f"make_decision({scenario_name}, Action, Justification, Score)."
        results = list(prolog.query(query))
        if not results:
            print(f"No result for scenario {scenario_name}")
            continue
        predicted = results[0]["Action"]
        if predicted == true_action:
            correct += 1

    accuracy = correct / total
    return (accuracy,)

if __name__ == "__main__":
    test_weights = [0.33, 0.33, 0.34]
    acc = run_prolog_query(test_weights)[0]
    print(f"Accuracy for weights {test_weights}: {acc:.2f}")
    

Accuracy for weights [0.33, 0.33, 0.34]: 0.56


In [None]:
scenario_data = [
    ("dropped_wallet_1",  ("dropped_wallet", True,  True,  "many_people_around")),
    ("dropped_wallet_2",  ("dropped_wallet", True,  True,  "many_people_around")),
    ("dropped_wallet_3",  ("dropped_wallet", True,  True,  "many_people_around")),
    ("dropped_wallet_4",  ("dropped_wallet", True,  True,  "many_people_around")),
    ("dropped_wallet_5",  ("dropped_wallet", True,  True,  "isolated_area")),
    ("dropped_wallet_6",  ("dropped_wallet", True,  True,  "isolated_area")),
    ("dropped_wallet_7",  ("dropped_wallet", True,  True,  "isolated_area")),
    ("dropped_wallet_8",  ("dropped_wallet", True,  True,  "isolated_area")),
    ("dropped_wallet_9",  ("dropped_wallet", True,  False, "many_people_around")),
    ("dropped_wallet_10", ("dropped_wallet", True,  False, "many_people_around")),
    ("dropped_wallet_11", ("dropped_wallet", True,  False, "many_people_around")),
    ("dropped_wallet_12", ("dropped_wallet", True,  False, "many_people_around")),
    ("dropped_wallet_13", ("dropped_wallet", True,  False, "isolated_area")),
    ("dropped_wallet_14", ("dropped_wallet", True,  False, "isolated_area")),
    ("dropped_wallet_15", ("dropped_wallet", True,  False, "isolated_area")),
    ("dropped_wallet_16", ("dropped_wallet", True,  False, "isolated_area")),
    ("dropped_wallet_17", ("dropped_wallet", False, True,  "many_people_around")),
    ("dropped_wallet_18", ("dropped_wallet", False, True,  "many_people_around")),
    ("dropped_wallet_19", ("dropped_wallet", False, True,  "many_people_around")),
    ("dropped_wallet_20", ("dropped_wallet", False, True,  "many_people_around")),
    ("dropped_wallet_21", ("dropped_wallet", False, True,  "isolated_area")),
    ("dropped_wallet_22", ("dropped_wallet", False, True,  "isolated_area")),
    ("dropped_wallet_23", ("dropped_wallet", False, True,  "isolated_area")),
    ("dropped_wallet_24", ("dropped_wallet", False, True,  "isolated_area")),
    ("dropped_wallet_25", ("dropped_wallet", False, False, "many_people_around")),
    ("dropped_wallet_26", ("dropped_wallet", False, False, "many_people_around")),
    ("dropped_wallet_27", ("dropped_wallet", False, False, "many_people_around")),
    ("dropped_wallet_28", ("dropped_wallet", False, False, "many_people_around")),
    ("dropped_wallet_29", ("dropped_wallet", False, False, "isolated_area")),
    ("dropped_wallet_30", ("dropped_wallet", False, False, "isolated_area")),
    ("dropped_wallet_31", ("dropped_wallet", False, False, "isolated_area")),
    ("dropped_wallet_32", ("dropped_wallet", False, False, "isolated_area")),
]

all_scenarios = {name: (event, (owner_nearby, contents_valuable, environment))
                 for name, (event, owner_nearby, contents_valuable, environment) in scenario_data}

    
def predict_action(scenario_name, weights):
    update_weights_in_file(weights)
    result = list(prolog.query(f"make_decision({scenario_name}, Action, Justification, Score)"))
    if result:
        predicted = result[0]['Action']
        return predicted, result[0]['Justification'], result[0]['Score']
    else:
        print(f"Scenario: {scenario_name} - No decision returned")
        return None, None, None

def evaluate(weights):
    weights = np.array(weights)
    weights = np.abs(weights)  # Ensure positive weights
    weights = weights / np.sum(weights)  # Normalize
    return run_prolog_query(weights)
    
def check_valid(individual):
    """Ensure weights are positive"""
    return all(w >= 0 for w in individual)    
# === DEAP SETUP ===


if not hasattr(creator, "FitnessMax"):
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
if not hasattr(creator, "Individual"):
    creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()

toolbox.register("attr_float", random.uniform, 0, 1) # Individuals are 3 floating-point numbers between 0 and 1
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_float, 3)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# Genetic operators
toolbox.register("evaluate", evaluate)
toolbox.register("mate", tools.cxBlend, alpha=0.1)  # Less disruptive crossover
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=0.1, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)

toolbox.decorate("mate", tools.DeltaPenalty(check_valid, 0.1))
toolbox.decorate("mutate", tools.DeltaPenalty(check_valid, 0.1))


    
def main():
    random.seed(42)
    population = toolbox.population(n=500)
    NGEN = 50
    CXPB = 0.7  # crossover prob
    MUTPB = 0.2  # mutation prob

    print("Start of evolution")
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    for gen in range(1, NGEN + 1):
        offspring = toolbox.select(population, len(population))
        offspring = list(map(toolbox.clone, offspring))

        # Apply crossover and mutation
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < CXPB:
                toolbox.mate(child1, child2)
                del child1.fitness.values, child2.fitness.values

        for mutant in offspring:
            if random.random() < MUTPB:
                toolbox.mutate(mutant)
                del mutant.fitness.values

        # Evaluate invalid fitnesses
        invalid = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = list(map(toolbox.evaluate, invalid))
        for ind, fit in zip(invalid, fitnesses):
            ind.fitness.values = fit

        population[:] = offspring

        top = tools.selBest(population, 1)[0]
        normalized_weights = np.round(np.array(top) / np.sum(top), 3)
        print(f"Gen {gen}: Best Accuracy = {top.fitness.values[0]:.2f} | Weights = {normalized_weights}")

    best_ind = tools.selBest(population, 1)[0]
    print("\nBest individual:")
    print(f"Weights: Utilitarian={best_ind[0]:.3f}, Deontological={best_ind[1]:.3f}, Self-interest={best_ind[2]:.3f}")
    print(f"Accuracy: {best_ind.fitness.values[0]:.3f}")
    print(best_ind)
    for scenario in all_scenarios:
        action, justification, score = predict_action(scenario, best_ind)
        correct = GROUND_TRUTH.get(scenario)
        scenario_struct_result = list(prolog.query(f"scenario({scenario}, S)"))
        if not scenario_struct_result:
            print(f"Scenario: {scenario} --- No scenario struct found")
            continue
        scenario_struct = scenario_struct_result[0]['S']
        rules_result = list(prolog.query(f"rule_sources({repr(scenario_struct)}, {action}, Sources)"))
        rules = rules_result[0]['Sources'] if rules_result else []
    
        match = action == correct
        print(f"Scenario: {scenario} --- Predicted: {action}, justification: {justification}, score: {score}, Ground Truth: {correct}, Match: {'✅' if match else '❌'}{match}")

if __name__ == "__main__":
    main()

Start of evolution
Gen 1: Best Accuracy = 0.84 | Weights = [0.301 0.427 0.272]
Gen 2: Best Accuracy = 0.84 | Weights = [0.332 0.533 0.135]
Gen 3: Best Accuracy = 0.84 | Weights = [0.35  0.445 0.205]
Gen 4: Best Accuracy = 0.84 | Weights = [0.226 0.416 0.358]
Gen 5: Best Accuracy = 0.84 | Weights = [0.237 0.454 0.309]
Gen 6: Best Accuracy = 0.84 | Weights = [0.298 0.598 0.103]
Gen 7: Best Accuracy = 0.84 | Weights = [0.227 0.502 0.271]
Gen 8: Best Accuracy = 0.84 | Weights = [0.271 0.469 0.26 ]
Gen 9: Best Accuracy = 0.84 | Weights = [0.28  0.465 0.255]
Gen 10: Best Accuracy = 0.84 | Weights = [0.21  0.565 0.224]
Gen 11: Best Accuracy = 0.84 | Weights = [0.186 0.53  0.283]


In [None]:
print(run_prolog_query([0.8, 0.1, 0.1]))  # Mostly utilitarian
print(run_prolog_query([0.1, 0.8, 0.1]))  # Mostly deontological
print(run_prolog_query([0.1, 0.1, 0.8]))  # Mostly self-interest
for w in [[1, 0, 0], [0, 1, 0], [0, 0, 1], [0.5, 0.5, 0], [0.3, 0.3, 0.4]]:
    print(w, run_prolog_query(w))

In [None]:
(32-7)/32