In [4]:
import numpy as np
import math
import random
from scipy.stats import norm
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

In [5]:
class KinematicModel:

    def __init__(self, L, y, theta, Lmax, l, d, p1, sigmaC, p2):
        self.L = L
        self.y = y
        self.theta = theta
        self.Lmax = Lmax
        self.l = l
        self.d = d
        self.p1 = p1
        self.sigmaC = sigmaC
        self.p2 = p2

    def input(self, action):
        noise = norm.rvs(loc=0, scale=action[0] / 4)
        self.y = self.y + action[0] * math.sin(math.radians(action[1] + noise + self.theta))
        nextVehicleEvent = random.uniform(0, 1)
        if nextVehicleEvent < self.p2:
            self.d = self.d + (self.l - action[0] * math.cos(math.radians(action[1] + noise + self.theta)))
        else:
            self.d = 40
        self.theta = self.theta + (action[0] / self.L) * math.tan(math.radians(action[1] + noise))
        curveEvent = random.uniform(0, 1)
        if curveEvent < self.p1:
            curveAngle = norm.rvs(loc=0, scale=self.sigmaC)
            self.theta = self.theta + curveAngle
            
            
    def __str__(self):
        return f"y = {self.y} / theta = {self.theta} / d = {self.d}"

In [6]:
# Probability of going out of a straight road given an action 
def P1(action, environment, sigma, show = True):
    if (environment.Lmax - environment.y) / action[0] > 1 or (environment.Lmax - environment.y) / action[0] < -1:
        term1 = 0
    # Term 1 is the probability that y > Lmax
    else:
        term1 = math.degrees(math.asin((environment.Lmax - environment.y) / action[0]))
        term1 = 1 - norm.cdf((1 / sigma) * (term1 - action[1] - environment.theta))
    #print(f"Term 1: {term1}")
    # Term 2 is the probability that y < -Lmax
    if (- environment.Lmax - environment.y) / action[0] > 1 or (- environment.Lmax - environment.y) / action[0] < -1:
        term2 = 0
    else:
        term2 = math.degrees(math.asin((- environment.Lmax - environment.y) / action[0]))
        term2 = norm.cdf((1 / sigma) * (term2 - action[1] - environment.theta))
    #print(f"Term 2: {term2}")
    value = (1 - environment.p1) * (term1 + term2)
    if show:
        print(f"Value: {value}")
    return value, term1, term2
    
# Probability of going out of a curved road given an action
def P2(action, environment, sigma, show = True):
    # Term 1 is the probability that y > Lmax
    if (environment.Lmax - environment.y) / action[0] > 1 or (environment.Lmax - environment.y) / action[0] < -1:
        term1 = 0
    else:
        term1 = math.degrees(math.asin((environment.Lmax - environment.y) / action[0]))
        term1 = 1 - norm.cdf((1 / sigma) * (term1 - action[1] - environment.theta))
    #print(f"Term 1: {term1}")
    # Term 2 is the probability that y < -Lmax
    if (- environment.Lmax - environment.y) / action[0] > 1 or (- environment.Lmax - environment.y) / action[0] < -1:
        term2 = 0
    else:
        term2 = math.degrees(math.asin((- environment.Lmax - environment.y) / action[0]))
        term2 = norm.cdf((1 / sigma) * (term2 - action[1] - environment.theta))
    #print(f"Term 2: {term2}")
    value = environment.p1 * (term1 + term2)
    if show:
        print(f"Value: {value}")
    return value, term1, term2

# Probability of risking a crash in a straight road given an action 
def P3(action, environment, sigma, show = True):
    if (1 / action[0]) * (environment.l + ( ((action[0] * 3.6) / -2) + environment.d)) > 1:
        return 0
    value = math.degrees(math.acos((1 / action[0]) * (environment.l + (((action[0] * 3.6) / -2) + environment.d))))
    value = norm.cdf((1 / sigma) * (value - math.fabs(action[1]) - math.fabs(environment.theta)))
    value = environment.p2 * (1 - environment.p1) * value
    if show:
        print(f"Value: {value}")
    return value

# Probability of risking a crash in a curved road given an action
def P4(action, environment, sigma, show = True):
    if (1 / action[0]) * (environment.l + ( ((action[0] * 3.6) / -2) + environment.d)) > 1:
        return 0
    value = math.degrees(math.acos((1 / action[0]) * (environment.l + (((action[0] * 3.6) / -2) + environment.d))))
    value = norm.cdf((1 / sigma) * (value - math.fabs(action[1]) - math.fabs(environment.theta)))
    value = environment.p2 * environment.p1 * value
    if show:
        print(f"Value: {value}")
    return value

In [7]:
# Transition probabilities

def anyToG(action, environment):
    P1_val, _, _ = P1(action, environment, action[0]/4, False)
    P2_val, _, _ = P2(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False)
    # Term 1 is the probability of NOT going out of the road
    term1 = 1 - (P1_val + P2_val)
    # Term 2 is the probability of NOT risking a crash
    term2 = 1 - (P3(action, environment, action[0]/4, False) + P4(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False))
    return term1 * term2
    
def anyToX(action, environment):
    P1_val, _, _ = P1(action, environment, action[0]/4, False)
    P2_val, _, _ = P2(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False)
    # Term 1 is the probability of going out of the road
    term1 = P1_val + P2_val
    # Term 2 is the probability of NOT risking a crash
    term2 = 1 - (P3(action, environment, action[0]/4, False) + P4(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False))
    return term1 * term2

def anyToI(action, environment):
    P1_val, _, _ = P1(action, environment, action[0]/4, False)
    P2_val, _, _ = P2(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False)
    # Term 1 is the probability of NOT going out of the road
    term1 = 1 - (P1_val + P2_val)
    # Term 2 is the probability of risking a crash
    term2 = P3(action, environment, action[0]/4, False) + P4(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False)
    return term1 * term2

def anyToXI(action, environment):
    P1_val, _, _ = P1(action, environment, action[0]/4, False)
    P2_val, _, _ = P2(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False)
    # Term 1 is the probability of going out of the road
    term1 = P1_val + P2_val
    # Term 2 is the probability of risking a crash
    term2 = P3(action, environment, action[0]/4, False) + P4(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False)
    return term1 * term2

def transitionProbabilitiesMatrix(actions, environment):
    transitions = [anyToG, anyToX, anyToI, anyToXI]
    transitionMatrix = np.array([[transition(action, kinematicModel) for transition in transitions] for action in actions])
    return transitionMatrix

In [8]:
# Reward for staying at the center of the road
def rewardCenterProbability(action, environment, ratio, show = True):
    # Term 1 is the probability that y < Lmax * ratio
    if ((environment.Lmax * ratio) - environment.y) / action[0] > 1 or ((environment.Lmax * ratio) - environment.y) / action[0] < -1:
        term1 = 0
        term1C = 0
    else:
        term1 = math.degrees(math.asin(((environment.Lmax * ratio) - environment.y) / action[0]))
        term1C = norm.cdf((1 / (math.sqrt(action[0]/4 + environment.sigmaC))) * (term1 - action[1] - environment.theta))
        term1 = norm.cdf((1 / (action[0]/4)) * (term1 - action[1] - environment.theta))
    #print(f"Term 1: {term1}")
    # Term 2 is the probability that y > -Lmax * ratio
    if ((-environment.Lmax * ratio) - environment.y) / action[0] > 1 or ((-environment.Lmax * ratio) - environment.y) / action[0] < -1:
        term2 = 0
        term2C = 0
    else:
        term2 = math.degrees(math.asin(((-environment.Lmax * ratio) - environment.y) / action[0]))
        term2C = norm.cdf((1 / (math.sqrt(action[0]/4 + environment.sigmaC))) * (term2 - action[1] - environment.theta))
        term2 = norm.cdf((1 / (action[0]/4)) * (term2 - action[1] - environment.theta))
    #print(f"Term 2: {term2}")
    value = ((1 - environment.p1) * (term1 - term2)) + (environment.p1 * (term1C - term2C))
    if show:
        print(f"Value: {value}")
    return value

# Reward for going at the correct speed
def rewardSpeed(action, environment):
    return -0.5 * (math.fabs(environment.l - action[0]))

# Reward for staying at the correct distance
def rewardDistanceProbability(action, environment):
    # Probability of risking a crash in a straight road given an action 
    term1 = P3(action, environment, action[0] / 4, False)
    #Probability of risking a crash in a curved road given an action
    term2 = P4(action, environment, math.sqrt(action[0]/4 + environment.sigmaC), False)
    return term1 + term2

def rewardOutOfRoadProbability(action, environment):
    # Probability of going out of a straight road given an action
    term1, _, _ = P1(action, environment, action[0]/4, False)
    # Probability of going out of a curved road given an action
    term2, _, _ = P2(action, environment, action[0]/4, False)
    return term1 + term2

# Expected reward given an action
def expectedReward(action, environment, r1, r2, r3, r4, show = True):
    termR1 = r1 * rewardCenterProbability(action, environment, 0.5, False)
    #print(f"Term r1: {termR1}")
    termR2 = r2 * rewardCenterProbability(action, environment, 0.25, False)
    #print(f"Term r2: {termR2}")
    rSpeed = rewardSpeed(action, environment)
    if show:
        print(f"Reward for action [{action[0]}, {action[1]}]: {round(termR1 + termR2 + rSpeed, 2)}")
    return termR1 + termR2 + rSpeed

def expectedRewardMatrix(action, environment, r1, r2, r3, r4):
    rewardMatrix = np.array([expectedReward(action, kinematicModel, r1, r2, r3, r4, False)for action in actions])
    return rewardMatrix

In [9]:
# Environment starts:
# - At center of the road y = 0 m
# - Aligned with the road theta = 0°
# - Road width 4 m Lmax = 2 m
# - Road limit 90 km/h l = 25 m/s
# - Distance from next vehicle d = 40 m
# - Probability of a road curve p1 = 5%
# - Variance of the curve sigmaC = 5
# - Probability of a vehicle in front p2 = 30%
kinematicModel = KinematicModel(1, 0, -5, 2, 25, 40, 0.05, 5, 0.3)

# State space
# 0 = S_G: good state
# 1 = S_X: out of road state
# 2 = S_I: risk of crash state
# 3 = S_XI: out of road and risk of crash state
states = ['G', 'X', 'I', 'XI']

# Action space
anglesDeg = [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5]
velocities = [20, 25, 30]
actions = [(v, delta) for v in velocities for delta in anglesDeg]
#print(f"{actions}")

# State-action transition probability matrix
#T = transitionProbabilitiesMatrix(actions, kinematicModel)
#print(f"{T}")

# State-action expected reward matrix
r1 = 50
r2 = 100
r3 = -1
r4 = 0
#R = expectedRewardMatrix(actions, kinematicModel, r1, r2, r3, r4)
#print(f"{R}")

print(f"{actions[0]}")

# Transition function
def transition_func(s, a, T):
    action_index = actions.index(a)
    # Seleziona la riga corrispondente nella matrice di transizione
    probabilities = T[action_index]
    # Crea la lista di tuple (stato, probabilità) usando lo zip tra 'states' e le probabilità
    transitions = [(state, prob) for state, prob in zip(states, probabilities)]
    return transitions

#print(f"{transition_func('G', (20, -30), T)}")

# Reward function (NON SAFE)
def reward_func(s, a, s_prime, R):
    action_index = actions.index(a)
    return R[action_index]
    
# Reward function (SAFE)
def reward_func_SAFE(s, a, s_prime, R):
    cost = 0
    action_index = actions.index(a)
    if s_prime == 'X' or s_prime == 'XI':
        cost -= 50
    elif s_prime == 'I' or s_prime == 'XI':
        cost -= 50
    return R[action_index] + cost

#print(f"{reward_func('G', (20, -30), 'G', R)}")

(20, -5)


In [12]:
# --- Value Iteration ---
def value_iteration(states, actions, transition_func, reward_func, environment, gamma=0.9, theta=1e-4):
    """
    Perform value iteration on a fixed Constrained Markov Decision Process (CMDP).

    Parameters:
    - states: List of states in the CMDP.
    - actions: List of available actions for each state.
    - transition_func: Function that returns transition probabilities for a given state-action pair.
    - reward_func: Function that calculates the reward for a state-action-next state combination.
    - environment: The CMDP environment with precomputed transition and reward matrices.
    - gamma: Discount factor for future rewards (default 0.9).
    - theta: Convergence threshold for stopping iteration (default 1e-4).

    Returns:
    - V: Final value function for each state.
    - policy: Optimal policy indicating the best action for each state.
    """

    # Initialize V(s) = 0 for all states and empty policy
    V = {s: 0 for s in states}
    policy = {s: None for s in states}

    # Use fixed transition and reward matrices computed from the environment
    T = transitionProbabilitiesMatrix(actions, environment)
    # Reward components for different conditions in the environment
    r1, r2, r3, r4 = 5, 10, 0, -1

    R = expectedRewardMatrix(actions, environment, r1, r2, r3, r4)

    iteration = 0
    while True:
        delta = 0
        print(f"--- Iteration {iteration} ---")
        iteration += 1

        # iterate over each state to update its value and determine the best action.
        for s in states:
            v_old = V[s] # Store the current value for convergence check (change in value)
            action_values = {} # Store the expected value for each action in this state
            
            # Calculate the expected value for each action in the current state
            for a in actions: 
                expected_value = 0

                # Iterate over all possible next states s' and their transition probabilities p.
                for s_prime, p in transition_func(s, a, T):
                    expected_value += p * (reward_func(s, a, s_prime, R) + gamma * V[s_prime]) # Bellman optimally equation
                    # sums the reward for transitioning to s' from s with action a and the discounted value of s' (future value).
                action_values[a] = expected_value # Store the expected value for this action

            # Select the best action and update the value function
            best_action_value = max(action_values.values())
            # The action that provides the maximum expected value
            best_action = max(action_values, key=action_values.get)
            # Update the value function and policy with the best action
            V[s] = best_action_value
            policy[s] = best_action
            print(f"State {s}: Best action {best_action} with value {best_action_value:.2f}")

            # Update delta to be the maximum change across states for convergence check
            delta = max(delta, abs(v_old - V[s]))
            
         # Convergence condition: If the maximum change is below the threshold theta
        if delta < theta:
            print("Convergence reached.")
            break
    return V, policy

# Run value iteration on the fixed model
V, policy = value_iteration(states, actions, transition_func, reward_func, kinematicModel, gamma=0.9, theta=1e-4)

print("\nFinal Value Function:")
for state, value in V.items():
    print(f"{state}: {value:.2f}")

print("\nFinal Policy:")
for state, action in policy.items():
    print(f"{state}: {action}")

--- Iteration 0 ---
State G: Best action (25, 5) with value 3.00
State X: Best action (20, 5) with value 4.08
State I: Best action (25, 5) with value 5.20
State XI: Best action (25, 5) with value 5.97
--- Iteration 1 ---
State G: Best action (25, 5) with value 6.70
State X: Best action (25, 5) with value 7.98
State I: Best action (25, 5) with value 9.08
State XI: Best action (25, 5) with value 9.66
--- Iteration 2 ---
State G: Best action (25, 5) with value 10.10
State X: Best action (25, 5) with value 11.29
State I: Best action (25, 5) with value 12.22
State XI: Best action (25, 5) with value 12.69
--- Iteration 3 ---
State G: Best action (25, 5) with value 13.06
State X: Best action (25, 5) with value 14.09
State I: Best action (25, 5) with value 14.88
State XI: Best action (25, 5) with value 15.27
--- Iteration 4 ---
State G: Best action (25, 5) with value 15.58
State X: Best action (25, 5) with value 16.46
State I: Best action (25, 5) with value 17.13
State XI: Best action (25, 5) 

In [13]:
# Run value iteration on the fixed model
V, policy = value_iteration(states, actions, transition_func, reward_func_SAFE, kinematicModel, gamma=0.9, theta=1e-4)

print("\nFinal Value Function:")
for state, value in V.items():
    print(f"{state}: {value:.2f}")

print("\nFinal Policy:")
for state, action in policy.items():
    print(f"{state}: {action}")

--- Iteration 0 ---
State G: Best action (20, 5) with value -10.06
State X: Best action (20, 5) with value -16.93
State I: Best action (20, 5) with value -20.62
State XI: Best action (20, 5) with value -20.62
--- Iteration 1 ---
State G: Best action (20, 5) with value -20.62
State X: Best action (20, 5) with value -27.81
State I: Best action (20, 5) with value -30.18
State XI: Best action (20, 5) with value -30.18
--- Iteration 2 ---
State G: Best action (20, 5) with value -30.18
State X: Best action (20, 5) with value -36.71
State I: Best action (20, 5) with value -38.65
State XI: Best action (20, 5) with value -38.65
--- Iteration 3 ---
State G: Best action (20, 5) with value -38.65
State X: Best action (20, 5) with value -44.43
State I: Best action (20, 5) with value -46.11
State XI: Best action (20, 5) with value -46.11
--- Iteration 4 ---
State G: Best action (20, 5) with value -46.11
State X: Best action (20, 5) with value -51.19
State I: Best action (20, 5) with value -52.67
Sta