In [None]:
import numpy as np
from collections import defaultdict
import random
from tqdm import trange
import copy
import networkx as nx
import matplotlib.pyplot as plt
import sys
import igraph
from matplotlib import cm, colors
random.seed(42)
import seaborn as sns
from testing_environments import ContinuousTMaze, GridEnv,GridEnvRightDownNoCue, GridEnvRightDownNoSelf, GridEnvDivergingMultipleReward, GridEnvDivergingSingleReward
from util import *

In [None]:
def stationary_distribution_eig(P):
    """
    Computes the stationary distribution of transition matrix P
    by solving P^T * v = v, and normalizing v so sum(v)=1.

    Parameters
    ----------
    P : 2D np.ndarray, shape (N, N)
        Transition matrix of a Markov chain (rows sum to 1).

    Returns
    -------
    z : 1D np.ndarray, shape (N,)
        The stationary distribution (row vector) such that zP = z.
    """
    # Eigen-decomposition of P^T
    eigenvalues, eigenvectors = np.linalg.eig(P.T)

    # Find index of eigenvalue 1 (within a numerical tolerance)
    idx = np.argmin(np.abs(eigenvalues - 1.0))

    # The corresponding eigenvector
    v = np.real(eigenvectors[:, idx])  # ensure it's real

    # Normalize so that it sums to 1
    # We also make sure all entries are non-negative (small numerical errors can introduce tiny negatives)
    v = np.where(v < 0, 0, v)  # clip negative values to 0 if needed
    if np.sum(v) == 0:
        raise ValueError("No non-negative eigenvector found corresponding to eigenvalue 1.")
    z = v / np.sum(v)

    return z

def stationary_distribution_power(P, max_iter=1000, tol=1e-12):
    """
    Computes the stationary distribution of transition matrix P
    by repeated multiplication (power iteration).

    Parameters
    ----------
    P : 2D np.ndarray, shape (N, N)
        Transition matrix of a Markov chain (rows sum to 1).
    max_iter : int
        Maximum number of iterations.
    tol : float
        Convergence tolerance (on L1 or L2 difference).

    Returns
    -------
    z : 1D np.ndarray, shape (N,)
        The stationary distribution (row vector).
    """
    N = P.shape[0]
    # Start from a uniform distribution (or random)
    z = np.ones(N) / N  

    for _ in range(max_iter):
        z_next = z @ P  # matrix multiplication from the left
        # Check convergence by comparing difference
        if np.linalg.norm(z_next - z, 1) < tol:
            break
        z = z_next

    # Normalize (just in case of tiny drift)
    z /= np.sum(z)
    return z
def row_normalize(matrix):
    """
    Returns a row-normalized copy of 'matrix'.
    Each row of the result sums to 1.
    """
    # Convert to float to avoid integer division issues
    matrix = matrix.astype(float)
    
    # Sum over columns, keep dimension for broadcasting
    row_sums = matrix.sum(axis=1, keepdims=True)

    # Avoid division by zero by replacing zeros with 1.0
    row_sums[row_sums == 0] = 1.0
    
    # Divide each row by its sum
    normalized = matrix / row_sums
    
    return normalized

In [None]:
size = 4
env_size = (size,size)
rewarded_terminal = env_size[0]*env_size[1]
cue_states = [6]
# env = GridEnvRightDownNoSelf(env_size=env_size, 
#                              rewarded_terminal = [rewarded_terminal],
#                              cue_states=cue_states)
env = GridEnv(env_size=env_size, 
                             rewarded_terminal = [rewarded_terminal],
                             cue_states=cue_states)
# env = GridEnvRightDownNoSelf(cue_states=[6])

n_episodes = 1000
max_steps_per_episode = 100

dataset = generate_dataset(env, n_episodes, max_steps_per_episode)

In [None]:
P = transition_matrix(dataset)

# P = P[1:, 1:]

P = row_normalize(P)


In [None]:
sns.heatmap(P)

In [None]:
# P = np.array([
#     [0.9, 0.1, 0.0],
#     [0.9, 0.0, 0.1],
#     [1.0, 0.0, 0.0]
# ])

z = stationary_distribution_eig(P)
print("Stationary distribution (via eigenvector):", z)

In [None]:
# if __name__ == "__main__":
# P = np.array([
#     [0.9, 0.1, 0.0],
#     [0.9, 0.0, 0.1],
#     [1.0, 0.0, 0.0]
# ])





z = stationary_distribution_power(P[1:,1:])
print("Stationary distribution (via power iteration):", z)

In [None]:
# P = np.array([
#     [0.9, 0.1, 0.0],
#     [0.9, 0.0, 0.1],
#     [1.0, 0.0, 0.0]
# ])

# z = stationary_distribution_eig(P)
# print("Stationary distribution (via eigenvector):", z)

In [None]:
# [[P[0,0]*z[0]/z[0], P[0,1]*z[0]/z[1], P[0,2]*z[0]/z[2]], 
#  [P[1,0]*z[1]/z[0], P[1,1]*z[1]/z[1], P[1,2]*z[1]/z[2]], 
#  [P[2,0]*z[2]/z[0], P[2,1]*z[2]/z[1], P[2,2]*z[2]/z[2]]]

In [None]:
P_r = retrospective_transition_matrix(P[1:,1:], z)


In [None]:
sns.heatmap(P_r)

In [None]:
# import numpy as np
def successor_representation(P, gamma=0.9):
    """
    Compute the Successor Representation (SR) for a Markov chain or MDP 
    given a prospective (forward) transition matrix P and a discount factor gamma.

    SR = (I - gamma*P)^{-1}

    Parameters
    ----------
    P : np.ndarray of shape (n, n)
        The forward (prospective) transition matrix, where P[i,j] 
        is the probability of transitioning from state i to state j.
        Typically, rows sum to 1.
    gamma : float
        The discount factor in [0, 1). Commonly 0.9 or similar.

    Returns
    -------
    SR : np.ndarray of shape (n, n)
        The successor representation matrix.
        SR[i, j] can be interpreted as the expected discounted time 
        spent in state j if you start in state i.
    """
    n = P.shape[0]
    I = np.eye(n)
    # Compute (I - gamma * P)^{-1} if it's invertible
    SR = np.linalg.inv(I - gamma * P)
    return SR

def predecessor_representation(P_r, gamma=0.9):
    """
    Compute the predecessor representation for a time-reversed (retrospective)
    transition matrix P_r and discount factor gamma.

    PR = (I - gamma*P_r)^{-1}

    Parameters
    ----------
    P_r : 2D np.ndarray, shape (n, n)
        The time-reversed transition matrix.
    gamma : float
        Discount factor.

    Returns
    -------
    PR : 2D np.ndarray, shape (n, n)
        The predecessor representation matrix.
    """
    n = P_r.shape[0]
    I = np.eye(n)
    # Compute (I - gamma * P_r)^{-1}
    PR = np.linalg.inv(I - gamma * P_r)
    return PR

# -----------------------------------------------------------
# Example usage
# -----------------------------------------------------------
# if __name__ == "__main__":
    # Suppose we have the retrospective transition matrix (approx from your example)
    # This is the "time-reversed" version of the original P_s, with z ~ [0.9009, 0.0901, 0.009].
    # NOTE: The exact numbers may vary slightly based on rounding.
# P_r = np.array([
#     [0.9,   1.0,  0.0],
#     [0.09,  0.0,  1.0],
#     [0.01,  0.0,  1.0]
# ])

gamma = 0.9
PR = predecessor_representation(P_r, gamma)

print("Retrospective transition matrix, P_r:")
print(P_r, "\n")

print(f"Predecessor Representation (gamma={gamma}):")
# Rounded for readability
print(PR.round(2))


In [None]:
SR = successor_representation(P, gamma)

In [None]:
sns.heatmap(PR,             xticklabels=range(1, 17), 
            yticklabels=range(1, 17))

In [None]:

sns.heatmap(SR[1:,1:], xticklabels=range(1, 17), yticklabels=range(1, 17) )

In [None]:
# import numpy as np

def predecessor_representation_contingency(Mp):
    """
    Compute the 'contingency' version of the Predecessor Representation (PR).

    Mp : np.ndarray of shape (n, n)
        The Predecessor Representation matrix.
    
    Returns
    -------
    Mp_cont : np.ndarray of shape (n, n)
        The PR contingency matrix, Mp - (Mp*E)/m.
    """
    n = Mp.shape[0]
    E = np.ones((n, n))
    # Mp * E is shape (n, n), each row i is the sum of row i repeated across columns.
    # Divide by n to get the row average, then subtract from Mp.
    Mp_cont = Mp - (Mp @ E) / n
    return Mp_cont

def successor_representation_contingency(Ms):
    """
    Compute the 'contingency' version of the Successor Representation (SR).

    Ms : np.ndarray of shape (n, n)
        The Successor Representation matrix.

    Returns
    -------
    Ms_cont : np.ndarray of shape (n, n)
        The SR contingency matrix, Ms - (Ms*E)/m.
    """
    n = Ms.shape[0]
    E = np.ones((n, n))
    Ms_cont = Ms - (Ms @ E) / n
    return Ms_cont




In [None]:
# --------------------------------------------------------------------
# Example Usage
# --------------------------------------------------------------------
# if __name__ == "__main__":
    # Example: Suppose we have a 3x3 PR matrix from earlier
# Mp = np.array([
#     [ 9.01,  9.11,  8.20],
#     [ 0.90,  0.81,  1.73],
#     [ 0.09,  0.08,  0.07]
# ])
Mp = PR
# And an example SR matrix (made-up values)
# Ms = np.array([
#     [ 1.2,  1.0,  0.8],
#     [ 0.1,  1.6,  2.5],
#     [ 0.3,  0.9,  3.1]
# ])
Ms = SR

Mp_cont = predecessor_representation_contingency(Mp)
Ms_cont = successor_representation_contingency(Ms)

print("Predecessor Representation (Mp):")
print(Mp.round(2), "\n")

print("Mp contingency = Mp - (Mp*E)/n :")
print(Mp_cont.round(2), "\n")

print("Successor Representation (Ms):")
print(Ms.round(2), "\n")

print("Ms contingency = Ms - (Ms*E)/n :")
print(Ms_cont.round(2))


In [None]:
sns.heatmap(Mp_cont, xticklabels=range(1, 17), yticklabels=range(1, 17) , cmap='RdBu_r', center=0)
plt.title('Predecessor representation contingency')

In [None]:
PRC_MAP = Mp_cont[:,-1]
PRC_MAP = np.reshape(PRC_MAP, (4,4))
num_rows, num_cols = PRC_MAP.shape
annotations = np.arange(1, num_rows * num_cols + 1).reshape(num_rows, num_cols)
sns.heatmap(PRC_MAP,  xticklabels=range(1, 5), yticklabels=range(1, 5),  
            cmap='RdBu_r', 
                      annot=annotations,  # Provide the annotation array
            fmt='d',            # Integer formatting
            # cmap='viridis',
            center=0 , 
            annot_kws={"size": 20} )

In [None]:
sns.heatmap(Ms_cont[1:,1:], xticklabels=range(1, 17), yticklabels=range(1, 17) )
plt.title('Successor representation contingency')

In [None]:
# P_r_ = np.reshape(P_r, (env_size[0],env_size[1]))
# P_r_ = np.transpose(P_r_)
# sns.heatmap(P_r_)