In [1]:
import gym
import numpy as np
from gym import spaces
from typing import Tuple

class BallCatchingEnv(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self, grid_size: Tuple[int, int]=(5, 12), max_balls: int=7):
        super(BallCatchingEnv, self).__init__()
        self.grid_height, self.grid_width = grid_size
        self.max_balls = max_balls

        # Actions: 0 = Stay, 1 = Left, 2 = Right
        self.action_space = spaces.Discrete(3)

        # Observation space: grid with values representing empty space, agent, and balls
        # Let's use 0 for empty, 1 for ball, and a unique value (e.g., 255) for the agent
        self.observation_space = spaces.Box(low=0, high=255, 
                                            shape=(self.grid_height, self.grid_width), 
                                            dtype=np.uint8)

        self.agent_pos = self.grid_width // 2  # Start the agent in the middle of the grid
        self.score = 0
        self.caught_balls = 0
        self.grid = None
        self.reset()

    def reset(self):
        self.agent_pos = self.grid_width // 2
        self.grid = np.zeros((self.grid_height, self.grid_width), dtype=np.uint8)
        self.balls = []  # To track balls store them as a list of tuples (row, col)
        self.score = 0
        self.caught_balls = 0
        # Place the agent on the grid
        self.grid[-1, self.agent_pos] = 255  # Use the bottom row for the agent
        return self.grid.copy()
    
    def step(self, action):
        # Ensure action is valid
        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))

        # Move agent based on action
        if action == 1 and self.agent_pos > 0:  # Move left
            self.agent_pos -= 1
        elif action == 2 and self.agent_pos < self.grid_width - 1:  # Move right
            self.agent_pos += 1
        # Note: action == 0 means stay, so no movement code is needed for that case

        # Generate new balls at the top of the grid
        num_new_balls = np.random.randint(1, self.max_balls + 1)
        for _ in range(num_new_balls):
            ball_col = np.random.randint(0, self.grid_width)
            ball_type = np.random.choice([1, 2])  # 1 for red, 2 for blue
            self.balls.append([0, ball_col, ball_type])  # Append new ball at top row with random column

        # Move existing balls down one row and prepare for checking catches
        """
        balls_to_remove = []
        for ball in self.balls:
            ball[0] += 1  # Move ball down one row
            if ball[0] == self.grid_height - 1:  # Ball is exactly at the bottom row
                if ball[1] == self.agent_pos:  # And in the same column as the agent
                    print(f"Catching ball of type: {ball[2]}, Current Score: {self.score}")  # Debugging output
                    self.caught_balls += 1
                    self.score += 20 if ball[2] == 2 else 10  # Update score based on ball type
                    print(f"New Score: {self.score}")  # Debugging output

        # Remove balls that have reached the bottom or been caught
        for ball in balls_to_remove:
            self.balls.remove(ball)

        """
        # Move existing balls down one row and check for catches
        for ball in list(self.balls):  # Safe iteration over a copy of the list
            ball[0] += 1  # Move ball down one row
            if ball[0] == self.grid_height - 1:  # If the ball is exactly at the bottom row
                if ball[1] == self.agent_pos:  # And in the same column as the agent
                    #print(f"Catching ball of type: {ball[2]}, Current Score: {self.score}") 
                    self.caught_balls += 1
                    self.score += 20 if ball[2] == 2 else 10  # Score based on ball type
                    #print(f"New Score: {self.score}") 
                # Remove the ball after processing, whether caught or missed
                self.balls.remove(ball)
        

        # Update grid representation
        self.grid = np.zeros((self.grid_height, self.grid_width), dtype=np.uint8)
        for ball in self.balls:
            self.grid[ball[0], ball[1]] = ball[2]  # Place balls on the grid
        self.grid[self.grid_height - 1, self.agent_pos] = 255  # Place the agent on the grid


        # Check for episode termination
        done = self.caught_balls >= 2

        # info dict added for debugging or additional info
        info = {}

        return self.grid.copy(), self.score, done, info

    def render(self, mode='human'):
        if mode == 'human':
            # Print the grid 
            for row in self.grid:
                print(' '.join(str(cell).rjust(3) for cell in row))
            print(f"Agent Position: {self.agent_pos} | Score: {self.score}\n")


class BallCatchingAgent:
    def __init__(self, env):
        self.env = env

    def act(self, state):
        # Implement your agent's policy here
        # For now, let's just return a random action
        return self.env.action_space.sample()



# Example usage
env = BallCatchingEnv(grid_size=(6, 12), max_balls=5)
agent = BallCatchingAgent(env)

state = env.reset()
done = False

while not done:
    action = agent.act(state)
    state, reward, done, _ = env.step(action)
    env.render()

        



  0   0   0   0   0   0   0   0   0   0   0   0
  0   2   0   2   1   0   0   0   2   0   0   0
  0   0   0   0   0   0   0   0   0   0   0   0
  0   0   0   0   0   0   0   0   0   0   0   0
  0   0   0   0   0   0   0   0   0   0   0   0
  0   0   0   0   0 255   0   0   0   0   0   0
Agent Position: 5 | Score: 0

  0   0   0   0   0   0   0   0   0   0   0   0
  0   0   0   0   0   2   0   0   0   0   2   0
  0   2   0   2   1   0   0   0   2   0   0   0
  0   0   0   0   0   0   0   0   0   0   0   0
  0   0   0   0   0   0   0   0   0   0   0   0
  0   0   0   0   0 255   0   0   0   0   0   0
Agent Position: 5 | Score: 0

  0   0   0   0   0   0   0   0   0   0   0   0
  0   0   2   0   0   0   0   0   0   0   0   0
  0   0   0   0   0   2   0   0   0   0   2   0
  0   2   0   2   1   0   0   0   2   0   0   0
  0   0   0   0   0   0   0   0   0   0   0   0
  0   0   0   0 255   0   0   0   0   0   0   0
Agent Position: 4 | Score: 0

  0   0   0   0   0   0   0   0   0   0   0   

**What should be the initial setup of the environment?**

- grid_size specifies the dimensions of the grid that represents the observation space. It's a 2D array where balls will "fall" from the top.
- max_balls is the maximum number of balls that can be introduced in one step. It adds a level of variability to the game.
- action_space is discrete, with three actions: stay in place, move left, or move right.
- observation_space is a 2D array (Box space) representing the game grid. The values within this grid represent the absence of balls (0), the presence of balls (1 for now, but this can be - adjusted if different types of balls are introduced), and the agent (255 for clear differentiation).
- agent_pos tracks the horizontal position of the agent, which moves only along the bottom row of the grid.
- score and caught_balls track the game's progress and performance of the agent.
- grid is the representation of the current state of the environment, including the positions of the agent and balls.

<br/>

**What will the environment dynamics?**

1. Ball Generation:
    -  Balls (red and blue) are generated randomly at the top of the grid.
    -  Logic to initialize balls at the top row with random column positions.
    - The number of balls introduced at each step can vary from 1 to max_balls.
2. Scoring:
    - Catching a red ball awards 10 points.
    - Catching a blue ball awards 20 points.
3. Agent Movement:
    - Action = 0: The agent stays in its current position.
    - Action = 1: The agent moves left by one column (if not at the left edge).
    - Action = 2: The agent moves right by one column (if not at the right edge).
4. Ball Movement:
    - Balls fall down by one row at each timestep.
    - If a ball reaches the bottom row, check if it's in the same column as the agent. If so, the ball is considered caught, and points are awarded based on the ball's color.
5. Edge Cases for Agent Movement:
    -  If the agent is at the leftmost or rightmost position, actions to move further left or right should have no effect.
6. Ball Tracking:
    - Keep track of all balls currently falling. This can be done with a list of tuples or objects that store each ball's position and type.
    - Red ball is represented by 1 and Blue ball is represented by 2. No ball is represented by 0. 
    - When balls move or are caught/missed, this list needs to be updated accordingly.
7. Grid Representation Update:
    - After each action and ball movement, the grid representation needs to be updated to reflect the current state, including the agent's new position and the positions of all balls.
8. Episode Termination:
    - An episode ends when the agent catches at least 5 balls, after which a new episode starts.



In [29]:
import gym
import numpy as np
from gym import spaces
from typing import Tuple

class BallCatchingEnv(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self, grid_size: Tuple[int, int]=(5, 12), max_balls: int=7):
        super(BallCatchingEnv, self).__init__()
        self.grid_height, self.grid_width = grid_size
        self.max_balls = max_balls

        # Actions: 0 = Stay, 1 = Left, 2 = Right
        self.action_space = spaces.Discrete(3)

        # 0 for empty, 1 for ball, and 255 for the agent
        self.observation_space = spaces.Box(low=0, high=255, 
                                            shape=(self.grid_height, self.grid_width), 
                                            dtype=np.uint8)

        self.agent_pos = self.grid_width // 2  # Start the agent in the middle of the grid
        self.score = 0
        self.caught_balls = 0
        self.grid = None
        self.reset()

    def reset(self):
        self.agent_pos = self.grid_width // 2
        self.grid = np.zeros((self.grid_height, self.grid_width), dtype=np.uint8)
        self.balls = []  
        self.score = 0
        self.caught_balls = 0
        self.grid[-1, self.agent_pos] = 255  # Use the bottom row for the agent
        return self.grid.copy()
    
    def step(self, action):
        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))

        if action == 1 and self.agent_pos > 0:  # Move left
            self.agent_pos -= 1
        elif action == 2 and self.agent_pos < self.grid_width - 1:  # Move right
            self.agent_pos += 1

        # Generate new balls at the top of the grid
        num_new_balls = np.random.randint(1, self.max_balls + 1)
        for _ in range(num_new_balls):
            ball_col = np.random.randint(0, self.grid_width)
            ball_type = np.random.choice([1, 2])  # 1 for red, 2 for blue
            self.balls.append([0, ball_col, ball_type])  # Append new ball at top row with random column


        # Move existing balls down one row and check for catches
        for ball in list(self.balls):  
            ball[0] += 1  # Move ball down one row
            if ball[0] == self.grid_height - 1:  
                if ball[1] == self.agent_pos:  
                    #print(f"Catching ball of type: {ball[2]}, Current Score: {self.score}") 
                    self.caught_balls += 1
                    self.score += 20 if ball[2] == 2 else 10  
                    #print(f"New Score: {self.score}") 
                # Remove the ball after processing, whether caught or missed
                self.balls.remove(ball)
        

        # Update grid
        self.grid = np.zeros((self.grid_height, self.grid_width), dtype=np.uint8)
        for ball in self.balls:
            self.grid[ball[0], ball[1]] = ball[2]  
        self.grid[self.grid_height - 1, self.agent_pos] = 255  


        # episode termination
        done = self.caught_balls >= 2

        info = {}

        return self.grid.copy(), self.score, done, info
    
    
    def get_current_part_snapshot(self):
        # Divide the grid width by 3 to define the width of each part
        part_width = self.grid_width // 3

        # Determine which part the agent is currently in
        current_part = self.agent_pos // part_width

        # Calculate the start and end indices of the current part
        start_col = current_part * part_width
        end_col = start_col + part_width

        # Slice the grid to get the current part's snapshot
        current_snapshot = self.grid[:, start_col:end_col]

        return current_snapshot.copy(), current_part


    def render(self, mode='human'):
        if mode == 'human':
            # Print the grid 
            for row in self.grid:
                print(' '.join(str(cell).rjust(3) for cell in row))
            print(f"Agent Position: {self.agent_pos} | Score: {self.score}\n")


In [27]:
class BallCatchingAgent:
    def __init__(self, env):
        self.env = env

    def calculate_steps_to_next_grid(self, agent_col, current_part_index):
        part_width = self.env.grid_width // 3  
        steps_to_left_border = agent_col - (current_part_index * part_width)  # Num of steps to the left
        steps_to_right_border = ((current_part_index + 1) * part_width - 1) - agent_col  # Num of steps to the right

        if current_part_index == 0:
            # If grid 1, Go right
            return steps_to_right_border + 1, 'R'
        elif current_part_index == 1:
            # If grid 2, return information about moving both sides
            # steps_to_grid_0, direction_to_grid_0, steps_to_grid_1, direction_to_grid_1
            return steps_to_left_border + 1, 'L', steps_to_right_border + 1, 'R'
            #if steps_to_left_border < steps_to_right_border:
            #    return steps_to_left_border + 1, 'L'
            #else:
            #    return steps_to_right_border + 1, 'R'
        elif current_part_index == 2:
            # If grid 3, move left 
            return steps_to_left_border + 1, 'L'
        else:
            raise ValueError("Invalid grid part index")
        

    def find_best_path_and_score(self, grid, start_row, start_col):
        rows = len(grid)
        cols = len(grid[0])
        directions = [(-1, 0, 'S'), (-1, -1, 'L'), (-1, 1, 'R')]
        point_values = {1: 10, 2: 20, 255: 0} 

        def explore_path(row, col, path, score):
            if row < 0 or col < 0 or col >= cols:
                return (score, path)

            current_score = point_values.get(grid[row][col], 0)
            best_score = score + current_score
            best_path = path

            for d_row, d_col, action in directions:
                next_row, next_col = row + d_row, col + d_col
                if 0 <= next_row < rows and 0 <= next_col < cols:
                    new_score, new_path = explore_path(next_row, next_col, path + [action], score + current_score)
                    if new_score > best_score:
                        best_score = new_score
                        best_path = new_path
            return (best_score, best_path)

        total_score, best_path = explore_path(start_row, start_col, [], 0)
        return best_path, total_score
    

    def find_agent_position(self, snapshot):
        for col in range(len(snapshot[0])):
            if snapshot[-1][col] == 255:  
                return len(snapshot) - 1, col
        return None  
    
    def calculate_points(self, grid, start_row, start_col, steps, direction):
        point_values = {1: 10, 2: 20}
        gained_points = 0
        lost_points = 0
        rows = len(grid)
        cols = len(grid[0])

        
  
    def act(self,state):
        # Get the current grid's snapshot
        snapshot, current_part_index = self.env.get_current_part_snapshot()

        agent_row, agent_col = self.find_agent_position(snapshot)

        # Calculate the num of steps to the next grid 
        if current_part_index == 1:
            steps_to_grid_left, direction_to_grid_left, steps_to_grid_right, direction_to_grid_right = self.calculate_steps_to_next_grid(agent_col, current_part_index)
        else:
            steps_to_next_grid, direction_to_next_grid = self.calculate_steps_to_next_grid(agent_col, current_part_index)

        best_path, total_score = self.find_best_path_and_score(snapshot, agent_row, agent_col)

        print("========================================================================")
        print("------------------ Snapshot---------------------------")
        print(snapshot)
        print("-------------------Best Path --------------------------")
        print(best_path)
        print("-------------------Score of this path---------------------------")
        print(total_score)
        print("=============================== X ======================================")
        
        # Decide the next action based on the first step in the best path
        if best_path:
            next_action = best_path[0]
            if next_action == 'S':
                return 0  # Stay
            elif next_action == 'L':
                return 1  # Left
            elif next_action == 'R':
                return 2  # Right
        else:
            # If for some reason the best_path is empty, take a random action
            return self.env.action_space.sample()

    

In [None]:
# Example usage
env = BallCatchingEnv(grid_size=(6, 12), max_balls=5)
agent = BallCatchingAgent(env)

state = env.reset()
done = False

while not done:
    action = agent.act(state)
    state, reward, done, _ = env.step(action)
    env.render()

---------------------------------------------------------------------------

**Function for finding the best path TRIAL**


In [19]:
def find_best_path_and_score(grid, start_row, start_col):
    rows = len(grid)
    cols = len(grid[0])
    
    # Directions the agent can move: (row_change, col_change, action)
    directions = [(-1, 0, 'S'), (-1, -1, 'L'), (-1, 1, 'R')]
    
    # Map grid points to their scores
    point_values = {1: 10, 2: 20, 255: 0}  # Agent's position does not add points
    
    def explore_path(row, col, path, score):
        if row < 0 or col < 0 or col >= cols:
            return (score, path)
        
        current_score = point_values.get(grid[row][col], 0)
        best_score = score + current_score
        best_path = path
        
        for d_row, d_col, action in directions:
            next_row, next_col = row + d_row, col + d_col
            if 0 <= next_row < rows and 0 <= next_col < cols:
                new_score, new_path = explore_path(next_row, next_col, path + [action], score + current_score)
                if new_score > best_score:
                    best_score = new_score
                    best_path = new_path
        return (best_score, best_path)
    
    total_score, best_path = explore_path(start_row, start_col, [], 0)
    return best_path, total_score

grid = [
    [0, 0, 0, 0],
    [1, 0, 1, 2],
    [0, 1, 1, 0],
    [0, 1, 0, 0],
    [0, 1, 2, 0],
    [0, 0, 255, 0]
]
start_row, start_col = 5, 2

best_path, total_score = find_best_path_and_score(grid, start_row, start_col)
print("Best path:", best_path)
print("Total score:", total_score)


Best path: ['S', 'L', 'R', 'R']
Total score: 60


-----------------------------------------

**Neural Network Play**

Feature Vector: The feature vector for training the neural network online should include:
- Total potential score from the best path in the current grid.
- Number of steps to the next grid.
- Guaranteed points from moving towards the next grid.
- Guaranteed losses from moving towards the next grid.
- Current part index to provide context about the spatial location within the environment.

Reward Function: The reward function should reflect the agent's performance in terms of both immediate and future rewards. Can consider including penalties for guaranteed losses and rewards for guaranteed points in the reward calculation to encourage strategic decision-making.

In [None]:
class BallCatchingAgent:
    def __init__(self, env, neural_network):
        self.env = env
        self.neural_network = neural_network  # Assuming the NN is already trained and ready

    def act(self):
        snapshot, current_part_index = self.env.get_current_part_snapshot()
        agent_row, agent_col = self.find_agent_position(snapshot)
        best_path, potential_points = self.find_best_path_and_score(snapshot, agent_row, agent_col)

        steps_to_next_grid = self.calculate_steps_to_next_grid(current_part_index, agent_col)

        # NN input might include potential_points, steps_to_next_grid, current_part_index, etc.
        nn_input = [potential_points, steps_to_next_grid, current_part_index]
        move_to_next_grid = self.neural_network.predict(nn_input)  # Assuming a method to get NN decision

        if move_to_next_grid:
            # Decide direction based on current_part_index and potentially best_path
            next_action = self.decide_movement_to_next_grid(current_part_index, best_path)
        else:
            # Follow the best immediate path
            next_action = self.decide_action_from_best_path(best_path)

        return next_action

    # Methods like calculate_steps_to_next_grid, decide_movement_to_next_grid, decide_action_from_best_path would need to be implemented


In [82]:
def calculate_points(grid, start_row, start_col, steps, direction):
    point_values = {1: 10, 2: 20}
    gained_points = 0
    lost_points = 0
    rows = len(grid)
    cols = len(grid[0])

    # Calculate gained points by moving left only
    for step in range(1, steps + 1):
        if direction == 'L':
             val_adj = -step
        elif direction == 'R':
             val_adj = step
        if start_col + val_adj >= 0: 
            cell_value = grid[start_row + val_adj][start_col + val_adj]
            gained_points += point_values.get(cell_value, 0)
    
    # Calculate lost points by considering the positions directly above and to the right of the agent's path
    for step in range(1, steps + 1):
        row = start_row - step
        max_vals = []
        if step != 1:
                start_col = start_col - step + 1
        for col_diff in range(0, 2):  # Check the column of the agent, and one column to the right
            col = start_col + col_diff
            if 0 <= col < cols and 0 <= row:  
                cell_value = grid[row][col]
                max_vals.append(cell_value)

        lost_points += point_values.get(max(max_vals) , 0)  
        

    return gained_points, lost_points

# Example grid and starting position
grid = [
    [0, 0, 0, 0],
    [1, 0, 1, 2],
    [1, 1, 1, 0],
    [0, 2, 0, 0],
    [0, 1, 2, 0],
    [0, 0, 255, 0]
]
start_row, start_col = 5, 2
steps = 2  # Number of steps the agent will take to the left
direction = 'L'
gained_points, lost_points = calculate_points(grid, start_row, start_col, steps, direction)

print(f"Guaranteed points gained: {gained_points}")
print(f"Guaranteed points lost: {lost_points}")


Guaranteed points gained: 10
Guaranteed points lost: 40


In [74]:
def calculate_points_directional(grid, start_row, start_col, steps, direction):
    point_values = {1: 10, 2: 20}
    gained_points = 0
    lost_points = 0
    rows = len(grid)
    cols = len(grid[0])

    for step in range(1, steps + 1):
        print("Step = ",step)
        # Adjust the direction of movement
        if direction == 'L':
            col_adjust = -step
        elif direction == 'R':
            col_adjust = step
        else:
            raise ValueError("Direction must be 'L' for left or 'R' for right")

        # Calculate gained points based on direction
        print("GAINS :- Printing (row,col,points)")
        new_col = start_col + col_adjust
        if 0 <= new_col < cols:
            cell_value = grid[start_row - step][new_col]
            print(start_row - step, new_col, cell_value)
            gained_points += point_values.get(cell_value, 0)

        # Calculate lost points by considering the positions above the agent
        max_vals = []
        # Adjust starting column based on the direction for each step
        print("LOSS :- Printing (row, col, points)")
        adjust_col_for_lost = start_col if step == 1 else start_col + col_adjust + (1 if direction == 'L' else -1)
        for col_diff in range(0, 2):  
            col = adjust_col_for_lost + col_diff * (-1 if direction == 'L' else 1)
            if 0 <= col < cols and 0 <= start_row - step:
                cell_value = grid[start_row - step][col]
                print(start_row - step,col,cell_value)
                max_vals.append(cell_value)

        if max_vals:
            lost_points += point_values.get(max(max_vals), 0)

    return gained_points, lost_points

# Example usage:
grid = [
    [0, 0, 0, 0],
    [1, 0, 1, 2],
    [1, 1, 1, 0],
    [0, 0, 0, 0],
    [0, 1, 2, 0],
    [0, 0, 255, 0]
]
start_row, start_col = 5, 1
steps = 2
direction = 'L'  # 'L' for left, 'R' for right

gained_points, lost_points = calculate_points_directional(grid, start_row, start_col, steps, direction)
print(f"Direction: {direction}")
print(f"Guaranteed points gained: {gained_points}")
print(f"Guaranteed points lost: {lost_points}")


Step =  1
GAINS :- Printing (row,col,points)
4 0 0
LOSS :- Printing (row, col, points)
4 1 1
4 0 0
Step =  2
GAINS :- Printing (row,col,points)
LOSS :- Printing (row, col, points)
3 0 0
Direction: L
Guaranteed points gained: 0
Guaranteed points lost: 10
