Simplified 5G/6G mobility + MEC/offloading environment
for a multi-agent decision system.

The goal is not to be 3GPP-accurate, but to provide a consistent,
controllable world where different decision policies can be tested.

Main entry point: NetworkSimulation

- reset(service_type, ...) -> initial context
- get_context() -> current observable state
- step(decision) -> (next_context, info)
- run_episode(num_steps, controller_fn) -> trace list

In [1]:
from __future__ import annotations

import math
import random
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any

import numpy as np

# Utility functions

In [2]:
def dbm_to_watts(dbm: float) -> float: 
    return 10 ** ((dbm - 30) / 10.0)

In [3]:
def watts_to_dbm(watts: float) -> float:
    if watts <= 0:
        return -200.0
    return 10 * math.log10(watts) + 30

In [4]:
def path_loss_db(distance_m: float,
                 pl0_db: float = 30.0,
                 path_loss_exp: float = 3.5) -> float:
    """
    PL(d) = PL0 + 10 * n * log10(d / d0)
    """
    d = max(distance_m, 1.0)
    return pl0_db + 10 * path_loss_exp * math.log10(d)

In [5]:
def shannon_capacity_hz(bandwidth_hz: float, sinr_linear: float) -> float:
    """C = B * log2(1 + SINR)"""
    return bandwidth_hz * math.log2(1.0 + sinr_linear)

# Data models

In [6]:
@dataclass
class BaseStation:
    id: int
    x: float
    y: float
    tx_power_dbm: float = 46.0  # ~40W
    noise_dbm: float = -100.0   # thermal noise + receiver noise
    bandwidth_mhz: float = 20.0

    def distance_to(self, x: float, y: float) -> float:
        return math.hypot(self.x - x, self.y - y)

In [7]:
@dataclass
class UserEquipment:
    x: float
    y: float
    speed_mps: float
    direction_rad: float
    local_cpu_ghz: float = 2.0
    battery_joules: float = 1000.0

    def step(self, dt_s: float, area_size: Tuple[float, float]) -> None:
        """Move UE with simple constant velocity model and reflecting borders."""
        dx = self.speed_mps * math.cos(self.direction_rad) * dt_s
        dy = self.speed_mps * math.sin(self.direction_rad) * dt_s

        new_x = self.x + dx
        new_y = self.y + dy
        max_x, max_y = area_size

        # Reflect from borders
        if new_x < 0 or new_x > max_x:
            self.direction_rad = math.pi - self.direction_rad
            new_x = max(0, min(max_x, new_x))
        if new_y < 0 or new_y > max_y:
            self.direction_rad = -self.direction_rad
            new_y = max(0, min(max_y, new_y))

        self.x = new_x
        self.y = new_y



In [8]:
@dataclass
class Task:
    id: int
    arrival_time_s: float
    data_size_bits: float
    cpu_cycles: float
    deadline_s: float
    service_type: str

In [9]:
@dataclass
class ServiceProfile:
    name: str
    latency_budget_s: float
    energy_weight: float
    latency_weight: float
    task_data_bits_mean: float
    task_cpu_cycles_mean: float
    task_interarrival_s: float

In [None]:
SERVICE_PROFILES: Dict[str, ServiceProfile] = {
    "VR": ServiceProfile(
        name="VR",  # models eMBB (Enhanced Mobile Broadband) like (VR, AR, cloud gaming, video)
        latency_budget_s=0.05,  
        energy_weight=0.3,
        latency_weight=0.7,
        task_data_bits_mean=5e6,   # 5 Mbits
        task_cpu_cycles_mean=5e9, 
        task_interarrival_s=0.2    # task every ~200 ms
    ),
    "EV": ServiceProfile(
        name="EV",  # models URLLC (Ultra Reliable Low Latency Communication) like (EV control, robotics, remote surgery)
        latency_budget_s=0.1,  
        energy_weight=0.4,
        latency_weight=0.6,
        task_data_bits_mean=1e6,
        task_cpu_cycles_mean=2e9,
        task_interarrival_s=0.5
    ),
    "IoT": ServiceProfile(
        name="IoT",  # models mMTC (Massive Machine Type Communications) like (IoT sensors, smart meters)
        latency_budget_s=1.0,    
        energy_weight=0.75,
        latency_weight=0.3,
        task_data_bits_mean=1e5,
        task_cpu_cycles_mean=5e8,
        task_interarrival_s=5.0
    ),
}

In [11]:
@dataclass
class MecServer:
    id: int
    attached_bs: int
    cpu_ghz: float = 10.0

# Core simulation
Simplified joint radio + MEC environment.

    This class is intentionally generic and easy to control from
    rule-based or learning-based agents.

    Time is discrete: each step advances the simulation by dt_s seconds.
    

In [None]:
class NetworkSimulation:

    def __init__(
        self,
        num_cells: int = 3,
        area_size: Tuple[float, float] = (1000.0, 1000.0),
        dt_s: float = 0.1,
        seed: Optional[int] = None,
    ) -> None:
        self.num_cells = num_cells
        self.area_size = area_size
        self.dt_s = dt_s
        self.rng = random.Random(seed)
        #np.random.seed(seed if seed is not None else 907)

        self.base_stations: List[BaseStation] = []
        self.mec_servers: Dict[int, MecServer] = {}
        self.ue: Optional[UserEquipment] = None
        self.current_time_s: float = 0.0
        self.service_profile: ServiceProfile = SERVICE_PROFILES["VR"]
        self.serving_cell_id: int = 0
        self.task_counter: int = 0
        self.time_until_next_task_s: float = 0.0
        self.trace: List[Dict[str, Any]] = []

        self._init_cells_and_mec()

    # ------------------------ initialization -----------------------------

    def _init_cells_and_mec(self) -> None:
        """Place base stations in a simple line for now, each with a MEC server."""
        max_x, max_y = self.area_size
        spacing = max_x / (self.num_cells + 1)

        self.base_stations.clear()
        self.mec_servers.clear()

        for i in range(self.num_cells):
            bs_x = spacing * (i + 1)
            bs_y = max_y / 2.0
            bs = BaseStation(id=i, x=bs_x, y=bs_y)
            self.base_stations.append(bs)
            self.mec_servers[i] = MecServer(id=i, attached_bs=i)

    # ------------------------ public API ---------------------------------

    def reset(
        self,
        service_type: str = "VR",
        random_start: bool = True,
        seed: Optional[int] = None,
    ) -> Dict[str, Any]:
        """Reset simulation state and return initial context."""
        if seed is not None:
            self.rng.seed(seed)
            np.random.seed(seed)

        if service_type not in SERVICE_PROFILES:
            raise ValueError(f"Unknown service_type '{service_type}'")

        self.service_profile = SERVICE_PROFILES[service_type]
        self.current_time_s = 0.0
        self.task_counter = 0
        self.time_until_next_task_s = self._sample_interarrival()
        self.trace = []

        max_x, max_y = self.area_size

        if random_start:
            x = self.rng.uniform(0.1 * max_x, 0.9 * max_x)
            y = self.rng.uniform(0.1 * max_y, 0.9 * max_y)
            direction = self.rng.uniform(0, 2 * math.pi)
        else:
            x, y = max_x * 0.1, max_y / 2.0
            direction = 0.0

        speed_mps = 10.0  # ~36 km/h, can be tuned
        self.ue = UserEquipment(
            x=x,
            y=y,
            speed_mps=speed_mps,
            direction_rad=direction,
        )

        # Initial serving cell: max RSRP
        radio_state = self._compute_radio_state()
        self.serving_cell_id = int(np.argmax(radio_state["rsrp_dbm"]))

        return self.get_context()

    # ------------------------ radio model --------------------------------

    def _compute_radio_state(self) -> Dict[str, Any]:
        """Compute distances, RSRP, SINR and throughput per cell."""
        if self.ue is None:
            raise RuntimeError("Simulation not reset. Call reset() first.")

        rsrp_dbm_list: List[float] = []
        sinr_db_list: List[float] = []
        throughput_bps_list: List[float] = []

        ue_x, ue_y = self.ue.x, self.ue.y

        for bs in self.base_stations:
            d = bs.distance_to(ue_x, ue_y)
            pl_db = path_loss_db(d)
            rx_power_dbm = bs.tx_power_dbm - pl_db
            rsrp_dbm_list.append(rx_power_dbm)

        # For simplicity, treat interference as coming from all non-serving cells when computing SINR for each candidate cell
        for i, bs in enumerate(self.base_stations):
            signal_w = dbm_to_watts(rsrp_dbm_list[i])
            interference_w = 0.0
            for j, _ in enumerate(self.base_stations):
                if j == i:
                    continue
                interference_w += dbm_to_watts(rsrp_dbm_list[j])

            noise_w = dbm_to_watts(bs.noise_dbm)
            sinr_linear = signal_w / (interference_w + noise_w + 1e-15)
            sinr_db = 10 * math.log10(max(sinr_linear, 1e-15))
            sinr_db_list.append(sinr_db)

            bandwidth_hz = bs.bandwidth_mhz * 1e6
            capacity_bps = shannon_capacity_hz(bandwidth_hz, sinr_linear)
            throughput_bps_list.append(capacity_bps)

        return {
            "rsrp_dbm": rsrp_dbm_list,
            "sinr_db": sinr_db_list,
            "throughput_bps": throughput_bps_list,
        }

    # ------------------------ task model ---------------------------------

    def _sample_interarrival(self) -> float:
        """Sample time until next task using exponential distribution."""
        lam = 1.0 / self.service_profile.task_interarrival_s
        # Avoid extremely small intervals
        return max(np.random.exponential(1.0 / lam), 0.01)

    def _maybe_generate_task(self) -> Optional[Task]:
        if self.time_until_next_task_s > 0:
            return None

        self.task_counter += 1

        data_size = max(
            np.random.normal(self.service_profile.task_data_bits_mean,
                             0.3 * self.service_profile.task_data_bits_mean),
            1e4,
        )
        cpu_cycles = max(
            np.random.normal(self.service_profile.task_cpu_cycles_mean,
                             0.3 * self.service_profile.task_cpu_cycles_mean),
            1e7,
        )

        deadline = self.current_time_s + self.service_profile.latency_budget_s

        task = Task(
            id=self.task_counter,
            arrival_time_s=self.current_time_s,
            data_size_bits=data_size,
            cpu_cycles=cpu_cycles,
            deadline_s=deadline,
            service_type=self.service_profile.name,
        )

        # Schedule next task
        self.time_until_next_task_s = self._sample_interarrival()
        return task

    # ------------------------ context & decisions ------------------------

    def get_context(self) -> Dict[str, Any]:
        """Return the observable context for decision-making agents."""
        if self.ue is None:
            raise RuntimeError("Simulation not reset. Call reset() first.")

        radio_state = self._compute_radio_state()
        serving_idx = self.serving_cell_id
        rsrp_serving = radio_state["rsrp_dbm"][serving_idx]
        sinr_serving = radio_state["sinr_db"][serving_idx]
        throughput_serving = radio_state["throughput_bps"][serving_idx]

        context = {
            "time_s": self.current_time_s,
            "service_type": self.service_profile.name,
            "latency_budget_s": self.service_profile.latency_budget_s,
            "ue_position": (self.ue.x, self.ue.y),
            "ue_speed_mps": self.ue.speed_mps,
            "serving_cell_id": self.serving_cell_id,
            "rsrp_dbm": radio_state["rsrp_dbm"],
            "sinr_db": radio_state["sinr_db"],
            "throughput_bps": radio_state["throughput_bps"],
            "serving_rsrp_dbm": rsrp_serving,
            "serving_sinr_db": sinr_serving,
            "serving_throughput_bps": throughput_serving,
            "user_pref": {
                "latency_weight": self.service_profile.latency_weight,
                "energy_weight": self.service_profile.energy_weight,
            },
        }
        return context

    # ------------------------ task evaluation ----------------------------

    def _evaluate_task_decision(
        self,
        task: Task,
        offload_target: str,
        serving_throughput_bps: float,
    ) -> Dict[str, Any]:
        """Evaluate latency / energy for a single task given offload target."""
        ue = self.ue
        if ue is None:
            raise RuntimeError("Simulation not reset. Call reset() first.")

        # Parameters for a toy energy model
        tx_power_w = 1.0    # 1 Watt transmit
        cpu_power_w = 2.0   # 2 Watt while computing

        # Local processing
        if offload_target == "local":
            exec_time_s = task.cpu_cycles / (ue.local_cpu_ghz * 1e9)
            total_latency_s = exec_time_s
            energy_j = exec_time_s * cpu_power_w

        # Edge processing via serving cell
        elif offload_target == "edge":
            tx_rate_bps = max(serving_throughput_bps, 1e3)
            tx_time_s = task.data_size_bits / tx_rate_bps
            mec = self.mec_servers[self.serving_cell_id]
            exec_time_s = task.cpu_cycles / (mec.cpu_ghz * 1e9)
            total_latency_s = tx_time_s + exec_time_s
            energy_j = tx_time_s * tx_power_w

        # "Cloud" with extra backhaul latency
        elif offload_target == "cloud":
            tx_rate_bps = max(serving_throughput_bps, 1e3)
            tx_time_s = task.data_size_bits / tx_rate_bps
            backhaul_s = 0.02  # 20 ms backbone delay (toy)
            cloud_cpu_ghz = 50.0
            exec_time_s = task.cpu_cycles / (cloud_cpu_ghz * 1e9)
            total_latency_s = tx_time_s + backhaul_s + exec_time_s
            energy_j = tx_time_s * tx_power_w

        else:
            raise ValueError(f"Unknown offload_target '{offload_target}'")

        deadline_met = total_latency_s <= (task.deadline_s - task.arrival_time_s)

        # Update UE battery
        ue.battery_joules -= energy_j

        return {
            "task_id": task.id,
            "offload_target": offload_target,
            "latency_s": total_latency_s,
            "deadline_s": task.deadline_s - task.arrival_time_s,
            "deadline_met": deadline_met,
            "energy_j": energy_j,
        }

    # ------------------------ baseline controller ------------------------

    def baseline_controller(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Simple rule-based controller for both HO and offloading.

        - Handover to the cell with highest RSRP if it's better than the
          current one by > hysteresis_db.
        - Offload locally for small tasks / bad channel, edge otherwise.
        - Cloud is unused in this simple baseline.
        """
        rsrp_list = context["rsrp_dbm"]
        serving_idx = context["serving_cell_id"] 
        serving_rsrp = rsrp_list[serving_idx]
        best_idx = int(np.argmax(rsrp_list))
        best_rsrp = rsrp_list[best_idx]

        hysteresis_db = 5.0 #at least 5 dB better to trigger HO
        target_cell = serving_idx
        if best_idx != serving_idx and (best_rsrp - serving_rsrp) > hysteresis_db: 
            target_cell = best_idx 

        # Offloading rule: here we simply look at serving throughput and latency budget.
        thr = context["serving_throughput_bps"]
        latency_budget_s = context["latency_budget_s"]

        if latency_budget_s < 0.07 and thr > 5e6: 
            offload_target = "edge"
        elif latency_budget_s < 0.2 and thr > 2e6: 
            offload_target = "edge"
        else:
            offload_target = "local"

        return {
            "handover_target": target_cell,
            "offload_target": offload_target,
        }

    # ------------------------ main stepping logic ------------------------

    def step(self, decision: Optional[Dict[str, Any]] = None) -> Tuple[Dict[str, Any], Dict[str, Any]]:
        """Advance the simulation by one time step.

        decision:
            {
              "handover_target": Optional[int],  # target cell index
              "offload_target": Optional[str],   # "local" | "edge" | "cloud" | None
            }
        If decision is None, use the baseline_controller.

        Returns:
            next_context, info

        info contains any generated task metrics and misc diagnostics.
        """
        if self.ue is None:
            raise RuntimeError("Simulation not reset. Call reset() first.")

        # Use baseline controller if none provided
        if decision is None:
            decision = self.baseline_controller(self.get_context())

        # 1) Apply handover decision (if any)
        ho_target = decision.get("handover_target", self.serving_cell_id)
        if not isinstance(ho_target, int) or ho_target < 0 or ho_target >= self.num_cells:
            ho_target = self.serving_cell_id
        self.serving_cell_id = ho_target

        # 2) Move UE
        self.ue.step(self.dt_s, self.area_size)

        # 3) Time & task arrival
        self.current_time_s += self.dt_s
        self.time_until_next_task_s -= self.dt_s

        new_task = self._maybe_generate_task()

        # 4) Compute new radio state
        radio_state = self._compute_radio_state()
        serving_thr = radio_state["throughput_bps"][self.serving_cell_id]

        # 5) Evaluate offloading decision if there's a task and a target
        task_info: Optional[Dict[str, Any]] = None
        if new_task is not None:
            offload_target = decision.get("offload_target", "local")
            if offload_target is None:
                offload_target = "local"
            task_info = self._evaluate_task_decision(new_task, offload_target, serving_thr)

        # 6) Build context & log
        context = self.get_context()
        info: Dict[str, Any] = {
            "decision": decision,
            "new_task": new_task is not None,
            "task_info": task_info,
        }

        log_entry = {
            "time_s": self.current_time_s,
            "context": context,
            "decision": decision,
            "task_info": task_info,
        }
        self.trace.append(log_entry)

        return context, info

    # ------------------------ rollouts -----------------------------------

    def run_episode(
        self,
        num_steps: int,
        controller_fn: Optional[Any] = None,
    ) -> List[Dict[str, Any]]:
        """Run a full episode and return the collected trace.

        controller_fn: callable taking context and returning a decision dict.
                       If None, uses baseline_controller.
        """
        self.trace = []
        for _ in range(num_steps):
            ctx = self.get_context()
            if controller_fn is None:
                decision = None
            else:
                decision = controller_fn(ctx)
            self.step(decision)
        return self.trace

# Minimal smoke test

In [14]:
sim = NetworkSimulation(num_cells=3, area_size=(1000.0, 1000.0), dt_s=0.01, seed=907)
ctx = sim.reset(service_type="VR")
print("Initial context:")
print({k: v for k, v in ctx.items() if k not in ("rsrp_dbm", "sinr_db", "throughput_bps")})

trace = sim.run_episode(num_steps=5000)
num_tasks = sum(1 for t in trace if t["task_info"] is not None)
print(f"Ran 50 steps, tasks generated: {num_tasks}")

Initial context:
{'time_s': 0.0, 'service_type': 'VR', 'latency_budget_s': 0.05, 'ue_position': (661.2577104736738, 143.01620664673152), 'ue_speed_mps': 10.0, 'serving_cell_id': 2, 'serving_rsrp_dbm': -73.79842045548558, 'serving_sinr_db': -0.24570621668202625, 'serving_throughput_bps': 19195324.658412267, 'user_pref': {'latency_weight': 0.7, 'energy_weight': 0.3}}
Ran 50 steps, tasks generated: 237


In [16]:
trace

[{'time_s': 0.01,
  'context': {'time_s': 0.01,
   'service_type': 'VR',
   'latency_budget_s': 0.05,
   'ue_position': (661.3144926777114, 143.09852178084813),
   'ue_speed_mps': 10.0,
   'serving_cell_id': 2,
   'rsrp_dbm': [-79.76192587227486, -74.75199485889658, -73.7945530055611],
   'sinr_db': [-8.531110687957602, -1.94561813027782, -0.2434322676171385],
   'throughput_bps': [3786889.4996662717,
    14254693.81814482,
    19202665.914297745],
   'serving_rsrp_dbm': -73.7945530055611,
   'serving_sinr_db': -0.2434322676171385,
   'serving_throughput_bps': 19202665.914297745,
   'user_pref': {'latency_weight': 0.7, 'energy_weight': 0.3}},
  'decision': {'handover_target': 2, 'offload_target': 'edge'},
  'task_info': None},
 {'time_s': 0.02,
  'context': {'time_s': 0.02,
   'service_type': 'VR',
   'latency_budget_s': 0.05,
   'ue_position': (661.3712748817491, 143.18083691496474),
   'ue_speed_mps': 10.0,
   'serving_cell_id': 2,
   'rsrp_dbm': [-79.76161740657318, -74.749991655760

In [1]:
!jupyter nbconvert --to python simulation.ipynbon --output simulation.py

This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [--JupyterApp.answer_yes=True]
--execute
    Execute the notebook prior to export.
    Equivalent to: [--ExecutePr

