In [1]:
from torch import nn
from bound_propagation import BoundModelFactory, HyperRectangle
from bound_propagation.polynomial import Pow
import torch
import numpy as np
import cvxpy as cp
import gymnasium as gym

(CVXPY) Jan 17 12:15:30 AM: Encountered unexpected exception importing solver SCS:
ImportError("dlopen(/Users/koentuin/Documents/Studie/Minor Engineering wit AI/CAI/venv/lib/python3.11/site-packages/_scs_direct.cpython-311-darwin.so, 0x0002): tried: '/Users/koentuin/Documents/Studie/Minor Engineering wit AI/CAI/venv/lib/python3.11/site-packages/_scs_direct.cpython-311-darwin.so' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64')), '/System/Volumes/Preboot/Cryptexes/OS/Users/koentuin/Documents/Studie/Minor Engineering wit AI/CAI/venv/lib/python3.11/site-packages/_scs_direct.cpython-311-darwin.so' (no such file), '/Users/koentuin/Documents/Studie/Minor Engineering wit AI/CAI/venv/lib/python3.11/site-packages/_scs_direct.cpython-311-darwin.so' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64'))")
(CVXPY) Jan 17 12:15:30 AM: Encountered unexpected exception importing solver OSQP:
ImportError("dlopen(/Users/koentuin/Documents/Studie/

In [2]:
def create_action_partitions(env, partitions):
    """
    Generates partitions of the action space of a reinforcement learning environment into smaller hyperrectangles.

    This function is useful for discretizing a continuous action space into smaller, manageable parts, allowing for more granular exploration or analysis.

    Parameters:
    - env (Environment): The reinforcement learning environment object, which should have an action space attribute.
    - partitions (int): The number of partitions to create along each dimension of the action space.

    Each dimension of the action space is divided into 'partitions' number of equal parts. For an action space with 'n' dimensions, this results in 'partitions^n' total hyperrectangles.

    Returns:
    - A list of HyperRectangle objects, each representing a partition of the action space. Each HyperRectangle object is defined by its lower and upper bounds in the action space.
    """
    action_space = env.action_space
    num_actions = action_space.shape[0]
    action_low = action_space.low
    action_high = action_space.high

    res = []

    def generate_partitions(dimensions, lower, upper, current_partition):
        if dimensions == num_actions:
            # If we've reached the number of dimensions, add the current partition
            res.append(HyperRectangle(np.array(lower), np.array(upper)))
        else:
            # Calculate the size of the partition for the current dimension
            partition_size = (action_high[dimensions] - action_low[dimensions]) / partitions

            for part in range(partitions):
                # Determine the lower and upper bounds for the current dimension
                dim_lower_bound = action_low[dimensions] + part * partition_size
                dim_upper_bound = dim_lower_bound + partition_size

                # Recursively generate partitions for the next dimension
                generate_partitions(dimensions + 1, lower + [dim_lower_bound], upper + [dim_upper_bound], current_partition)

    generate_partitions(0, [], [], [])

    return res

In [3]:
def get_lower_bound(model, state, action, epsilon):
    """
    Computes the lower bound of the output for a given state-action pair using CROWN (Convex Relaxation for neural network Output bounNd) method.

    This function applies CROWN, a method for robustness verification of neural networks, to compute the lower bound of the neural network's output for a given state-action pair within a specified perturbation range. The perturbation range is defined by epsilon around the state-action pair.

    Parameters:
    - model (NN): The neural network model for which the bounds are computed.
    - state (Tensor): The current state of the system, represented as a tensor.
    - action (Tensor): The action to be evaluated, represented as a tensor.
    - epsilon (float): The perturbation range around the state-action pair within which the lower bound is computed.

    Returns:
    - A tensor representing the lower bound of the network's output for the given state-action pair within the specified perturbation range. The shape of the tensor matches the concatenated state-action input shape.

    Notes:
    - The method assumes that the input to 'model' is a concatenation of state and action.
    - The lower bound is computed using the CROWN method, which involves a convex relaxation technique to estimate bounds on neural network outputs.
    - HyperRectangle.from_eps is used to define the perturbation range around the input state-action pair.
    """
    state_action = torch.cat((state, action), dim=1).view(1, -1)
    input_bounds = HyperRectangle.from_eps(state_action, epsilon)
    crown_bounds = model.crown(input_bounds)

    lower_bound = crown_bounds.lower[0].unsqueeze(0)

    return lower_bound.view(-1, state_action.shape[1])

In [4]:
def create_bound_matrices(partitions, state, env, boundnet):
    """
    Computes bound matrices for action partitions in a given state using a bound network.

    Parameters:
    - partitions (Iterable): A collection of partitions of the action space, each representing a subset of potential actions.
    - state (Tensor): The current state of the system, represented as a tensor.
    - env: The environment object, containing information about the action space.
    - boundnet (NN): A neural network used to compute linear bounds for a given state and action.

    Returns:
    - A list of tuples. Each tuple contains:
        1. An action partition,
        2. A tensor representing the action-dependent component of the bound for that partition,
        3. A numpy array representing the state vector component of the bound.
    """
    action_space = env.action_space
    num_actions = action_space.shape[0]
    res = []
    for action_partition in partitions:
        action = torch.tensor(action_partition.center).view(1, -1)
        bounds = get_lower_bound(boundnet, state, action, 0.1)
        h_action_dependent = bounds[:, -num_actions:]
        state_bounds = bounds[:, :-num_actions]
        h_state_vec = state_bounds @ state.to(state_bounds.dtype).t()
        vecs = (action_partition, h_action_dependent.detach().numpy(), h_state_vec.view(-1,).detach().numpy())
        res.append(vecs)

    return res

In [5]:
class InfeasibilityError(Exception):
    """Exception raised if there are no actions that fulfill the safety criterions."""

    def __init__(self, message="No safe action to take"):
        self.message = message
        super().__init__(self.message)

In [6]:
def continuous_cbf(bound_matrices, nominal_action, h_current, alpha):
    """
    Selects a safe action from bound matrices using continuous control barrier functions (CBF).

    Parameters:
    - bound_matrices (Iterable): An iterable of tuples, each containing the action partition, the action-dependent component of the barrier function h, and the state vector component of the bound of h.
    - nominal_action (Tensor): The preferred action in the current state, typically derived from an unconstrained policy.
    - h_current (Tensor): The current value of the barrier function.
    - alpha (float): A scaling factor used in the safety criterion. It scales the current barrier function value to set a threshold for the next state's barrier function value.

    Returns:
    - The selected safe action as a numpy array. If multiple safe actions are available, it returns the one closest to the nominal action.

    Raises:
    - InfeasibilityError: If no safe actions are found, indicating that the current state is infeasible under the given safety constraints.
    """
    safe_actions = []
    for action_partition, h_action_dependent, h_state_vec in bound_matrices:
        num_actions = nominal_action.shape[0]
        action = cp.Variable(num_actions)
        action_lower_bound = (action_partition).lower.reshape((-1,))
        action_upper_bound = (action_partition).upper.reshape((-1,))
        constraints = [action_lower_bound <= action, action <= action_upper_bound, h_state_vec + h_action_dependent @ action >= alpha * h_current]
        objective = cp.Minimize(cp.norm(action - nominal_action, 2))
        problem = cp.Problem(objective, constraints)
        problem.solve()

        if problem.status in [cp.INFEASIBLE, cp.UNBOUNDED]:
            safe_actions.append((None, None))
        else:
            safe_actions.append((action.value, objective.value))
    
    best_safe_action = min((action for action in safe_actions if action[0] is not None), key=lambda x: x[1], default=(None, None))[0]
    if best_safe_action is None:
        raise InfeasibilityError()
    else:
        return best_safe_action

In [7]:
class NNDM(nn.Sequential):
    """
    TODO: write docstring
    TODO: generalise class
    """
    def __init__(self):
        super(NNDM, self).__init__(
            nn.Linear(28, 64),
            nn.Tanh(),
            nn.Linear(64, 24),
        )

    def forward(self, x):
        out = super().forward(x)
        return out + x[:,:24]

    
class HHead(nn.Sequential):
    """
    TODO: implement h function
    TODO: write docstring
    TODO: generalise class
    """
    def __init__(self):
        
        super().__init__(
            Pow(2),
            nn.Linear(24, 2)
        )

class CombinedModel(nn.Sequential):
    def __init__(self):
        super(CombinedModel, self).__init__()
        self.add_module('nndm', NNDM())
        self.add_module('hhead', HHead())

In [9]:
# Initialise the environment
env = gym.make("BipedalWalker-v3")

# Initialise the barrier function h and the NNDM with the barrier function on top to compute linear bounds
hnet = HHead()
net = CombinedModel()
factory = BoundModelFactory()
boundnet = factory.build(net)

# Generate random state, h values, and nominal action for demonstration
state = torch.rand(1, 24)
h_current = hnet(state).view(-1).detach().numpy()
nominal_action = torch.rand(4)

# Apply the CBF to the nominal action
partitions = create_action_partitions(env, 4)
bound_matrices = create_bound_matrices(partitions, state, env, boundnet)
safe_action = continuous_cbf(bound_matrices, nominal_action, h_current, 0)
print(safe_action)

[0.30970567 0.17388272 0.36180925 0.68025029]
