In [None]:
import numpy as np
import pandas as pd
import random
import gym
import matplotlib.pyplot as plt
from collections import deque
import numpy as np
import pprint
import ray
#ray.shutdown()
ray.init()
from ray.rllib.agents.sac import SACTrainer 
from ray.rllib.agents.ppo import PPOTrainer 
from ray.rllib.agents.ddpg import TD3Trainer 
from ray.tune.registry import register_env

import random
SEED = 3333
np.random.seed(SEED )
random.seed(SEED )

In [None]:

PMAX = 2 # Maximum transmitted power by the UE is 2 Watts 
# We are assuming that the UE has total power which can be splitted between local processing or computation offloading.
KAPPA = 1e-27
NOISE = 1e-9
BETA = 500
WD = 2 # weight in utility function for local procesing 
WE = 1e6
t = 0
T = 1e-3 #slot duration
T_off = 0.4*T #offloading duration is some fraction of the total 
EPISODE_LENGTH = 200 # 100 steps in every episode. No specific reason for this value.
class UE:
    def __init__(self,ID = '0x00',mode = 0, Asso_AP_ID = '1x00', P = PMAX, kappa = KAPPA, beta = BETA, wd = WD,we = WE ):
        self.ID = ID # string id of a UE
        self.mode = []
        #self.mode = [np.random.randint(0,2)]
        self.alpha = [] # the fraction of power to be spent on offloading of bits
        self.Asso_AP_ID = Asso_AP_ID
        self.P = P
        self.kappa = kappa
        self.lambda_ = 5*(1e6)*T # for determining offered offloading. Used in poisson process.
        self.task_buffer = 0        
        # several lists to store the information regarding the system state at each decision epoch.
        self.f = [] # local computing cycles/s
        self.offered_offloading = []
        self.tx_rate = []
        self.drop = []
        self.E_l = [] 
        self.E_o = []
        self.D_l = []
        self.D_o = []
        self.U_l = []
        self.U_o = []
        self.U = []        
        self.beta = beta       
        self.wd = WD
        self.we = WE        
        self.F  = (self.P/self.kappa)**(1/3)



    def generate_task(self): # generate a task in a time slot.
        #for now we assume that the UE always has a task to send. i.e. task arrival rate has been set very high.
        self.task_buffer += (np.random.poisson(self.lambda_))
        
    
    def create_alpha(self): # decide on power allocation between local processing and offloading. This will be predicted by actor network in the actual implelementation.
        #Here the function has been included for trial purpose only.
         self.alpha.append(np.random.rand())
         self.mode.append(np.random.randint(0,2))
         self.f.append(((1-self.alpha[t])*self.P/self.kappa)**(1/3))
    
    def send_offloading_tasks(self):
        # select an offloading mode # can be modified include to selection of F-AP also.for now we stick with a basic model
        # select Power fraction level
        # returns the amount of offered bits for execution to the F-AP
        #  self.mode[t] = np.random.randint(0,2)        
        # the offered task and the executed task locally should not exceed the buffer size
        self.offered_offloading.append(self.tx_rate[t]*T_off*BETA) # offered offloading is in cpu cycles to be executed per second and not in bits
        return (self.ID, self.offered_offloading[t])
        

    def local_execution(self):
        self.E_l.append(self.P*(1-self.alpha[t])*T) # local processing energy consumption
        self.D_l.append( T*self.f[t]/self.beta) # number of locally processed bits
        self.U_l.append(self.wd*self.D_l[t] - self.we*self.E_l[t]) # Utility in local processing
        return self.U_l[t]

    def offloading_execution(self):
        # data transmission rate will be provided by the F-AP node with which it is connected
        # drop wil indicate whether the F-AP could process the task or not
        self.D_o.append(self.tx_rate[t]*T_off*(1-self.drop[t])) # Number of offloaded bits.        
        self.E_o.append(self.alpha[t]*self.P*T_off) # Energy consumed in offloading bits
        self.U_o.append(self.wd*self.D_o[t] - self.we*self.E_o[t]) # Utility in offloading bits
        return self.U_o[t]

    def calculate_utility(self):
        self.U.append(self.local_execution() + self.offloading_execution()) # sum of local an offloading utility
        self.task_buffer = self.task_buffer - self.D_l[t] - self.D_o[t] # update the task buffer based on local and edge execution 
        self.task_buffer = max(0,self.task_buffer)    
        return self.U[t]


class FAP:
    # A class for Fog Access point  
    def __init__(self,x_pos=250, y_pos=0,ID = '1x00', queue_len =100):
        self.cpu = 1.2*1e9 # Available Computational capacity of MEC server
        self.ID = ID #ID of F-AP. Currently, using only one F-AP, hence irrelevant
        self.B = 1e6 # bandwidth 1MHz
        self.v = 1.3 # communication overhead
        self.UEs_asso = [] # list of UEs associated with the F-AP
        self.possible_channel_gains = dict(zip(range(6),10**(np.array([-11.23,-9.37,-7.8,-6.3,-4.68,-2.08])/10))) # set of 6 possible chains taken from https://ieeexplore.ieee.org/document/8771176
        self.reverse_channel_gain_dict = dict((v,k) for k,v in self.possible_channel_gains.items()) # a dictionary to find channel number from gain value        
        self.noise = NOISE # Noise power
        self.tx_rate = [] # a list to store offered tx rates to UEs in various time slots.
        self.capacity = queue_len # how may task bits at max it can store
        self.queue = deque([], maxlen = self.capacity)
        self.dropped_task = [] # details of dropped task at every slot containing information about ue id and its corresponding bits
        self.drop = [{}] # details of dropped task at every slot containing information about ue id and whether task is dropped? (True or false)
        self.queue_delay = 0 # not used
        self.x = x_pos # positions value not used
        self.y = y_pos 
        self.tasks = []  # a list containing details of task arrivals every time slot       
        N = len(self.possible_channel_gains) 
        transition_matrix = np.random.rand(N,N) # transition matrix for determining the probability of change of channel gains with every time slot
        transition_matrix /= transition_matrix.sum(axis =1, keepdims=1) # for converting rand values to probabilitues              
        self.transition_matrix = pd.DataFrame(transition_matrix,columns=self.possible_channel_gains.keys(), index=self.possible_channel_gains.keys())


    def get_offloading_request(self,*args):
    # here elements of args are UE.send_offloading_request()   
    # if the sum of offloading bits divided by cpu rate doesn't execeed the task duration then send the drop as false to every UE
    # otherwise remove the least requirement size request and again compute the result and so on till the tasks can be executed during the slot duration
    # send the drop indicator as True to the UEs whose tasks have been discarded.
        self.tasks.append(dict(list(args)))
        self.drop.append({})    # create an empty dictionary for task drop result filling  
        self.dropped_task.append([])      
        if sum(self.tasks[t].values())/self.cpu < 0.4*T: # convert bits into cpu cycles per second
            for key,_ in self.tasks[t].items():
                self.drop[t][key] = False        
        else:            
            for key,_ in self.tasks[t].items():  # set drop value for UE to False before setting the drop value to inidividually in the while loop
                self.drop[t][key] = False
            while sum(self.tasks[t].values())/self.cpu > T : # start removing the minimum size task one by one
                if  len(self.tasks[t]) == 0:
                    break
                min_key = min(zip(self.tasks[t].values(),self.tasks[t].keys()))[1]
                self.dropped_task[t].append([min(zip(self.tasks[t].values(),self.tasks[t].keys()))]) 
                del self.tasks[t][min_key] # drop the task with minimum task size
                self.drop[t][min_key] = True

        for ue in self.UEs_asso: # update the drop values in UE counter
            ue.drop.append(self.drop[t][ue.ID])



    def create_first_channel_gains(self):        
        self.channelGains = [dict(zip([ue.ID for ue in self.UEs_asso], np.random.choice(np.array(list(self.possible_channel_gains.values())),size =len(self.UEs_asso))))] # a list of channel
        # gains for each UE. The list is appended at every time slot by calculating the channel gains using transistion probability matrix.


    def generate_channel_gains(self):
        # generate channel gains for next slot using transition probability matrix and a set of six possible channel gains
        # uses locations of transition matrix for channel gain estimating   
        if t == 0:
            self.create_first_channel_gains()            
        else:    
            temp ={}
            for key,val in self.channelGains[t-1].items():            
                next_channel_no = np.random.choice(self.transition_matrix.columns,p=self.transition_matrix.loc[self.reverse_channel_gain_dict[val]]) #  self.reverse_channel_gain_dict[val] gives channel number
                temp[key] = self.possible_channel_gains[next_channel_no]          
            self.channelGains.append(temp) 
     
    def calculate_tx_rate(self):
        # calculates the transmission rate for a given slot between UE and AP, based on channel gain, alpha, power level and interference from other UEs.
        # the AP's tx rate list is appended with the dictionary containing ues' id and corresponding tx rate.
        temp = {}
        for ue in self.UEs_asso:
            temp[ue.ID] = self.B/self.v *np.log2(1+self.channelGains[t][ue.ID]*ue.alpha[t]*ue.P\
                /(self.noise + sum([self.channelGains[t][other_ue.ID]*other_ue.alpha[t]*other_ue.P for other_ue in self.UEs_asso if other_ue.ID != ue.ID ])))
            ue.tx_rate.append(temp[ue.ID]) # also copy the tx rate into UE's attribute

        self.tx_rate.append(temp)


In [None]:
class Environment(ray.rllib.env.multi_agent_env.MultiAgentEnv):

    def __init__(self,config):
        super().__init__()
        self.UE1 = UE(ID='a')
        self.UE2 = UE(ID='b')
        self.UE3 = UE(ID='c')
        self.AP = FAP(ID='AP')
        self._agent_ids = ['a','b','c']
        #self._agent_ids = set()
        self.AP.UEs_asso = [self.UE1,self.UE2,self.UE3]
        self.action_space = gym.spaces.Box(low= np.array([0]), high = np.array([1]), shape = (1,), dtype = np.float32)
        self.observation_space = gym.spaces.Box(low= np.array([0]), high = np.array([10]), shape = (1,), dtype = np.float32 )
        self.time_step_limit = EPISODE_LENGTH
        self.done = False
        #self.reset()

    def observation_space_sample(self):
        self.AP.generate_channel_gains()
        for key,val in self.AP.channelGains[-1].items(): # converting values in channel gain dictionary to an array
            self.AP.channelGains[-1][key] = np.array([val])
        return self.AP.channelGains[-1]

    def action_space_sample(self):
        return {'a':np.array(np.random.rand()),'b':np.array(np.random.rand()),'c':np.array(np.random.rand())}


        
    def step(self,action = {'a':np.array(np.random.rand()),'b':np.array(np.random.rand()),'c':np.array(np.random.rand())}):
        # currently we are considering only F-AP mode and local processing mode. Hence action for a UE consists of selecting the Power fraction level 'alpha' for offloading a

        self.timesteps += 1
        for ue in self.AP.UEs_asso:
            ue.generate_task()
            #ue.create_alpha()        
        for id ,alpha in action.items(): # Now to take the respective alpha values for each UE an updaate the alpha attribute of each UE and corresponding local computational capacity
            for ue in self.AP.UEs_asso:
                if ue.ID == id:
                    ue.alpha.append(alpha)
                    ue.f.append(((1-ue.alpha[-1])*ue.P/ue.kappa)**(1/3))       
            
        
        self.AP.calculate_tx_rate()
        self.AP.get_offloading_request(self.UE1.send_offloading_tasks(),self.UE2.send_offloading_tasks(),self.UE3.send_offloading_tasks())
        self.AP.generate_channel_gains()  # try to get channel gains for next state
        # The rewards, dones and infos will now be dictionary containing 
        reward = {}
        
        for ue in self.AP.UEs_asso:
            ue.calculate_utility()
            temp = {ue.ID: ue.U[-1]}
            reward.update(temp)       

        
        if self.timesteps >=self.time_step_limit:
            self.done = True
        
        #update dones
        dones = {}
        infos = {}
        for ue in self.AP.UEs_asso:
            temp1 = {ue.ID: self.done}
            temp2 = {ue.ID: {}}
            dones.update(temp1)
            infos.update(temp2)
        
        dones.update({"__all__":self.done}) # an extra __all__ key value is required in dones dictionary in multi-agent RL environment.

        for key,val in self.AP.channelGains[-1].items(): # converting values in channel gain dictionary to an array
            self.AP.channelGains[-1][key] = np.array([val])
        

        return self.AP.channelGains[-1], reward ,dones,infos #[ue.U[-1] for ue  in self.AP.UEs_asso]
        # Let us assume that UEs know that the channel gain at the start of time slot is known to the UE.
        # Then state is given as channel gain of the UE . Can the UEs coordinate to decide optimal power allocation strategy so as to maximize the utility.


    def reset(self):
        self.UE1 = UE(ID='a')
        self.UE2 = UE(ID='b')
        self.UE3 = UE(ID='c')
        self.AP = FAP(ID='AP')
        self.AP.UEs_asso = [self.UE1,self.UE2,self.UE3]
        self.timesteps = 0
        t=0
        self.AP.generate_channel_gains()
        for key,val in self.AP.channelGains[-1].items(): # converting values in channel gain dictionary to an array
            self.AP.channelGains[-1][key] = np.array([val])
        return self.AP.channelGains[-1]
    

    def render(self):
        print("_"*100)
        print(f"UE Tx rates = {self.AP.tx_rate[-1]}")
        print(f"UEs task drops = {self.AP.dropped_task[-1]}")
        print("_"*100)
        
config={}       
env = Environment(config) # a config dictionary needs to be passed as per RLlib's specification.



def env_creator(_):
    return Environment(config)

env_name = "OffloadingEnv"
register_env(env_name, env_creator)


In [None]:
# A configuration dictionary for the environment
config = {"env": env_name,
    # !PyTorch users!
    "framework": "torch",  # If users have chosen to install torch instead of tf.

    "create_env_on_driver": True,
    "disable_env_checking":True,
    "horizon":EPISODE_LENGTH
}
config.update({
    "multiagent": {
        "policies": None,
        "policy_mapping_fn": None,
    },
})

In [None]:
env.reset()
rllib_trainer = SACTrainer(config=config) #TD3Trainer from ddpg also works

mean_reward_SAC = []
for _ in range(20):   
    #rllib_trainer.train()
    results = rllib_trainer.train()
    print(f"Iteration={rllib_trainer.iteration}: R(\"return\")={results['episode_reward_mean']}")
    mean_reward_SAC.append(results['episode_reward_mean'])

In [None]:
env.reset()
rllib_trainer_TD3 = TD3Trainer(config=config) #TD3Trainer from ddpg also works
rllib_trainer_TD3
mean_reward_TD3 = []
for _ in range(20):   
    #rllib_trainer.train()
    results = rllib_trainer_TD3.train()
    print(f"Iteration={rllib_trainer_TD3.iteration}: R(\"return\")={results['episode_reward_mean']}")
    mean_reward_TD3.append(results['episode_reward_mean'])

In [None]:
plt.plot(np.arange(1,21),mean_reward_SAC,'-m*',np.arange(1,21),mean_reward_TD3,'-kd')
plt.xlabel
plt.show()

In [None]:
import time
from ipywidgets import Output
from IPython import display
import time

out = Output()
display.display(out)
out = Output()
display.display(out)

with out:
    env = Environment(config)
    obs = env.reset()
    while True:
        a1 = rllib_trainer_TD3.compute_actions(obs)['a']
        a2 = rllib_trainer_TD3.compute_actions(obs)['b']
        a3 = rllib_trainer_TD3.compute_actions(obs)['c']
        obs, rewards, dones, _ = env.step({"a": a1, "b": a2, 'c':a3})
        out.clear_output(wait=True)
        time.sleep(0.08)
        env.render()
        if dones['__all__']:
            break