In [10]:
import numpy as np
import plotly.graph_objects as go
import random
from abc import ABC, abstractmethod
from queue import PriorityQueue
from scipy.stats import dirichlet
from typing import Callable, Any
import scipy as sp

# Set random seeds for reproducibility
seed_value = 20
random.seed(seed_value)
np.random.seed(seed_value)

# Set the size of the terrain
size = 200  # Size of the grid
scale = 1  # Scale for general hill variations

# Create a 2D grid of coordinates
x = np.linspace(-size / 2, size / 2, size)
y = np.linspace(-size / 2, size / 2, size)
x, y = np.meshgrid(x, y)

# Initialize the terrain with random small hills
z = np.zeros((size, size))
num_hills = 60  # Number of random hills
min_hill_height = 0.5
max_hill_height = 1
min_hill_width = 5
max_hill_width = 15
prominent_hill_height = 5
prominent_hill_width = random.uniform(8, 12)

for _ in range(num_hills):
    # Randomize hill parameters
    hill_height = random.uniform(min_hill_height, max_hill_height)  # Heights between 1 and 2
    hill_x = random.uniform(-size / 2, size / 2)
    hill_y = random.uniform(-size / 2, size / 2)
    hill_width = random.uniform(min_hill_width, max_hill_width)  # Controls hill spread
    
    # Add Gaussian hill to the terrain
    z += hill_height * np.exp(-((x - hill_x) ** 2 + (y - hill_y) ** 2) / (2 * hill_width ** 2))

# Add one prominent hill at height close to 10
prominent_hill_x = random.uniform(-size / 2, size / 2)
prominent_hill_y = random.uniform(-size / 2, size / 2)

z += prominent_hill_height * np.exp(-((x - prominent_hill_x) ** 2 + (y - prominent_hill_y) ** 2) / (2 * prominent_hill_width ** 2))

# Create a 3D surface plot
fig = go.Figure(data=[go.Surface(z=z, x=x, y=y, colorscale="Viridis")])

# Set plot title and axis labels
fig.update_layout(
    title="Randomized Hill Terrain with One Prominent Hill",
    scene=dict(
        xaxis_title="X Axis",
        yaxis_title="Y Axis",
        zaxis_title="Height",
        zaxis=dict(range=[0, 10])
    ),
    autosize=False,
    width=800,
    height=800,
    margin=dict(l=65, r=50, b=65, t=90),
)

# Show the plot
fig.show()


In [11]:
def discretize_terrain(x, y, z, xy_resolution=5, num_z_bins=20):
    """
    Discretizes a continuous terrain into a 3D grid state space.
    
    Parameters:
    - x: 2D array of x coordinates.
    - y: 2D array of y coordinates.
    - z: 2D array of elevation values (same shape as x and y).
    - xy_resolution: Resolution for discretizing x and y (units per grid cell).
    - num_z_bins: Number of discrete bins for z (elevation) values.

    Returns:
    - state_map: 2D array where each entry is a tuple (x_index, y_index, z_index).
    - z_bins: The elevation bin edges used for discretization.
    """

    # Determine the grid size based on the xy resolution
    x_min, x_max = x.min(), x.max()
    y_min, y_max = y.min(), y.max()

    # Create discrete indices for x and y based on the specified resolution
    x_indices = ((x - x_min) / xy_resolution).astype(int)
    y_indices = ((y - y_min) / xy_resolution).astype(int)

    # Discretize the z (elevation) values into bins
    z_flat = z.flatten()
    z_bins = np.linspace(z_flat.min(), z_flat.max(), num_z_bins + 1)
    z_indices = np.digitize(z.flatten(), bins=z_bins) - 1
    z_indices = z_indices.reshape(z.shape)

    # Combine x, y, and z indices into a state map
    state_map = np.stack([x_indices, y_indices, z_indices], axis=-1)

    return state_map, z_bins

In [12]:
class MDP():
    """
    Data structure for a Markov Decision Process. In mathematical terms,
    MDPs are sometimes defined in terms of a tuple consisting of the various
    components of the MDP, written (S, A, T, R, gamma):

    gamma: discount factor
    S: state space
    A: action space
    T: transition function
    R: reward function
    TR: sample transition and reward. We will us `TR` later to sample the next
        state and reward given the current state and action: s_prime, r = TR(s, a)
    """
    def __init__(self,
                 gamma: float,
                 S: list[Any],
                 A: list[Any],
                 T: Callable[[Any, Any, Any], float] | np.ndarray,
                 R: Callable[[Any, Any], float] | np.ndarray,
                 TR: Callable[[Any, Any], tuple[Any, float]] = None):
        self.gamma = gamma  # discount factor
        self.S = S          # state space
        self.A = A          # action space

        # reward function R(s, a)
        if type(R) == np.ndarray:
            self.R = lambda s, a: R[s, a]
        else:
            self.R = R

        # transition function T(s, a, s')
        # sample next state and reward given current state and action: s', r = TR(s, a)
        if type(T) == np.ndarray:
            self.T = lambda s, a, s_prime: T[s, a, s_prime]
            self.TR = lambda s, a: (np.random.choice(len(self.S), p=T[s, a]), self.R(s, a)) if not np.all(T[s, a] == 0) else (np.random.choice(len(self.S)), self.R(s, a))
        else:
            self.T = T
            self.TR = TR

    def lookahead(self, U: Callable[[Any], float] | np.ndarray, s: Any, a: Any) -> float:
        if callable(U):
            return self.R(s, a) + self.gamma * np.sum([self.T(s, a, s_prime) * U(s_prime) for s_prime in self.S])
        return self.R(s, a) + self.gamma * np.sum([self.T(s, a, s_prime) * U[i] for i, s_prime in enumerate(self.S)])

    def iterative_policy_evaluation(self, policy: Callable[[Any], Any], k_max: int) -> np.ndarray:
        U = np.zeros(len(self.S))
        for _ in range(k_max):
            U = np.array([self.lookahead(U, s, policy(s)) for s in self.S])
        return U

    def policy_evaluation(self, policy: Callable[[Any], Any]) -> np.ndarray:
        R_prime = np.array([self.R(s, policy(s)) for s in self.S])
        T_prime = np.array([[self.T(s, policy(s), s_prime) for s_prime in self.S] for s in self.S])
        I = np.eye(len(self.S))
        return np.linalg.solve(I - self.gamma * T_prime, R_prime)

    def greedy(self, U: Callable[[Any], float] | np.ndarray, s: Any) -> tuple[float, Any]:
        expected_rewards = [self.lookahead(U, s, a) for a in self.A]
        idx = np.argmax(expected_rewards)
        return self.A[idx], expected_rewards[idx]

    def backup(self, U: Callable[[Any], float] | np.ndarray, s: Any) -> float:
        return np.max([self.lookahead(U, s, a) for a in self.A])

    def randstep(self, s: Any, a: Any) -> tuple[Any, float]:
        return self.TR(s, a)

    def simulate(self, s: Any, policy: Callable[[Any], Any], d: int) -> list[tuple[Any, Any, float]]:  # TODO - Create test
        trajectory = []
        for _ in range(d):
            a = policy(s)
            s_prime, r = self.TR(s, a)
            trajectory.append((s, a, r))
            s = s_prime
        return trajectory
    
    def random_policy(self):
        return lambda s, A=self.A: random.choices(A)[0]

In [13]:
class ValueFunctionPolicy():
    def __init__(self, P: MDP, U: Callable[[Any], float] | np.ndarray):
        self.P = P  # problem
        self.U = U  # utility function

    def __call__(self, s: Any) -> Any:
        return self.P.greedy(self.U, s)[0]


class MDPSolutionMethod(ABC):
    pass


class OfflinePlanningMethod(MDPSolutionMethod):
    @abstractmethod
    def solve(self, P: MDP) -> Callable[[Any], Any]:
        pass


class ExactSolutionMethod(OfflinePlanningMethod):
    pass


class PolicyIteration(ExactSolutionMethod):
    def __init__(self, initial_policy: Callable[[Any], Any], k_max: int):
        self.initial_policy = initial_policy
        self.k_max = k_max

    def solve(self, P: MDP) -> Callable[[Any], Any]:
        policy = self.initial_policy
        for _ in range(self.k_max):
            U = P.policy_evaluation(policy)
            policy_prime = ValueFunctionPolicy(P, U)
            if all([policy(s) == policy_prime(s) for s in P.S]):
                break
            policy = policy_prime
        return policy


class ValueIteration(ExactSolutionMethod):
    def __init__(self, k_max: int):
        self.k_max = k_max

    def solve(self, P: MDP) -> Callable[[Any], Any]:
        U = np.zeros(len(P.S))
        for _ in range(self.k_max):
            U = np.array([P.backup(U, s) for s in P.S])
        return ValueFunctionPolicy(P, U)


class GaussSeidelValueIteration(ExactSolutionMethod):
    def __init__(self, k_max: int):
        self.k_max = k_max

    def solve(self, P: MDP) -> Callable[[Any], Any]:
        U = np.zeros(len(P.S))
        for _ in range(self.k_max):
            for i, s in enumerate(P.S):
                U[i] = P.backup(U, s)
        return ValueFunctionPolicy(P, U)


class LinearProgramFormulation(ExactSolutionMethod):
    def solve(self, P: MDP) -> Callable[[Any], Any]:
        S, A, R, T = self.numpyform(P)
        U = cp.Variable(len(S))
        objective = cp.Minimize(cp.sum(U))
        constraints = [U[s] >= R[s, a] + P.gamma * (T[s, a] @ U) for s in S for a in A]
        problem = cp.Problem(objective, constraints)
        problem.solve()
        return ValueFunctionPolicy(P, U.value)

    @staticmethod
    def numpyform(P: MDP) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        S_prime = np.arange(len(P.S))
        A_prime = np.arange(len(P.A))
        R_prime = np.array([[P.R(s, a) for a in P.A] for s in P.S])
        T_prime = np.array([[[P.T(s, a, s_prime) for s_prime in S_prime] for a in P.A] for s in P.S])
        return S_prime, A_prime, R_prime, T_prime


In [14]:
# Define the grid resolution for x, y, and the number of elevation bins for z
xy_resolution = 5
num_z_bins = 20

# Call the discretize_terrain function with your terrain data (x, y, z)
state_map, z_bins = discretize_terrain(x, y, z, xy_resolution=xy_resolution, num_z_bins=num_z_bins)

# Print the shape of the state_map and the bin edges for verification
print(f"State map shape: {state_map.shape}")
print(f"Z bin edges: {z_bins}")


State map shape: (200, 200, 3)
Z bin edges: [2.49965458e-03 2.52253807e-01 5.02007960e-01 7.51762113e-01
 1.00151627e+00 1.25127042e+00 1.50102457e+00 1.75077872e+00
 2.00053288e+00 2.25028703e+00 2.50004118e+00 2.74979533e+00
 2.99954949e+00 3.24930364e+00 3.49905779e+00 3.74881195e+00
 3.99856610e+00 4.24832025e+00 4.49807440e+00 4.74782856e+00
 4.99758271e+00]


In [15]:
# Set the discount factor
gamma = 0.9

# Define the state space (S) using the discretized terrain (state_map)
S = [(i, j) for i in range(state_map.shape[0]) for j in range(state_map.shape[1])]

# Define the action space (A) as discrete steps in x and y directions (with max step size of 5)
max_step = 5
A = [(dx, dy) for dx in range(-max_step, max_step + 1) for dy in range(-max_step, max_step + 1) if dx != 0 or dy != 0]

# Placeholder for the transition function (T) - will be populated later
T = lambda s, a, s_prime: 0

# Placeholder for the reward function (R) - will be populated later
R = lambda s, a: -1

# Instantiate the MDP using the MDP class you already have
hill_climb_mdp = MDP(
    gamma=gamma,
    S=S,
    A=A,
    T=T,
    R=R
)

# Print the number of states and actions to verify the MDP instantiation
print(f"Number of states: {len(S)}")
print(f"Number of actions: {len(A)}")


Number of states: 40000
Number of actions: 120


In [18]:
from collections import defaultdict
from scipy.stats import dirichlet, norm
import pandas as pd


# Initialize parameters
alpha = 1  # Dirichlet prior parameter (pseudocount)
reward_std = 1  # Standard deviation for reward sampling
num_episodes = 100  # Number of episodes to run
max_steps = 200  # Maximum steps per episode
goal_reward = 100  # Large positive reward for reaching the goal

# Define the goal region (e.g., near the prominent hill's peak)
goal_x, goal_y = prominent_hill_x, prominent_hill_y
goal_radius = 5  # Radius around the goal peak to consider as the goal region

# Sparse dictionary for transition counts and rewards
transition_counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
reward_sums = defaultdict(lambda: defaultdict(float))
reward_counts = defaultdict(lambda: defaultdict(int))

# Data collection list
data = []

def get_reward(s, s_prime):
    x, y = s
    x_prime, y_prime = s_prime

    # Elevation change
    current_z_idx = state_map[x, y, 2]
    next_z_idx = state_map[x_prime, y_prime, 2]
    elevation_change = next_z_idx - current_z_idx

    # Distance traveled in the x-y plane
    distance_penalty = np.sqrt((x_prime - x)**2 + (y_prime - y)**2)

    # Compute distance to the goal for current and next states
    current_distance_to_goal = np.sqrt((x - goal_x)**2 + (y - goal_y)**2)
    next_distance_to_goal = np.sqrt((x_prime - goal_x)**2 + (y_prime - goal_y)**2)

    # Check if the next state is within the goal region
    if next_distance_to_goal <= goal_radius:
        return goal_reward

    # Potential-based shaping: reward is based on moving closer to the goal
    potential_difference = current_distance_to_goal - next_distance_to_goal
    reward = potential_difference - distance_penalty

    # Optionally, add a small incentive for uphill movement only when moving towards the goal
    if next_distance_to_goal < current_distance_to_goal:
        reward += 0.1 * elevation_change

    return reward


def deterministic_transition(s, a):
    """
    Determines the next state based on a deterministic action model with boundary handling.
    """
    x, y = s
    dx, dy = a
    new_x, new_y = x + dx, y + dy

    # Check if the new state is out of bounds
    if not (0 <= new_x < state_map.shape[0] and 0 <= new_y < state_map.shape[1]):
        return None  # Indicate an invalid transition
    return (new_x, new_y)

def posterior_sample_action(state):
    """
    Samples an action based on the posterior transition model using Thompson Sampling,
    and ensures the action does not lead out of bounds.
    """
    valid_action_samples = []
    for action in A:
        action_idx = A.index(action)

        # Check if the action would result in a valid next state
        next_state = deterministic_transition(state, action)
        if next_state is None:
            continue  # Skip invalid actions

        # Estimate expected reward for the action
        if reward_counts[state][action_idx] > 0:
            expected_reward = reward_sums[state][action_idx] / reward_counts[state][action_idx]
        else:
            expected_reward = -1  # Default penalty for unexplored actions

        valid_action_samples.append((action, expected_reward))

    # Choose the best valid action based on the highest expected reward
    if valid_action_samples:
        best_action = max(valid_action_samples, key=lambda x: x[1])[0]
        return best_action

    # Fallback to a random valid action if no valid actions are found
    return random.choice(A)

def simulate_episode(initial_state):
    """
    Simulates one episode using deterministic transitions with boundary handling.
    """
    state = initial_state
    trajectory = []
    for _ in range(max_steps):
        action = posterior_sample_action(state)
        action_idx = A.index(action)

        # Determine the next state using the deterministic transition model
        next_state = deterministic_transition(state, action)

        # If the transition was invalid, sample a new action
        if next_state is None:
            continue

        # Compute the reward for this transition
        reward = get_reward(state, next_state)

        # Store the transition data
        trajectory.append((state, action, reward, next_state))
        data.append((state, action, reward, next_state))

        # Update counts for posterior
        transition_counts[state][action_idx][next_state] += 1
        reward_sums[state][action_idx] += reward
        reward_counts[state][action_idx] += 1

        # Move to the next state
        state = next_state

    return trajectory


# Run simulations to collect exploration data
for episode in range(num_episodes):
    initial_state = random.choice(S)
    simulate_episode(initial_state)

# Flatten the data for CSV export
flattened_data = [
    [s[0], s[1], a[0], a[1], r, s_prime[0], s_prime[1]]
    for (s, a, r, s_prime) in data
]

# Create a DataFrame with explicit column names
df = pd.DataFrame(flattened_data, columns=["s_x", "s_y", "a_dx", "a_dy", "reward", "s'_x", "s'_y"])

# Save the DataFrame to CSV without quotes
df.to_csv("exploration_data_flat.csv", index=False)
print("Exploration data saved to exploration_data_flat.csv")



Exploration data saved to exploration_data_flat.csv
