In [1]:
import math
import numpy as np
import scipy.special

inf  = float("inf")

def soft_bellman_operation(env, reward):
    
    # Input:    env    :  environment object
    #           reward :  SxA dimensional vector 
    #           horizon:  finite horizon
        
    discount = env.discount
    horizon = env.horizon
    
    if horizon is None or math.isinf(horizon):
        raise ValueError("Only finite horizon environments are supported")
    
    n_states  = env.n_states
    n_actions = env.n_actions
    
#     T = env.transition_matrix
    
    V = np.zeros(shape = (horizon, n_states))
    Q = np.zeros(shape = (horizon, n_states,n_actions))
        
    broad_R = reward

    # Base case: final timestep
    # final Q(s,a) is just reward
    Q[horizon - 1, :, :] = broad_R
    # V(s) is always normalising constant
    V[horizon - 1, :] = scipy.special.logsumexp(Q[horizon - 1, :, :], axis=1)

    # Recursive case
    for t in reversed(range(horizon - 1)):
#         next_values_s_a = T @ V[t + 1, :]
#         next_values_s_a = next_values_s_a.reshape(n_states,n_actions)
        for a in range(n_actions):
            Ta = env.transition_probability[:,a,:]
            next_values_s_a = Ta@V[t + 1, :]
            Q[t, :, a] = broad_R[:,a] + discount * next_values_s_a
            
        V[t, :] = scipy.special.logsumexp(Q[t, :, :], axis=1)

    pi = np.exp(Q - V[:, :, None])

    return V, Q, pi


def create_Pi(pi):
    horizon, n_states, n_actions = pi.shape
    
    Pi = np.zeros(((horizon - 1)*n_states*n_actions,1))
    
    for t in range(horizon-1):
        curr_pi = pi[t].flatten('F')[:,None]
#         print(curr_pi.shape)
        next_pi = pi[t+1].flatten('F')[:,None]
        Pi[t*n_states*n_actions:(t+1)*n_states*n_actions] = np.log(curr_pi) - np.log(next_pi)
    return Pi

In [39]:
class BlockingGridworld(object):
    """
    Gridworld MDP.
    """

    def __init__(self, grid_size, wind, discount,horizon,start_state, feature_dim, p1, p2):
        """
        grid_size: Grid size. int.
        wind: Chance of moving randomly. float.
        discount: MDP discount. float.
        -> Gridworld
        """

        self.actions = ((1, 0), (0, 1), (-1, 0), (0, -1))
        self.n_actions = len(self.actions)
        self.n_states = grid_size**2
        self.grid_size = grid_size
        self.wind = wind
        self.wind_buffer = wind
        
        self.discount = discount
        self.horizon = horizon
        self.feature_dim = feature_dim
        self.p1 = p1
        self.p2 = p2
        
        self.blocked_states = [6,7,8,9]
        
        # Preconstruct the transition probability array.
        self.transition_probability = np.array(
            [[[self._transition_probability(i, j, k)
               for k in range(self.n_states)]
              for j in range(self.n_actions)]
             for i in range(self.n_states)])
        
        
        
        self.normalize_transition_matrices()
        
#         self.make_state_determinstic(1)
#         self.make_state_determinstic(2)
#         self.make_state_determinstic(3)
#         self.make_state_determinstic(4)
        
        
        
        self.start_state = start_state
#         self.make_state_determinstic(12)
        self.reward_v = np.array([[self.reward(s) for s in range(self.n_states)] for a in range(self.n_actions)]).T
    def __str__(self):
        return "Gridworld({}, {}, {})".format(self.grid_size, self.wind,
                                              self.discount)

    def int_to_point(self, i):
        """
        Convert a state int into the corresponding coordinate.

        i: State int.
        -> (x, y) int tuple.
        """

        return (i % self.grid_size, i // self.grid_size)

    def point_to_int(self, p):
        """
        Convert a coordinate into the corresponding state int.

        p: (x, y) tuple.
        -> State int.
        """

        return p[0] + p[1]*self.grid_size

    def neighbouring(self, i, k):
        """
        Get whether two points neighbour each other. Also returns true if they
        are the same point.

        i: (x, y) int tuple.
        k: (x, y) int tuple.
        -> bool.
        """

        return abs(i[0] - k[0]) + abs(i[1] - k[1]) <= 1

    def _transition_probability(self, i, j, k):
        """
        Get the probability of transitioning from state i to state k given
        action j.

        i: State int.
        j: Action int.
        k: State int.
        -> p(s_k | s_i, a_j)
        """

        xi, yi = self.int_to_point(i)
        xj, yj = self.actions[j]
        xk, yk = self.int_to_point(k)
            
        
        # is this state blocked in the MDP?
        left_column = (i == 1) or (i == 2) or (i == 3) or (i == 4)
        lc = [1,2,3,4]
        
        # disallow self transitions for the left column
#         if i == k and left_column:
#             return 0.0
        
        #disallow transition from the leftmost column to the one next to it
        if k in self.blocked_states and not i in self.blocked_states:
            return 0.0
        
        #disallow transition from the 2nd column to the left most column 
#         if k in lc and i in self.blocked_states:
#             return 0.0
        
        if not self.neighbouring((xi, yi), (xk, yk)):
            return 0.0

        # Is k the intended state to move to?
        if (xi + xj, yi + yj) == (xk, yk):
            return 1 - self.wind + self.wind/self.n_actions

        # If these are not the same point, then we can move there by wind.
        if (xi, yi) != (xk, yk):
            return self.wind/self.n_actions

        # If these are the same point, we can only move here by either moving
        # off the grid or being blown off the grid. Are we on a corner or not?
        if (xi, yi) in {(0, 0), (self.grid_size-1, self.grid_size-1),
                        (0, self.grid_size-1), (self.grid_size-1, 0)}:
            # Corner.
            # Can move off the edge in two directions.
            # Did we intend to move off the grid?
            if not (0 <= xi + xj < self.grid_size and
                    0 <= yi + yj < self.grid_size):
                # We intended to move off the grid, so we have the regular
                # success chance of staying here plus an extra chance of blowing
                # onto the *other* off-grid square.
                return 1 - self.wind + 2*self.wind/self.n_actions
            else:
                # We can blow off the grid in either direction only by wind.
                return 2*self.wind/self.n_actions
        else:
            # Not a corner. Is it an edge?
            if (xi not in {0, self.grid_size-1} and
                yi not in {0, self.grid_size-1}):
                # Not an edge.
                return 0.0

            # Edge.
            # Can only move off the edge in one direction.
            # Did we intend to move off the grid?
            if not (0 <= xi + xj < self.grid_size and
                    0 <= yi + yj < self.grid_size):
                # We intended to move off the grid, so we have the regular
                # success chance of staying here.
                return 1 - self.wind + self.wind/self.n_actions
            else:
                # We can blow off the grid only by wind.
                return self.wind/self.n_actions
            
    def normalize_transition_matrices(self):
        for a in range(self.n_actions):
            P = self.transition_probability[:,a,:]
            sum_P = P.sum(axis = 1)
            normalized_P = P/sum_P[:,None]
            self.transition_probability[:,a,:] = normalized_P
            
    def reward(self, state_int):
        """
        Reward for being in state state_int.

        state_int: State integer. int.
        -> Reward.
        """

        if state_int == self.n_states - 1:
            return 1
        return 0
    
    def make_state_determinstic(self, s):
        
        for s_prime in range(self.n_states):
            for a in range(self.n_actions):
                self.transition_probability[s,a,s_prime] = 0.0
                
        for a in range(self.n_actions):
            self.transition_probability[s,a,s-1] = 1.0
         
            
    def generate_trajectories(self):
        # Memoization dictionary to store already computed results
        horizon = self.horizon
        memo_dict = {}

        def generate_recursive(current_time, current_state):
            # Check if the result is already computed
            if (current_time, current_state) in memo_dict:
                return memo_dict[(current_time, current_state)]

            # Base case: when we reach the horizon, return an empty trajectory
            if current_time == horizon:
                return [[]]

            trajectories = [] 

            # Recursive case: try all actions and next states
            for action in range(self.n_actions):
                next_state_probs = self.transition_probability[current_state,action,:] if current_state is not None else self.start_distribution

                for next_state in range(self.n_states):
                    # Recursive call
                    if next_state_probs[next_state] != 0:
                        next_trajectories = generate_recursive(current_time + 1, next_state)

                        # Extend the current trajectory with the current action and next state
                        if current_state == None:

                            trajectories.extend([(next_state, action )] + traj for traj in next_trajectories)
                        else:
                            trajectories.extend([(current_state, action, next_state)] + traj for traj in next_trajectories)

            # Memoize the result before returning
            memo_dict[(current_time, current_state)] = trajectories
            
            print("Length is:", len(trajectories))
            return trajectories

        # Start the recursion with time = 0 and no initial state
        # For a unique starting state
        return generate_recursive(0, self.start_state)
    
    def get_trajectory_matrix(self):
        
        trajs = self.generate_trajectories()
        
        M = np.zeros((len(trajs), self.n_states*self.n_actions ))
        
        
        for i in range(len(trajs)):
            for t,time_step in enumerate(trajs[i]):
                
                s = time_step[0]
                a = time_step[1]
                
                pos_ = a*self.n_states + s
                
                M[i][pos_] += self.discount**t
                
        return M
    
   
    def manhatten_distance(self, i,k):
        
        xi, yi = self.int_to_point(i)
        xk, yk = self.int_to_point(k)
        
        return np.abs(xi-xk) + np.abs(yi- yk) 
    
    def feature_vector(self,s,a):
        
        f = np.zeros((self.feature_dim,1))
        
        f[0] = self.manhatten_distance(s, self.p1)
        
        f[1] = self.manhatten_distance(s, self.p2)
        
        
        
        xs, ys  = self.int_to_point(s)
        xa, ya = self.actions[a]
        
        des_ = (xs+xa,ys+ya)
        des_s = self.point_to_int(des_)
        
        if self.manhatten_distance(des_s , self.p1) <= f[0]:
            f[0] += 1
        else: 
            f[0] -= 1
        
        if self.manhatten_distance(des_s , self.p2) <= f[1]:
            f[1] += 1
        else: 
            f[1] -= 1
        
        return f
    
    def F_matrix(self):
        F = np.zeros((self.n_states*self.n_actions, 2))
        
        for s in range(self.n_states):
            for a in range(self.n_actions):
                F[s + a*self.n_states] = self.feature_vector(s,a).T
                
        return F
    

In [40]:
grid_size = 5
wind = 0.1
discount = 0.9
horizon = 10
start_state = 6
feature_dim = 2
p1 = 0
p2 = 24
theta = 5*np.ones((2,1))


gw = BlockingGridworld(grid_size,wind,discount,horizon,start_state, feature_dim, p1,p2)

# assert that the transition matrices are markovian
for a in range(gw.n_actions):
    assert np.linalg.norm(gw.transition_probability[:,a,:].sum(axis = 1)[:,None] -np.ones((25,1))) <= 1e-5


reward = gw.reward_v
F = gw.F_matrix()
ones_ = np.ones((gw.n_states*gw.n_actions,1))

# reward[-1][-1] = 1
V,Q,pi = soft_bellman_operation(gw,reward)

v = np.linalg.lstsq(F,ones_, rcond = None)

# assert that ran(1) \in \ran(F)
# assert np.linalg.norm(F@v[0] - ones_) <= 1e-5


# Construct Psi and Pi
I = np.eye(gw.n_states)
E = I
P = gw.transition_probability[:,0,:]

for a in range(1, gw.n_actions):
    E = np.vstack((E,I))
    P = np.vstack((P, gw.transition_probability[:,a,:]))
    
gamma = gw.discount
psi_rows = (horizon - 1)*gw.n_states*gw.n_actions
psi_cols = horizon*gw.n_states

Pi = create_Pi(pi)

Psi = np.zeros((psi_rows, psi_cols))

for i in range(horizon-1):
    Psi[i*gw.n_states*gw.n_actions:(i+1)*gw.n_states*gw.n_actions, i*gw.n_states:(i+1)*gw.n_states] = E
    Psi[i*gw.n_states*gw.n_actions:(i+1)*gw.n_states*gw.n_actions, (i+1)*gw.n_states:(i+2)*gw.n_states] = -discount*P

# vector that is in kernel of Psi, sanity check for Psi
v = np.ones((horizon*gw.n_states,1))
for i in range(horizon):
    v[i*gw.n_states:(i+1)*gw.n_states] = discount**(horizon-1 -i)
    
assert np.linalg.norm(Psi@v) <= 1e-5

print("The rank of P is  : ", np.linalg.matrix_rank(P))
print("The rank of Psi is: ", np.linalg.matrix_rank(Psi))

# find the  solution
v = np.linalg.pinv(Psi) @ Pi
print("The solution of mu (V_{T-1}) is: ", v[-grid_size**2:])
print("The dim of kernel of Psi is: ", scipy.linalg.null_space(Psi).shape[1])

if scipy.linalg.null_space(Psi).shape[1] <= 1:
    print("The min of kernel projection is:", min(scipy.linalg.null_space(Psi)[-grid_size**2:]))
    print("The max of kernel projection is:", max(scipy.linalg.null_space(Psi)[-grid_size**2:]))

The rank of P is  :  25
The rank of Psi is:  249
The solution of mu (V_{T-1}) is:  [[ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [ 0.07475761]
 [-0.92524239]]
The dim of kernel of Psi is:  1
The min of kernel projection is: [-0.0930154]
The max of kernel projection is: [-0.0930154]


In [23]:
K = scipy.linalg.null_space(Psi)

for i in range(K.shape[1]):
    
    print("v(%d): "%i,np.round(K[:,i][-grid_size**2:],decimals=3))

    print("\n")

v(0):  [ 0.     0.     0.     0.    -0.    -0.    -0.912 -0.411  0.004  0.
  0.    -0.     0.     0.     0.     0.     0.     0.     0.    -0.
 -0.    -0.     0.     0.     0.   ]


v(1):  [-0.095 -0.095 -0.095 -0.095 -0.095 -0.095  0.     0.    -0.    -0.
 -0.095 -0.095 -0.095 -0.095 -0.095 -0.095 -0.095 -0.095 -0.095 -0.095
 -0.095 -0.095 -0.095 -0.095 -0.095]


v(2):  [-0.     0.     0.     0.    -0.     0.    -0.05   0.112  0.029  0.992
  0.     0.     0.     0.    -0.     0.    -0.    -0.    -0.    -0.
  0.     0.    -0.    -0.    -0.   ]


v(3):  [-0.    -0.    -0.     0.    -0.    -0.     0.189 -0.429 -0.879  0.084
 -0.    -0.     0.    -0.    -0.    -0.    -0.     0.    -0.     0.
 -0.    -0.    -0.     0.     0.   ]


v(4):  [-0.    -0.    -0.     0.    -0.     0.    -0.361  0.797 -0.476 -0.094
 -0.     0.     0.    -0.     0.     0.    -0.     0.     0.    -0.
  0.     0.     0.     0.    -0.   ]




In [20]:
def get_state_action(sa,gw):
    n_actions = gw.n_actions
    n_states = gw.n_states
    assert sa < n_states*n_actions
    a = sa // gw.n_states
    s = sa - a*gw.n_states
    
    return (s,a)


def feasible_traj(trajectory, gw):
    # check first state
    T = len(trajectory)
    i = 0
    
    (s,a) = get_state_action(trajectory[0],gw)
    
    if not s == gw.start_state:
        print("Wrong Start State")
        return False
    
    while i < T-1:
        s_prime,a_prime = get_state_action(trajectory[i+1],gw)
        
        if gw.transition_probability[s][a][s_prime] <= 1e-5:
            return False
        
        i+=1
        s = s_prime
        a = a_prime
        
    return True

def get_trajectory_row(trajs,gw):
        
#         trajs = self.generate_trajectories()
        
        M = np.zeros((1, gw.n_states*gw.n_actions ))
      
        
#         for i in range(len(trajs)):
        for t,time_step in enumerate(trajs):
            s,a = get_state_action(time_step,gw)

            pos_ = a*gw.n_states + s

            M[0][pos_] += gw.discount**t
                
        return M
    
    
K = scipy.linalg.null_space(Psi)
K_eta = K[-gw.n_states:,:]

A = E@K_eta
B = F

# Example usage:
# horizon = 3  # Replace this with the desired length of the password
# num_state_action_pairs = 5  # Replace this with the desired range of numbers (0 to num_digits-1)
horizon = gw.horizon
num_state_action_pairs = gw.n_states*gw.n_actions

traj = [0] * horizon
traj[0] = 4
traj[1] = 3
traj[2] = 2
traj[3] = 1


cntr_ = 0

r = get_trajectory_row(traj, gw)
l1 = r@A[:,0]
l2 = r@A[:,1]
l3 = r@A[:,2]
l4 = r@A[:,3]

print("r@k1 = %.4f , r@k2 = %.4f , r@k3 = %.4f, r@k4 = %.4f"%(l1,l2,l3,l4))

n_feasible = 0
wi = True

while True:
    # Print current password
    cntr_ += 1
    
    if feasible_traj(traj, gw):
        n_feasible +=1
        r = get_trajectory_row(traj, gw)
        l1_new = r@A[:,0]
        l2_new = r@A[:,1]
        l3_new = r@A[:,2]
        l4_new = r@A[:,3]
        
        if not np.abs(l1 - l1_new) <= 1e-6 and np.abs(l2 - l2_new) <= 1e-6 \
                 and np.abs(l2 - l2_new) <= 1e-6 and np.abs(l2 - l2_new) <= 1e-6:
            print("Not Weakly Identifiable")
            wi = False
            
            break
#         print("r@k1 = %.4f , r@k2 = %.4f , r@k3 = %.4f, r@k4 = %.4f"%(l1,l2,l3,l4))
#         time.sleep(1)
        l1 = l1_new
        l2 = l2_new
        l3 = l3_new
        l4 = l4_new
        
        if np.random.rand() > 0.999:
            print(n_feasible)

    
    
#     print(traj)
    
    # Increment the last digit
    traj[-1] += 1

    # Check if we need to carry over to the previous digit
    for i in range(horizon - 1, 0, -1):
        if traj[i] == num_state_action_pairs:
            traj[i] = 0
            traj[i - 1] += 1
    
    # Break the loop if the first digit exceeds the range
    if traj[0] == num_state_action_pairs:
        print("Weakly Identifiable!")
        break

r@k1 = 0.0000 , r@k2 = -0.6166 , r@k3 = 0.0000, r@k4 = -0.0000
365
541
1657


KeyboardInterrupt: 

In [35]:
traj = [6,7,6]
feasible_traj(traj,gw)

False

In [41]:
gw.transition_probability[:,0,:]

array([[0.05      , 0.925     , 0.        , 0.        , 0.        ,
        0.025     , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.02564103, 0.02564103, 0.94871795, 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.02564103, 0.02564103, 0.94871795, 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.