In [None]:
# graph_from_model.py
"""
Evaluate trained DQN model ONLY and generate success probability graphs.
This file is SELF-CONTAINED (includes Env + QNetwork).
"""

import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces

# =====================================================
# 1) ENVIRONMENT (copied exactly from your training code)
# =====================================================
class FlightEnv(gym.Env):
    def __init__(self, start_alt=400.0, start_dist=800.0):
        super().__init__()
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, -30, 0], dtype=np.float32),
            high=np.array([5000, 300, 10000, 30, 1], dtype=np.float32),
            dtype=np.float32
        )
        self.action_space = spaces.Discrete(5)
        self.start_alt = start_alt
        self.start_dist = start_dist
        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.altitude = float(self.start_alt)
        self.speed = float(160 + np.random.uniform(-10, 10))
        self.distance = float(self.start_dist)
        self.prev_distance = self.distance
        self.angle = float(np.random.uniform(-2, 2))
        self.runway_condition = float(np.random.choice([0.0, 0.5, 1.0]))
        self.steps = 0
        return self._get_obs(), {}

    def step(self, action):
        self.steps += 1

        if action == 0: self.speed += 6
        elif action == 1: self.speed -= 6
        elif action == 2: self.altitude += 35; self.angle += 1.5
        elif action == 3: self.altitude -= 35; self.angle -= 1.5

        self.distance -= max(self.speed * 0.3, 1)
        self.altitude -= 8

        drag = {0.0:0.6, 0.5:0.4, 1.0:0.25}[self.runway_condition]
        self.speed -= drag

        self.angle = np.clip(self.angle, -30, 30)
        self.altitude = max(self.altitude, 0)
        self.speed = np.clip(self.speed, 0, 300)

        reward = 0
        reward += (self.prev_distance - self.distance) * 0.02
        self.prev_distance = self.distance
        reward -= 0.03
        reward -= 0.005 * abs(self.altitude - 100)
        reward -= 0.005 * abs(self.speed - 150)
        reward -= 0.01 * abs(self.angle)
        if self.distance < 400: reward += 0.8
        if 0 < self.altitude < 100 and 100 < self.speed < 200: reward += 1.5
        reward += (self.start_dist - self.distance) / self.start_dist

        done, success = False, False
        outcome = "in-flight"

        if self.distance <= 0:
            if 0 <= self.altitude <= 50 and 100 <= self.speed <= 200 and abs(self.angle) < 10:
                reward += 200; success = True; outcome = "successful landing"
            else:
                reward -= 40; outcome = "failed landing"
            done = True

        elif self.altitude <= 0:
            reward -= 40; done = True; outcome = "crash before runway"

        elif self.speed <= 20 and self.altitude > 100:
            reward -= 40; done = True; outcome = "stall midair"

        elif self.steps >= 600:
            done = True; outcome = "timeout"

        return self._get_obs(), reward, done, False, {
            "success": success, "outcome": outcome, "runway": self.runway_condition
        }

    def _get_obs(self):
        return np.array([
            self.altitude / 5000,
            self.speed / 300,
            self.distance / 10000,
            (self.angle + 30) / 60,
            self.runway_condition
        ], dtype=np.float32)

# =====================================================
# 2) Q-NETWORK (copied exactly from training code)
# =====================================================
class QNetwork(nn.Module):
    def __init__(self, obs_dim, n_actions):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(obs_dim, 256), nn.ReLU(),
            nn.Linear(256, 256), nn.ReLU(),
            nn.Linear(256, n_actions)
        )
    def forward(self, x): return self.model(x)

# =====================================================
# 3) LOAD MODEL
# =====================================================
MODEL_PATH = "/kaggle/input/dqn-flight/pytorch/default/1/best_model (1).pt"
EVAL_EPISODES = 20000
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

env = FlightEnv()
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.n

model = QNetwork(obs_dim, n_actions).to(DEVICE)
model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
model.eval()

print("Loaded model from:", MODEL_PATH)

# =====================================================
# 4) EVALUATION LOOP
# =====================================================
runway_counts = {0.0:0, 0.5:0, 1.0:0}
runway_success = {0.0:0, 0.5:0, 1.0:0}

success_curve = {0.0:[], 0.5:[], 1.0:[]}
mean_curve = []

def get_action(obs):
    with torch.no_grad():
        t = torch.tensor(obs, dtype=torch.float32, device=DEVICE).unsqueeze(0)
        return int(model(t).argmax().item())

for ep in range(1, EVAL_EPISODES + 1):
    obs, _ = env.reset()
    done = False
    rwy = env.runway_condition

    while not done:
        action = get_action(obs)
        obs, _, done, _, info = env.step(action)

    runway_counts[rwy] += 1
    if info["success"]:
        runway_success[rwy] += 1

    # Append curves
    for r in [0.0, 0.5, 1.0]:
        if runway_counts[r] > 0:
            success_curve[r].append(runway_success[r] / runway_counts[r])
        else:
            success_curve[r].append(0.0)

    mean_curve.append(np.mean([
        success_curve[0.0][-1],
        success_curve[0.5][-1],
        success_curve[1.0][-1]
    ]))

    if ep % 5000 == 0:
        print(f"Progress: {ep}/{EVAL_EPISODES}")

# =====================================================
# 5) PLOT RESULTS
# =====================================================
episodes = np.arange(1, EVAL_EPISODES + 1)

plt.figure(figsize=(12,6))
plt.plot(episodes, success_curve[0.0], label="Dry (0.0)")
plt.plot(episodes, success_curve[0.5], label="Wet (0.5)")
plt.plot(episodes, success_curve[1.0], label="Icy (1.0)")
plt.xlabel("Evaluation Episodes")
plt.ylabel("Success Probability")
plt.title("Landing Success Probability vs Evaluation Episodes")
plt.grid(True)
plt.legend()
plt.show()

plt.figure(figsize=(12,6))
plt.plot(episodes, mean_curve, color="black")
plt.xlabel("Evaluation Episodes")
plt.ylabel("Mean Success Probability")
plt.title("Overall Mean Landing Success Across Runways")
plt.grid(True)
plt.show()

print("\nFinal success probabilities:")
print({k: runway_success[k]/runway_counts[k] for k in runway_counts})
