In [2]:
class Point3D:
    def __init__(self, x: float, y: float, z: float):
        self.x = x
        self.y = y
        self.z = z

class Object3D:
    def __init__(self, position: Point3D, dimensions: Point3D = None, orientation: Point3D = None):
        """
        position: A Point3D object representing the position of the object in space.
        dimensions: A Point3D object representing the object's width, height, depth.
        orientation: A Point3D object representing the orientation in terms of pitch, yaw, and roll (rotation around x, y, and z axes).
        """
        self.position = position
        self.dimensions = dimensions if dimensions else Point3D(1, 1, 1)  # Default dimensions as 1x1x1
        self.orientation = orientation if orientation else Point3D(0, 0, 0)  # Default orientation as no rotation

    def move(self, new_position: Point3D):
        """Updates the object's position in 3D space."""
        self.position = new_position

    def rotate(self, new_orientation: Point3D):
        """Updates the object's orientation in space."""
        self.orientation = new_orientation

    def scale(self, new_dimensions: Point3D):
        """Resizes the object by updating its dimensions."""
        self.dimensions = new_dimensions

    def __repr__(self):
        return f"Object3D(Position: {self.position}, Dimensions: {self.dimensions}, Orientation: {self.orientation})"


# Example usage:
cube = Object3D(position=Point3D(0, 0, 0), dimensions=Point3D(1, 1, 1))
print('Dimensions',cube.dimensions.x, cube.dimensions.y, cube.dimensions.z ,
      'Orientation', cube.orientation.x, cube.orientation.y, cube.orientation.z,
      'Position', cube.position.x, cube.position.y, cube.position.z)

# Moving the object
cube.move(Point3D(5, 5, 5))
print('Dimensions',cube.dimensions.x, cube.dimensions.y, cube.dimensions.z ,
      'Orientation', cube.orientation.x, cube.orientation.y, cube.orientation.z,
      'Position', cube.position.x, cube.position.y, cube.position.z)

# Rotating the object
cube.rotate(Point3D(90, 0, 45))  # 90 degrees around x, 0 around y, and 45 degrees around z
print('Dimensions',cube.dimensions.x, cube.dimensions.y, cube.dimensions.z ,
      'Orientation', cube.orientation.x, cube.orientation.y, cube.orientation.z,
      'Position', cube.position.x, cube.position.y, cube.position.z)

# Scaling the object
cube.scale(Point3D(2, 2, 2))  # New dimensions
print('Dimensions',cube.dimensions.x, cube.dimensions.y, cube.dimensions.z ,
      'Orientation', cube.orientation.x, cube.orientation.y, cube.orientation.z,
      'Position', cube.position.x, cube.position.y, cube.position.z)


Dimensions 1 1 1 Orientation 0 0 0 Position 0 0 0
Dimensions 1 1 1 Orientation 0 0 0 Position 5 5 5
Dimensions 1 1 1 Orientation 90 0 45 Position 5 5 5
Dimensions 2 2 2 Orientation 90 0 45 Position 5 5 5


In [3]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from tf_agents.environments import py_environment
from tf_agents.environments import tf_py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts
from tf_agents.policies import random_tf_policy
from tf_agents.agents.dqn import dqn_agent
from tf_agents.networks import q_network
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.drivers import dynamic_step_driver
from tf_agents.utils import common
from gym.spaces import Box
from tf_agents.agents.ddpg import critic_network
from tf_agents.networks.actor_distribution_network import ActorDistributionNetwork
from tf_agents.utils import common
from tf_agents.agents.ddpg import ddpg_agent

2024-10-12 22:41:45.405615: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-10-12 22:41:45.620343: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-10-12 22:41:45.620400: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-10-12 22:41:45.622038: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-10-12 22:41:45.655444: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.


In [5]:
# Define Point3D and Object3D classes
class Point3D:
    def __init__(self, x: float, y: float, z: float):
        self.x = x
        self.y = y
        self.z = z

    def as_array(self):
        return np.array([self.x, self.y, self.z])

class Object3D:
    def __init__(self, position: Point3D, dimensions: Point3D = None, orientation: Point3D = None):
        self.position = position.as_array()
        self.dimensions = dimensions.as_array() if dimensions else np.array([1, 1, 1])
        self.orientation = orientation.as_array() if orientation else np.array([0, 0, 0])

class ObjectPlacementEnv(py_environment.PyEnvironment):
    def __init__(self, space_size, object_list):
        self._space_size = np.array(space_size)
        self._object_list = object_list
        self._placed_objects = []
        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=5, name='action')  # Discretized actions
        self._observation_spec = array_spec.ArraySpec(
            shape=(len(object_list), 9), dtype=np.float32, name='observation')
        self._reset()

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._placed_objects = []
        return ts.restart(self._get_observation())

    def _get_observation(self):
        obs = []
        for obj in self._object_list:
            obj_state = np.concatenate([obj.position, obj.dimensions, obj.orientation])
            obs.append(obj_state)
        return np.array(obs, dtype=np.float32)

    def visualize(self):
        """Visualizes the current placements of objects in the environment."""
        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')

        # Set limits based on the space size
        ax.set_xlim(0, self.space_size[0])
        ax.set_ylim(0, self.space_size[1])
        ax.set_zlim(0, self.space_size[2])

        # Plot each placed object
        for obj in self._placed_objects:
            x, y, z = obj[:3]  # Assuming the first three elements are position coordinates
            ax.scatter(x, y, z, marker='o')  # Plot the object's position

        ax.set_xlabel('X axis')
        ax.set_ylabel('Y axis')
        ax.set_zlabel('Z axis')
        ax.set_title('Object Placements in 3D Space')
        plt.show()

    def _step(self, action):
        position, orientation = self._map_action_to_position_orientation(action)

        if len(self._placed_objects) < len(self._object_list):
            object_dimensions = self._object_list[len(self._placed_objects)].dimensions

            if self._is_placement_valid(position):
                self._placed_objects.append(np.concatenate([position, orientation]))
                reward = 1.0
            else:
                reward = -1.0
        else:
            # If all objects are placed, we can consider this a terminal state
            return ts.termination(self._get_observation(), reward=0)

        if len(self._placed_objects) == len(self._object_list):
            return ts.termination(self._get_observation(), reward)

        return ts.transition(self._get_observation(), reward)

    def _map_action_to_position_orientation(self, action):
        """
        Maps a discrete action to a position and orientation in the space.
        We discretize the position and orientation to 6 possible actions for simplicity.
        """
        # Define some fixed positions and orientations based on the action
        # Example: If action == 0, position is (1, 1, 1) and orientation is (0, 0, 0)
        if action == 0:
            position = np.array([1, 1, 1])
            orientation = np.array([0, 0, 0])
        elif action == 1:
            position = np.array([2, 2, 2])
            orientation = np.array([0, 0, 1])
        elif action == 2:
            position = np.array([3, 3, 3])
            orientation = np.array([1, 0, 0])
        elif action == 3:
            position = np.array([4, 4, 4])
            orientation = np.array([0, 1, 0])
        elif action == 4:
            position = np.array([5, 5, 5])
            orientation = np.array([1, 1, 0])
        elif action == 5:
            position = np.array([6, 6, 6])
            orientation = np.array([1, 1, 1])
        return position, orientation

    def _is_placement_valid(self, position):
        object_dimensions = self._object_list[len(self._placed_objects)].dimensions
        if not self._fits_in_space(position, object_dimensions):
            return False
        if self._overlaps_with_existing(position, object_dimensions):
            return False
        return True

    def _fits_in_space(self, position, dimensions):
        return np.all(position >= 0) and np.all(position + dimensions <= self._space_size)

    def _overlaps_with_existing(self, position, dimensions):
        for placed_object in self._placed_objects:
            placed_position = placed_object[:3]
            placed_dimensions = self._object_list[len(self._placed_objects)].dimensions
            if np.all(placed_position + placed_dimensions > position) and np.all(placed_position < position + dimensions):
                return True
        return False





In [6]:
# Example usage
space_size = [10, 10, 10]
object_list = [
    Object3D(Point3D(0, 0, 0), Point3D(1, 1, 1)),
    Object3D(Point3D(0, 0, 0), Point3D(2, 2, 2)),
    Object3D(Point3D(0, 0, 0), Point3D(3, 3, 3))
]

train_py_env = ObjectPlacementEnv(space_size, object_list)
eval_py_env = ObjectPlacementEnv(space_size, object_list)
train_env = tf_py_environment.TFPyEnvironment(train_py_env)
eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)

# Q-network and DQN agent setup
q_net = q_network.QNetwork(train_env.observation_spec(), train_env.action_spec(), fc_layer_params=(100,))
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=1e-3)
train_step_counter = tf.Variable(0)

agent = dqn_agent.DqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=common.element_wise_squared_loss,
    train_step_counter=train_step_counter
)
agent.initialize()

# Replay buffer setup
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=train_env.batch_size,
    max_length=10000
)

random_policy = random_tf_policy.RandomTFPolicy(train_env.time_step_spec(), train_env.action_spec())

# Initial data collection
def collect_initial_data(env, policy, buffer, steps):
    driver = dynamic_step_driver.DynamicStepDriver(
        env,
        policy,
        observers=[buffer.add_batch],
        num_steps=steps
    )
    final_time_step, _ = driver.run()

# Collect 1000 steps of initial data using the random policy
collect_initial_data(train_env, random_policy, replay_buffer, steps=1000)

# Dataset and training loop
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3,
    sample_batch_size=64,
    num_steps=2).prefetch(3)

iterator = iter(dataset)

num_iterations = 10

Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=False) instead.


In [7]:
for step in range(num_iterations):
    # Collect data with the agent's policy
    collect_driver = dynamic_step_driver.DynamicStepDriver(
        train_env,
        agent.collect_policy,
        observers=[replay_buffer.add_batch],
        num_steps=1
    )
    collect_driver.run()

    # Sample a batch of data and train the agent
    experience, _ = next(iterator)
    train_loss = agent.train(experience).loss

    if step % 100 == 0:
        print(f"Step {_}, Loss: {train_loss}")


Instructions for updating:
back_prop=False is deprecated. Consider using tf.stop_gradient instead.
Instead of:
results = tf.foldr(fn, elems, back_prop=False)
Use:
results = tf.nest.map_structure(tf.stop_gradient, tf.foldr(fn, elems))
Step BufferInfo(ids=<tf.Tensor: shape=(64, 2), dtype=int64, numpy=
array([[951, 952],
       [364, 365],
       [522, 523],
       [426, 427],
       [ 82,  83],
       [247, 248],
       [933, 934],
       [  5,   6],
       [321, 322],
       [135, 136],
       [ 65,  66],
       [388, 389],
       [565, 566],
       [994, 995],
       [555, 556],
       [530, 531],
       [855, 856],
       [470, 471],
       [625, 626],
       [204, 205],
       [387, 388],
       [474, 475],
       [775, 776],
       [913, 914],
       [142, 143],
       [ 27,  28],
       [984, 985],
       [785, 786],
       [128, 129],
       [933, 934],
       [ 60,  61],
       [102, 103],
       [484, 485],
       [786, 787],
       [848, 849],
       [179, 180],
       [449, 45

In [None]:
# Create a new environment for testing
test_env = ObjectPlacementEnv(space_size, object_list)
test_tf_env = tf_py_environment.TFPyEnvironment(test_env)

# Reset the environment to start
time_step = test_tf_env.reset()
total_reward = 0

# Number of episodes you want to run for testing
num_test_episodes = 5

for episode in range(num_test_episodes):
    time_step = test_tf_env.reset()
    episode_reward = 0
    done = False

    while not done:
        # Get the action from the agent's policy
        action = agent.policy.action(time_step)

        # Step the environment
        time_step = test_tf_env.step(action)

        # Accumulate the total reward
        episode_reward += time_step.reward.numpy().sum()

        # Check if the episode has ended
        done = time_step.is_last()

    print(f"Episode {episode + 1}, Total Reward: {episode_reward}")

    # Visualize the result after each episode
    test_env.visualize()
