In [17]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import random
from typing import Dict, Tuple, List

class KnapsackEnv(gym.Env):
    """
    A Gym environment for solving 0-1 Knapsack Problem instances using Reinforcement Learning.
    This version handles one problem instance at a time.
    """
    metadata = {'render.modes': ['human']}

    def __init__(self, problem_instance: Dict):
        """
        Initialize the Knapsack environment with a single problem instance.

        Args:
            problem_instance (Dict): A dictionary with:
                - 'values': List[float] of item values
                - 'weights': List[float] of item weights
                - 'capacity': float indicating the knapsack capacity
        """
        super(KnapsackEnv, self).__init__()

        # Store the current problem instance
        self.problem_instance = problem_instance

        # Precompute the maximum number of items based on the instance
        self.n_items = len(problem_instance['values'])

        # Define the action space: select an item index in [0, n_items-1]
        self.action_space = spaces.Discrete(self.n_items)

        # Define the observation space (2*N + 4 features):
        # 1. Number of items (1)
        # 2. Normalized values for all items (N)
        # 3. Normalized weights for all items (N)
        # 4. Capacity (1)
        # 5. Total value of all items (1)
        # 6. Total weight of all items (1)
        # Total = 2*N + 4
        self.observation_space = spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(2 * self.n_items + 4,),
            dtype=np.float32
        )

        # Internal state variables (initialized in reset)
        self.remaining_capacity = 0.0
        self.selected_items = []
        self.available_items = []
        self.current_value = 0.0
        self.current_weight = 0.0
        self.done = False

        # Keep track of the best value found so far (can be used for logging or analysis)
        self.best_value = 0.0

        # Initialize environment for the given problem instance
        self.reset()

    def set_problem_instance(self, new_instance: Dict) -> None:
        """
        Replace the current problem instance with a new one.
        You should call `env.reset()` afterwards to start fresh.

        Args:
            new_instance (Dict): Dictionary with the same keys as before:
                - 'values': List[float]
                - 'weights': List[float]
                - 'capacity': float
        """
        self.problem_instance = new_instance
        self.n_items = len(new_instance['values'])

        # Update the action and observation spaces if necessary
        self.action_space = spaces.Discrete(self.n_items)
        self.observation_space = spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(2 * self.n_items + 4,),
            dtype=np.float32
        )

    def reset(self) -> np.ndarray:
        """
        Reset the environment state to start a new episode with the current problem instance.

        Returns:
            state (np.ndarray): The initial observation (2*N + 4 features).
        """
        # Extract the current problem instance
        values = self.problem_instance['values']
        weights = self.problem_instance['weights']
        capacity = self.problem_instance['capacity']

        # Reset internal state
        self.remaining_capacity = capacity
        self.selected_items = []
        self.available_items = list(range(self.n_items))
        self.current_value = 0.0
        self.current_weight = 0.0
        self.done = False

        # Normalize values and weights (as in the referenced Equations (4) and (5))
        self.normalized_values = self._normalize_values(values, weights, capacity)
        self.normalized_weights = self._normalize_weights(weights, capacity)

        # Compute total value and total weight
        self.total_value = sum(values)
        self.total_weight = sum(weights)

        # Return the initial state representation
        return self._get_state()

    def _normalize_values(self, values: List[float], weights: List[float], capacity: float) -> List[float]:
        """
        Normalize values according to Equation (4): vr_i = v_i / (w_i * W_P).
        If w_i = 0, handle division by zero carefully (you might define a custom rule).
        """
        normalized = []
        for v, w in zip(values, weights):
            if w == 0:
                # If weight is zero, define a fallback (could be 0 or a large number)
                normalized.append(0.0)
            else:
                normalized.append(v / (w * capacity))
        return normalized

    def _normalize_weights(self, weights: List[float], capacity: float) -> List[float]:
        """
        Normalize weights according to Equation (5): wr_i = w_i / W_P.
        """
        return [w / capacity for w in weights]

    def _get_state(self) -> np.ndarray:
        """
        Construct the state vector with (2*N + 4) features.
        1. Number of items (always total number, N)
        2. Normalized values for items (N features; 0 for removed items)
        3. Normalized weights for items (N features; 0 for removed items)
        4. Capacity (1)
        5. Total value (1)
        6. Total weight (1)
        """
        state = np.zeros(2 * self.n_items + 4, dtype=np.float32)

        # 1) Number of items (total items, not just available)
        state[0] = self.n_items

        # 2) Normalized values: if the item is not available, set to 0.
        for i in range(self.n_items):
            if i in self.available_items:
                state[1 + i] = self.normalized_values[i]
            else:
                state[1 + i] = 0.0

        # 3) Normalized weights: if the item is not available, set to 0.
        for i in range(self.n_items):
            if i in self.available_items:
                state[1 + self.n_items + i] = self.normalized_weights[i]
            else:
                state[1 + self.n_items + i] = 0.0

        # 4) Capacity
        state[1 + 2 * self.n_items] = self.problem_instance['capacity']
        # 5) Total value of all items
        state[2 + 2 * self.n_items] = self.total_value
        # 6) Total weight of all items
        state[3 + 2 * self.n_items] = self.total_weight

        return state


    def step(self, action: int) -> Tuple[np.ndarray, float, bool, dict]:
        """
        Take an action (select an item index) and return:
        (new_state, reward, done, info)

        If the item cannot fit in the knapsack, it is removed from the available items so that
        its corresponding features become 0 in the state vector.
        """
        values = self.problem_instance['values']
        weights = self.problem_instance['weights']
        capacity = self.problem_instance['capacity']
        reward = 0.0

        # Check if action is out of bounds.
        if action < 0 or action >= self.n_items:
            reward = -capacity
            self.done = True

        # Check if the item is already removed (not available)
        elif action not in self.available_items:
            reward = -self.normalized_weights[action]

        else:
            # Attempt to select the item.
            item_value = values[action]
            item_weight = weights[action]

            if item_weight <= self.remaining_capacity:
                # Item fits in the knapsack: add it.
                self.current_value += item_value
                self.current_weight += item_weight
                self.remaining_capacity -= item_weight

                self.selected_items.append(action)
                self.available_items.remove(action)
                reward = self.normalized_values[action]
            else:
                # Item does not fit: apply negative reward and remove it from state.
                reward = -self.normalized_weights[action]
                self.available_items.remove(action)

        # Check if the episode should end.
        if len(self.available_items) == 0 or self.remaining_capacity == 0:
            self.done = True
            if self.current_value > self.best_value:
                self.best_value = self.current_value

        # Construct the new state.
        new_state = self._get_state()

        # Prepare the info dictionary.
        info = {
            'current_value': self.current_value,
            'current_weight': self.current_weight,
            'remaining_capacity': self.remaining_capacity,
            'selected_items': self.selected_items,
            'best_value': self.best_value
        }

        return new_state, reward, self.done, info


    def render(self, mode='human'):
        """
        Print out the environment's current state.
        """
        if mode == 'human':
            print("Knapsack Environment")
            print(f"Capacity: {self.problem_instance['capacity']}, Remaining: {self.remaining_capacity}")
            print(f"Selected items: {self.selected_items}")
            print(f"Current value: {self.current_value}, Current weight: {self.current_weight}")
            print(f"Available items: {self.available_items}")
            print(f"Best value so far: {self.best_value}")
            print(f"Done: {self.done}")
            print("-" * 50)


In [None]:
def run_episode(env:KnapsackEnv, policy=None, render=False, max_ite = None):
    """
    Run a single episode in a Gym environment.

    Args:
        env: An instance of a Gym environment.
        policy: Optional function that takes the current state and returns an action.
                If None, actions are chosen randomly.
        render: Boolean flag to indicate whether to render the environment at each step.

    Returns:
        total_reward (float): The total accumulated reward from the episode.
        episode_info (dict): The final info dictionary returned by the environment.
    """
    state = env.reset()
    total_reward = 0.0
    done = False
    ite = 0
    while not done:
        if render:
            env.render()
        # Choose an action: use the policy if provided, otherwise sample randomly.
        action = policy(state) if policy is not None else env.action_space.sample()

        state, reward, done, info = env.step(action)
        total_reward += reward
        ite += 1
        if max_ite is not None and ite < max_ite:
            break

    if render:
        env.render()

    return total_reward, info


# Example usage:
if __name__ == "__main__":
    # Define a simple problem instance
    problem_instance = {
        'values': [60, 100, 120],
        'weights': [10, 20, 40],
        'capacity': 50
    }

    # Create the Knapsack environment (assuming KnapsackEnv is already defined)
    env = KnapsackEnv(problem_instance)

    # Run an episode using a random policy (since no policy is provided)
    total_reward, final_info = run_episode(env, render=True)
    print("Episode finished with total reward:", total_reward)
    print("Final info:", final_info)


Knapsack Environment
Capacity: 50, Remaining: 50
Selected items: []
Current value: 0.0, Current weight: 0.0
Available items: [0, 1, 2]
Best value so far: 0.0
Done: False
--------------------------------------------------
Knapsack Environment
Capacity: 50, Remaining: 40
Selected items: [np.int64(0)]
Current value: 60.0, Current weight: 10.0
Available items: [1, 2]
Best value so far: 0.0
Done: False
--------------------------------------------------
Knapsack Environment
Capacity: 50, Remaining: 0
Selected items: [np.int64(0), np.int64(2)]
Current value: 180.0, Current weight: 50.0
Available items: [1]
Best value so far: 180.0
Done: True
--------------------------------------------------
Episode finished with total reward: 0.18
Final info: {'current_value': 180.0, 'current_weight': 50.0, 'remaining_capacity': 0, 'selected_items': [np.int64(0), np.int64(2)], 'best_value': 180.0}
