In [None]:
import numpy as np
from queue import PriorityQueue

In [None]:
class PartialModel:
    # Assumed that number of actions is same for all states which is true for the machine replacement problem
    def __init__(self, num_states, num_actions, M, quality_interpolation_factor, quality_adjustment_coeff):
        self.num_states = num_states
        self.num_actions = num_actions
        self.T = np.ones((num_actions, num_states, num_states)) / num_states
        self.R = np.zeros((num_actions, num_states))
        self.N = np.zeros((num_actions, num_states))
        self.M = M
        self.quality = 0 # Initial quality
        self.quality_interpolation_factor = quality_interpolation_factor
        self.quality_adjustment_coeff = quality_adjustment_coeff
        self.R_max = 0
        self.R_min = 0

    def update_quality(self, state, action, reward, next_state):
        c = self.N[action, state] / self.M
        delta_R = (reward - self.R[action, state]) / (self.N[action, state] + 1)
        self.R_max = max(self.R_max, self.R[action, state])
        self.R_min = min(self.R_min, self.R[action, state])
        e_R = 1 - 2*delta_R*delta_R/(self.R_max - self.R_min + 1e-5)
        e_T = 1
        for k in range(self.num_states):
            delta_T = (1 - self.T[action, state, k]) / (self.N[action, state] + 1) if k == next_state else -self.T[action, state, k] / (self.N[action, state] + 1)
            e_T -= delta_T*delta_T*self.N[action, state]*self.N[action, state]
        e = c*(self.quality_interpolation_factor*e_R + (1-self.quality_interpolation_factor)*e_T)
        self.quality = (1 - self.quality_adjustment_coeff) * self.quality + self.quality_adjustment_coeff * e


    def update_model(self, state, action, reward, next_state):
        delta_R = (reward - self.R[action, state]) / (self.N[action, state] + 1)
        self.R[action, state] += delta_R

        for k in range(self.num_states):
            delta_T = (1 - self.T[action, state, k]) / (self.N[action, state] + 1) if k == next_state else -self.T[action, state, k] / (self.N[action, state] + 1)
            self.T[action, state, k] += delta_T
        self.N[action, state] = min(self.N[action, state] + 1, self.M)

    def get_quality(self):
        return self.quality

    def get_reward(self, state, action):
        return self.R[action, state]

    def get_next_state(self, state, action):
        return np.random.choice(self.num_states, p=self.T[action, state])

In [None]:
def prioritized_sweep(queue: PriorityQueue, model: PartialModel, Q: np.ndarray, theta: float = 0.01):
    while not queue.empty() and queue.queue[0][0] > theta:
        state, action = queue.get()
        # Perform the prioritized sweeping update
        reward = model.get_reward(state, action)
        Q[action, state] = reward + discount_factor * np.sum(model.T[action, state, :] * np.min(Q, axis=0))