# Hierarchical Multi-Agent Reinforcement Learning for Congestion-Aware Vessel Scheduling

**Supervised by Prof. Aboussalah**  \n**Spring 2026 Independent Study**

This Colab notebook is an MVP (minimum viable prototype) that sketches a working pipeline for hierarchical MARL with congestion forecasting and port coordination. The goal is to be **descriptive**, **traceable**, and **executable** with simplified components so we can iterate quickly before scaling to a full simulator and MAPPO training.

## Table of contents
1. Project framing and objectives
2. Architecture and data flow
3. Configuration
4. Toy simulator (MVP environment)
5. Forecasting module (mock)
6. Agent decision stubs (hierarchical control)
7. Metrics and evaluation hooks
8. Next steps for the full project


## 1. Project framing and objectives
We model a maritime network with **heterogeneous agents**:
- **Fleet coordinator** (strategic decisions, 12–24h cadence)
- **Vessel agents** (operational speed/arrival control, 1–4h cadence)
- **Port agents** (dock allocation and service scheduling, 2–6h cadence)

**MVP objectives**:
- Validate the **information flow** between forecasting, coordinator, vessel, and port layers.
- Provide a **minimal environment** to test reward signals and coordination logic.
- Establish **hooks for metrics** that will be used in the full study.

We simulate a small system (1 coordinator, 8 vessels, 5 ports) and use a toy environment to demonstrate the **data flow** and **learning loops**.

## 2. Architecture and data flow
```
Fleet Coordinator  → (Strategic Directives) →  Vessel Agents  → (Arrival Requests) →  Port Agents
Port Agents        → (Dock Availability)   →  Vessel Agents
```
**Forecasting usage**:
- Medium-term forecasts (3–7 days) inform the **fleet coordinator**.
- Short-term forecasts (6–24 hours) inform **vessels and ports**.

This MVP will keep policies simple but will expose the exact data each agent sees.

## 3. Configuration
We keep dependencies minimal for the MVP. In a full implementation, this section will include MARL frameworks (e.g., RLlib, MARLlib) and forecasting libraries.

In [None]:
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Dict, List, Tuple

SEED = 42
rng = np.random.default_rng(SEED)


## 4. Toy simulator (MVP environment)
We implement a **discrete-event toy simulator** that captures the key signals: queue lengths, dock occupancy, and travel times. This is intentionally small and interpretable so we can debug coordination and forecast usage early.

In [None]:
@dataclass
class PortState:
    queue: int
    docks: int
    occupied: int

@dataclass
class VesselState:
    location: int
    speed: float
    fuel: float

def initialize_ports(num_ports: int, docks_per_port: int = 3) -> List[PortState]:
    return [PortState(queue=rng.integers(0, 5), docks=docks_per_port, occupied=0)
            for _ in range(num_ports)]

def initialize_vessels(num_vessels: int, num_ports: int) -> List[VesselState]:
    return [VesselState(location=rng.integers(0, num_ports), speed=12.0, fuel=100.0)
            for _ in range(num_vessels)]

NUM_PORTS = 5
NUM_VESSELS = 8
ports = initialize_ports(NUM_PORTS)
vessels = initialize_vessels(NUM_VESSELS, NUM_PORTS)
ports, vessels

### 4.1 Simple environment step (placeholder)
We define a minimal **step** function to evolve queues and dock occupancy. This provides a concrete hook for plugging in rewards and metrics later.

In [None]:
def step_ports(ports: List[PortState], service_rates: List[int]) -> None:
    for port, rate in zip(ports, service_rates):
        served = min(port.queue, rate)
        port.queue = max(port.queue - served, 0)
        port.occupied = min(port.docks, port.occupied + served)

def observe_port_metrics(ports: List[PortState]) -> Dict[str, float]:
    avg_queue = float(np.mean([p.queue for p in ports]))
    dock_util = float(np.mean([p.occupied / p.docks for p in ports]))
    return {"avg_queue": avg_queue, "dock_utilization": dock_util}


## 5. Forecasting module (mock)
The **medium-term forecaster** provides a 3–7 day congestion estimate for each port. The **short-term forecaster** outputs 6–24 hour predictions. Here we mock them with noisy trends to validate the data flow.

In [None]:
def medium_term_forecast(num_ports: int, horizon_days: int = 5) -> np.ndarray:
    base = rng.uniform(2, 8, size=(num_ports, 1))
    trend = np.linspace(0, 1.5, horizon_days)[None, :]
    noise = rng.normal(0, 0.3, size=(num_ports, horizon_days))
    return np.clip(base + trend + noise, 0, None)

def short_term_forecast(num_ports: int, horizon_hours: int = 12) -> np.ndarray:
    base = rng.uniform(1, 6, size=(num_ports, 1))
    noise = rng.normal(0, 0.5, size=(num_ports, horizon_hours))
    return np.clip(base + noise, 0, None)

medium_forecast = medium_term_forecast(NUM_PORTS, horizon_days=5)
short_forecast = short_term_forecast(NUM_PORTS, horizon_hours=12)
medium_forecast.shape, short_forecast.shape

## 6. Agent decision stubs (hierarchical control)
We create placeholder policies that **consume forecasts** and **emit actions**. These are not learned yet; they simply show the flow of information and will be replaced with MAPPO policies later.

In [None]:
def fleet_coordinator_policy(medium_forecast: np.ndarray) -> Dict:
    # pick the least congested port (lowest mean forecast)
    port_scores = medium_forecast.mean(axis=1)
    dest_port = int(np.argmin(port_scores))
    return {
        "dest_port": dest_port,
        "departure_window_hours": 12,
        "emission_budget": 50.0
    }

def vessel_policy(vessel: VesselState, short_forecast: np.ndarray, directive: Dict) -> Dict:
    # reduce speed if short-term congestion is high
    dest_port = directive["dest_port"]
    congestion = float(short_forecast[dest_port].mean())
    speed = 10.0 if congestion > 4.0 else 14.0
    return {
        "target_speed": speed,
        "request_arrival_slot": True,
    }

def port_policy(port_state: PortState, incoming_requests: int, short_forecast_row: np.ndarray) -> Dict:
    # prioritize clearing queue when forecast predicts near-term congestion
    pressure = float(short_forecast_row.mean())
    service_rate = min(port_state.docks, port_state.occupied + 1)
    if pressure > 4.0:
        service_rate = port_state.docks  # open all docks
    return {
        "service_rate": service_rate,
        "accept_requests": incoming_requests,
    }

directive = fleet_coordinator_policy(medium_forecast)
v_actions = [vessel_policy(v, short_forecast, directive) for v in vessels]
incoming = sum(1 for a in v_actions if a["request_arrival_slot"])
p_actions = [port_policy(p, incoming, short_forecast[i]) for i, p in enumerate(ports)]
service_rates = [a["service_rate"] for a in p_actions]
step_ports(ports, service_rates)
metrics = observe_port_metrics(ports)
directive, v_actions[0], p_actions[0], metrics

## 7. Metrics and evaluation hooks
We attach simple metrics now, so that later experiments can compare independent vs reactive vs predictive policies using the same pipeline.

In [None]:
def compute_vessel_metrics(vessels: List[VesselState]) -> Dict[str, float]:
    avg_speed = float(np.mean([v.speed for v in vessels]))
    avg_fuel = float(np.mean([v.fuel for v in vessels]))
    return {"avg_speed": avg_speed, "avg_fuel": avg_fuel}

port_metrics = observe_port_metrics(ports)
vessel_metrics = compute_vessel_metrics(vessels)
{"port_metrics": port_metrics, "vessel_metrics": vessel_metrics}

## 8. Next steps for the full project
1. **Replace toy forecasts** with real models (RNNs, econometric baselines).
2. **Implement Gymnasium environment** with step() and reset() for MARL training.
3. **Plug in MAPPO** with a centralized critic for coordination.
4. **Run ablations** comparing: independent vs reactive vs predictive vs oracle.
5. **Evaluate** using cost, delay, emissions, and coordination metrics.

This MVP ensures the data flow and hierarchy are correct before scaling to a production-grade simulator and full MARL training.