# Solving Inventory control with Exact MDP 

## Introduction

The inventory control problem is a fundamental challenge in supply chain management and operations research. It revolves around the optimization of inventory levels to strike a balance between minimizing costs, such as holding excess inventory, and meeting customer demand without incurring excessive shortages. By formulating the problem as a Markov Decision Process (MDP), various reinforcement learning techniques, such as Q-learning and value iteration, can be employed to derive optimal policies that guide decision-making in restocking and order placement, ultimately enhancing the efficiency and profitability of inventory management systems.

In the context of inventory control, the state transition probabilities for a Markov Decision Process (MDP) describe the probabilities of moving from one inventory level to another based on the actions taken (order quantities) and the random demand. 


Let's define the following variables:

- $S_t$ :  Current inventory level at time $t$  
- $S_{t+1}$  :  Next inventory level at time   $t+1$ 
- $A_t$  :  Action (order quantity) taken at time   $t$  
- $D_t$  :  Random demand at time   $t$  
- $C$ :  Capacity of the inventory (maximum inventory level) 
- $H$: is the holding cost per unit of inventory 
- $L$: is the shortage cost per unit of demand 



The state transition probability formula for the inventory control problem can be represented as:

$$
P(S_{t+1} | S_t, A_t, D_t) = \begin{cases}
                            1, & \text{if } S_{t+1} = \max(0, \min(C, S_t + A_t - D_t)) \\
                            0, & \text{otherwise}
\end{cases}
$$

This formula indicates that the transition probability is 1 if the next state $(S_{t+1})$ is the result of applying the action $(A_t)$ and accounting for the random demand (D_t), while ensuring that the inventory level does not exceed the capacity $(C)$. Otherwise, the transition probability is 0.

This formula assumes deterministic transitions. However, it can be adjusted to handle problems that involve uncertain transitions or probabilistic demand.

 

The reward $(R)$ for transitioning from state $(S_t)$ to $(S_{t+1})$ based on action $(A_t)$ and demand $(D_t)$ can be formulated as follows:

$$ R(S_t, A_t, D_t) =  - \big[ H \cdot \max(0, S_t - D_t)  +   L \cdot \max(0, D_t - S_t) \big] $$


In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%matplotlib inline

In [3]:
import sys

In [4]:
sys.path.append('../')

In [5]:
import gym

In [56]:
import random
import numpy as np
import pandas as pd
import attr

In [72]:
from mdp.ValueIteration import ValueIteration


In [73]:
from mdp.utils.env import make_env

In [74]:
import matplotlib.pyplot as plt 

## Inventory control environment setup 

In [68]:


@attr.s(auto_attribs=True)
class InventoryControlEnv(gym.Env):
    capacity: int = 20
    init_inventory: int = 10
    holding_cost: int = 1
    shortage_cost: int = 10
    max_demand: int = 5
    episode_length : int = 500
    def __attrs_post_init__(self):
        self.action_space = gym.spaces.Discrete(self.capacity + 1)  # Actions represent orders
        self.observation_space = gym.spaces.Discrete(self.capacity + 1)  # States represent inventory levels
        self.nS = self.capacity + 1
        self.P = {}
        self._compute_transition_probabilities()

        self.state = self.init_inventory
        self.current_step = 0

    def step(self, action: int) -> tuple:
        """
        Perform one step in the environment.

        Args:
            action (int): The action representing the order quantity.

        Returns:
            tuple: Tuple containing next state, reward, done flag, and additional info.
        """
        assert self.action_space.contains(action)

        demand = np.random.randint(0, self.max_demand + 1)
        reward = -self.holding_cost * max(0, self.state - demand) - self.shortage_cost * max(0, demand - self.state)

        next_state = max(0, min(self.capacity, self.state + action - demand))
        self.state = next_state
        self.current_step += 1

        done = self.current_step >= self.episode_length

        return next_state, reward, done, {}

    def reset(self) -> int:
        """
        Reset the environment to the initial state.

        Returns:
            int: The initial state (inventory level).
        """
        self.state = self.init_inventory
        self.current_step = 0
        return self.state

    def _compute_transition_probabilities(self):
        """
        Compute transition probabilities for the environment.
        """
        for s in range(self.capacity + 1):
            self.P[s] = {}
            for a in range(self.capacity + 1):
                self.P[s][a] = []
                for ns in range(self.capacity + 1):
                    next_state = max(0, min(self.capacity, s + a - self.max_demand))
                    reward = -self.holding_cost * max(0, next_state - self.max_demand) - self.shortage_cost * max(0, self.max_demand - next_state)
                    if ns == next_state:
                        self.P[s][a].append((1.0, next_state, reward, False))
                    else:
                        self.P[s][a].append((0.0, next_state, reward, False))


In [69]:
inventory_env = InventoryControlEnv( capacity=20, 
                                    init_inventory=10, 
                                    holding_cost=1, 
                                    shortage_cost=10, 
                                    max_demand=5)

In [70]:
print("Number of discrete states:", inventory_env.observation_space.n)

Number of discrete states: 21


## Solving the inventory control problem 

### Value Iteration 

#### Synchronous

In [42]:
state_sweep_value_iteration = ValueIteration(env=inventory_env, 
                                 gamma = 0.9,
                                 threshold = 1e-6,
                                 max_iterations = 10000,
                                 inplace=False)
state_sweep_value_iteration.fit()

Value-iteration (state sweep) converged at iteration #3.


In [43]:
state_sweep_optimal_state_values = state_sweep_value_iteration.optimal_state_values


In [44]:
state_sweep_optimal_state_values

array([  0. ,   0. ,   0. ,   0. ,   0. ,   0. ,   0. ,   0. ,   0. ,
         0. ,   0. ,  -1. ,  -2. ,  -3. ,  -4. ,  -5. ,  -6.9,  -8.8,
       -10.7, -12.6, -14.5])

In [45]:
state_sweep_optimal_policy = state_sweep_value_iteration.optimal_policy

In [46]:
state_sweep_optimal_policy

array([10,  9,  8,  7,  6,  5,  4,  3,  2,  1,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0])

In [47]:
state_sweep_optimal_policy

array([10,  9,  8,  7,  6,  5,  4,  3,  2,  1,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0])

#### Asynchronous 

In [48]:

inplace_value_iteration = ValueIteration(env=inventory_env, 
                                 gamma = 0.9,
                                 threshold = 1e-6,
                                 max_iterations = 10000,
                                 inplace=True)
inplace_value_iteration.fit()


Value-iteration (in-place) converged at iteration #2.


In [49]:
inplace_value_iteration.optimal_policy

array([10,  9,  8,  7,  6,  5,  4,  3,  2,  1,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0])

#### Asycnhronous

In [None]:
policy_iterator = PolicyIterator(env=inventory_env, synchronous_evaluation=True)
asynch_final_policy = policy_iterator.policy_iteration()

In [None]:
asynch_final_policy

### Q-Learning

In [62]:
@attr.s(auto_attribs=True)
class QLearningAgent:
    env: gym.Env
    learning_rate: float = 0.01
    discount_factor: float = 0.99
    exploration_prob: float = 0.5
    q_table: np.ndarray = attr.ib(init=False)

    def __attrs_post_init__(self):
        self.q_table = np.zeros((self.env.observation_space.n, self.env.action_space.n))

    def choose_action(self, state: int) -> int:
        """Choose an action based on exploration or exploitation.

        Args:
            state (int): Current state.

        Returns:
            int: Chosen action.
        """
        if np.random.uniform(0, 1) < self.exploration_prob:
            return self.env.action_space.sample()  # Explore randomly
        else:
            return np.argmax(self.q_table[state, :])  # Exploit learned Q-values

    def update_q_table(self, state: int, action: int, reward: float, next_state: int) -> None:
        """Update Q-values based on the Bellman equation.

        Args:
            state (int): Current state.
            action (int): Chosen action.
            reward (float): Reward received.
            next_state (int): Next state.
        """
        best_next_action = np.argmax(self.q_table[next_state, :])
        self.q_table[state, action] += self.learning_rate * (
            reward + self.discount_factor * self.q_table[next_state, best_next_action] - self.q_table[state, action]
        )

    def train(self, num_episodes: int) -> None:
        """Train the Q-learning agent.

        Args:
            num_episodes (int): Number of episodes for training.
        """
        for episode in range(num_episodes):
            state = self.env.reset()
            done = False

            while not done:
                action = self.choose_action(state)
                next_state, reward, done, _ = self.env.step(action)
                self.update_q_table(state, action, reward, next_state)
                state = next_state

    def get_optimal_policy(self) -> np.ndarray:
        """Get the optimal policy based on learned Q-values.

        Returns:
            np.ndarray: Optimal policy.
        """
        return np.argmax(self.q_table, axis=1)



In [63]:
agent = QLearningAgent(env=inventory_env,learning_rate  = 0.01,discount_factor  = 0.99,exploration_prob  = 0.5)

In [64]:

agent.train(num_episodes=100000)
optimal_policy = agent.get_optimal_policy()
print("Optimal Policy:", optimal_policy)



Optimal Policy: [9 6 5 6 5 3 2 1 1 0 0 0 0 0 0 0 0 0 0 0 0]


In [75]:
def get_optimal_policy_description(optimal_policy):
    action_descriptions = ["Order {}".format(i) for i in range(len(optimal_policy))]
    return [action_descriptions[action] for action in optimal_policy]
optimal_policy_descriptions = get_optimal_policy_description(optimal_policy)
print("Optimal Policy Descriptions:", optimal_policy_descriptions)


Optimal Policy Descriptions: ['Order 9', 'Order 6', 'Order 5', 'Order 6', 'Order 5', 'Order 3', 'Order 2', 'Order 1', 'Order 1', 'Order 0', 'Order 0', 'Order 0', 'Order 0', 'Order 0', 'Order 0', 'Order 0', 'Order 0', 'Order 0', 'Order 0', 'Order 0', 'Order 0']
