In [None]:
from google.colab import drive
drive.mount('/content/drive')
SAVE_FOLDER = '/content/drive/MyDrive/Projects/modal/epistemic_01/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import os

# Set a moderate temperature for soft-min/max to show fuzzy logic
TEMP = 0.1

class MLNN(nn.Module):
    """
    Implements the core logic of the MLNN paper, demonstrating
    Epistemic, Temporal, and Composite operators.
    """
    def __init__(self, num_agents=2, num_steps=3):
        super(MLNN, self).__init__()

        self.num_agents = num_agents
        self.num_steps = num_steps
        self.num_states = num_agents * num_steps # Total "Spacetime" worlds

        # --- Define Accessibility Matrices ---

        # 1. Temporal Accessibility (A_temporal)
        # This is FIXED. It defines the "flow of time".
        # A state (w, t_i) can "see" all states (w', t_j) where j >= i.
        A_temporal = torch.zeros(self.num_states, self.num_states)
        for i in range(self.num_steps):
            start_row = i * self.num_agents
            end_row = (i + 1) * self.num_agents

            # Can see all future and present states
            A_temporal[start_row:end_row, start_row:] = 1

        # We register it as a non-learnable buffer
        self.register_buffer('A_temporal', A_temporal)

        # 2. Epistemic Accessibility (A_epistemic as A_theta)
        # This is LEARNABLE. It defines "who knows what".
        # We initialize it as "siloed" (agents only see themselves at the same time).
        # E.g., (A, t0) only sees (A, t0). (B, t1) only sees (B, t1).
        initial_epistemic_logits = torch.ones(self.num_states, self.num_states) * -10.0
        for i in range(self.num_states):
            initial_epistemic_logits[i, i] = 10.0 # Strongly see self

        self.A_epistemic_logits = nn.Parameter(initial_epistemic_logits)

        # --- Proposition Storage ---
        # We'll store propositions as [Lower, Upper] bounds
        self.propositions = {}

    def get_A_epistemic(self):
        """Get the learnable accessibility matrix (A_theta)"""
        return torch.sigmoid(self.A_epistemic_logits)

    # --- Core Modal Operators (from paper) ---

    def op_box(self, prop_bounds, A_matrix):
        """Differentiable Box (Necessity) operator: □φ
           L(□φ) = soft-min_j ( (1 - A_ij) + L(φ_j) )
           U(□φ) = soft-min_j ( (1 - A_ij) + U(φ_j) )
        """
        # prop_bounds is (S, 2) -> (1, S, 2)
        # A_matrix is (S, S) -> (S, S, 1)
        prop_exp = prop_bounds.unsqueeze(0)
        A_exp = A_matrix.unsqueeze(-1)

        # Differentiable Implication (1 - A) + P
        # (S, S, 2)
        implication = (1.0 - A_exp) + prop_exp

        # Soft-min over dimension 1 (the 'j' worlds)
        # We multiply by -1/TEMP, logsumexp, then -TEMP
        # (S, 2)
        # Add small epsilon to prevent log(0) if all implications are large negative
        logsumexp = torch.logsumexp(-implication / TEMP + 1e-9, dim=1)
        bounds = -TEMP * logsumexp

        # Clamp to ensure valid logic bounds [0, 1]
        return torch.clamp(bounds, 0, 1)

    def op_diamond(self, prop_bounds, A_matrix):
        """Differentiable Diamond (Possibility) operator: ◇φ
           L(◇φ) = soft-max_j ( A_ij * L(φ_j) )
           U(◇φ) = soft-max_j ( A_ij * U(φ_j) )
        """
        # prop_bounds is (S, 2) -> (1, S, 2)
        # A_matrix is (S, S) -> (S, S, 1)
        prop_exp = prop_bounds.unsqueeze(0)
        A_exp = A_matrix.unsqueeze(-1)

        # Differentiable Conjunction (A * P)
        # (S, S, 2)
        conjunction = A_exp * prop_exp

        # Soft-max over dimension 1 (the 'j' worlds)
        # (S, 2)
        # Add small epsilon to prevent log(0) if all conjunctions are large negative
        logsumexp = torch.logsumexp(conjunction / TEMP + 1e-9, dim=1)
        bounds = TEMP * logsumexp

        return torch.clamp(bounds, 0, 1)

    # --- Specific Operator Implementations ---

    def K(self, prop_name):
        """Epistemic 'Knows' (K_a): Box over Epistemic relation"""
        prop_bounds = self.propositions[prop_name]
        return self.op_box(prop_bounds, self.get_A_epistemic())

    def G(self, prop_name):
        """Temporal 'Globally' (G): Box over Temporal relation"""
        prop_bounds = self.propositions[prop_name]
        return self.op_box(prop_bounds, self.A_temporal)

    def F(self, prop_name):
        """Temporal 'Eventually' (F): Diamond over Temporal relation"""
        prop_bounds = self.propositions[prop_name]
        return self.op_diamond(prop_bounds, self.A_temporal)

    # --- Composite Operator Function ---
    def K_G(self, prop_name):
        """Composite Operator: K(G(φ))"""
        # 1. Inner formula: G(φ)
        # This computes G(φ) for *all* 6 states
        g_phi_bounds = self.G(prop_name)

        # 2. Outer formula: K(...)
        # We feed the *result* of G(φ) into K.
        # This is the "nested aggregation".
        # We detach to treat the inner result as a fixed proposition
        # for the outer operator, which is a standard LNN step.
        return self.op_box(g_phi_bounds.detach(), self.get_A_epistemic())

def main():

    # --- Setup Save Folder ---
    os.makedirs(SAVE_FOLDER, exist_ok=True)
    print(f"Artifacts will be saved to: {os.path.abspath(SAVE_FOLDER)}")

    print("=" * 60)
    print("  MLNN Demo: Epistemic, Temporal, and Composite Operators")
    print("=" * 60)
    print("Scenario: 2 Agents (A, B), 3 Timesteps (t0, t1, t2)")
    print("States: [s0=(A,t0), s1=(B,t0), s2=(A,t1), s3=(B,t1), s4=(A,t2), s5=(B,t2)]")

    model = MLNN()

    # --- Define Ground Truth ---
    # isOnline = [0, 1, 1, 0, 1, 1]
    isOnline_truth = torch.tensor([0, 1, 1, 0, 1, 1], dtype=torch.float32)
    # Store as [Lower, Upper] bounds. [0,0] for False, [1,1] for True
    model.propositions['isOnline'] = torch.stack([isOnline_truth, isOnline_truth], dim=1)

    print(f"\nGround Truth 'isOnline': {isOnline_truth.numpy()}")

    # --- (1) PROGRESS OF TRAINING ---
    print("\n" + "-" * 60)
    print("(1) TRAINING: Forcing the MLNN to resolve a contradiction")
    print("-" * 60)

    # Back to the original, simpler training idea.
    # Ground Truth `isOnline`: [0, 1, 1, 0, 1, 1]
    # Axiom: "Agent A at t0 (s0) considers it POSSIBLE that the system is online."
    # Axiom: F_epistemic(isOnline)[s0] -> 1.0
    # Let's add `F_epistemic` (a Diamond over A_epistemic)
    model.F_epistemic = lambda prop_name: model.op_diamond(
        model.propositions[prop_name], model.get_A_epistemic()
    )

    print("Training Goal: Resolve a contradiction.")
    print("  - Axiom: Agent A at t0 (s0) MUST consider it POSSIBLE 'isOnline'.")
    print("  - Formula: F_epistemic(isOnline)[s0] (Lower Bound) -> 1.0")
    print("  - Initial State: Agent A (s0) only sees itself (s0), where 'isOnline' is 0.")
    print("  - Contradiction: F_epistemic(isOnline)[s0] evaluates to ~0. LOSS ≈ 1.0")
    print("  - Hypothesis: Model will learn A_theta[0, 1] ≈ 1 (A,t0 -> B,t0).\n")

    # Use a standard optimizer
    optimizer = optim.Adam([model.A_epistemic_logits], lr=0.5) # Only optimize A_theta

    # --- Lists to store history for plotting ---
    epoch_history = []
    loss_history = []
    a_theta_0_1_history = [] # The weight we expect to change
    a_theta_0_0_history = [] # A control weight we expect to stay high

    num_epochs = 32
    for epoch in range(num_epochs):
        model.train()

        # **MODIFICATION: Only allow gradients for the relevant logit**
        # Clone the logits, detach unnecessary parts
        logits_clone = model.A_epistemic_logits.clone()
        irrelevant_mask = torch.ones_like(logits_clone, dtype=torch.bool)
        irrelevant_mask[0, 1] = False # Allow gradient for A[0, 1]
        logits_clone[irrelevant_mask] = logits_clone[irrelevant_mask].detach()

        # Recalculate A_epistemic using the partially detached clone
        A_epistemic_for_loss = torch.sigmoid(logits_clone)

        # Recalculate F_epistemic using this specific A matrix
        truth_bounds = model.op_diamond(model.propositions['isOnline'], A_epistemic_for_loss)
        axiom_truth_lower_bound = truth_bounds[0, 0] # Get L(F_e(isOnline)) at s0

        # Contradiction Loss: We want the lower bound to be 1.0
        loss = (1.0 - axiom_truth_lower_bound)**2

        # Backward pass computes gradients ONLY for A_epistemic_logits[0, 1]
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # --- Store history ---
        current_A = model.get_A_epistemic().detach()
        epoch_history.append(epoch)
        loss_history.append(loss.item())
        a_theta_0_1_history.append(current_A[0, 1].item())
        a_theta_0_0_history.append(current_A[0, 0].item())


        if epoch % 5 == 0:
            print(f"Epoch {epoch:2}: Loss = {loss.item():.6f} | "
                  f"A_theta[s0->s0] = {current_A[0,0].item():.3f} | "
                  f"A_theta[s0->s1] = {current_A[0,1].item():.3f} | "
                  f"A_theta[s0->s2] = {current_A[0,2].item():.3f}") # Check another value

    print("\nTraining complete. A_theta[s0->s1] is now high.")
    print("Other A_theta[0,j] values should remain low due to targeted gradient updates.")

    # --- (2) GENERATE AND SAVE PLOT ---
    print("\n" + "-" * 60)
    print("(2) Generating and saving training plot...")

    fig, ax1 = plt.subplots(figsize=(6, 2))

    # Plot Loss on left Y-axis
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Contradiction Loss\n($L_{contradiction}$)', color='g')
    ax1.plot(epoch_history, loss_history, 'g-', label='$L_{contradiction}$')
    ax1.tick_params(axis='y', labelcolor='g')

    # Create a second Y-axis for the weights
    ax2 = ax1.twinx()
    ax2.set_ylabel('Accessibility Weight ($A_\\theta$)', color='b')
    ax2.plot(epoch_history, a_theta_0_1_history, 'b-', label='$A_\\theta[0,1]$ (A,t0 $\\rightarrow$ B,t0)')
    ax2.plot(epoch_history, a_theta_0_0_history, 'r--', label='$A_\\theta[0,0]$ (A,t0 $\\rightarrow$ A,t0)')
    ax2.tick_params(axis='y', labelcolor='b')
    ax2.set_ylim(-0.05, 1.05) # Set Y-limit for weights

    # Add legend
    fig.legend(loc='upper left', bbox_to_anchor=(0.15, 0.78))
    plt.title('MLNN Training: $L_{contradiction}$ and $A_\\theta$ vs. Epoch')
    fig.tight_layout()

    # Save plot
    plot_path = os.path.join(SAVE_FOLDER, 'training_progress.pdf')
    plt.savefig(plot_path)
    print(f"   Training plot saved to: {plot_path}")
    plt.close(fig) # Close figure to free memory

    # --- (3) RESULTS & (4) EVALUATION ---
    print("\n" + "-" * 60)
    print("(3) RESULTS: Final Accessibility Matrix (A_theta)")
    print("-" * 60)

    model.eval()
    A_E = model.get_A_epistemic().detach()
    print("Learned Epistemic Relation (A_theta). Rows=From, Cols=To")
    state_labels = ["s0(A,t0)", "s1(B,t0)", "s2(A,t1)", "s3(B,t1)", "s4(A,t2)", "s5(B,t2)"]
    print("      " + " ".join([f"{s:^8}" for s in state_labels]))
    for i, row in enumerate(A_E.numpy()):
        print(f"{state_labels[i]:<8}" + " ".join([f"{x:^8.2f}" for x in row]))

    # --- Save Matrix Plot ---
    fig_mat, ax_mat = plt.subplots()
    cax = ax_mat.matshow(A_E.numpy(), cmap='viridis')
    fig_mat.colorbar(cax, label='Accessibility Weight')
    ax_mat.set_xticks(range(len(state_labels)))
    ax_mat.set_yticks(range(len(state_labels)))
    ax_mat.set_xticklabels(state_labels, rotation=90)
    ax_mat.set_yticklabels(state_labels)
    ax_mat.set_xlabel("To State")
    ax_mat.set_ylabel("From State")
    plt.title("Final Learned Epistemic Accessibility ($A_\\theta$)")
    fig_mat.tight_layout()

    matrix_plot_path = os.path.join(SAVE_FOLDER, 'final_A_theta.png')
    plt.savefig(matrix_plot_path)
    print(f"\n   Matrix plot saved to: {matrix_plot_path}")
    plt.close(fig_mat)


    print("\n" + "-" * 60)
    print("(4) EVALUATION: Checking all operator claims")
    print("-" * 60)

    # Helper to format output
    def check(formula_name, bounds_tensor, state_idx, expected_val):
        lower, upper = bounds_tensor[state_idx, 0].item(), bounds_tensor[state_idx, 1].item()
        # **MODIFICATION: Adjust thresholds for fuzzy logic with TEMP=0.1**
        if lower > 0.8: val = "True"       # More likely True
        elif upper < 0.2: val = "False"    # More likely False
        else: val = "Fuzzy/Unknown"        # Truly fuzzy/uncertain
        status = "✅" if val == expected_val else "❌"
        print(f" {status} {formula_name:<18} at s{state_idx}: "
              f"Bounds=[{lower:.2f}, {upper:.2f}] -> {val} (Expected: {expected_val})")

    # --- Check 1: Epistemic Logic (The trained axiom) ---
    print("--- 1. Epistemic Logic (using *Learned* A_theta) ---")
    F_e_isOnline = model.F_epistemic('isOnline')
    check("F_epistemic(isOnline)", F_e_isOnline, 0, "True") # Softmax should yield high value now

    K_isOnline = model.K('isOnline')
    check("K(isOnline)", K_isOnline, 0, "False") # Softmin should still yield low value
    print("    -> CORRECT: Agent A (s0) now sees s0 (False=0) and s1 (True=1),")
    print("       so it's POSSIBLE (True) but not KNOWN (False). Logic holds.")

    # --- Check 2: Temporal Logic ---
    print("\n--- 2. Temporal Logic (using *Fixed* A_temporal) ---")
    G_isOnline = model.G('isOnline')
    check("G(isOnline)", G_isOnline, 0, "False") # Softmin is low because L[s0]=0, L[s3]=0
    print("    -> CORRECT: 'isOnline' is False at s0, s3. Not Globally True.")

    F_isOnline = model.F('isOnline')
    check("F(isOnline)", F_isOnline, 0, "True") # Softmax is high because L[s1]=1
    print("    -> CORRECT: 'isOnline' is True at s1 (which s0 can see temporally).")

    check("G(isOnline)", G_isOnline, 4, "True")
    print("    -> CORRECT: From s4=(A,t2), 'isOnline' is True=1 at s4 and s5.")
    print("       The only visible future states are [s4, s5], where it is [1, 1].")


    # --- Check 3: Composite Operators ---
    print("\n--- 3. Composite Logic ---")
    KG_isOnline = model.K_G('isOnline')
    check("K(G(isOnline))", KG_isOnline, 0, "False") # Should still be False
    print("    -> ANALYSIS:")
    g_truth_vector = G_isOnline.detach()[:,0].numpy().round(2)
    print(f"       1. Inner G(isOnline) truth vector ≈ {g_truth_vector}") # Note fuzzy values
    print("       2. Outer K(...) uses learned A_theta (sees s0, s1 mostly).")
    print(f"       3. K(G)[s0] ≈ soft-min( (1-A[0,0])+G[s0], (1-A[0,1])+G[s1], ... )")
    print(f"          ≈ soft-min( (1-{A_E[0,0]:.2f})+{g_truth_vector[0]:.2f}, (1-{A_E[0,1]:.2f})+{g_truth_vector[1]:.2f}, ... )")
    # Calculation: soft-min( (1-1.0)+0.0, (1-0.99)+0.0 ) = soft-min(0.0, 0.01) ≈ 0.0
    print(f"          ≈ soft-min( {1-A_E[0,0].item()+g_truth_vector[0]:.2f}, {1-A_E[0,1].item()+g_truth_vector[1]:.2f}, ... ) ≈ 0.0 (False)")
    print("    -> CONCLUSION: Agent A correctly deduces it does NOT know 'G(isOnline)'.")

    # --- Check 4: Generalization Test ---
    print("\n--- 4. Generalization Check ---")
    print("Checking if learning A[0,1] affected unrelated states (e.g., s2).")

    # K(isOnline) at s2 = (A, t1)
    # Ground Truth: isOnline = [0, 1, 1, 0, 1, 1] -> L[s2]=1
    # Initially A[2,2]≈1, A[2,3]≈0.
    # K(isOnline)[s2] ≈ soft-min((1-A[2,2])+L[s2], (1-A[2,3])+L[s3], ...)
    #                 ≈ soft-min((1-1.0)+1.0, (1-0.0)+0.0, ...)
    #                 ≈ soft-min(1.0, 1.0, ...) ≈ 1.0 (True)
    check("K(isOnline)", K_isOnline, 2, "True")
    print(f"    -> State s2 (A,t1) sees states with A_theta weights:")
    print(f"       s2: {A_E[2,2]:.2f}, s3: {A_E[2,3]:.2f}")
    # Check that A[2,2] is high and A[2,3] is low (isolation maintained)
    if A_E[2, 2] > 0.9 and A_E[2, 3] < 0.1:
        print("    -> ✅ CORRECT: Training for s0 did not negatively impact s2's isolation.")
    else:
        print("    -> ❌ FAILED: Training for s0 caused unintended changes for s2.")

    # --- (5) SAVE MODEL ---
    print("\n" + "-" * 60)
    print("(5) Saving model...")
    model_path = os.path.join(SAVE_FOLDER, 'mlnn_model.pth')
    torch.save(model.state_dict(), model_path)
    print(f"   Model state dict saved to: {model_path}")

if __name__ == "__main__":
    main()

Artifacts will be saved to: /content/drive/MyDrive/Projects/modal/epistemic_01
  MLNN Demo: Epistemic, Temporal, and Composite Operators
Scenario: 2 Agents (A, B), 3 Timesteps (t0, t1, t2)
States: [s0=(A,t0), s1=(B,t0), s2=(A,t1), s3=(B,t1), s4=(A,t2), s5=(B,t2)]

Ground Truth 'isOnline': [0. 1. 1. 0. 1. 1.]

------------------------------------------------------------
(1) TRAINING: Forcing the MLNN to resolve a contradiction
------------------------------------------------------------
Training Goal: Resolve a contradiction.
  - Axiom: Agent A at t0 (s0) MUST consider it POSSIBLE 'isOnline'.
  - Formula: F_epistemic(isOnline)[s0] (Lower Bound) -> 1.0
  - Initial State: Agent A (s0) only sees itself (s0), where 'isOnline' is 0.
  - Contradiction: F_epistemic(isOnline)[s0] evaluates to ~0. LOSS ≈ 1.0
  - Hypothesis: Model will learn A_theta[0, 1] ≈ 1 (A,t0 -> B,t0).

Epoch  0: Loss = 0.673702 | A_theta[s0->s0] = 1.000 | A_theta[s0->s1] = 0.000 | A_theta[s0->s2] = 0.000
Epoch  5: Loss = 0