CORRECT WASTED UNIT UPDATE


In [None]:
class PlateletInventory:
    def __init__(self):
        """
        Initialize inventory dictionary with:
        - Keys as platelet ages (1 to 5)
        - Values as the quantity of platelets at each age
        """
        self.inventory = {i: 0 for i in range(1, 6)}
        self.wasted_units = 0
        self.total_units = 0

    def update_inventory(self, incoming_stock):
        """
        Update inventory for a new day:
        1. Age existing stock by one day
        2. Add new incoming stock
        3. Calculate waste (platelets exceeding shelf life of 5 days)
        """
        # Store waste from platelets exceeding shelf life
        self.wasted_units = self.inventory[5]

        # Age existing stock by shifting each age group to the next
        for age in range(5, 1, -1):  # Start from the oldest (5 -> 4 -> 3 -> ...)
            self.inventory[age] = self.inventory[age - 1]

        # Add incoming stock as new stock with age 1
        self.inventory[1] = incoming_stock

        # Clear inventory at age 5 as they are now waste
        self.inventory[5] = 0

        # Calculate total units in inventory
        self.total_units = sum(self.inventory.values())

        # Return updated status
        return {
            'current_inventory': self.inventory.copy(),
            'wasted_units': self.wasted_units,
            'total_units': self.total_units
        }

    def get_inventory_status(self):
        """
        Return the current status of the inventory, including:
        - Inventory by age
        - Total units in inventory
        - Total wasted units
        """
        return {
            'inventory_by_age': self.inventory.copy(),
            'total_units': self.total_units,
            'wasted_units': self.wasted_units
        }


age dictionary with demand and supply

In [1]:
import numpy as np

class PlateletInventory:
    def __init__(self):
        """
        Initialize inventory dictionary with ages 1 to 5 and associated quantities.
        Also tracks wasted units, total units in inventory, and unfulfilled demand.
        """
        self.inventory = {i: 0 for i in range(1, 6)}
        self.wasted_units = 0
        self.total_units = 0
        self.unfulfilled_demand = 0

    def fulfill_demand(self, demand):
        """
        Fulfill demand starting from the oldest stock.
        Args:
            demand (int): The demand for the day.
        Returns:
            fulfilled (int): The number of units fulfilled.
        """
        remaining_demand = demand
        fulfilled = 0

        # Sort inventory by age to ensure oldest units are used first
        sorted_ages = sorted(self.inventory.keys(), reverse=True)

        for age in sorted_ages:
            if remaining_demand <= 0:
                break

            units_available = self.inventory[age]
            units_used = min(units_available, remaining_demand)

            self.inventory[age] -= units_used
            remaining_demand -= units_used
            fulfilled += units_used

        self.unfulfilled_demand = remaining_demand
        return fulfilled

    def update_inventory(self, incoming_stock, demand):
        """
        Update inventory by fulfilling demand, aging stock, adding new stock,
        and calculating waste.
        Args:
            incoming_stock (int): Number of new units received.
            demand (int): Demand for the day.
        Returns:
            dict: Updated status including inventory, waste, and demand details.
        """
        # Fulfill demand
        demand_fulfilled = self.fulfill_demand(demand)

        # Store waste from age-5 platelets before aging
        self.wasted_units = self.inventory[5]

        # Age existing stock
        for age in range(5, 1, -1):
            self.inventory[age] = self.inventory[age - 1]

        # Add new stock to age 1
        self.inventory[1] = incoming_stock

        # Update total units
        self.total_units = sum(self.inventory.values())

        return {
            'current_inventory': self.inventory.copy(),
            'wasted_units': self.wasted_units,
            'total_units': self.total_units,
            'demand_fulfilled': demand_fulfilled,
            'unfulfilled_demand': self.unfulfilled_demand
        }

def simulate_with_demand(days=10, incoming_min=5, incoming_max=25):
    """
    Simulate platelet inventory over a number of days with random incoming stock and demand.
    Args:
        days (int): Number of days to simulate.
        incoming_min (int): Minimum daily incoming stock.
        incoming_max (int): Maximum daily incoming stock.
    Returns:
        list: Simulation results for each day.
    """
    inventory = PlateletInventory()
    results = []

    # Using Half-Normal distribution for demand
    for day in range(1, days + 1):
        incoming = np.random.randint(incoming_min, incoming_max + 1)
        demand = int(np.abs(np.random.normal(loc=30, scale=15)))  # mean=30, sd=15
        status = inventory.update_inventory(incoming, demand)

        results.append({
            'Day': day,
            'Incoming': incoming,
            'Demand': demand,
            'Fulfilled': status['demand_fulfilled'],
            'Unfulfilled': status['unfulfilled_demand'],
            'Wasted': status['wasted_units'],
            'Total': status['total_units'],
            'Inventory': status['current_inventory']
        })
    return results

# Run simulation with sample values
np.random.seed(42)
results = simulate_with_demand(days=5)

# Print results in requested format
for day in results:
    print(f"Day {day['Day']}: Incoming: {day['Incoming']}, Demand: {day['Demand']}")
    print(f"Inventory by age: {day['Inventory']}")
    print(f"Fulfilled: {day['Fulfilled']}, Unfulfilled: {day['Unfulfilled']}")
    print(f"Wasted: {day['Wasted']}, Total: {day['Total']}\n")


Day 1: Incoming: 11, Demand: 21
Inventory by age: {1: 11, 2: 0, 3: 0, 4: 0, 5: 0}
Fulfilled: 0, Unfulfilled: 21
Wasted: 0, Total: 11

Day 2: Incoming: 12, Demand: 37
Inventory by age: {1: 12, 2: 0, 3: 0, 4: 0, 5: 0}
Fulfilled: 11, Unfulfilled: 26
Wasted: 0, Total: 12

Day 3: Incoming: 25, Demand: 26
Inventory by age: {1: 25, 2: 0, 3: 0, 4: 0, 5: 0}
Fulfilled: 12, Unfulfilled: 14
Wasted: 0, Total: 25

Day 4: Incoming: 15, Demand: 26
Inventory by age: {1: 15, 2: 0, 3: 0, 4: 0, 5: 0}
Fulfilled: 25, Unfulfilled: 1
Wasted: 0, Total: 15

Day 5: Incoming: 15, Demand: 34
Inventory by age: {1: 15, 2: 0, 3: 0, 4: 0, 5: 0}
Fulfilled: 15, Unfulfilled: 19
Wasted: 0, Total: 15



sq policy

In [5]:
import numpy as np
import pandas as pd
from scipy.optimize import dual_annealing, differential_evolution
from datetime import datetime

class SQPolicyOptimization:
    def __init__(self, fixed_cost=150, variable_cost=5, holding_cost=5,
                 wastage_cost=50, shortage_cost=100):
        self.fixed_cost = fixed_cost
        self.variable_cost = variable_cost
        self.holding_cost = holding_cost
        self.wastage_cost = wastage_cost
        self.shortage_cost = shortage_cost

    def simulate_episode(self, s, Q, demands):
        inventory = PlateletInventory()
        metrics = {
            'demands': [], 'unfulfilled_demands': [], 'fulfilled_demands': [],
            'inventory_levels': [], 'orders': [], 'total_costs': [],
            'wasted_units': [], 'fixed_costs': [], 'variable_costs': [],
            'holding_costs': [], 'wastage_costs': [], 'shortage_costs': []
        }

        for demand in demands:
            # Check if order needed
            total_inventory = sum(inventory.inventory.values())
            order = Q if total_inventory <= s else 0

            # Update inventory with current order and demand
            status = inventory.update_inventory(order, demand)

            # Calculate costs
            holding_cost = self.holding_cost * status['total_units']
            wastage_cost = self.wastage_cost * status['wasted_units']
            shortage_cost = self.shortage_cost * status['unfulfilled_demand']
            fixed_cost = self.fixed_cost if order > 0 else 0
            variable_cost = self.variable_cost * order
            total_cost = holding_cost + wastage_cost + shortage_cost + fixed_cost + variable_cost

            # Store metrics
            metrics['demands'].append(demand)
            metrics['unfulfilled_demands'].append(status['unfulfilled_demand'])
            metrics['fulfilled_demands'].append(status['demand_fulfilled'])
            metrics['inventory_levels'].append(status['total_units'])
            metrics['orders'].append(order)
            metrics['total_costs'].append(total_cost)
            metrics['wasted_units'].append(status['wasted_units'])
            metrics['fixed_costs'].append(fixed_cost)
            metrics['variable_costs'].append(variable_cost)
            metrics['holding_costs'].append(holding_cost)
            metrics['wastage_costs'].append(wastage_cost)
            metrics['shortage_costs'].append(shortage_cost)

        return metrics

    def objective_function(self, params, demands_list):
        s, Q = params
        total_cost = 0

        for demands in demands_list:
            metrics = self.simulate_episode(s, Q, demands)
            total_cost += sum(metrics['total_costs'])

        return total_cost / len(demands_list)

    def optimize(self, training_demands, method='dual_annealing'):
        bounds = [(0, 200), (0, 300)]  # bounds for s and Q

        if method == 'dual_annealing':
            result = dual_annealing(self.objective_function, bounds=bounds, args=(training_demands,))
        else:
            result = differential_evolution(self.objective_function, bounds=bounds, args=(training_demands,))

        return result.x

    def calculate_performance_metrics(self, demands, s, Q):
        metrics = self.simulate_episode(s, Q, demands)

        # Calculate fill rate and service cycle
        fill_rate = sum(metrics['fulfilled_demands']) / sum(metrics['demands'])
        service_cycle = sum(1 for u in metrics['unfulfilled_demands'] if u == 0) / len(demands)

        metrics['fill_rates'] = [fill_rate] * len(demands)
        metrics['service_cycles'] = [service_cycle] * len(demands)

        return pd.DataFrame(metrics)

def generate_training_data(episodes=36, timesteps=30):
    return [np.abs(np.random.normal(loc=30, scale=15, size=timesteps)) for _ in range(episodes)]

def main():
    # Generate training data
    np.random.seed(42)
    training_demands = generate_training_data()

    # Initialize optimizer
    optimizer = SQPolicyOptimization()

    # Find optimal s and Q using both methods
    s_da, Q_da = optimizer.optimize(training_demands, method='dual_annealing')
    s_de, Q_de = optimizer.optimize(training_demands, method='differential_evolution')

    # Use the better solution
    s_opt, Q_opt = (s_da, Q_da) if optimizer.objective_function([s_da, Q_da], training_demands) < \
                                  optimizer.objective_function([s_de, Q_de], training_demands) else (s_de, Q_de)
    print(s_opt, Q_opt)
    # Analyze training data
    training_results = pd.DataFrame()
    for episode, demands in enumerate(training_demands, 1):
        episode_results = optimizer.calculate_performance_metrics(demands, s_opt, Q_opt)
        episode_results['episode'] = episode
        episode_results['timestep'] = range(1, len(demands) + 1)
        training_results = pd.concat([training_results, episode_results])

    # Save training results
    training_results.to_excel('sq_training_results.xlsx', index=False)

    # Analyze testing data
    testing_demands = generate_training_data(episodes=6, timesteps=30)
    testing_results = pd.DataFrame()
    for episode, demands in enumerate(testing_demands, 1):
        episode_results = optimizer.calculate_performance_metrics(demands, s_opt, Q_opt)
        episode_results['episode'] = episode
        episode_results['timestep'] = range(1, len(demands) + 1)
        training_results = pd.concat([testing_results, episode_results])

    # Save training results
    testing_results.to_excel('sq_testing_results.xlsx', index=False)
    print(f"Optimal s: {s_opt:.2f}, Optimal Q: {Q_opt:.2f}")

if __name__ == "__main__":
    main()

69.7846656378531 56.37013562559986
Optimal s: 69.78, Optimal Q: 56.37


rs policy

In [8]:
import numpy as np
from scipy.optimize import dual_annealing, differential_evolution
import pandas as pd
from tqdm import tqdm

class RSPolicyOptimizer:
    def __init__(self, platelet_inventory):
        self.inventory = platelet_inventory
        self.fixed_order_cost = 150  # Fixed cost per order
        self.variable_cost = 5      # Cost per unit
        self.holding_cost = 5       # Cost per unit per day
        self.shortage_cost = 100     # Cost per unit short
        self.wastage_cost = 50      # Cost per unit wasted

    def simulate_RS_policy(self, params, demands):
        R, S = params
        total_cost = 0
        inventory_levels = []
        orders = []
        wasted_units = []
        unfulfilled = []
        fulfilled = []

        # Reset inventory
        self.inventory = PlateletInventory()

        for demand in demands:
            # Check if order needed
            current_inventory = sum(self.inventory.inventory.values())
            order_quantity = max(0, S - current_inventory) if current_inventory <= R else 0

            # Update inventory with new order and demand
            status = self.inventory.update_inventory(order_quantity, demand)

            # Calculate costs
            ordering_cost = self.fixed_order_cost if order_quantity > 0 else 0
            variable_cost = order_quantity * self.variable_cost
            holding_cost = sum(self.inventory.inventory.values()) * self.holding_cost
            shortage_cost = status['unfulfilled_demand'] * self.shortage_cost
            wastage_cost = status['wasted_units'] * self.wastage_cost

            total_cost += ordering_cost + variable_cost + holding_cost + shortage_cost + wastage_cost

            # Store metrics
            inventory_levels.append(status['total_units'])
            orders.append(order_quantity)
            wasted_units.append(status['wasted_units'])
            unfulfilled.append(status['unfulfilled_demand'])
            fulfilled.append(status['demand_fulfilled'])

        return {
            'total_cost': total_cost,
            'inventory_levels': inventory_levels,
            'orders': orders,
            'wasted_units': wasted_units,
            'unfulfilled': unfulfilled,
            'fulfilled': fulfilled
        }

    def objective_function(self, params, demands):
        results = self.simulate_RS_policy(params, demands)
        return results['total_cost']

    def optimize(self, training_demands, method='dual_annealing'):
        bounds = [(0, 100), (0, 300)]  # Bounds for R and S

        if method == 'dual_annealing':
            result = dual_annealing(self.objective_function, bounds=bounds, args=(training_demands,))
        else:
            result = differential_evolution(self.objective_function, bounds=bounds, args=(training_demands,))

        return result.x

    def calculate_metrics(self, demands, R, S):
        results = self.simulate_RS_policy([R, S], demands)

        total_demand = sum(demands)
        fill_rate = sum(results['fulfilled']) / total_demand if total_demand > 0 else 0
        service_cycle = sum(1 for u in results['unfulfilled'] if u == 0) / len(demands)

        metrics = pd.DataFrame({
            'Timestep': range(1, len(demands) + 1),
            'Demand': demands,
            'RS_Inventory_Level': results['inventory_levels'],
            'RS_Orders': results['orders'],
            'RS_Wasted_Units': results['wasted_units'],
            'RS_Unfulfilled_Demand': results['unfulfilled'],
            'RS_Fulfilled_Demand': results['fulfilled']
        })

        metrics['RS_Fixed_Cost'] = metrics['RS_Orders'].apply(lambda x: self.fixed_order_cost if x > 0 else 0)
        metrics['RS_Variable_Cost'] = metrics['RS_Orders'] * self.variable_cost
        metrics['RS_Holding_Cost'] = metrics['RS_Inventory_Level'] * self.holding_cost
        metrics['RS_Wastage_Cost'] = metrics['RS_Wasted_Units'] * self.wastage_cost
        metrics['RS_Shortage_Cost'] = metrics['RS_Unfulfilled_Demand'] * self.shortage_cost
        metrics['RS_Total_Cost'] = (metrics['RS_Fixed_Cost'] + metrics['RS_Variable_Cost'] +
                                  metrics['RS_Holding_Cost'] + metrics['RS_Wastage_Cost'] +
                                  metrics['RS_Shortage_Cost'])
        metrics['RS_Fill_Rate'] = fill_rate
        metrics['RS_Service_Cycle'] = service_cycle

        return metrics


# Generate training data
np.random.seed(42)
num_episodes = 36
timesteps_per_episode = 30
training_demands = []

# This part was previously commented out. Uncomment it to generate the demands
for _ in range(num_episodes):
    episode_demands = np.abs(np.random.normal(loc=30, scale=15, size=timesteps_per_episode))
    training_demands.extend(episode_demands.astype(int))  # Convert demands to integers

# Optimize RS policy
optimizer = RSPolicyOptimizer(PlateletInventory())
optimal_R, optimal_S = optimizer.optimize(training_demands)

# Calculate and save training results
training_metrics = optimizer.calculate_metrics(training_demands, optimal_R, optimal_S)
training_metrics.to_excel('rs_training_results.xlsx', index=False)
print(f"Optimal R: {optimal_R:.2f}")
print(f"Optimal S: {optimal_S:.2f}")
# Load and process testing data
#testing_data = pd.read_excel('testing_data.xlsx')  # Assuming column name is 'Demand'
#testing_demands = testing_data['Demand'].values

# Calculate and save testing results
'''testing_metrics = optimizer.calculate_metrics(testing_demands, optimal_R, optimal_S)
testing_metrics.to_excel('rs_testing_results.xlsx', index=False)'''



Optimal R: 73.00
Optimal S: 106.00


"testing_metrics = optimizer.calculate_metrics(testing_demands, optimal_R, optimal_S)\ntesting_metrics.to_excel('rs_testing_results.xlsx', index=False)"