In [1]:
"""
AETHER: Autonomous Edge for Transient Hazard Evasion and Routing
A Symbiotic Fleet Cognitive Firewall Demo

Run in Google Colab with: !pip install gradio plotly pandas numpy
"""

import gradio as gr
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
import json

# ============================================================================
# SIMULATION CLASSES
# ============================================================================

class Vehicle:
    def __init__(self, vid, vtype, x, y, speed, battery=100):
        self.id = vid
        self.type = vtype
        self.x = x
        self.y = y
        self.speed = speed
        self.battery = battery
        self.stress_level = random.uniform(0.2, 0.4)
        self.emissions = 0
        self.hazards_detected = []
        self.in_slipstream = False

    def update_position(self, dt=1):
        angle = random.uniform(0, 2 * np.pi)
        self.x += self.speed * np.cos(angle) * dt * 0.1
        self.y += self.speed * np.sin(angle) * dt * 0.1
        self.x = np.clip(self.x, 0, 100)
        self.y = np.clip(self.y, 0, 100)

    def calculate_emissions(self):
        base_emissions = self.speed * 0.15
        if self.in_slipstream:
            base_emissions *= 0.75
        return base_emissions

class Pothole:
    def __init__(self, pid, x, y, severity):
        self.id = pid
        self.x = x
        self.y = y
        self.severity = severity
        self.detected_count = 0
        self.avg_impact = severity * 10

class FleetNetwork:
    def __init__(self):
        self.vehicles = []
        self.potholes = []
        self.mesh_connections = []
        self.total_emissions_saved = 0
        self.hazards_neutralized = 0
        self.safety_interventions = 0

    def add_vehicle(self, vehicle):
        self.vehicles.append(vehicle)

    def add_pothole(self, pothole):
        self.potholes.append(pothole)

    def detect_mesh_connections(self, radius=15):
        self.mesh_connections = []
        for i, v1 in enumerate(self.vehicles):
            for j, v2 in enumerate(self.vehicles[i+1:], i+1):
                dist = np.sqrt((v1.x - v2.x)**2 + (v1.y - v2.y)**2)
                if dist < radius:
                    self.mesh_connections.append((v1.id, v2.id, dist))

    def detect_hazards(self, detection_radius=8):
        for vehicle in self.vehicles:
            vehicle.hazards_detected = []
            for pothole in self.potholes:
                dist = np.sqrt((vehicle.x - pothole.x)**2 + (vehicle.y - pothole.y)**2)
                if dist < detection_radius:
                    vehicle.hazards_detected.append(pothole)
                    pothole.detected_count += 1
                    self.hazards_neutralized += 1

    def apply_slipstream_optimization(self):
        for vehicle in self.vehicles:
            vehicle.in_slipstream = False

        for i, v1 in enumerate(self.vehicles):
            if v1.type == "Van":
                for v2 in self.vehicles:
                    if v2.type == "Truck":
                        dist = np.sqrt((v1.x - v2.x)**2 + (v1.y - v2.y)**2)
                        if dist < 10 and abs(v1.y - v2.y) < 5:
                            v1.in_slipstream = True
                            break

    def calculate_stress_interventions(self):
        interventions = 0
        for vehicle in self.vehicles:
            vehicle.stress_level += random.uniform(-0.05, 0.08)
            vehicle.stress_level = np.clip(vehicle.stress_level, 0, 1)
            if vehicle.stress_level > 0.7:
                vehicle.stress_level *= 0.8
                interventions += 1
        return interventions

    def update_fleet(self, dt=1):
        self.detect_mesh_connections()
        self.detect_hazards()
        self.apply_slipstream_optimization()
        interventions = self.calculate_stress_interventions()
        self.safety_interventions += interventions

        total_emissions = 0
        for vehicle in self.vehicles:
            vehicle.update_position(dt)
            emissions = vehicle.calculate_emissions()
            vehicle.emissions += emissions
            total_emissions += emissions

        baseline_emissions = len(self.vehicles) * 25 * dt
        self.total_emissions_saved += (baseline_emissions - total_emissions)

        return total_emissions

# ============================================================================
# VISUALIZATION FUNCTIONS
# ============================================================================

def create_fleet_map(fleet):
    fig = go.Figure()

    # Plot potholes
    if fleet.potholes:
        pothole_x = [p.x for p in fleet.potholes]
        pothole_y = [p.y for p in fleet.potholes]
        pothole_severity = [p.severity for p in fleet.potholes]
        pothole_text = [f"Pothole {p.id}<br>Severity: {p.severity:.1f}<br>Detections: {p.detected_count}"
                        for p in fleet.potholes]

        fig.add_trace(go.Scatter(
            x=pothole_x, y=pothole_y,
            mode='markers',
            marker=dict(
                size=[s*15 for s in pothole_severity],
                color='red',
                symbol='x',
                line=dict(width=2)
            ),
            name='Potholes',
            text=pothole_text,
            hovertemplate='%{text}<extra></extra>'
        ))

    # Plot mesh connections
    for conn in fleet.mesh_connections:
        v1 = next(v for v in fleet.vehicles if v.id == conn[0])
        v2 = next(v for v in fleet.vehicles if v.id == conn[1])
        fig.add_trace(go.Scatter(
            x=[v1.x, v2.x], y=[v1.y, v2.y],
            mode='lines',
            line=dict(color='rgba(100,200,255,0.3)', width=1),
            showlegend=False,
            hoverinfo='skip'
        ))

    # Plot vehicles
    vehicle_colors = {'Car': 'blue', 'Van': 'green', 'Truck': 'orange', 'Emergency': 'red'}
    vehicle_symbols = {'Car': 'circle', 'Van': 'square', 'Truck': 'diamond', 'Emergency': 'star'}

    for vtype in set(v.type for v in fleet.vehicles):
        vehicles_of_type = [v for v in fleet.vehicles if v.type == vtype]
        v_x = [v.x for v in vehicles_of_type]
        v_y = [v.y for v in vehicles_of_type]
        v_text = [f"{v.type} {v.id}<br>Speed: {v.speed:.1f}<br>Stress: {v.stress_level:.2f}<br>Slipstream: {v.in_slipstream}"
                  for v in vehicles_of_type]

        fig.add_trace(go.Scatter(
            x=v_x, y=v_y,
            mode='markers+text',
            marker=dict(
                size=12,
                color=vehicle_colors[vtype],
                symbol=vehicle_symbols[vtype],
                line=dict(width=1, color='white')
            ),
            text=[f"V{v.id}" for v in vehicles_of_type],
            textposition="top center",
            textfont=dict(size=8),
            name=vtype,
            hovertemplate='%{text}<extra></extra>',
            customdata=v_text
        ))

    fig.update_layout(
        title="AETHER Fleet Network - Real-Time Mesh Topology",
        xaxis_title="X Position (km)",
        yaxis_title="Y Position (km)",
        height=600,
        hovermode='closest',
        plot_bgcolor='#1a1a2e',
        paper_bgcolor='#16213e',
        font=dict(color='white'),
        xaxis=dict(range=[0, 100], gridcolor='#2a2a3e'),
        yaxis=dict(range=[0, 100], gridcolor='#2a2a3e')
    )

    return fig

def create_metrics_dashboard(fleet, time_elapsed):
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=("Fleet Emissions (kg CO2)", "Hazard Detection Timeline",
                       "Driver Stress Levels", "Network Connectivity"),
        specs=[[{"type": "indicator"}, {"type": "bar"}],
               [{"type": "scatter"}, {"type": "indicator"}]]
    )

    # Emissions Saved
    fig.add_trace(go.Indicator(
        mode="gauge+number+delta",
        value=fleet.total_emissions_saved,
        title={'text': "CO2 Saved"},
        delta={'reference': fleet.total_emissions_saved * 0.7},
        gauge={'axis': {'range': [None, 500]},
               'bar': {'color': "darkgreen"},
               'steps': [
                   {'range': [0, 200], 'color': "lightgray"},
                   {'range': [200, 400], 'color': "gray"}],
               'threshold': {'line': {'color': "red", 'width': 4}, 'thickness': 0.75, 'value': 450}}
    ), row=1, col=1)

    # Hazard Neutralization
    hazard_types = ["Potholes", "Road Damage", "Obstacles"]
    hazard_counts = [fleet.hazards_neutralized, random.randint(5, 15), random.randint(2, 8)]
    fig.add_trace(go.Bar(
        x=hazard_types,
        y=hazard_counts,
        marker_color=['#e74c3c', '#f39c12', '#3498db'],
        text=hazard_counts,
        textposition='auto',
    ), row=1, col=2)

    # Stress Levels
    for vehicle in fleet.vehicles[:5]:
        fig.add_trace(go.Scatter(
            x=[time_elapsed],
            y=[vehicle.stress_level],
            mode='lines+markers',
            name=f"V{vehicle.id}",
            line=dict(width=2)
        ), row=2, col=1)

    # Network Connectivity
    connectivity = (len(fleet.mesh_connections) / max(1, len(fleet.vehicles))) * 100
    fig.add_trace(go.Indicator(
        mode="gauge+number",
        value=connectivity,
        title={'text': "Mesh Density (%)"},
        gauge={'axis': {'range': [0, 100]},
               'bar': {'color': "lightblue"},
               'steps': [
                   {'range': [0, 50], 'color': "rgba(255,100,100,0.3)"},
                   {'range': [50, 80], 'color': "rgba(255,255,100,0.3)"},
                   {'range': [80, 100], 'color': "rgba(100,255,100,0.3)"}]}
    ), row=2, col=2)

    fig.update_layout(
        height=700,
        showlegend=True,
        plot_bgcolor='#1a1a2e',
        paper_bgcolor='#16213e',
        font=dict(color='white')
    )

    return fig

def create_slipstream_diagram():
    fig = go.Figure()

    # Truck
    fig.add_trace(go.Scatter(
        x=[30], y=[50],
        mode='markers+text',
        marker=dict(size=40, color='orange', symbol='diamond'),
        text=["Truck"],
        textposition="top center",
        name="Lead Vehicle"
    ))

    # Van in slipstream
    fig.add_trace(go.Scatter(
        x=[42], y=[50],
        mode='markers+text',
        marker=dict(size=30, color='green', symbol='square'),
        text=["Van (Slipstream)"],
        textposition="top center",
        name="Following Vehicle"
    ))

    # Airflow lines
    for i in range(5):
        y_offset = 48 + i * 1
        fig.add_trace(go.Scatter(
            x=[10, 25, 35, 50, 70],
            y=[y_offset, y_offset, 50, 50, y_offset],
            mode='lines',
            line=dict(color='rgba(100,200,255,0.5)', width=2, dash='dash'),
            showlegend=False
        ))

    fig.add_annotation(
        x=36, y=55,
        text="25% Emissions<br>Reduction Zone",
        showarrow=True,
        arrowhead=2,
        arrowcolor="yellow",
        font=dict(color="yellow", size=12)
    )

    fig.update_layout(
        title="Cooperative Aerodynamics - Slipstream Optimization",
        xaxis_title="Distance (m)",
        yaxis_title="Lateral Position (m)",
        height=400,
        xaxis=dict(range=[0, 80]),
        yaxis=dict(range=[40, 60]),
        plot_bgcolor='#1a1a2e',
        paper_bgcolor='#16213e',
        font=dict(color='white')
    )

    return fig

# ============================================================================
# GLOBAL STATE & MAIN SIMULATION
# ============================================================================

fleet = FleetNetwork()
simulation_time = 0

def initialize_simulation(num_vehicles, num_potholes):
    global fleet, simulation_time
    fleet = FleetNetwork()
    simulation_time = 0

    vehicle_types = ['Car', 'Van', 'Truck', 'Emergency']
    for i in range(num_vehicles):
        vtype = random.choice(vehicle_types)
        vehicle = Vehicle(
            vid=i,
            vtype=vtype,
            x=random.uniform(10, 90),
            y=random.uniform(10, 90),
            speed=random.uniform(30, 60)
        )
        fleet.add_vehicle(vehicle)

    for i in range(num_potholes):
        pothole = Pothole(
            pid=i,
            x=random.uniform(10, 90),
            y=random.uniform(10, 90),
            severity=random.uniform(0.5, 1.0)
        )
        fleet.add_pothole(pothole)

    return "Simulation initialized!"

def run_simulation_step():
    global fleet, simulation_time

    if len(fleet.vehicles) == 0:
        return "Please initialize simulation first!", None, None, None

    fleet.update_fleet()
    simulation_time += 1

    map_fig = create_fleet_map(fleet)
    metrics_fig = create_metrics_dashboard(fleet, simulation_time)
    slipstream_fig = create_slipstream_diagram()

    stats = f"""
    📊 **AETHER System Status** (T+{simulation_time}s)

    🚗 **Fleet Statistics:**
    - Active Vehicles: {len(fleet.vehicles)}
    - Mesh Connections: {len(fleet.mesh_connections)}
    - Vehicles in Slipstream: {sum(1 for v in fleet.vehicles if v.in_slipstream)}

    ♻️ **Environmental Impact:**
    - Total CO2 Saved: {fleet.total_emissions_saved:.2f} kg
    - Emissions Reduction: {(fleet.total_emissions_saved/(simulation_time*len(fleet.vehicles)*25)*100):.1f}%

    🛡️ **Safety Metrics:**
    - Hazards Neutralized: {fleet.hazards_neutralized}
    - Safety Interventions: {fleet.safety_interventions}
    - Avg. Driver Stress: {np.mean([v.stress_level for v in fleet.vehicles]):.2f}

    🌐 **Network Health:**
    - Mesh Density: {(len(fleet.mesh_connections)/max(1,len(fleet.vehicles))*100):.1f}%
    - Data Packets: {len(fleet.mesh_connections) * simulation_time}
    """

    return stats, map_fig, metrics_fig, slipstream_fig

# ============================================================================
# GRADIO INTERFACE
# ============================================================================

with gr.Blocks(theme=gr.themes.Soft(), title="AETHER Fleet Demo") as demo:
    gr.Markdown("""
    # 🚀 AETHER: Autonomous Edge for Transient Hazard Evasion and Routing
    ## Symbiotic Fleet Cognitive Firewall - Live Demo

    **Drive. Connect. Innovate.** - A revolutionary Edge AI system that transforms individual vehicles into a collective intelligence.
    """)

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("### 🎛️ Simulation Controls")
            num_vehicles = gr.Slider(5, 20, value=12, step=1, label="Number of Vehicles")
            num_potholes = gr.Slider(3, 15, value=8, step=1, label="Number of Hazards")

            init_btn = gr.Button("🔄 Initialize Fleet", variant="primary")
            step_btn = gr.Button("▶️ Run Simulation Step", variant="secondary")

            gr.Markdown("---")
            stats_output = gr.Markdown("Waiting for initialization...")

        with gr.Column(scale=2):
            fleet_map = gr.Plot(label="Fleet Network Map")

    with gr.Row():
        metrics_plot = gr.Plot(label="System Metrics Dashboard")

    with gr.Row():
        slipstream_plot = gr.Plot(label="Slipstream Optimization Visualization")

    gr.Markdown("""
    ---
    ### 🔑 Key Innovations:

    1. **Predictive Pothole Neutralization**: Vehicles share hazard data and calculate optimal traversal paths
    2. **Dynamic Emissions-Aero Routing**: Fleet-wide aerodynamic optimization through slipstream coordination
    3. **Biometric Wellness Corridor**: Proactive driver stress monitoring with adaptive safety zones
    4. **Swarm Intersection Intelligence**: Decentralized priority negotiation for zero-stop traffic flow

    ### 📊 How to Use:
    1. Set fleet parameters and click "Initialize Fleet"
    2. Click "Run Simulation Step" repeatedly to watch the system evolve
    3. Observe mesh connections (blue lines), hazards (red X), and vehicle coordination

    **Tech Stack**: NVIDIA Jetson Edge AI • C-V2X Mesh • AWS IoT Core • TensorFlow Lite
    """)

    init_btn.click(
        fn=initialize_simulation,
        inputs=[num_vehicles, num_potholes],
        outputs=[stats_output]
    )

    step_btn.click(
        fn=run_simulation_step,
        outputs=[stats_output, fleet_map, metrics_plot, slipstream_plot]
    )

# Launch
if __name__ == "__main__":
    demo.launch()

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://317960e2b469e4479e.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
