<a href="https://colab.research.google.com/github/nosadchiy/public/blob/main/RustMDP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [33]:
#!pip install ipdb

import numpy as np
from scipy.optimize import minimize
#import ipdb

##############################
# 1. Define the Model
##############################

class ReplacementMDP:
    """
    A toy replacement model:
    - States: s in {0, 1, 2, ..., S-1} can represent "wear level" of a machine/bus engine.
    - Actions: 0 = 'Continue', 1 = 'Replace'.
    - Transition probabilities:
        If action=Continue, wear state goes up by 1 with some probability
        (or transitions to a terminal wear state).
        If action=Replace, wear state goes back to 0 with probability 1.
    """
    def __init__(self,
                 S,             # Number of states
                 beta,          # Discount factor (could be estimated)
                 cost_replace,  # Replacement cost (to be estimated)
                 cost_usage     # Usage cost per "wear state" (to be estimated)
                 ):
        self.S = S
        self.beta = beta
        self.cost_replace = cost_replace
        self.cost_usage = cost_usage

    def rewards(self, s, a):
        """
        Reward function r(s,a;theta).
        We'll treat it as negative costs:
          - If we continue (a=0), cost = cost_usage * s
          - If we replace (a=1), cost = cost_replace
        Return negative cost as "reward".
        """
        if a == 0:
            return - self.cost_usage * s
        else:
            return - self.cost_replace

    def transition_probs(self, s, a):
        """
        Transition probabilities P(s'|s,a).
        For simplicity:
          - If a=0 (Continue), then:
                s' = min(s+1, S-1) with prob=1
          - If a=1 (Replace), then:
                s' = 0 with prob=1
        """
        next_state = np.zeros(self.S)
        if a == 0:
            next_s = min(s+1, self.S-1)
            next_state[next_s] = 1.0
        else:
            # replace -> go to state=0
            next_state[0] = 1.0
        #ipdb.set_trace()
        return next_state

##############################
# 2. Solve the Dynamic Program
##############################

def solve_value_function(mdp, tol=1e-8, max_iter=10000):
    """
    Value iteration to solve for V(s).
    Returns: V[s] and choice-specific value functions Q[s,a].
    """
    S = mdp.S
    beta = mdp.beta

    # Initialize value function
    V = np.zeros(S)

    for it in range(max_iter):
        V_old = V.copy()

        # Compute choice-specific values Q(s,a)
        Q = np.zeros((S, 2))
        for s in range(S):
            for a in [0, 1]:
                r_sa = mdp.rewards(s, a)
                P_sa = mdp.transition_probs(s, a)
                Q[s, a] = r_sa + beta * np.sum(P_sa * V_old)

        # Update the value function with max over actions
        V = np.max(Q, axis=1)

        # Check convergence
        if np.max(np.abs(V - V_old)) < tol:
            break

    return V, Q


##############################
# 3. Choice Probabilities
##############################

def choice_probabilities(Q, mu=1.0):
    """
    Suppose the agent chooses actions with logit probabilities:
      P(a|s) = exp( Q[s,a]/mu ) / sum_{a'} exp( Q[s,a']/mu ).
    mu is the "scale" of the Type I Extreme Value error.
    Return a matrix of shape (S, 2), where row s is [P(a=0|s), P(a=1|s)].
    """
    # Q is shape (S, 2)
    exp_Q = np.exp(Q / mu)
    denom = np.sum(exp_Q, axis=1, keepdims=True)
    P = exp_Q / denom
    return P

##############################
# 4. Log-Likelihood Function
##############################

def log_likelihood(theta, data, S, beta=0.95, mu=1.0):
    """
    theta: array of parameters [cost_replace, cost_usage].
    data:  list of (s, a) observations from actual decisions.
    Returns negative log-likelihood (for minimization).

    The function:
        1) Construct the MDP with given theta.
        2) Solve for the value function Q(s,a).
        3) Compute P(a|s).
        4) Evaluate the log-likelihood of observed data under that policy.
    """
    cost_replace, cost_usage = theta
    # Bound the parameters to avoid negative or meaningless values, if needed
    if cost_replace <= 0 or cost_usage < 0:
        return 1e8  # penalize invalid parameters

    # Construct the MDP with these parameter values
    mdp = ReplacementMDP(S=S, beta=beta, cost_replace=cost_replace, cost_usage=cost_usage)

    # Solve for the value function and Q
    _, Q = solve_value_function(mdp)

    # Compute choice probabilities
    P = choice_probabilities(Q, mu=mu)

    # Evaluate likelihood
    ll = 0.0
    for (s_obs, a_obs) in data:
        ll += np.log(P[s_obs, a_obs] + 1e-12)  # +1e-12 to avoid log(0)

    return -ll  # Return negative log-likelihood for minimization

##############################
# 5. Example "Estimation"
##############################

def simulate_data(mdp, n=1000, seed=42, mu=1.0):
    """
    Generate synthetic data from the MDP under logit choice.
    We assume each period the agent is in some state s,
    chooses a with probability P(a|s), then moves to next state.
    """
    np.random.seed(seed)

    # Solve for V, Q
    _, Q = solve_value_function(mdp)
    # Compute choice probabilities
    P = choice_probabilities(Q, mu=mu)

    # Simulate states and actions
    data = []
    s = 0  # start from state 0 for simplicity
    for t in range(n):
        # draw action from P(a|s)
        a = np.random.choice([0,1], p=P[s, :])
        data.append((s, a))

        # transition
        trans = mdp.transition_probs(s, a)
        s_next = np.random.choice(mdp.S, p=trans)
        s = s_next

    return data

def estimate_parameters(data, S, beta, mu=1.0):
    """
    Estimate parameters using SciPy's minimize to maximize likelihood.
    We'll do a simple unconstrained search for [cost_replace, cost_usage].
    """
    # Objective function
    def objective(theta):
        return log_likelihood(theta, data, S, beta=beta, mu=mu)

    # Initial guess
    theta0 = np.array([5.0, 1.0])  # e.g., [cost_replace=5, cost_usage=1]

    # We can set bounds if needed, for example:
    bnds = [(1e-3, None), (0, None)]  # cost_replace>0, cost_usage>=0

    result = minimize(objective, theta0, method='L-BFGS-B', bounds=bnds)
    return result

if __name__ == "__main__":
    # True parameters for data generation
    true_cost_replace = 30.0
    true_cost_usage = 2.0
    true_beta = 0.95
    true_S = 10 # max state
    mu = 1  # scale parameter for logit errors (not estimated in this example)

    # Construct MDP with true parameters
    true_mdp = ReplacementMDP(S=true_S, beta=true_beta,
                              cost_replace=true_cost_replace,
                              cost_usage=true_cost_usage)

    # Simulate data
    data = simulate_data(true_mdp, n=2000, seed=123, mu=mu)

    # Estimate parameters
    est_result = estimate_parameters(data, S=true_S, beta=true_beta, mu=mu)
    print("Estimation Results:")
    print("  Success:", est_result.success)
    print("  Estimated parameters:", est_result.x)
    print("  Negative log-likelihood:", est_result.fun)
    print("  Message:", est_result.message)


Estimation Results:
  Success: True
  Estimated parameters: [26.2685504   1.72883523]
  Negative log-likelihood: 415.2625654922745
  Message: CONVERGENCE: REL_REDUCTION_OF_F_<=_FACTR*EPSMCH
