In [3]:
import tensorflow as tf
import tf_agents.environments.py_environment
import tf_agents.specs.array_spec
import tf_agents.trajectories.time_step as ts
import random
import numpy as np
class BallSortCraneEnvironment(tf_agents.environments.py_environment.PyEnvironment):
    def __init__(self):
        # Define constants for the environment
        self.SCREEN_WIDTH = 800
        self.SCREEN_HEIGHT = 600  # Define the screen height
        self.BAR_WIDTH = 20
        self.BAR_HEIGHT = 100
        self.BALL_RADIUS = 20
        self.BASKET_WIDTH = 60
        self.BASKET_HEIGHT = 20
        self.MOVEMENT_SPEED = 5
        self.VELOCITY = 10

        # Initialize the game state variables
        self.bar_x = self.SCREEN_WIDTH - (self.SCREEN_WIDTH // 4)
        self.bar_y = self.SCREEN_HEIGHT - (self.SCREEN_HEIGHT // 4)
        self.bar_height = self.BAR_HEIGHT
        self.score = 0
        self.is_grabbing = False  # Initialize grabbing state
        self.is_holding_ball = False  # Initialize ball holding state
        self.randomize_positions()
        self.previous_distance_to_ball = float('inf')  
        
        # Define action and observation spaces
        self._action_spec = tf_agents.specs.array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=3)
        self._observation_spec = tf_agents.specs.array_spec.ArraySpec(
            shape=(8,), dtype=np.float32)

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        # Reset the game state to the initial state
        self.bar_x = self.SCREEN_WIDTH - (self.SCREEN_WIDTH // 4)
        self.bar_y = self.SCREEN_HEIGHT - (self.SCREEN_HEIGHT // 4)
        self.bar_height = self.BAR_HEIGHT
        self.score = 0
        self.is_grabbing = False
        self.is_holding_ball = False
        self.randomize_positions()

        # Return initial observation as a TimeStep
        return ts.restart(tf.constant(self._normalize_observation(self.get_observation()), dtype=np.float32))

    def _step(self, action):
        # Execute the action in the environment and return the next TimeStep
        if action == 0:  # Move left
            self.bar_x = max(self.bar_x - self.MOVEMENT_SPEED, 0)
        elif action == 1:  # Move right
            self.bar_x = min(self.bar_x + self.MOVEMENT_SPEED, self.SCREEN_WIDTH)
        elif action == 2:  # Extend bar
            self.bar_height = min(self.bar_height + 10, self.SCREEN_HEIGHT)
        elif action == 3:  # Shrink bar
            self.bar_height = max(self.bar_height - 10, 0)
        
        # Implement game logic (e.g., ball movement, collision checking)
        self.update_game_state()
        
        self.previous_distance_to_ball = abs(self.bar_x - self.ball_x)
        # Calculate the reward and check the termination condition
        reward, done = self.calculate_reward_and_termination()

        if done:
            # Cast the reward to the appropriate data type (e.g., float32)
            reward = tf.convert_to_tensor(reward, dtype=tf.float32)
            return ts.termination(
                tf.constant(self._normalize_observation(self.get_observation()), dtype=np.float32),
                reward
            )
        else:
            # Cast the reward to the appropriate data type (e.g., float32)
            reward = tf.convert_to_tensor(reward, dtype=tf.float32)
            return ts.transition(
                tf.constant(self._normalize_observation(self.get_observation()), dtype=np.float32),
                reward)
    def update_game_state(self):
        # Implement game state update logic here
        self.ball_x += 0  # Implement ball movement
        self.ball_y -= self.VELOCITY  # Simulate ball falling

        # Check if the ball collides with the bar
        if (
            self.bar_x - self.BAR_WIDTH / 2 < self.ball_x < self.bar_x + self.BAR_WIDTH / 2
            and self.bar_y - self.bar_height <= self.ball_y + self.BALL_RADIUS
        ):
            self.score += 1
            
    def randomize_positions(self):
        # Randomize the positions of the ball and basket within valid ranges
        self.ball_x = random.randint(self.BALL_RADIUS, self.SCREEN_WIDTH // 2 - self.BALL_RADIUS)
        self.basket_x = random.randint(
            self.SCREEN_WIDTH // 2 + self.BASKET_WIDTH // 2,
            self.SCREEN_WIDTH - self.BASKET_WIDTH // 2
        )
        # Ensure valid y-positions for the ball and basket
        max_pillar_height = self.SCREEN_HEIGHT // 2
        self.ball_y = random.randint(self.BALL_RADIUS, max_pillar_height - self.BALL_RADIUS)
        self.basket_y = self.BASKET_HEIGHT // 2

    def calculate_reward_and_termination(self):
        reward = 0
        done = False

        # Calculate the current distance to the ball
        distance_to_ball = abs(self.bar_x - self.ball_x)

        if not self.is_grabbing:
            # Reward for moving closer to the ball, scaled by a factor
            reward += max(0, (self.previous_distance_to_ball - distance_to_ball) * 0.1)
        else:
            # When the ball is grabbed, check for successful delivery or dropping
            if self.is_holding_ball and self.ball_in_basket():
                reward += successful_delivery_reward  # Reward for delivering the ball to the basket
                done = True
            elif self.ball_falls():
                reward -= ball_fall_penalty  # Penalty for dropping the ball
                done = True

        # Update the previous distance to the ball for the next step
        self.previous_distance_to_ball = distance_to_ball

        return reward, done

    def ball_in_basket(self):
        # Check if the ball is within the basket
        return (
            self.ball_x + self.BALL_RADIUS >= self.basket_x - self.BASKET_WIDTH / 2 and
            self.ball_x - self.BALL_RADIUS <= self.basket_x + self.BASKET_WIDTH / 2 and
            self.ball_y - self.BALL_RADIUS <= self.basket_y + self.BASKET_HEIGHT / 2
        )

    def ball_falls(self):
        # Check if the ball falls out of the screen
        return self.ball_y < -self.BALL_RADIUS


    def get_observation(self):
        # Generate the observation for the agent
        observation = [
            self.bar_x, self.bar_y, self.bar_height, self.score,
            self.ball_x, self.ball_y, self.basket_x, self.basket_y
        ]
        return observation

    def _normalize_observation(self, observation):
        # Normalize observation values to the range [0, 1]
        normalized_observation = []
        max_values = [self.SCREEN_WIDTH, self.SCREEN_HEIGHT, self.SCREEN_HEIGHT, float('inf'), 
                      self.SCREEN_WIDTH, self.SCREEN_HEIGHT, self.SCREEN_WIDTH, self.SCREEN_HEIGHT]
        for obs, max_val in zip(observation, max_values):
            normalized_observation.append(obs / max_val)
        return normalized_observation

In [None]:
import tensorflow as tf
import numpy as np
import tf_agents
from tf_agents.networks import q_network
from tf_agents.agents.dqn import dqn_agent
from tf_agents.environments import tf_py_environment
from tf_agents.utils import common
from tf_agents.environments import suite_gym
from tf_agents.environments import utils

# Create the BallSortCrane environment
train_env = BallSortCraneEnvironment()
train_env = tf_py_environment.TFPyEnvironment(train_env)

# Define the Q-network
fc_layer_params = (100,)

q_net = q_network.QNetwork(
    train_env.observation_spec(),
    train_env.action_spec(),
    fc_layer_params=fc_layer_params)

# Define the DQN agent
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=1e-3)

train_step_counter = tf.Variable(0)

agent = dqn_agent.DqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=common.element_wise_squared_loss,
    train_step_counter=train_step_counter)

agent.initialize()

# Define the replay buffer
replay_buffer_capacity = 100
replay_buffer = tf_agents.replay_buffers.tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=train_env.batch_size,
    max_length=replay_buffer_capacity)

# Define the data collection policy
epsilon = tf.constant(0.1, dtype=np.float32)  # Specify epsilon as a TensorFlow constant
collect_policy = tf_agents.policies.epsilon_greedy_policy.EpsilonGreedyPolicy(
    policy=agent.policy,
    epsilon=epsilon)

# Define the data collection driver
collect_driver = tf_agents.drivers.dynamic_step_driver.DynamicStepDriver(
    train_env,
    collect_policy,
    observers=[replay_buffer.add_batch],
    num_steps=1)

# Define the training metrics
train_metrics = [
    tf_agents.metrics.tf_metrics.AverageReturnMetric(),
    tf_agents.metrics.tf_metrics.AverageEpisodeLengthMetric(),
]

# Define the training and evaluation loops
num_iterations = 10 #10000
initial_collect_steps = 1000
collect_steps_per_iteration = 1
batch_size = 1
num_eval_episodes = 10

for _ in range(initial_collect_steps):
    collect_driver.run()

# Create a dataset from the replay buffer
dataset = replay_buffer.as_dataset(
    sample_batch_size=batch_size,
    num_steps=2,
    num_parallel_calls=3).prefetch(3)

iterator = iter(dataset)

# Define the training loop
def train_agent(num_iterations):
    for _ in range(num_iterations):
        # Collect a step using the collect_driver
        collect_driver.run()

        # Sample a batch of data from the replay buffer
        experience, unused_info = next(iterator)

        # Train the agent
        train_loss = agent.train(experience).loss

        # Log training metrics
        if agent.train_step_counter.numpy() % 100 == 0:
            for metric in train_metrics:
                metric.tf_summaries(
                    train_step=agent.train_step_counter, step_metrics=metric.result())
            # Print a checkpoint
            print(f"Iteration {iteration}, Loss: {train_loss.numpy()}, Metrics: {train_metrics}")

# Train the agent
train_agent(num_iterations)

# Evaluate the agent
eval_env = BallSortCraneEnvironment()
eval_env = tf_py_environment.TFPyEnvironment(eval_env)

def compute_avg_return(environment, policy, num_episodes=10):
    total_return = 0.0
    for _ in range(num_episodes):
        time_step = environment.reset()
        episode_return = 0.0
        while not time_step.is_last():
            action_step = policy.action(time_step)
            time_step = environment.step(action_step.action)
            episode_return += time_step.reward
        total_return += episode_return
    avg_return = total_return / num_episodes
    return avg_return

avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
print("Average Return:", avg_return.numpy())


In [None]:
    def calculate_reward_and_termination(self):
        # Default reward and termination conditions
        reward = 0
        done = False

        # Check if the agent is grabbing the ball
        if self.is_grabbing:
            if not self.is_holding_ball:  # Check if the agent is not already holding the ball
                # Reward the agent for grabbing the ball initially
                reward += 0.5  # Higher initial reward

                # Set the holding ball status to True
                self.is_holding_ball = True
        else:
            if self.is_holding_ball:
                # Check if the ball is not in the basket while releasing
                if (
                    not (
                        self.ball_x + self.BALL_RADIUS >= self.basket_x - self.BASKET_WIDTH / 2
                        and self.ball_x - self.BALL_RADIUS <= self.basket_x + self.BASKET_WIDTH / 2
                        and self.ball_y - self.BALL_RADIUS <= self.basket_y + self.BASKET_HEIGHT / 2
                    )
                ):
                    # Penalize the agent for dropping the ball outside the basket
                    reward -= 0.5  # Higher penalty

                # Reset the holding ball status
                self.is_holding_ball = False

        # Check if the ball is in the basket
        if (
            self.ball_x + self.BALL_RADIUS >= self.basket_x - self.BASKET_WIDTH / 2
            and self.ball_x - self.BALL_RADIUS <= self.basket_x + self.BASKET_WIDTH / 2
            and self.ball_y - self.BALL_RADIUS <= self.basket_y + self.BASKET_HEIGHT / 2
        ):
            reward += 1  # Positive reward for catching the ball
            done = True  # Terminate the episode when the ball is caught

        # Check if the ball falls out of the screen
        if self.ball_y < -self.BALL_RADIUS:
            reward -= 1  # Negative reward for letting the ball fall
            done = True  # Terminate the episode when the ball falls

        return reward, done