In [2]:
import gym
from gym import spaces

class BoxPackingEnv(gym.Env):
  """
  Custom Gym environment for the box packing problem.
  """
  def __init__(self, predefined_boxes, max_item_rotations):
    self.predefined_boxes = predefined_boxes  # List of dictionaries (dimensions and capacity)
    self.max_item_rotations = max_item_rotations  # Max allowed rotations per item
    self.reset()

  def reset(self):
    # Reset the environment for a new order
    self.remaining_items = []  # List of dictionaries (item dimensions and quantity)
    # ... (logic to populate remaining_items based on your data generation)
    self.available_boxes = list(self.predefined_boxes)  # Copy of predefined boxes
    self.wasted_space = 0  # Initialize wasted space
    self.current_item_idx = 0  # Index of the item being placed
    self.current_item_rotations = 0  # Number of rotations performed on current item
    return self._get_observation()

  def step(self, action):
    # Take an action and return next state, reward, done, and info
    reward = 0
    done = False
    info = {}

    if action == 0:  # Skip to next item (penalty)
      reward -= 1.0
      self.current_item_idx += 1
      self.current_item_rotations = 0
    else:
      # Validate and process box selection action
      box_idx = action - 1  # Subtract 1 to account for skip action
      if self._box_fits_item(box_idx):
        self._place_item_in_box(box_idx)
        reward += 1.0  # Reward for placing item
        reward += self._calculate_wasted_space_reward(box_idx)  # Reward for wasted space
        self.current_item_idx += 1
        self.current_item_rotations = 0
      else:
        reward -= 0.5  # Penalty for trying an unfit box

    # Check if all items packed or no more options
    done = self.current_item_idx >= len(self.remaining_items) or self._all_boxes_full()

    # Check for exceeding box capacity (large penalty)
    if self._any_box_overfilled():
      done = True
      reward -= 10.0  # Large penalty for exceeding capacity

    return self._get_observation(), reward, done, info

  def _get_observation(self):
    # Convert state information to a suitable representation for the RL agent (refer to Response 3)
    state_rep = self.state.get_state_representation()
    return state_rep

  def _box_fits_item(self, box_idx):
    # Implement logic to check if the current item fits in the chosen box after rotations
    # Consider all possible rotations (up to max_item_rotations) of the item
    for _ in range(self.max_item_rotations + 1):
      # ... (logic to check if item fits in the box with current rotations)
      if item_fits_in_box:
        return True
    return False

  def _place_item_in_box(self, box_idx):
    # Update state information after placing the item in the chosen box
    # Update box capacity and wasted space
    self.wasted_space += self._calculate_wasted_space_in_box(box_idx)
    # ... (logic to update box capacity and remaining items)

  def _calculate_wasted_space_reward(self, box_idx):
    # Calculate reward based on remaining volume in the chosen box
    # ... (logic to calculate wasted space reward based on box capacity and remaining volume)
    return wasted_space_reward

  def _any_box_overfilled(self):
    # Check if any box has exceeded its capacity
    for box in self.available_boxes:
      if box["remaining_capacity"] < 0:
        return True
    return False

  def _all_boxes_full(self):
    # Check if all available boxes are full
    for box in self.available_boxes:
      if box["remaining_capacity"] > 0:
        return False


In [5]:
class State:
  def __init__(self, remaining_items, available_boxes):
    self.remaining_items = remaining_items  # List of dictionaries (item dimensions and quantity)
    self.available_boxes = available_boxes  # List of dictionaries (box dimensions and capacity remaining)
    self.wasted_space = 0  # Placeholder, needs calculation based on specific use case

  def get_state_representation(self):
    # This function converts the state into a suitable representation for the RL agent
    # Refer to Response 3 for a breakdown of this function
    pass

  def get_total_volume(self):
    # Calculate the total volume of all remaining items
    total_volume = 0
    for item in self.remaining_items:
      item_volume = item["width"] * item["height"] * item["depth"] * item["quantity"]
      total_volume += item_volume
    return total_volume


def calculate_wasted_space_in_box(box):
  # Calculate wasted space within a box based on your chosen approach
  # This example calculates wasted space as a ratio of remaining volume to total capacity
  wasted_space_ratio = box["remaining_capacity"] / box["total_capacity"]
  return wasted_space_ratio * box["total_capacity"]  # Convert ratio to volume

In [6]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D

class DQN(tf.keras.Model):
  def __init__(self, state_size, action_size):
    super(DQN, self).__init__()
    self.conv1 = Conv2D(32, (3, 3), activation='relu', input_shape=(state_size[0], state_size[1], 1))
    self.flatten = Flatten()
    self.fc1 = Dense(64, activation='relu')
    self.fc2 = Dense(action_size)

  def call(self, x):
    x = tf.expand_dims(x, axis=-1)
    x = self.conv1(x)
    x = self.flatten(x)
    x = self.fc1(x)
    return self.fc2(x)

class ReplayBuffer(object):
  def __init__(self, max_size, input_shape, action_size):
    self.buffer_size = max_size
    self.count = 0
    self.state_buffer = np.zeros((max_size, *input_shape))
    self.action_buffer = np.zeros(max_size)
    self.reward_buffer = np.zeros(max_size)
    self.next_state_buffer = np.zeros((max_size, *input_shape))
    self.done_buffer = np.zeros(max_size)

  def add(self, state, action, reward, next_state, done):
    index = self.count % self.buffer_size
    self.state_buffer[index] = state
    self.action_buffer[index] = action
    self.reward_buffer[index] = reward
    self.next_state_buffer[index] = next_state
    self.done_buffer[index] = done
    self.count += 1

  def size(self):
    return self.count % self.buffer_size

  def sample(self, batch_size):
    max_buffer_size = min(self.count, self.buffer_size)
    indices = np.random.choice(max_buffer_size, size=batch_size, replace=False)
    return (
      self.state_buffer[indices],
      self.action_buffer[indices],
      self.reward_buffer[indices],
      self.next_state_buffer[indices],
      self.done_buffer[indices])

def train_dqn(env, agent, replay_buffer, num_episodes, batch_size, gamma, learning_rate):
  optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
  for episode in range(num_episodes):
    state = env.reset()
    state = np.reshape(state, (1, *state.shape))  # Reshape for CNN input
    episode_reward = 0
    done = False

    while not done:
      action = np.argmax(agent(state))
      next_state, reward, done, info = env.step(action)
      next_state = np.reshape(next_state, (1, *next_state.shape))  # Reshape for CNN input
      episode_reward += reward

      replay_buffer.add(state, action, reward, next_state, done)

      state = next_state

      if replay_buffer.size() >= batch_size:
        states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
        q_values = agent(states)
        next_q_values = agent(next_states)
        q_target = rewards + gamma * tf.reduce_max(next_q_values, axis=1, keepdims=True) * (1 - dones)

        # Update critic network using Huber loss (less sensitive to outliers)
        with tf.GradientTape() as tape:
          q_value = q_values[0, actions]
          loss = tf.losses.Huber()(q_target, q_value)
        grads = tape.gradient(loss, agent.trainable_variables)
        optimizer.apply_gradients


In [13]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D


class DQN(tf.keras.Model):
  def __init__(self, state_size, action_size):
    super(DQN, self).__init__()
    self.conv1 = Conv2D(32, (3, 3), activation='relu', input_shape=(state_size[0], state_size[1], 1))
    self.flatten = Flatten()
    self.fc1 = Dense(64, activation='relu')
    self.fc2 = Dense(action_size)

  def call(self, x):
    x = tf.expand_dims(x, axis=-1)
    x = self.conv1(x)
    x = self.flatten(x)
    x = self.fc1(x)
    return self.fc2(x)


class ReplayBuffer(object):
  def __init__(self, max_size, input_shape, action_size):
    self.buffer_size = max_size
    self.count = 0
    self.state_buffer = np.zeros((max_size, *input_shape))
    self.action_buffer = np.zeros(max_size)
    self.reward_buffer = np.zeros(max_size)
    self.next_state_buffer = np.zeros((max_size, *input_shape))
    self.done_buffer = np.zeros(max_size)

  def add(self, state, action, reward, next_state, done):
    index = self.count % self.buffer_size
    self.state_buffer[index] = state
    self.action_buffer[index] = action
    self.reward_buffer[index] = reward
    self.next_state_buffer[index] = next_state
    self.done_buffer[index] = done
    self.count += 1

  def size(self):
    return self.count % self.buffer_size

  def sample(self, batch_size):
    max_buffer_size = min(self.count, self.buffer_size)
    indices = np.random.choice(max_buffer_size, size=batch_size, replace=False)
    return (
      self.state_buffer[indices],
      self.action_buffer[indices],
      self.reward_buffer[indices],
      self.next_state_buffer[indices],
      self.done_buffer[indices])


def train_dqn(env, agent, replay_buffer, num_episodes, batch_size, gamma, learning_rate, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
  """
  Advanced DQN training function with epsilon-greedy exploration and training metrics tracking.
  """
  optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
  episode_rewards = []  # Track episode rewards for monitoring

  for episode in range(num_episodes):
    state = env.reset()
    state = np.reshape(state, (1, *state.shape))  # Reshape for CNN input
    episode_reward = 0
    done = False

    while not done:
      # Epsilon-greedy action selection
      if np.random.rand() < epsilon:
        action = env.action_space.sample()  # Explore randomly
      else:
        q_values = agent(state)
        action = np.argmax(q_values)  # Exploit based on Q-values

      # Take action in the environment
      next_state, reward, done, info = env.step(action)
      next_state = np.reshape(next_state, (1, *next_state.shape))  # Reshape for CNN input
      episode_reward += reward

      replay_buffer.add(state, action, reward, next_state, done)

      state = next_state

      # Train DQN using experience replay when enough samples are available
      if replay_buffer.size() >= batch_size:
        states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
        q_values
        q_values = agent(states)
        next_q_values = agent(next_states)  # Calculate Q-values for next states using the DQN model

        # Calculate target Q-values (q_target) using Bellman equation
        q_target = rewards + gamma * tf.reduce_max(next_q_values, axis=1, keepdims=True) * (1 - dones)

        # Update critic network using Huber loss
        with tf.GradientTape() as tape:
          q_value = q_values[0, actions]
          loss = tf.losses.Huber()(q_target, q_value)
        grads = tape.gradient(loss, agent.trainable_variables)
        optimizer.apply_gradients(grads)

      # Update epsilon for epsilon-greedy exploration (gradually decrease exploration)
      epsilon = max(epsilon_min, epsilon * epsilon_decay)

    episode_rewards.append(episode_reward)
    # Print episode statistics (optional)
    if episode % 100 == 0:
      print(f"Episode: {episode}, Average Reward: {np.mean(episode_rewards[-100:])}")

  return episode_rewards



In [15]:
import gym
import numpy as np

class BoxPackingEnv(gym.Env):
  """
  Custom environment for box packing with DQN agent.
  """
  def __init__(self, box_sizes, max_items):
    """
    Initialize the environment with available box sizes and maximum items per order.

    Args:
        box_sizes (list): List of tuples representing available box dimensions (length, width, height).
        max_items (int): Maximum number of items in an order.
    """
    self.box_sizes = box_sizes  # List of (length, width, height) tuples
    self.max_items = max_items
    self.reset()  # Reset for initial state

  def reset(self):
    """
    Reset the environment to a new packing scenario.

    Returns:
        observation (numpy.ndarray): Initial state representation.
    """
    self.remaining_items = []  # List of dictionaries (item_id, length, width, height)
    self.used_boxes = []  # List of chosen box dimensions during packing
    self.current_box = None  # Currently selected box (length, width, height)
    self.reward = 0  # Cumulative reward for packing

    # Generate random order data
    num_items = np.random.randint(1, self.max_items + 1)
    for _ in range(num_items):
      item_id = len(self.remaining_items)
      length, width, height = np.random.uniform(0.1, 1.0, size=3)  # Random item dimensions
      self.remaining_items.append({"item_id": item_id, "length": length, "width": width, "height": height})

    # Initial state representation (remaining items count, available box sizes)
    state = np.array([len(self.remaining_items)] + [0] * len(self.box_sizes))
    for i, box_size in enumerate(self.box_sizes):
      state[1 + i] = 1 if self.can_fit_box(box_size) else 0  # Indicate if box can fit remaining items

    return state

  def step(self, action):
    """
    Take an action (box selection) and update the environment.

    Args:
        action (int): Index of the chosen box size from available options.

    Returns:
        observation (numpy.ndarray): Updated state representation.
        reward (float): Reward for the action.
        done (bool): Whether the episode is finished (all items packed or no valid boxes).
        info (dict): Additional information (optional).
    """
    if self.current_box is not None:  # If a box is already selected, rotate the item
      self.reward -= 0.1  # Penalize rotation as it's generally less space-efficient
      return self._rotate_item(action)

    # Select the chosen box size
    box_size = self.box_sizes[action]
    self.current_box = box_size

    # Check if all items can fit in the chosen box
    if self.can_fit_box(box_size):
      reward = self._calculate_reward(box_size)
      self.reward += reward
      self.used_boxes.append(box_size)
      for item in self.remaining_items:
        self.remaining_items.remove(item)  # Remove packed items
      self.current_box = None  # Clear current box selection

      # Check if all items are packed or no valid boxes are left
      done = len(self.remaining_items) == 0 or all(not self.can_fit_box(box_size) for box_size in self.box_sizes)
    else:
      reward = -1  # Penalty for trying a box that doesn't fit
      done = True  # Episode terminated due to no fitting box

    # Update state representation
    state = np.array([len(self.remaining_items)] + [0] * len(self.box_sizes))
    for i, box_size in enumerate(self.box_sizes):
      state[1 + i] = 1 if self.can_fit_box(box_size) else 0

    return state, reward, done, {}  # Empty info dictionary for now

  def _can_fit_item(self, item, box):
    """
    Checks if a single item can fit inside a given box in any orientation.

    Args:
        item (dict): Dictionary containing item dimensions (length, width, height).
        box (tuple): Tuple representing box dimensions (length, width, height).

    Returns:
        bool: True if the item can fit in the box, False otherwise.
    """
    for _ in range(6):  # Check all 6 possible item rotations
      if (item["length"] <= box[0] and item["width"] <= box[1] and item["height"] <= box[2]) or \
         (item["length"] <= box[1] and item["width"] <= box[0] and item["height"] <= box[2]) or \
         (item["length"] <= box[0] and item["width"] <= box[2] and item["height"] <= box[1]) or \
         (item["length"] <= box[1] and item["width"] <= box[2] and item["height"] <= box[0]) or \
         (item["length"] <= box[2] and item["width"] <= box[0] and item["height"] <= box[1]) or \
         (item["length"] <= box[2] and item["width"] <= box[1] and item["height"] <= box[0]):
        return True
    return False

  def can_fit_box(self, box_size):
    """
    Checks if all remaining items can fit inside a given box.

    Args:
        box_size (tuple): Tuple representing box dimensions (length, width, height).

    Returns:
        bool: True if all items can fit in the box, False otherwise.
    """
    return all(self._can_fit_item(item, box_size) for item in self.remaining_items)

  def _calculate_reward(self, box_size):
    """
    Calculates the reward for using a specific box size.

    Args:
        box_size (tuple): Tuple representing box dimensions (length, width, height).

    Returns:
        float: Reward value for using the box.
    """
    volume_used = np.prod(self.current_box)
    total_item_volume = sum(item["length"] * item["width"] * item["height"] for item in self.remaining_items)
    wasted_space = max(0, volume_used - total_item_volume)
    reward = volume_used / (total_item_volume + 1) - wasted_space * 0.1 - 0.05  # Adjust penalty coefficients

    return reward

  def _rotate_item(self, action):
    """
    Simulates rotating the currently selected item within the box.

    Args:
        action (int): Index representing the rotation direction (e.g., 0: no rotation, 1: rotate 90 degrees).

    Returns:
        observation (numpy.ndarray): Updated state representation.
        reward (float): Reward for the rotation (usually negative due to inefficiency).
        done (bool): Whether the episode is finished (all items packed or no valid rotations).
        info (dict): Additional information (optional).
    """
    # Implement your item rotation logic here.
    # Update state and reward based on the rotation outcome (successful or not).
    # Episode might terminate if no valid rotations are possible.

    # Placeholder logic for demonstration (no actual rotation)
    possible_rotations = 3  # Replace with the number of supported rotations (e.g., 0, 90, 180 degrees)
    if action < possible_rotations:
      self.reward -= 0.02  # Small penalty for attempting rotation
    else:
      done = True  # No more rotations possible, terminate episode

    return self._get_observation(), self.reward, done, {}

  def _get_observation(self):
    """
    Returns the current state representation of the environment.
    """
    state = np.array([len(self.remaining_items)] + [0] * len(self.box_sizes))
    for i, box_size in enumerate(self.box_sizes):
      state[1 + i] = 1 if self.can_fit_box(box_size) else 0
    return state


In [16]:
import tensorflow as tf

class DQNAgent(tf.keras.Model):
  """
  Deep Q-Network agent for box packing.
  """
  def __init__(self, state_size, action_size):
    """
    Initialize the DQN agent with state and action space sizes.

    Args:
        state_size (int): Dimensionality of the environment state.
        action_size (int): Number of available actions (box sizes).
    """
    super(DQNAgent, self).__init__()
    self.conv1 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(state_size[0], state_size[1], 1))
    self.flatten = tf.keras.layers.Flatten()
    self.fc1 = tf.keras.layers.Dense(64, activation='relu')
    self.fc2 = tf.keras.layers.Dense(action_size)

  def call(self, state):
    """
    Forward pass of the DQN model to predict Q-values.

    Args:
        state (tf.Tensor): Input state representation.

    Returns:
        tf.Tensor: Predicted Q-values for all actions.
    """
    x = tf.expand_dims(state, axis=-1)  # Add channel dimension for CNN
    x = self.conv1(x)
    x = self.flatten(x)
    x = self.fc1(x)
    q_values = self.fc2(x)
    return q_values



In [17]:
import random

class ReplayBuffer(object):
  """
  Replay buffer for storing past experiences for training.
  """
  def __init__(self, capacity):
    """
    Initialize the replay buffer with a fixed capacity.

    Args:
        capacity (int): Maximum number of experiences to store.
    """
    self.buffer = []
    self.capacity = capacity
    self.position = 0

  def add(self, experience):
    """
    Add a new experience (state, action, reward, next_state, done) to the buffer.

    Args:
        experience (tuple): Tuple containing the experience data.
    """
    if self.position == self.capacity:
      # Replace the oldest experience if buffer is full
      self.buffer[self.position] = experience
    else:
      self.buffer.append(experience)
      self.position += 1

  def sample(self, batch_size):
    """
    Sample a random batch of experiences from the replay buffer.

    Args:
        batch_size (int): Size of the desired experience batch.

    Returns:
        tuple: A tuple of sampled states, actions, rewards, next states, and done flags.
    """
    experiences = random.sample(self.buffer, min(batch_size, len(self.buffer)))
    states, actions, rewards, next_states, done = zip(*experiences)
    return states, actions, rewards, next_states, done


In [19]:
import tensorflow as tf
from tensorflow.keras.losses import Huber

def train_dqn(env, agent, replay_buffer, num_episodes, batch_size, gamma, learning_rate):
    """
    Train the DQN agent on the box packing environment.

    Args:
        env (BoxPackingEnv): Box packing environment instance.
        agent (DQNAgent): DQN agent model.
        replay_buffer (ReplayBuffer): Replay buffer for storing experiences.
        num_episodes (int): Number of training episodes.
        batch_size (int): Batch size for experience replay.
        gamma (float): Discount factor for future rewards.
        learning_rate (float): Learning rate for the optimizer.
    """
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    loss_fn = HuberLoss()  # Consider using Huber loss for robustness

    for episode in range(num_episodes):
        state = env.reset()
        done = False

        while not done:
            # Select action based on epsilon-greedy policy
            action = agent.get_action(state)

            # Step through the environment
            next_state, reward, done, _ = env.step(action)

            # Store experience in replay buffer
            replay_buffer.add((state, action, reward, next_state, done))

            # Sample a batch of experiences for training
            if len(replay_buffer) >= batch_size:
                states, actions, rewards, next_states, done_flags = replay_buffer.sample(batch_size)

                # Calculate Q-value targets for training
                q_values = agent(states)
                next_q_values = agent(next_states)
                q_value_targets = rewards + gamma * tf.math.reduce_max(next_q_values, axis=1, keepdims=True) * (1 - done_flags)

                # Train the DQN agent with Huber loss
                with tf.GradientTape() as tape:
                    predicted_q_values = agent(states)
                    loss = loss_fn(q_value_targets, predicted_q_values)
                grads = tape.gradient(loss, agent.trainable_variables)
                optimizer.apply_gradients(zip(grads, agent.trainable_variables))

            state = next_state

        # Print episode progress (optional)
        print(f"Episode: {episode+1}/{num_episodes}")
