In [None]:
# !pip install pygame
# install requirements.txt
# !conda install pytorch torchvision cudatoolkit

# !pip install -r requirements.txt

In [None]:
!nvcc --version

In [None]:
import gym
from gym import spaces
import numpy as np
import pygame

In [None]:
class CooperativeGameEnv(gym.Env):
    def __init__(self, grid_size=(5, 5)):
        super(CooperativeGameEnv, self).__init__()
        self.grid_size = grid_size
        self.blue_target = None
        self.red_target = None
        self.blue_player = None
        self.red_player = None

        self.action_space = spaces.Discrete(2)  # Up, Down, Left, Right
                #observation_space and the distance between the player and the target
        self.observation_space = spaces.Box(low=0, high=1, shape=(8,), dtype=np.float32)
        self.window_size = (300, 300)
        self.cell_size = (self.window_size[0] // self.grid_size[0], self.window_size[1] // self.grid_size[1])

        pygame.init()
        self.screen = pygame.display.set_mode(self.window_size)
        pygame.display.set_caption("Cooperative Game")
        self.clock = pygame.time.Clock()
    # Function to randomly reset the target position
    def transformPos(self,x,y):
        canvas_width, canvas_height = self.window_size
        # target_size = self.cell_size[0]
        # target_x = np.random.randint(0, canvas_width - target_size + 1)
        # target_y = np.random.randint(0, canvas_height - target_size + 1)
        # return target_x, target_y
        return x * canvas_width, y * canvas_height
    def reset(self):
        self.blue_target = np.random.rand(2)* 4
        self.red_target = np.random.rand(2)* 4
        self.blue_player = np.random.rand(2)* 4
        self.red_player = np.random.rand(2) * 4
        # Generate the red target while it is too close to the blue target
        while True:
            self.red_target = np.random.rand(2)
            red_target_grid = self.transformPos(self.red_target[0],self.red_target[1])
            d=np.linalg.norm(red_target_grid - self.blue_target)
            # multplayer the position with his grid size to get the position in the grid
            if d > .5:
                break
        return self._get_observation()

    def step(self, actions):
        self._update_player_positions(actions)
        done = self._check_collision()
        reward = self._calculate_reward()

        return self._get_observation(), reward, done, {}

    def _get_observation(self):
        # return angle between the player and the target
        # and the distance between the player and the target
        return np.array([self.blue_player[0], self.blue_player[1], self.red_player[0], self.red_player[1], self.blue_target[0], self.blue_target[1], self.red_target[0], self.red_target[1]])

    def _update_player_positions(self, actions):
        blue_action, red_action = actions

        if blue_action == 0:  # Up
            self.blue_player[1] = min(self.blue_player[1] + 1, self.grid_size[1] - 1)
        elif blue_action == 1:  # Down
            self.blue_player[1] = max(self.blue_player[1] - 1, 0)
        elif blue_action == 2:  # Left
            self.blue_player[0] = max(self.blue_player[0] - 1, 0)
        elif blue_action == 3:  # Right
            self.blue_player[0] = min(self.blue_player[0] + 1, self.grid_size[0] - 1)

        if red_action == 0:  # Up
            self.red_player[1] = min(self.red_player[1] + 1, self.grid_size[1] - 1)
        elif red_action == 1:  # Down
            self.red_player[1] = max(self.red_player[1] - 1, 0)
        elif red_action == 2:  # Left
            self.red_player[0] = max(self.red_player[0] - 1, 0)
        elif red_action == 3:  # Right
            self.red_player[0] = min(self.red_player[0] + 1, self.grid_size[0] - 1)

    def _check_collision(self):
        blue_distance = np.linalg.norm(self.blue_player - self.blue_target)
        red_distance = np.linalg.norm(self.red_player - self.red_target)
        return blue_distance == 0 and red_distance == 0

    def _calculate_reward(self):
        blue_distance = np.linalg.norm(self.blue_player - self.blue_target)
        red_distance = np.linalg.norm(self.red_player - self.red_target)
        mean_distance = np.sqrt((blue_distance**2 + red_distance**2) / 2)
        return -mean_distance

    def win(self):
        # take the player position and the target position
        self.blue_player = self.blue_target
        self.red_player = self.red_target
    
    def render(self):
        # grid = np.zeros(self.grid_size)
        # grid[int(self.blue_player[0]), int(self.blue_player[1])] = 1
        # grid[int(self.red_player[0]), int(self.red_player[1])] = 2
        # grid[int(self.blue_target[0]), int(self.blue_target[1])] = 3
        # grid[int(self.red_target[0]), int(self.red_target[1])] = 4
        # print(grid)
        self.screen.fill((255, 255, 255))

        for x in range(self.grid_size[0]):
            for y in range(self.grid_size[1]):
                rect = pygame.Rect(x * self.cell_size[0], y * self.cell_size[1], self.cell_size[0], self.cell_size[1])
                pygame.draw.rect(self.screen, (200, 200, 200), rect)

        blue_player_rect = pygame.Rect(self.blue_player[0] * self.cell_size[0], self.blue_player[1] * self.cell_size[1], self.cell_size[0], self.cell_size[1])
        pygame.draw.rect(self.screen, (0, 0, 255), blue_player_rect)

        red_player_rect = pygame.Rect(self.red_player[0] * self.cell_size[0], self.red_player[1] * self.cell_size[1], self.cell_size[0], self.cell_size[1])
        pygame.draw.rect(self.screen, (255, 0, 0), red_player_rect)

        blue_target_rect = pygame.Rect(self.blue_target[0] * self.cell_size[0], self.blue_target[1] * self.cell_size[1], self.cell_size[0], self.cell_size[1])
        pygame.draw.rect(self.screen, (0, 0, 255), blue_target_rect)

        red_target_rect = pygame.Rect(self.red_target[0] * self.cell_size[0], self.red_target[1] * self.cell_size[1], self.cell_size[0], self.cell_size[1])
        pygame.draw.rect(self.screen, (255, 0, 0), red_target_rect)

        pygame.display.flip()
        self.clock.tick(30)



In [None]:
# !pip install matplotlib
# !pip install opencv-python

In [None]:
import matplotlib.pyplot as plt
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas

import cv2

In [None]:
env = CooperativeGameEnv()


In [29]:
# # Usage example
env = CooperativeGameEnv()
obs = env.reset()
done = False
rewards = []
fig=plt.figure()
steps=0
while not done:
    env.render()
    blue_action = np.random.randint(0, 4)
    red_action = np.random.randint(0, 4)
    obs, reward, done, _ = env.step([blue_action, red_action])
    rewards.append(reward)
    rr=rewards[-100:]
    plt.plot( rr)
    canvas= FigureCanvas(fig)
    canvas.draw()
    image=  np.fromstring(canvas.tostring_rgb(), dtype='uint8')
    image=image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    cv2.imshow('image',image)
    cv2.waitKey(1)
    plt.clf()
    if steps>120:
        env.win()
    steps+=1
env.render()



<Figure size 640x480 with 0 Axes>

In [30]:
import random
import torch
import torch.nn as nn
import torch.optim as optim

In [31]:
torch.cuda.is_available()

True

In [32]:
env.action_space.sample()

0

In [33]:
# Define the Deep Q-Network (DQN) model
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Define the replay memory buffer
class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []

    def push(self, transition):
        if len(self.memory) >= self.capacity:
            self.memory.pop(0)
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

# Define the Deep Q-Network Agent
class DQNAgent:
    def __init__(self, state_size, action_size, lr, gamma, epsilon, epsilon_decay):
        self.state_size = state_size
        self.action_size = action_size
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay

        self.device = torch.device( "cpu")
        # self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.policy_net = DQN(state_size, action_size).to(self.device)
        self.target_net = DQN(state_size, action_size).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.criterion = nn.MSELoss()

        self.memory = ReplayMemory(capacity=10000)

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            blue_action = np.random.randint(0, 4)
            red_action = np.random.randint(0, 4)
            actions=[blue_action, red_action]
            return actions
        with torch.no_grad():
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)
            q_values = self.policy_net(state)
            return q_values.argmax().item()

    def train(self, batch_size):
        if len(self.memory.memory) < batch_size:
            return

        transitions = self.memory.sample(batch_size)
        batch = list(zip(*transitions))
        state_batch = torch.tensor(batch[0], dtype=torch.float32).to(self.device)
        action_batch = torch.tensor(batch[1], dtype=torch.int64).to(self.device)
        reward_batch = torch.tensor(batch[2], dtype=torch.float32).unsqueeze(1).to(self.device)
        next_state_batch = torch.tensor(batch[3], dtype=torch.float32).to(self.device)
        done_batch = torch.tensor(batch[4], dtype=torch.float32).unsqueeze(1).to(self.device)

        current_q_values = self.policy_net(state_batch).gather(1, action_batch)
        next_q_values = self.target_net(next_state_batch).max(1)[0].unsqueeze(1)
        target_q_values = reward_batch + (1 - done_batch) * self.gamma * next_q_values

        loss = self.criterion(current_q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.epsilon *= self.epsilon_decay

    def update_target_network(self):        
        self.target_net.load_state_dict(self.policy_net.state_dict())

# Define the main training loop
def train_dqn(env, agent, num_episodes, batch_size):
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.memory.push((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward

            agent.train(batch_size)
            agent.update_target_network()

        print(f"Episode: {episode+1}, Total Reward: {total_reward}")




In [34]:
obs = env.reset()
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
lr = 0.001
gamma = 0.99
epsilon = 1.0
epsilon_decay = 0.999
batch_size = 64
num_episodes = 1000


In [35]:
agent = DQNAgent(state_size, action_size, lr, gamma, epsilon, epsilon_decay)

In [36]:
agent.device

device(type='cpu')

In [37]:
for episode in range(num_episodes):
    state = env.reset()
    done = False
    total_reward = 0

    while not done:
        action = agent.select_action(state)
        next_state, reward, done, _ = env.step(action)
        agent.memory.push((state, action, reward, next_state, done))
        state = next_state
        total_reward += reward

        agent.train(batch_size)
        agent.update_target_network()

    print(f"Episode: {episode+1}, Total Reward: {total_reward}")

RuntimeError: index 2 is out of bounds for dimension 1 with size 2

In [None]:
# !pip freeze > requirements.txt
