In [None]:
# Combined and Optimized RL Parking Allocation Code

import numpy as np
import gymnasium as gym
from gymnasium import spaces
import matplotlib.pyplot as plt
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
import random

# -------------------------
# ENVIRONMENT DEFINITION
# -------------------------
class ParkingEnv(gym.Env):
    def __init__(self, num_spots=10, max_queue=5, peak_hours=[(8, 10), (16, 18)], max_steps=96):
        super(ParkingEnv, self).__init__()
        self.num_spots = num_spots
        self.max_queue = max_queue
        self.peak_hours = peak_hours
        self.max_steps = max_steps

        self.observation_space = spaces.Dict({
            'spots': spaces.Box(low=0, high=1, shape=(num_spots,), dtype=np.int32),
            'queue': spaces.Discrete(max_queue + 1),
            'hour': spaces.Discrete(24)
        })
        self.action_space = spaces.Discrete(num_spots + 1)

        self.reset()

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.spots = np.zeros(self.num_spots, dtype=np.int32)
        self.durations = np.zeros(self.num_spots, dtype=np.int32)
        self.queue = 0
        self.time = 8
        self.steps = 0
        self.total_wait = 0
        self.illegal_parking = 0
        return self._get_obs(), {}

    def _get_obs(self):
        return {
            'spots': self.spots.copy(),
            'queue': self.queue,
            'hour': self.time
        }

    def _is_peak(self):
        for start, end in self.peak_hours:
            if start <= self.time < end:
                return True
        return False

    def _generate_vehicles(self):
        rate = 2 if self._is_peak() else 0.5
        arrivals = np.random.poisson(rate)
        self.queue = min(self.max_queue, self.queue + arrivals)

    def _update_spots(self):
        for i in range(self.num_spots):
            if self.spots[i] == 1:
                self.durations[i] -= 15
                if self.durations[i] <= 0:
                    self.spots[i] = 0

    def _calculate_reward(self, action):
        reward = 0
        if action == self.num_spots:
            reward -= 0.2 * self.queue
            return reward

        if self.spots[action] == 1:
            reward -= 5
            self.illegal_parking += 1
            return reward

        if self.queue > 0:
            reward += 2
            center_dist = abs(action - self.num_spots // 2) / (self.num_spots // 2)
            reward += (1 - center_dist)
            self.spots[action] = 1
            self.durations[action] = np.random.randint(30, 121)
            self.queue -= 1
        else:
            reward -= 1

        return reward

    def step(self, action):
        reward = self._calculate_reward(action)
        self._update_spots()
        self._generate_vehicles()
        self.time = int((self.time + 0.25) % 24)
        self.total_wait += self.queue * 15
        self.steps += 1
        done = self.steps >= self.max_steps
        info = {
            'avg_wait': self.total_wait / (self.steps + 1e-6),
            'illegal': self.illegal_parking,
            'utilization': np.mean(self.spots)
        }
        return self._get_obs(), reward, done, False, info

# -------------------------
# DQN NETWORK AND AGENT
# -------------------------
class DQN(nn.Module):
    def __init__(self, num_spots):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(num_spots + 2, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, num_spots + 1)
        )

    def forward(self, obs):
        x = torch.cat([
            torch.FloatTensor(obs['spots']),
            torch.FloatTensor([obs['queue']]),
            torch.FloatTensor([obs['hour'] / 24.0])
        ])
        return self.fc(x)

class DQNAgent:
    def __init__(self, env):
        self.env = env
        self.policy = DQN(env.num_spots)
        self.target = DQN(env.num_spots)
        self.target.load_state_dict(self.policy.state_dict())
        self.memory = deque(maxlen=10000)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-4)
        self.epsilon = 1.0
        self.eps_end = 0.1
        self.gamma = 0.99
        self.batch = 64

    def act(self, state):
        if random.random() < self.epsilon:
            return self.env.action_space.sample()
        with torch.no_grad():
            q = self.policy(state)
            return torch.argmax(q).item()

    def remember(self, s, a, r, s2, done):
        self.memory.append((s, a, r, s2, done))

    def learn(self):
        if len(self.memory) < self.batch:
            return
        batch = random.sample(self.memory, self.batch)
        for s, a, r, s2, d in batch:
            q_vals = self.policy(s)
            with torch.no_grad():
                next_q = self.target(s2)
                max_q = torch.max(next_q)
                target = r + self.gamma * max_q * (1 - d)
            q_vals[a] = target
            loss = nn.MSELoss()(self.policy(s), q_vals)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

    def train(self, episodes=500):
        for ep in range(episodes):
            s, _ = self.env.reset()
            done = False
            total = 0
            while not done:
                a = self.act(s)
                s2, r, done, _, info = self.env.step(a)
                self.remember(s, a, r, s2, done)
                self.learn()
                s = s2
                total += r
            if ep % 10 == 0:
                self.target.load_state_dict(self.policy.state_dict())
            self.epsilon = max(self.eps_end, self.epsilon * 0.995)
            print(f"Ep {ep}, Reward: {total:.2f}, Eps: {self.epsilon:.2f}, Illegal: {info['illegal']}, Wait: {info['avg_wait']:.2f}, Util: {info['utilization']:.2f}")
