In [None]:
!pip install import-ipynb

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from PIL import Image
#from transformers import DPTFeatureExtractor, DPTForDepthEstimation
#from ultralytics import YOLO
import gym
from gym import spaces
from collections import namedtuple, deque
import random

In [None]:
import import_ipynb
import objDet
import dpt_map

In [None]:
# Define the DQN model
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Define the ReplayMemory
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Environment class
class AutonomousVehicleEnv(gym.Env):
    def __init__(self):
        super().__init__()
        self.action_space = spaces.Discrete(4)  # stop, right, left, straight
        self.observation_space = spaces.Box(low=0, high=255, shape=(224, 640, 3), dtype=np.uint8)

    def step(self, action):
        # Implement environment dynamics
        # For simplicity, we'll just return a random observation and reward
        observation = self.observation_space.sample()
        reward = np.random.uniform(-1, 1)
        done = False
        info = {}
        return observation, reward, done, info

    def reset(self):
        return self.observation_space.sample()

# DQN Agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.memory = ReplayMemory(10000)
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.batch_size = 64

        self.policy_net = DQN(state_size, action_size).to(self.device)
        self.target_net = DQN(state_size, action_size).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.learning_rate)

    def act(self, state):
        if random.random() <= self.epsilon:
            return random.randrange(self.action_size)
        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            q_values = self.policy_net(state)
            return q_values.argmax().item()

    def remember(self, state, action, reward, next_state, done):
        self.memory.push(state, action, next_state, reward, done)

    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        transitions = self.memory.sample(self.batch_size)
        batch = Transition(*zip(*transitions))

        state_batch = torch.FloatTensor(batch.state).to(self.device)
        action_batch = torch.LongTensor(batch.action).unsqueeze(1).to(self.device)
        reward_batch = torch.FloatTensor(batch.reward).to(self.device)
        next_state_batch = torch.FloatTensor(batch.next_state).to(self.device)
        done_batch = torch.FloatTensor(batch.done).to(self.device)

        q_values = self.policy_net(state_batch).gather(1, action_batch)
        next_q_values = self.target_net(next_state_batch).max(1)[0].detach()
        expected_q_values = reward_batch + self.gamma * next_q_values * (1 - done_batch)

        loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1))
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def update_target_network(self):
        self.target_net.load_state_dict(self.policy_net.state_dict())

# Main function
def main():
    # Load models
    object_detection_model = "runs/detect/train/weights/best.pt"
    yolo = objDet.YOLOTrainerDetector()

    # Create environment
    env = AutonomousVehicleEnv()

    # Create agent
    state_size = 224 * 640 * 3  # Flattened image size
    action_size = 4
    agent = DQNAgent(state_size, action_size)

    # Training loop
    num_episodes = 10
    for episode in range(num_episodes):
        state = env.reset()
        state = state.flatten()  # Flatten the state
        total_reward = 0
        done = False

        while not done:
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            next_state = next_state.flatten()  # Flatten the next state

            # Convert observation to image
            image = Image.fromarray(next_state.reshape(224, 640, 3).astype('uint8'))

            # Predict depth
            depth_map = dpt_map.findDepth(image)

            # Detect objects
            detected_objects = detect_objects(image, object_detection_model)

            # Use depth_map and detected_objects to modify the reward
            # This is a simple example and should be replaced with more sophisticated logic
            if np.min(depth_map) < 10:  # If very close to an obstacle
                reward -= 1
            if len(detected_objects) > 0:  # If objects detected
                reward += 5

            agent.remember(state, action, reward, next_state, done)
            agent.replay()

            state = next_state
            total_reward += reward

        if episode % 10 == 0:
            agent.update_target_network()

        print(f"Episode {episode + 1}, Total Reward: {total_reward}, Epsilon: {agent.epsilon:.2f}")

if __name__ == "__main__":
    main()
