In [1]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from tqdm.notebook import tqdm, trange
from numba import njit, prange

In [2]:
N = 100

a, b, m = 0.1, 0.02, 2.5 # parameters for the failure rate function
theta = 1 # degradation rate

max_degradation = 50
c = 3 # per-time holding cost
RC = 15 # replacement const
u = 10 # utility constant

def failure_rate(d):
    """
    Calculate the failure rate of the machine based on its current degradation level.
    The failure rate is modeled as a quadratic function of the degradation level.
    The parameters a, b, and m are constants that define the shape of the function.

    Args:
        d (float): Current degradation level

    Returns:
        float: Failure rate
    """
    return a + b(d - m) ** 2

def failure_probability(d, x, max_degradation=max_degradation):
    """
    Calculate the probability of failure based on the current degradation level and an increment in degradation level.

    Args:
        d (float): Current degradation level
        x (float): Increment in degradation level
        max_degradation (float): Maximum degradation level

    Returns:
        p: Probability of failure
    """
    if d + x > max_degradation:
        return 1
    return 1 - np.exp(
        -(
            a*x + b*x*(3*m**2 - 3*m*(x+2*d) + x**2 + 3*x*d + 3*d**2)/3   
        )
    )

def get_random_increment_x():
    """
    Sample a random increment x from a discrete uniform distribution.

    Returns:
        x: Random increment
    """
    return np.random.randint(N) / N

def simulate_machine():
    usage = 0
    time_until_failure = 0
    
    while True:
        x = get_random_increment_x()
        
        p = np.random.random()
        if p <= failure_probability(usage, x):
            # machine failure
            return usage, time_until_failure
        usage += x
        time_until_failure += 1


In [3]:
precision = 2

In [4]:
contexts = np.arange(0.2, 1 + 1/N, 1 / N).round(precision).tolist()
states = np.arange(0, max_degradation, 1/N).round(precision).tolist()
actions = [0, 1]

n_states = len(states)
n_contexts = len(contexts)
n_actions = len(actions)

P = np.zeros((n_states, n_contexts, n_actions, n_states, n_contexts), dtype=np.float32)
R = np.zeros((n_states, n_contexts, n_actions, n_states), dtype=np.float32)

In [5]:
failure_probability(49.80, 0.20)

0.9998774575838142

In [6]:
# initialize the transition and reward matrices

for s_ind, s in tqdm(enumerate(states)):
    for x_ind, x in enumerate(contexts):
        for a_ind, a in enumerate(actions):
            if a == 0: # do nothing
                P[s_ind, x_ind, a_ind, s_ind, :] = 1 / n_contexts
                R[s_ind, x_ind, a_ind, s_ind] = -c
            else: # replace
                ns = round(s + x, precision)
                ind_ns = round(ns * 100) if ns <= states[-1] else 0
                
                # upon failure, the machine is replaced and the degradation level is reset to 0
                p = failure_probability(s, x)
                P[s_ind, x_ind, a_ind, 0] = p / n_contexts
                P[s_ind, x_ind, a_ind, ind_ns] = (1 - p) / n_contexts
                
                R[s_ind, x_ind, a_ind, ind_ns] = u * x
                R[s_ind, x_ind, a_ind, 0] = u * x - RC
                
                # if s == 4.31 and x == .80:
                #     print(s_ind, x_ind, ns, ind_ns)
                #     print(P[s_ind, x_ind, a_ind, n_states-1])
                #     print(P[s_ind, x_ind, a_ind, ind_ns])

0it [00:00, ?it/s]

In [7]:
s, x = 14.31, .80
s_ind = round(s * 100)
x_ind = round(x*100 - 20)
ns = round(s + x, 2)
ns_ind = min(
    round(ns * 100),
    n_states-1
)

print(f"Current state: {s}\nIncrement: {x}\n")
print(f"Next state: {'Terminal' if ns_ind == n_states - 1 else states[ns_ind]}")
print(f"Success Probability: {P[s_ind, x_ind, 1, ns_ind].sum()}")
print(f"Failure Probability: {P[s_ind, x_ind, 1, 0].sum()}")
print(f"Reward, No Action: {R[s_ind, x_ind, 0, s_ind]}\nReward, No Failure: {R[s_ind, x_ind, 1, ns_ind]}\nReward, Failure: {R[s_ind, x_ind, 1, 0]}")

Current state: 14.31
Increment: 0.8

Next state: 15.11
Success Probability: 0.04132866859436035
Failure Probability: 0.9586713314056396
Reward, No Action: -3.0
Reward, No Failure: 8.0
Reward, Failure: -7.0


In [8]:
# arr = P[s_ind, x_ind, 1, :]

# nonzero_indices = np.nonzero(arr)[0]  # Get indices of nonzero elements
# nonzero_values = arr[nonzero_indices]
# result = list(zip(nonzero_indices, nonzero_values))

# print(result)

In [9]:
P.shape, R.shape

((5000, 81, 2, 5000, 81), (5000, 81, 2, 5000))

In [10]:
n_states, n_contexts, n_actions

(5000, 81, 2)

In [11]:
import torch

# Assumed shapes
# P: [n_states, n_contexts, n_actions, n_states, n_contexts]
# R: [n_states, n_contexts, n_actions, n_states]
# V_prev: [n_states, n_contexts]

device = torch.device('mps')  # or 'cuda' if available, or 'cpu'

In [12]:
P.shape

(5000, 81, 2, 5000, 81)

In [13]:
# Keep large P on CPU
P = torch.tensor(P, dtype=torch.float32, device='cpu')  # huge!
R = torch.tensor(R, dtype=torch.float32, device='cpu')  # large
V = np.zeros((n_states, n_contexts))

V_prev = torch.tensor(V, dtype=torch.float32, device=device)  # small enough to fit
n_states, n_contexts, n_actions = P.shape[:3]
Q = torch.zeros((n_states, n_contexts, n_actions), device=device)

gamma = 0.99

for s in trange(n_states):
    for x in range(n_contexts):
        for a in range(n_actions):
            # Only move one slice of P and R to GPU
            P_sxa = P[s, x, a].to(device)  # [n_states, n_contexts]
            R_sxa = R[s, x, a].to(device)  # [n_states]

            reward = (P_sxa * R_sxa[:, None]).sum()
            future = (P_sxa * V_prev).sum()

            Q[s, x, a] = reward + gamma * future

# Convert back to numpy if needed
Q = Q.cpu().numpy()

: 

In [None]:
def value_iteration(P, R, gamma, theta, max_iter):
    n_states, n_contexts, n_actions, _, _ = P.shape
    V = np.zeros((n_states, n_contexts))
    policy = np.zeros((n_states, n_contexts), dtype=np.int64)

    for _ in trange(max_iter):
        delta = 0.0
        V_prev = V.copy()
        
        # We can parallelize over s and x, but must do a 2D loop
        
        Q = np.zeros((n_states, n_contexts, n_actions))
        
        reward = np.einsum('sxasn,sxas...->sxa', P, R[..., np.newaxis])
        future = np.einsum('sxasn,sn->sxa', P, V_prev)
        Q = reward + gamma * future

        # Total Q-value
        return Q
                        
    #             best_a = np.argmax(Q_values)
    #             best_v = Q_values[best_a]
    #             policy[s, x] = best_a
    #             V[s, x] = best_v
    #             diff = abs(best_v - V_prev[s,x])
                
    #             if diff > delta:
    #                 delta = diff
    #     if delta < theta:
    #         break

    # return V, policy


In [None]:
V, policy = value_iteration(
    P, R,
    gamma=1.0, theta=1e-8, max_iter=10000
)

In [None]:
Q = value_iteration(
    P, R,
    gamma=1.0, theta=1e-8, max_iter=10000
)

In [None]:
# plot the value function as heatmap

plt.figure(figsize=(6, 4))
sns.heatmap(V, cmap="YlGnBu", cbar=True, xticklabels=contexts, yticklabels=states)
plt.title("Value Function Heatmap")
plt.xlabel("Contexts")
plt.ylabel("States")

# set x ticks to be multiples of 2
# set y ticks to be multiples of 0.1
plt.xticks(ticks=np.arange(0, len(contexts), 10), labels=contexts[::10])
plt.yticks(ticks=np.arange(0, len(states), 500), labels=states[::500])
plt.grid(False)
plt.tight_layout()
plt.show()