In [1]:
import numpy as np
import random
from scipy.special import erfcinv

In [3]:
# Edge Computing Environment
class EdgeComputingEnvironment:
    def __init__(self, M=15, area_size=100, D_m=1354, eta_m_range=(100, 300), F_max_ue=1.5, P_max=23, B=5, T_max=10, F_max_es=30, S_max_es=60, epsilon=10**-7, E_max=3, theta=10**-26, L=8, phi=0.02, N0=-174, f_es_dev=0.02, f_ue_dev=0.02):
        """
        Initialize the edge computing environment with given parameters.
        """
        self.M = M  # Number of users
        self.area_size = area_size  # Size of the area in which users are distributed
        self.D_m = D_m  # Task data size
        self.eta_m_range = eta_m_range  # Range of Task complexity
        self.F_max_ue = F_max_ue * 10**9  # Maximum frequency of user equipment
        self.P_max = 10 ** ((P_max - 30) / 10)  # Convert maximum transmission power from dBm to Watts
        self.B = B * 10**6  # Bandwidth
        self.T_max = T_max * 10**-3  # Maximum tolerable delay
        self.F_max_es = F_max_es * 10**9  # Maximum frequency of edge server
        self.S_max_es = S_max_es * 10**3  # Maximum cache size of edge server
        self.epsilon = epsilon  # Error tolerance for rate calculation
        self.E_max = E_max * 10**-3  # Maximum energy consumption
        self.theta = theta  # Energy coefficient
        self.L = L  # Number of antennas
        self.phi = phi * 10**-3  # Transmission time interval
        self.R_min = 10**6  # Minimum data rate
        self.N0 = N0  # Noise power in dBm
        self.N0 = 10 ** ((N0 - 30) / 10)  # Convert noise power from dBm/Hz to Watts/Hz
        self.PL_d = lambda d: -35.3 - (37.6 * np.log10(d))  # Path loss model
        self.f_es_dev = f_es_dev  #The deviation between the estimated value and the actual value of the processing rate of the ES
        self.f_ue_dev = f_ue_dev  #The deviation between the estimated value and the actual value of the processing rate of the UE
        self.is_training = True
        self.tasks = []
        self.current_task = {}
        self.Task_processed = 0
        self.penalty = 10

        self.user_device_params = []  # List to store parameters for each user device
        self.initialize_user_device_params()  # Initialize user device parameters

        self.cache = []  # Cache to store tasks
        self.current_cache_size = 0  # Current size of the cache
        self.transmitting_tasks = []  # List to store transmitting tasks
        self.processing_tasks = []  # List to store processing tasks
        self.current_time = 0.0  # Current simulation time

        # Initialize bandwidth and computation attributes
        self.total_bandwidth = 0 # Initialize total bandwidth
        self.total_computation = 0 # Initialize total computation

    def initialize_user_device_params(self):
        """
        Initialize parameters for each user device.
        Randomly generates user-specific parameters such as path loss.
        """
        for device_id in range(self.M):
            d = np.random.uniform(1, self.area_size / 2)  # Distance to server
            PL_dB = self.PL_d(d)
            g_m = 10 ** (PL_dB / 10)  # Convert path loss from dB to linear scale
            h_bar = np.random.randn(1, self.L) + 1j * np.random.randn(1, self.L)  # Channel gain

            self.user_device_params.append({
                'device_id': device_id,  # Assign a unique ID to each device
                'd': d,
                'g_m': g_m,
                'h_bar': h_bar,
            })

    def create_task(self):
        """
        create task randomly
        """
        task_distribution = np.random.choice(range(self.M), self.M, replace=True)
        self.tasks = []
        for user_id in task_distribution:
            eta_m =  np.round(np.random.choice(np.linspace(self.eta_m_range[0], self.eta_m_range[1], 50)))  # Task complexity
            T_max_task = self.T_max  # Static
            order = np.round(random.uniform(1,5))

            task_info = {
            'eta_m': eta_m,
            'T_max': T_max_task,
            'D_m': 1354,  # Task data size
            'user_id' : user_id,
            'order' : order
            }

            self.tasks.append(task_info)

    def get_state(self):
        """
        get state and task.
        get task and check that task is in the cache or not.
        """
        self.tasks = sorted(self.tasks, key=lambda x: x['order'], reverse=True)

        task = self.tasks.pop()

        self.current_task = task

        cache_hit = 1 if any(task == task_info[0] for task_info in self.cache) else 0  # check the cache

        state = [
            task['eta_m'],
            self.total_bandwidth,
            self.total_computation,
            cache_hit,
            self.user_device_params[task['user_id']]['d']
        ]

        return state

    def calculate_gamma_m(self, b_m, p_m, user_id):
        """
        Calculate the signal-to-noise ratio (SNR) for a given user.

        Parameters:
        - b_m (float): Bandwidth allocation
        - p_m (float): Transmission power
        - user_id (int): ID of the user

        Returns:
        - gamma_m (array): SNR values for the user's communication channel
        """
        h_m = np.sqrt(self.user_device_params[user_id]['g_m']) * self.user_device_params[user_id]['h_bar']  # Channel gain
        gamma_m = (p_m * np.linalg.norm(h_m, axis=1) ** 2) / (b_m * self.B * self.N0)  # SNR
        
        return gamma_m

    def calculate_uplink_rate(self, b_m, p_m, user_id):
        """
        Calculate the uplink data rate for a given user.

        Parameters:
        - b_m (float): Bandwidth allocation
        - p_m (float): Transmission power
        - user_id (int): ID of the user

        Returns:
        - R_m (float): Uplink data rate in bits/second
        """
        gamma_m = self.calculate_gamma_m(b_m, p_m, user_id)  # Calculate the SINR for the m-th user
        V_m = 1 - (1 / (1 + gamma_m) ** 2)  # Intermediate variable for rate calculation
        Q_inv = np.sqrt(2) * erfcinv(2 * self.epsilon)  # Calculate the inverse of the Q-function for the outage probability
        R_m = (self.B / np.log(2)) * ((b_m * np.log(1 + gamma_m)) - ((np.sqrt((b_m * V_m) / (self.phi * self.B))) * Q_inv))  # Uplink data rate

        return R_m

    def calculate_delay(self, alpha_m, cache_hit, R_m, D_m, f_ue_m, f_es_m, f_ue_est, f_es_est, eta_m):
        """
        Calculate the end-to-end delay for a given task.

        Parameters:
        - alpha_m (float): Offloading decision
        - cache_hit (int): Split factor (0 or 1)
        - R_m (float): Uplink data rate in bits/second
        - D_m (int): Data size
        - f_ue_m (float): Computation capability of the user device
        - f_es_m (float): Computation capability of the edge server
        - f_ue_est (float): Estimation error for the user device's computation capability
        - f_es_est (float): Estimation error for the edge server's computation capability
        - eta_m (float): Computational intensity

        Returns:
        - T_e2e (float): End-to-end delay in seconds
        """
        actual_f_ue_m = f_ue_m - f_ue_est  # Actual processing rate of the user device

        if cache_hit == 1:
            T_es = self.calculate_server_processing_delay(alpha_m, cache_hit, D_m, f_es_m, f_es_est, eta_m)  # Only edge server processing delay
            T_e2e = T_es

        else:
            T_ue = (alpha_m * eta_m * D_m) / actual_f_ue_m  # User device processing delay
            T_tr = self.calculate_transmission_delay(alpha_m, R_m, D_m)  # Transmission delay
            T_es = self.calculate_server_processing_delay(alpha_m, cache_hit, D_m, f_es_m, f_es_est, eta_m)  # Edge server processing delay
            T_e2e = T_ue + T_tr + T_es  # Total end-to-end delay

        return T_e2e

    def calculate_transmission_delay(self, alpha_m, R_m, D_m):
        """
        Calculate the transmission delay for a given task.

        Parameters:
        - alpha_m (float): Offloading decision
        - R_m (float): Uplink data rate in bits/second
        - D_m (int): Data size
        - user_id (int): ID of the user

        Returns:
        - T_co (float): Transmission delay in seconds
        """
        T_co =  ((1 - alpha_m) * (D_m * 8)) / R_m   # Transmission delay calculation based on task size and uplink rate

        return T_co

    def calculate_server_processing_delay(self, alpha_m, cache_hit, D_m, f_es_m, f_es_est, eta_m):
        """
        Calculate the processing delay at the edge server for a given task.

        Parameters:
        - alpha_m (float): Offloading decision
        - D_m (int): Data size
        - cache_hit (0,1): 1 = Exist in cache and 0 not exist in cache
        - f_es_m (float): Computation capability of the edge server
        - f_es_est (float): Estimation error for the edge server's computation capability
        - eta_m (float): Computational intensity

        Returns:
        - T_es (float): Processing delay at the edge server in seconds
        """

        actual_f_es_m = f_es_m - f_es_est  # Actual processing rate of the Edge server

        if cache_hit == 0:
            T_es = ((1 - alpha_m) * eta_m * D_m) / actual_f_es_m  # Processing delay at the edge server

        else:
            T_es = (eta_m * D_m) / actual_f_es_m
        return T_es

    def calculate_energy_consumption(self, s_m, R_m, alpha_m, p_m, D_m, f_ue_m, f_ue_est, eta_m):
        """
        Calculate the energy consumption for a given task.

        Parameters:
        - alpha_m (float): Offloading decision
        - R_m (float): Uplink data rate in bits/second
        - s_m (int): Split factor (0 or 1)
        - f_ue_m (float): Computation capability of the user device
        - p_m (float): Transmission power
        - f_ue_est (float): Estimation error for the user device's computation capability
        - eta_m (float): Computational intensity

        Returns:
        - E_total (float): Total energy consumption in Joules
        """

        actual_f_ue_m = f_ue_m - f_ue_est  # Calculate the actual processing rate of the UE

        E_ue = alpha_m * (self.theta / 2) * eta_m * D_m * (actual_f_ue_m ** 2)  # Energy consumption at the user device
        E_tx = ((D_m * 8) * p_m) / R_m  # Transmission energy

        if s_m == 1:  # Task is in cache
            E_total = 0  # No energy consumed when task is in cache
        else:
            E_total = E_ue + E_tx  # Total energy consumption

        return E_total

    def manage_cache(self, task_info, task_delay, cache_hit_model):
        """
        Manage the cache for storing and retrieving tasks.

        Parameters:
        - task_info (tuple): Task parameters to identify the task
        - task_delay (float): Delay of the task

        Returns:
        - bool: True if the task is found in the cache, False otherwise
        """
        # Check for cache hit first
        cache_hit = any(task_info == task[0] for task in self.cache)
        
        if task_delay == 0:
            return cache_hit  # Return True if found, False otherwise
        
        task_size = task_info['D_m'] * 8  # Task size
        Server_Max_Capacity = self.S_max_es  # Server maximum capacity
        if Server_Max_Capacity == 0 :
            return
        
        if self.is_training :
            # During training
            if cache_hit:
                self.current_cache_size -= task_info['D_m'] * 8
                self.cache = [task for task in self.cache if task[0] != task_info]

            # During training, always update cache based on task delay
            self.cache.append((task_info, task_delay))  # Add task to cache
            self.current_cache_size += task_size  # Update cache size
        
            if self.current_cache_size >= Server_Max_Capacity:
                sorted_cache = sorted(self.cache, key=lambda x: x[1], reverse=True)  # Sort tasks by delay in descending order

                while (self.current_cache_size) > Server_Max_Capacity:
                    if not sorted_cache:
                        break  # Exit loop if sorted_cache is empty
                    last_task = sorted_cache.pop()  # Remove the last task from sorted_cache
                    self.cache.remove(last_task)  # Remove the task from the cache
                    self.current_cache_size -= last_task[0]['D_m'] * 8  # Update current cache size

        if self.is_training is False:
            # During testing, follow the model's prediction
            if cache_hit_model == 0 :
                if cache_hit:
                    self.current_cache_size -= task_info['D_m'] * 8
                    self.cache = [task for task in self.cache if task[0] != task_info]
            else:
                if cache_hit:
                    self.current_cache_size -= task_info['D_m'] * 8
                    self.cache = [task for task in self.cache if task[0] != task_info]

                if (task_size + self.current_cache_size) <= Server_Max_Capacity:
                    self.cache.append((task_info, task_delay))  # Add task to cache
                    self.current_cache_size += task_size  # Update cache size

                else:
                    sorted_cache = sorted(self.cache, key=lambda x: x[1], reverse=True)  # Sort tasks by delay in descending order

                    while (task_size + self.current_cache_size) > Server_Max_Capacity:
                        if not sorted_cache:
                            break  # Exit loop if sorted_cache is empty
                        last_task = sorted_cache.pop()  # Remove the last task from sorted_cache
                        self.cache.remove(last_task)  # Remove the task from the cache
                        self.current_cache_size -= last_task[0]['D_m'] * 8  # Update current cache size

                    self.cache.append((task_info, task_delay))  # Add task to cache
                    self.current_cache_size += task_size  # Update cache size


    def step(self, action):
        """
        Perform a simulation step for the given action.

        Parameters:
        - action (array): Array of action for each user
        - tasks (array): Array of task for each user

        Returns:
        - tuple: (task_rewards, next_state, done)
        """
        done = False

        alpha_m = action[0]
        b_m = action[1]
        p_m = action[2]
        f_ue_m = action[3]
        f_es_m = action[4]
        cache_hit_model = action[5]

        task = self.current_task

        user_id = task.pop('user_id',0)
        task.pop('order')
        
        # Determine if the task is a cache hit or miss
        cache_hit = 1 if self.manage_cache(task, 0, cache_hit_model) else 0

        f_ue_est = f_ue_m * self.f_ue_dev  
        f_es_est = f_es_m * self.f_es_dev  

        # Calculate the uplink data rate for the user
        R_m = self.calculate_uplink_rate(b_m, p_m, user_id)

        # Calculate the end-to-end delay for the task
        delay = self.calculate_delay(
            alpha_m, cache_hit, R_m,
            task['D_m'], f_ue_m, f_es_m, f_ue_est,
            f_es_est, task['eta_m']
        )

        delay = np.round(delay[0],6) if isinstance(delay, np.ndarray) else np.round(delay,6)

        # Calculate the energy consumption for the task
        energy = self.calculate_energy_consumption(
            cache_hit, R_m, alpha_m, p_m, task['D_m'], f_ue_m,
            f_es_est, task['eta_m']
        )

        energy = np.round(energy[0],6) if isinstance(energy, np.ndarray) else np.round(energy,6)

        # Manage task transmission and processing times
        if cache_hit == 0:
            transmission_end_time = self.current_time + self.calculate_transmission_delay(alpha_m, R_m, task['D_m'])
            processing_end_time = transmission_end_time + self.calculate_server_processing_delay(alpha_m, cache_hit, task['D_m'], f_es_m, f_es_est, task['eta_m'])

            self.transmitting_tasks.append((self.current_time, transmission_end_time, b_m))
            process = f_es_m * (1 - alpha_m)
            self.processing_tasks.append((transmission_end_time, processing_end_time, process))

        else:
            # For cache hit, only processing delay is considered
            processing_end_time = self.current_time + self.calculate_server_processing_delay(alpha_m, cache_hit, task['D_m'], f_es_m, f_es_est, task['eta_m'])
            self.processing_tasks.append((self.current_time, processing_end_time, f_es_m))

        # Update cache with the task if it becomes eligible
        self.manage_cache(task, delay, cache_hit_model)
            
        # Calculate total bandwidth and computation resource usage at current time
        self.total_bandwidth = sum(b for _, end_time, b in self.transmitting_tasks if end_time > self.current_time)
        self.total_computation = sum(f for _, end_time, f in self.processing_tasks if end_time > self.current_time)

        # Free resources for tasks that have completed transmission or processing
        self.transmitting_tasks = [(start_time, end_time, b) for start_time, end_time, b in self.transmitting_tasks if end_time > self.current_time]
        self.processing_tasks = [(start_time, end_time, f) for start_time, end_time, f in self.processing_tasks if end_time > self.current_time]

        # Check The Cache hit of model is right or not 
        cache_hit = 1 if self.manage_cache(task, 0, cache_hit_model) else 0
        cache_hit_right = 1 if cache_hit == cache_hit_model else 0

        # Calculate reward
        reward  = (-energy - delay)*1e4  # *1e4 for convert to a large number 
        
        # Apply penalties for exceeding resource limits
        if delay > task['T_max']:
            reward -= self.penalty
            done = True
        if cache_hit_right == 0:
            reward -= self.penalty
        if R_m < self.R_min:
            reward -= self.penalty*2
            done = True
        if energy > self.E_max:
            reward -= self.penalty
            done = True
        if self.total_bandwidth > 1:
            reward -= self.penalty
            done = True
        if self.total_computation > self.F_max_es:
            reward -= self.penalty
            done = True

        state_info = [
            delay,
            energy,
            task['eta_m'],
            self.total_bandwidth,
            self.total_computation,
            cache_hit_right,
            self.user_device_params[user_id]['d']
        ]

        self.Task_processed += 1

        return reward, state_info, done

    # Increment current simulation time
    def increase_time(self):
        self.current_time += self.T_max
        
    def reset(self):
        """
        Reset the environment to its initial state.
        """
        self.cache = [] 
        self.current_cache_size = 0
        self.Task_processed = 0  
        self.transmitting_tasks = [] 
        self.processing_tasks = [] 
        self.current_time = 0.0  
        self.user_device_params = []
        self.initialize_user_device_params()
        self.total_bandwidth = 0  
        self.total_computation = 0  

    def render(self):
        print(f"Number of Users: {self.M}")
        print(f"Number of Task Processed: {self.Task_processed}")
        print(f"Total Bandwidth Used: {self.total_bandwidth}")
        print(f"Total Computation Used: {self.total_computation}")
        print(f"Current Cache Size: {np.round(self.current_cache_size/1000,2)} Kb")