<a href="https://colab.research.google.com/github/kuswri/Amazon-clone/blob/main/Ai_Powered_Drone_Swarm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# First, install dependencies in the correct order
%%capture
!pip install --upgrade pip
!pip install pandas numpy torch torchvision
!pip install gymnasium
!pip install stable-baselines3
!pip install opencv-python

# Restart runtime after installation
import os
os.kill(os.getpid(), 9)

In [None]:
import numpy as np
import torch
import cv2
import gymnasium as gym
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional
import logging
import threading
import queue
import time
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Verify imports are working
print("Imports successful!")
print(f"Torch version: {torch.__version__}")
print(f"Gym version: {gym.__version__}")
print(f"OpenCV version: {cv2.__version__}")

Imports successful!
Torch version: 2.5.1+cu124
Gym version: 1.0.0
OpenCV version: 4.11.0


In [None]:
# Data structures for the drone swarm system
@dataclass
class SensorData:
    """Data structure for drone sensor readings"""
    timestamp: float
    position: Tuple[float, float, float]
    velocity: Tuple[float, float, float]
    orientation: Tuple[float, float, float]
    battery_level: float
    camera_data: Optional[np.ndarray] = None

@dataclass
class DroneConfig:
    """Configuration parameters for individual drones"""
    drone_id: str
    initial_position: Tuple[float, float, float]
    max_velocity: float = 5.0
    max_altitude: float = 100.0
    sensor_update_rate: float = 0.1  # seconds

class DroneEnvironment(gym.Env):
    """Custom environment for drone reinforcement learning"""
    def __init__(self):
        super().__init__()
        # Define action space: [dx, dy, dz] for movement in 3D space
        self.action_space = gym.spaces.Box(
            low=-1.0,
            high=1.0,
            shape=(3,),
            dtype=np.float32
        )

        # Define observation space: [position(3), velocity(3), orientation(3)]
        self.observation_space = gym.spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(9,),
            dtype=np.float32
        )

        self.state = np.zeros(9, dtype=np.float32)
        self.target_position = np.random.uniform(-10, 10, 3)

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.state = np.zeros(9, dtype=np.float32)
        self.target_position = np.random.uniform(-10, 10, 3)
        return self.state, {}

    def step(self, action):
        # Update position based on action
        self.state[:3] += action

        # Simple physics simulation (velocity decay)
        self.state[3:6] = action * 0.8  # Current velocity

        # Calculate reward based on distance to target
        distance = np.linalg.norm(self.state[:3] - self.target_position)
        reward = -distance  # Negative distance as reward

        # Check if done
        done = distance < 0.1

        return self.state, reward, done, False, {}

# Test the environment
def test_environment():
    env = DroneEnvironment()
    obs, _ = env.reset()

    print("Initial observation:", obs)
    action = env.action_space.sample()
    print("Random action:", action)

    next_obs, reward, done, _, info = env.step(action)
    print("Next observation:", next_obs)
    print("Reward:", reward)
    print("Done:", done)

if __name__ == "__main__":
    print("Testing DroneEnvironment...")
    test_environment()

Testing DroneEnvironment...
Initial observation: [0. 0. 0. 0. 0. 0. 0. 0. 0.]
Random action: [-0.5321114  0.6148465  0.2523371]
Next observation: [-0.5321114   0.6148465   0.2523371  -0.42568913  0.49187723  0.20186968
  0.          0.          0.        ]
Reward: -8.331953934503957
Done: False


In [None]:
class DroneController:
    """Controller for individual drone units"""
    def __init__(self, config: DroneConfig):
        self.config = config
        self.position = np.array(config.initial_position, dtype=np.float32)
        self.velocity = np.zeros(3, dtype=np.float32)
        self.orientation = np.zeros(3, dtype=np.float32)
        self.status = "initialized"

        # Initialize RL environment and agent
        self.env = DroneEnvironment()
        self.model = PPO("MlpPolicy", self.env, verbose=0)

        # Control loop setup
        self.running = False
        self.control_thread = None
        self._setup_control_loop()

    def _setup_control_loop(self):
        """Setup the control loop thread"""
        self.control_thread = threading.Thread(
            target=self._control_loop,
            daemon=True
        )

    def _control_loop(self):
        """Main control loop for drone operations"""
        while self.running:
            try:
                # Get current state
                state = np.concatenate([
                    self.position,
                    self.velocity,
                    self.orientation
                ])

                # Get action from RL model
                action, _ = self.model.predict(state, deterministic=True)

                # Update drone state
                self.position += action * self.config.sensor_update_rate
                self.velocity = action

                # Enforce constraints
                self.position[2] = np.clip(
                    self.position[2],
                    0,
                    self.config.max_altitude
                )

                time.sleep(self.config.sensor_update_rate)

            except Exception as e:
                logger.error(f"Control loop error: {str(e)}")
                self.stop()

    def start(self):
        """Start drone operations"""
        if not self.running:
            self.running = True
            self.control_thread.start()
            logger.info(f"Drone {self.config.drone_id} started")

    def stop(self):
        """Stop drone operations"""
        self.running = False
        if self.control_thread and self.control_thread.is_alive():
            self.control_thread.join()
        logger.info(f"Drone {self.config.drone_id} stopped")

    def get_state(self) -> SensorData:
        """Get current drone state"""
        return SensorData(
            timestamp=time.time(),
            position=tuple(self.position),
            velocity=tuple(self.velocity),
            orientation=tuple(self.orientation),
            battery_level=100.0  # Simplified battery simulation
        )

# Test the drone controller
def test_controller():
    config = DroneConfig(
        drone_id="Test_Drone_1",
        initial_position=(0.0, 0.0, 0.0)
    )

    controller = DroneController(config)
    print("Initial state:", controller.get_state())

    controller.start()
    time.sleep(1)  # Let it run for a second
    print("State after 1 second:", controller.get_state())

    controller.stop()

if __name__ == "__main__":
    print("Testing DroneController...")
    test_controller()

Testing DroneController...
Initial state: SensorData(timestamp=1739003239.9565852, position=(0.0, 0.0, 0.0), velocity=(0.0, 0.0, 0.0), orientation=(0.0, 0.0, 0.0), battery_level=100.0, camera_data=None)
State after 1 second: SensorData(timestamp=1739003240.9606838, position=(0.0, 0.0, 0.0), velocity=(0.0, 0.0, 0.0), orientation=(0.0, 0.0, 0.0), battery_level=100.0, camera_data=None)


In [None]:
class SwarmManager:
    """Manager for coordinating multiple drones"""
    def __init__(self, num_drones: int):
        self.drones = {}
        self.mission_status = "standby"

        # Initialize drone fleet
        for i in range(num_drones):
            config = DroneConfig(
                drone_id=f"Drone_{i}",
                initial_position=(
                    np.random.uniform(-10, 10),
                    np.random.uniform(-10, 10),
                    np.random.uniform(0, 10)
                )
            )
            self.drones[config.drone_id] = DroneController(config)

    def start_mission(self, duration: int = 60):
        """Start a coordinated mission"""
        logger.info("Starting swarm mission...")
        self.mission_status = "active"

        try:
            # Start all drones
            for drone in self.drones.values():
                drone.start()

            # Mission monitoring loop
            start_time = time.time()
            while time.time() - start_time < duration:
                if self.mission_status != "active":
                    break

                # Monitor drone states
                for drone_id, drone in self.drones.items():
                    state = drone.get_state()
                    logger.info(f"Drone {drone_id} - Position: {state.position}")

                time.sleep(1)

        except Exception as e:
            logger.error(f"Mission error: {str(e)}")
            self.mission_status = "error"

        finally:
            self.stop_mission()

    def stop_mission(self):
        """Stop all drones and end mission"""
        self.mission_status = "completed"
        for drone in self.drones.values():
            drone.stop()
        logger.info("Mission completed")

# Test the swarm manager
def test_swarm():
    manager = SwarmManager(num_drones=3)
    print("Starting test mission (10 seconds)...")
    manager.start_mission(duration=10)

if __name__ == "__main__":
    print("Testing SwarmManager...")
    test_swarm()

Testing SwarmManager...
Starting test mission (10 seconds)...


In [None]:
# Complete system test
manager = SwarmManager(num_drones=3)
manager.start_mission(duration=30)  # Run for 30 seconds