In [1]:
import gymnasium as gym 
from gymnasium import spaces
import numpy as np 
import random 

In [2]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots 

In [3]:
class DroneEnvironment(gym.Env):
    """
    A 3D grid environment for a drone, compatible with Gymnasium.

    The drone navigates a grid, avoiding buildings and prohibited zones,
    to reach a randomly generated or set goal position.
    """

    def __init__(self, grid_size=10, start_position=None, buildings=None, prohibited_zones=None, max_episode_steps=200):
        super().__init__() # Initialize the parent Gym environment

        self.grid_size = grid_size
        self.start_position = start_position or [0, 0, 0] # Default start position
        self.goal_position = [0, 0, 0] # Will be set in reset()
        self.buildings = buildings if buildings else []
        self.prohibited_zones = prohibited_zones if prohibited_zones else []
        self.max_episode_steps = max_episode_steps
        self.steps_taken = 0
        self.path = []

        # Ensure the start position is valid
        assert not self._is_invalid_position(self.start_position), \
            f"Start position {self.start_position} is invalid (in building or prohibited zone)."

        # Define possible actions (6 directions: +X, -X, +Y, -Y, +Z, -Z)
        self.actions = [(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1)]
        self.action_space = spaces.Discrete(len(self.actions))

        # Observation space represents the combined state of drone position and goal position
        # Each position has grid_size^3 possible states, so combined is (grid_size^3)^2 = grid_size^6
        self.observation_space = spaces.Discrete(self.grid_size**6)

        self.current_state = self.position_to_state(self.start_position) # Initial state
        self.fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'scatter3d'}]]) # Plotly figure for rendering

    def position_to_state(self, position):
        """Converts a 3D position and the goal's position into a unique integer state."""
        x1, y1, z1 = position
        x2, y2, z2 = self.goal_position

        # Encode current position
        state1 = x1 * self.grid_size**2 + y1 * self.grid_size + z1
        # Encode goal position
        state2 = x2 * self.grid_size**2 + y2 * self.grid_size + z2

        # Combine both into a single unique state
        state = state1 * (self.grid_size**3) + state2
        return state

    def state_to_position(self, state):
        """Converts an integer state back to the drone's 3D position."""
        # Extract state1 (drone's position part) from the combined state
        state1 = state // (self.grid_size**3)

        # Decode drone's x, y, z coordinates
        x1 = state1 // (self.grid_size**2)
        y1 = (state1 % (self.grid_size**2)) // self.grid_size
        z1 = state1 % self.grid_size
        return [x1, y1, z1]

    def _is_in_box(self, position, box):
        """Helper to check if a position is inside a given rectangular box."""
        px, py, pz = position
        bx, by, bz = box['pos']
        sx, sy, sz = box['size']
        return bx <= px <= bx + sx and \
               by <= py <= by + sy and \
               bz <= pz <= bz + sz

    def _is_invalid_position(self, position):
        """Checks if a position is inside any building or prohibited zone."""
        # Check against buildings
        if any(self._is_in_box(position, b) for b in self.buildings):
            return True
        # Check against prohibited zones
        if any(self._is_in_box(position, z) for z in self.prohibited_zones):
            return True
        return False

    def step(self, action):
        """
        Executes one step in the environment given an action.

        Returns:
            observation (object): The new state of the environment.
            reward (float): The reward obtained from the action.
            terminated (bool): Whether the episode has ended (e.g., reached goal, hit obstacle).
            truncated (bool): Whether the episode has been truncated (e.g., max steps reached).
            info (dict): A dictionary containing additional debug information.
        """
        self.steps_taken += 1
        terminated = False
        truncated = False
        reward = -1 # Default penalty for each step

        x, y, z = self.state_to_position(self.current_state)
        dx, dy, dz = self.actions[action]
        next_position = [x + dx, y + dy, z + dz]

        # Check for truncation due to maximum steps
        if self.steps_taken > self.max_episode_steps:
            reward = -100
            truncated = True
            info = {"info": "Max steps exceeded"}
            return self.current_state, reward, terminated, truncated, info

        # Check for out of bounds (typically terminates the episode)
        if not all(0 <= c < self.grid_size for c in next_position):
            reward = -100
            terminated = True
            info = {"info": "Out of bounds"}
            return self.current_state, reward, terminated, truncated, info

        # Check for collisions with buildings (terminates the episode)
        if any(self._is_in_box(next_position, b) for b in self.buildings):
            reward = -50
            terminated = True
            info = {"info": "Hit building"}
            return self.current_state, reward, terminated, truncated, info

        # Check for entering a prohibited zone (terminates the episode)
        if any(self._is_in_box(next_position, z) for z in self.prohibited_zones):
            reward = -50
            terminated = True
            info = {"info": "In prohibited zone"}
            return self.current_state, reward, terminated, truncated, info

        # Check if the goal is reached (terminates successfully)
        if next_position == self.goal_position:
            self.path.append(next_position)
            self.current_state = self.position_to_state(next_position) # Update state before returning
            reward = 1000
            terminated = True
            info = {"info": "Goal reached"}
            return self.current_state, reward, terminated, truncated, info

        # If none of the above, it's a normal step
        self.current_state = self.position_to_state(next_position)
        self.path.append(next_position)
        info = {} # No special info for normal steps
        return self.current_state, reward, terminated, truncated, info

    def reset(self, *, seed=None, options=None):
        """
        Resets the environment to an initial state and generates a new random goal.

        Returns:
            observation (object): The initial state of the environment.
            info (dict): A dictionary containing additional debug information.
        """
        super().reset(seed=seed) # Important for reproducible random numbers

        self.steps_taken = 0
        self.path = [self.start_position]
        self.current_state = self.position_to_state(self.start_position)

        # Generate a new random goal position that is not in a building or prohibited zone
        while True:
            pos = [random.randint(0, self.grid_size - 1) for _ in range(3)]
            if not self._is_invalid_position(pos):
                self.goal_position = pos
                break

        # Update the current state to include the new goal for proper observation
        self.current_state = self.position_to_state(self.start_position)

        return self.current_state, {}

    def set_goal(self, goal_position):
        """Sets a specific goal position for the drone."""
        assert not self._is_invalid_position(goal_position), \
            f"Goal position {goal_position} is invalid (in building or prohibited zone)."
        self.goal_position = goal_position
        # Update current state if necessary to reflect new goal in observation space
        self.current_state = self.position_to_state(self.state_to_position(self.current_state))

    def _draw_point(self, position, color, name, size=6):
        """Helper to draw a single point on the 3D plot."""
        self.fig.add_trace(go.Scatter3d(
            x=[position[0]], y=[position[1]], z=[position[2]],
            mode='markers', marker=dict(size=size, color=color),
            name=name, showlegend=False
        ))

    def _draw_box(self, position, size, color):
        """Helper to draw a 3D rectangular prism (box) on the plot."""
        x, y, z = position
        dx, dy, dz = size
        corners = np.array([
            [x, y, z],
            [x + dx, y, z],
            [x + dx, y + dy, z],
            [x, y + dy, z],
            [x, y, z + dz],
            [x + dx, y, z + dz],
            [x + dx, y + dy, z + dz],
            [x, y + dy, z + dz]
        ])

        # Define faces by corner indices
        faces = [
            [0, 1, 2, 3], # Bottom face
            [4, 5, 6, 7], # Top face
            [0, 1, 5, 4], # Front face
            [2, 3, 7, 6], # Back face
            [1, 2, 6, 5], # Right face
            [0, 3, 7, 4]  # Left face
        ]

        for face_indices in faces:
            f = corners[face_indices]
            self.fig.add_trace(go.Scatter3d(
                x=f[:, 0], y=f[:, 1], z=f[:, 2],
                mode='lines',
                line=dict(color=color, width=4),
                showlegend=False
            ))

    def render(self):
        """Renders the 3D environment visualization using Plotly."""
        self.fig.data = [] # Clear any previous plot data

        # Setup layout for the 3D scene
        self.fig.update_layout(
            scene=dict(
                # Set axis ranges to include the full grid
                xaxis=dict(range=[0, self.grid_size], title='X'),
                yaxis=dict(range=[0, self.grid_size], title='Y'),
                zaxis=dict(range=[0, self.grid_size], title='Z')
            ),
            title="Drone Environment"
        )

        # Draw buildings
        for box in self.buildings:
            self._draw_box(box['pos'], box['size'], color='yellow')

        # Draw prohibited zones
        for zone in self.prohibited_zones:
            self._draw_box(zone['pos'], zone['size'], color='red')

        # Draw start and goal points
        self._draw_point(self.start_position, 'green', 'Start', size=10)
        self._draw_point(self.goal_position, 'blue', 'Goal', size=10)

        # Draw drone path
        if self.path:
            xs, ys, zs = zip(*self.path)
            self.fig.add_trace(go.Scatter3d(
                x=xs, y=ys, z=zs,
                mode='lines+markers',
                line=dict(color='black', width=2),
                marker=dict(size=2, color='red'),
                name='Drone Path', showlegend=False
            ))

        # Draw current drone position
        current_drone_pos = self.state_to_position(self.current_state)
        self._draw_point(current_drone_pos, 'red', 'Drone', size=5)

        self.fig.show()

    def close(self):
        """Cleans up the environment, usually by clearing plot data."""
        self.fig.data = []

In [4]:


class DroneEnvironment_v1(gym.Env):

    def __init__(self, grid_size=10, start_position=None, buildings=None, prohibited_zones=None, max_episode_steps=200):
        super(DroneEnvironment_v1, self).__init__()
        
        # حجم الشبكة ثلاثية الأبعاد
        self.grid_size = grid_size

        # الأبنية التي يجب على الطائرة تجنبها
        self.buildings = buildings if buildings else []

        # المناطق الممنوعة (مثلاً مناطق عسكرية)
        self.prohibited_zones = prohibited_zones if prohibited_zones else []

        # الحد الأقصى لعدد الخطوات لكل حلقة
        self.max_episode_steps = max_episode_steps

        # الحركات المسموحة (ستة اتجاهات في الفضاء)
        self.actions = [(1, 0, 0), (-1, 0, 0), 
                        (0, 1, 0), (0, -1, 0), 
                        (0, 0, 1), (0, 0, -1)]

        # فضاء الأفعال
        self.action_space = spaces.Discrete(len(self.actions))

        # فضاء الحالات (يمثل الموضع والهدف معًا)
        self.observation_space = spaces.Discrete(grid_size**6)

        # الموضع الابتدائي للطائرة
        self.start_position = start_position or [0, 0, 0]

        # موضع الهدف (سيتم تعيينه لاحقًا)
        self.goal_position = [0, 0, 0]

        # مسار الطائرة
        self.path = []

        # عدد الخطوات المنفذة
        self.steps_taken = 0

        # إنشاء الرسم
        self.fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'scatter3d'}]])

        # التأكد أن الموضع الابتدائي غير محظور
        assert not self._is_invalid_position(self.start_position), "Start position is invalid"

        # الحالة الابتدائية
        self.current_state = self.position_to_state(self.start_position)

    # تحويل الموضع إلى رقم حالة فريد باستخدام إحداثيات الهدف
    def position_to_state(self, position):
        x1, y1, z1 = position
        x2, y2, z2 = self.goal_position
        state1 = x1 * self.grid_size**2 + y1 * self.grid_size + z1
        state2 = x2 * self.grid_size**2 + y2 * self.grid_size + z2
        return state1 * (self.grid_size**3) + state2

    # تحويل رقم الحالة إلى موضع
    def state_to_position(self, state):
        state1 = state // (self.grid_size**3)
        x1 = state1 // (self.grid_size**2)
        y1 = (state1 % (self.grid_size**2)) // self.grid_size
        z1 = state1 % self.grid_size
        return [x1, y1, z1]

    # اختبار إذا كان الموضع داخل بناء أو منطقة محظورة
    def _is_in_box(self, position, box):
        px, py, pz = position
        bx, by, bz = box['pos']
        sx, sy, sz = box['size']
        return bx <= px <= bx + sx and by <= py <= by + sy and bz <= pz <= bz + sz

    # التحقق من أن الموضع غير مسموح (داخل بناء أو منطقة محظورة)
    def _is_invalid_position(self, position):
        return any(self._is_in_box(position, b) for b in self.buildings + self.prohibited_zones)

    # تنفيذ خطوة واحدة (action) في البيئة
    def step(self, action):
        # زيادة عداد الخطوات
        self.steps_taken += 1

        # إذا تم تجاوز الحد الأقصى
        if self.steps_taken > self.max_episode_steps:
            return self.current_state, -100, True, {"info": "Max steps exceeded"}

        # استخراج الموضع الحالي من رقم الحالة
        x, y, z = self.state_to_position(self.current_state)
        dx, dy, dz = self.actions[action]
        next_position = [x + dx, y + dy, z + dz]

        # التحقق من الخروج من الشبكة
        if not all(0 <= c < self.grid_size for c in next_position):
            return self.current_state, -100, False, {"info": "Out of bounds"}

        # الاصطدام ببناء
        if any(self._is_in_box(next_position, b) for b in self.buildings):
            return self.current_state, -50, False, {"info": "Hit building"}

        # الوقوع في منطقة ممنوعة
        if any(self._is_in_box(next_position, z) for z in self.prohibited_zones):
            return self.current_state, -50, False, {"info": "In prohibited zone"}

        # الوصول إلى الهدف
        if next_position == self.goal_position:
            self.path.append(next_position)
            state = self.position_to_state(next_position)
            return state, 1000, True, {"info": "Goal reached"}

        # التحديث إلى الحالة الجديدة
        self.current_state = self.position_to_state(next_position)
        self.path.append(next_position)
        return self.current_state, -1, False, {}

    # إعادة تهيئة البيئة
    def reset(self, seed=None, return_info=False, options=None):
        super().reset(seed=seed)
        self.steps_taken = 0
        self.path = [self.start_position]
        self.current_state = self.position_to_state(self.start_position)

        # توليد موضع عشوائي للهدف لا يتداخل مع الأبنية أو المناطق الممنوعة
        while True:
            pos = [random.randint(0, self.grid_size-1) for _ in range(3)]
            if not self._is_invalid_position(pos):
                self.goal_position = pos
                break

        return (self.current_state, {}) if return_info else self.current_state

    # تعيين موضع معين للهدف
    def set_goal(self, goal_position):
        assert not self._is_invalid_position(goal_position), "Invalid goal position"
        self.goal_position = goal_position

    # إظهار البيئة باستخدام الرسم ثلاثي الأبعاد
    def render(self):
        # مسح أي رسومات قديمة
        self.fig.data = []

        # إعداد الرسم
        self.fig.update_layout(
            scene=dict(
                xaxis=dict(range=[0, self.grid_size], title='X'),
                yaxis=dict(range=[0, self.grid_size], title='Y'),
                zaxis=dict(range=[0, self.grid_size], title='Z')
            ),
            title="Drone Environment"
        )

        # رسم الأبنية
        for box in self.buildings:
            self._draw_box(box['pos'], box['size'], color='yellow')

        # رسم المناطق الممنوعة
        for zone in self.prohibited_zones:
            self._draw_box(zone['pos'], zone['size'], color='red')

        # البداية والنهاية
        self._draw_point(self.start_position, 'green', 'Start')
        self._draw_point(self.goal_position, 'blue', 'Goal')

        # مسار الدرون
        if self.path:
            xs, ys, zs = zip(*self.path)
            self.fig.add_trace(go.Scatter3d(
                x=xs, y=ys, z=zs,
                mode='lines+markers',
                line=dict(color='black', width=2),
                marker=dict(size=2, color='red'),
                name='Drone Path', showlegend=False
            ))

        # الموضع الحالي
        x, y, z = self.state_to_position(self.current_state)
        self._draw_point([x, y, z], 'red', 'Drone')

        # عرض الشكل النهائي
        self.fig.show()

    # رسم نقطة واحدة في الفراغ (مثل البداية أو الهدف)
    def _draw_point(self, position, color, name):
        self.fig.add_trace(go.Scatter3d(
            x=[position[0]], y=[position[1]], z=[position[2]],
            mode='markers', marker=dict(size=6, color=color),
            name=name, showlegend=False
        ))

    # رسم متوازي مستطيلات (لبناء أو منطقة ممنوعة)
    def _draw_box(self, position, size, color):
        x, y, z = position
        dx, dy, dz = size
        corners = np.array([
            [x, y, z],
            [x + dx, y, z],
            [x + dx, y + dy, z],
            [x, y + dy, z],
            [x, y, z + dz],
            [x + dx, y, z + dz],
            [x + dx, y + dy, z + dz],
            [x, y + dy, z + dz]
        ])

        faces = [
            [0, 1, 2, 3],
            [4, 5, 6, 7],
            [0, 1, 5, 4],
            [2, 3, 7, 6],
            [1, 2, 6, 5],
            [0, 3, 7, 4]
        ]

        for face in faces:
            f = corners[face]
            self.fig.add_trace(go.Scatter3d(
                x=f[:, 0], y=f[:, 1], z=f[:, 2],
                mode='lines',
                line=dict(color=color, width=4),
                showlegend=False
            ))

    # إغلاق البيئة
    def close(self):
        self.fig.data = []


In [6]:
# تسجيل البيئة
gym.register(
    id="Drone-v0",
    entry_point=DroneEnvironment
)
gym.register(
    id="Drone-v1",
    entry_point=DroneEnvironment_v1
)

In [7]:
# مثال
# الأبنية
buildings = [
    {'pos': [3, 3, 0], 'size': [5, 5, 5]},
    {'pos': [8, 8, 0], 'size': [1, 1, 1]}
]
# المناطق الممنوعة
prohibited_zones = [
    {'pos': [0, 2, 0], 'size': [2, 4, 4]},
    {'pos': [8, 1, 0], 'size': [2, 2, 5]}
]
# البداية
start_position = [1, 9, 0]
# الهدف
goal_position = [9, 0, 8]

# إنشاء البيئة
env = gym.make("Drone-v0", grid_size=10, start_position=start_position,buildings=buildings, prohibited_zones=prohibited_zones , max_episode_steps=40)

env.reset()
raw_env = env.unwrapped
raw_env.set_goal(goal_position )



# الإظهار
env.render()

  logger.deprecation(


In [None]:
# مثال
# الأبنية
buildings = [
    {'pos': [3, 3, 0], 'size': [5, 5, 5]},
    {'pos': [8, 8, 0], 'size': [1, 1, 1]}
]
# المناطق الممنوعة
prohibited_zones = [
    {'pos': [0, 2, 0], 'size': [2, 4, 4]},
    {'pos': [8, 1, 0], 'size': [2, 2, 5]}
]
# البداية
start_position = [1, 9, 0]
# الهدف
goal_position = [9, 0, 8]

# إنشاء البيئة
env = gym.make("Drone-v1", grid_size=10, start_position=start_position,buildings=buildings, prohibited_zones=prohibited_zones , max_episode_steps=40)

env.reset()
raw_env = env.unwrapped
raw_env.set_goal(goal_position )



# الإظهار
env.render()

In [16]:
import time 
from IPython.display import clear_output,display 


clear_output() 

env.reset()
raw_env = env.unwrapped
raw_env.set_goal(goal_position )


done  = False 

while  not done : 

    action = env.action_space.sample() 

    state,reward,terminated, truncated,info = env.step(action)
    clear_output()
    env.render() 

    # time.sleep(1)
    done = terminated or truncated
    


In [17]:
env.close()