# An Environment for Service-rate Control in Tandem Queueing Systems 
<div>
<img src="https://raw.githubusercontent.com/majidraeis/Figs/master/queue.png" width="600"/>
</div>

## Goal:
Consider a tandem multi-server queueing system with $N$ stages. We want to learn a service-rate control policy that gurantees an upperbound ($d_{ub}$) on the end-to-end delay.

## State ($\bar{s}$):
$\bar{s}=(s_1, s_2, \cdots, s_N)$

i.e., the vector of queue lengths at the begining of each time slot
## Actions ($a$):
Service rate, i.e., $a=(\mu_1, \mu_2, \cdots, \mu_N)$
## Reward ( $R(s,a)$ ):


In [None]:
import copy
import random
import operator
import functools
import numpy as np
import matplotlib.pyplot as plt

import gym
from gym  import spaces
from tqdm import tqdm

In [None]:
class TandemEnv(gym.Env):

    def __init__(self, N_s, Mu_s, lambda_rate, d_ub, step_len):
       """
       N_s = array of number of servers at each stage
       Mu_s = array of service rates 
       lambda_rate = arrival rate
       d_ub = delay upperbound
       step_len = length of each time slot (step)
       """

       self.N_s = np.array(N_s)
       self.Mu_s   = Mu_s
       self.rho    = rho
       self.d_ub   = d_ub
          
       self.job_index  = 1
       self.t_arr  = 0
       self.cnt    = 1
       self.cost   = 0
       self.T = step_len
       episode_len = 2000.0
       self.MAX_STEPS = episode_len/self.T
       self.delay_max = 40.0 #80.0
       self.dep_vec = np.zeros((1,3))

       self.mu_min = 1.1*self.lambda_rate/self.N_s
       self.mu_max = 2
       self.action_space = spaces.Box(
            low= self.mu_min,
            high= self.mu_max, shape=(len(N_s),),
            dtype=np.float32
        )
       self.B_max = 10
       self.observation_space = gym.spaces.MultiBinary(n=self.B_max*len(N_s))

       self.qls = np.zeros(len(N_s),dtype=int)
       self.qls_ave = np.zeros(len(N_s),dtype=float)
       self.tandem = Tandem(N_s, Mu_s)
       self.tandem_job_dict = {}
       self.t_slot = 0
       

    def step(self, action):

        for n_s in range(len(self.N_s)):
          self.tandem.queue[n_s].ql_ave = 0
        self._take_action(action)
        s_prime = self.qls
        delay_vec_dep = []
        observed_delay_vec = []
        for index in self.arr_inSlot:
          if index in self.dep_vec[:-1,0]: 
            #---------if the arrival departs in the same timeslot-----------------
            observed_delay = self.tandem.queue[-1].job_dict[index]['Td'] - self.tandem.queue[0].job_dict[index]['Ta'] 
            observed_delay_vec.append(observed_delay)  
          elif self.t_slot - self.tandem.queue[0].job_dict[index]['Ta'] > self.d_ub: 
            #--if the elapsed time spent in this slot is already larger than d_ub-
            observed_delay = self.t_slot - self.tandem.queue[0].job_dict[index]['Ta']
            observed_delay_vec.append(observed_delay)  
              
        for index in self.dep_vec[:-1,0]: 
          delay = self.tandem.queue[-1].job_dict[index]['Td'] - self.tandem.queue[0].job_dict[index]['Ta']
          for n_s in range(len(self.N_s)):
            del(self.tandem.queue[n_s].job_dict[index])
          delay_vec_dep.append(delay)  

        reward = self._get_reward(np.array(observed_delay_vec), action) 
        done = False
        if np.mean(delay_vec_dep) > self.delay_max or self.cnt > self.MAX_STEPS:
          done = True
        self.cnt += 1    
        s_prime_bin = []
        for j in range(len(N_s)):
          s_prime_bin = np.append(s_prime_bin, np.array(list(np.binary_repr(int(s_prime[j])).zfill(self.B_max))).astype(np.int8)[:self.B_max])      
        return s_prime_bin, reward, done, delay_vec_dep

    def _take_action(self, action):
      self.dep_vec = []
      self.arr_inSlot = []
      info_vec = np.zeros((2,3))
      ql_init = self.qls

      for n_s in range(len(self.N_s)):
        self.tandem.queue[n_s].mu_s = action[n_s] 

      while(True):
        self.arr_inSlot.append(self.job_index)
        info_vec[0] = [self.job_index, self.t_arr, 1]
        self.job_index += 1
        self.t_arr = self.t_arr + self._inter_arr_gen()
        

        if  self.t_arr > self.t_slot + self.T:
          t_slot_old = self.t_slot
          self.t_slot += self.T 
          info_vec[1] = [self.job_index, self.t_slot, 0]
          self.qls, dep_vec = self.tandem._step(info_vec)
          self.dep_vec = np.append(self.dep_vec, dep_vec).reshape(-1, 3)
          break

        info_vec[1] = [self.job_index, self.t_arr, 0]
        self.qls, dep_vec = self.tandem._step(info_vec)
        self.dep_vec = np.append(self.dep_vec, dep_vec[dep_vec[:, 2]==1]).reshape(-1, 3)

      for i in range(len(self.N_s)):

        coeff = np.append(np.ones(len(self.tandem.arrival_vec_q[i])), -np.ones(len(self.tandem.departure_vec_q[i])))
        arr_dep = np.append(self.tandem.arrival_vec_q[i], self.tandem.departure_vec_q[i])
        ind_sorted = np.argsort(arr_dep)
        assert(not np.sum(arr_dep[ind_sorted]>self.t_slot)), 'arr_dep error'
        arr_dep = np.append(arr_dep[ind_sorted],[self.t_slot])
        coeff = coeff[ind_sorted]
        arr_dep_diff = np.append([arr_dep[0]-(self.t_slot-self.T)], np.diff(arr_dep))
        ql_diff = np.zeros(len(coeff)+1)
        ql_diff[0] = ql_init[i]

        for j in range(1,len(coeff)+1):
          ql_diff[j] = max(0, ql_diff[j-1]+coeff[j-1])
        self.qls_ave[i] = np.sum(ql_diff*arr_dep_diff)/self.T
        self.tandem.arrival_vec_q[i]=[]
        self.tandem.departure_vec_q[i]=[]

    def _get_reward(self, delay_vec, action):
        # ------- Define reward here--------
        return r

    def _inter_arr_gen(self):
        c_a2  = 0.7 #SCV^2
        mean  = 1/self.lambda_rate
        k     = 1/c_a2
        theta = mean/k
        interTa = np.random.gamma(k, theta)
        return interTa
    
    
    def reset(self):

        self.t_arr = 0
        self.job_index = 1
        self.cnt = 1
        self.qls = np.zeros(len(self.N_s), dtype=int)
        self.dep_vec = []
        self.tandem_job_dict = {}
        self.tandem = Tandem(self.N_s, self.Mu_s)
        self.cost = 0
        self.qls_ave = np.zeros(len(self.N_s),dtype=float)
        self.t_slot = 0
        self.t_arr_vec = []
        qls_bin = np.zeros(self.B_max*len(self.N_s),dtype=np.int8)  
        return qls_bin

"----------Defining constituent queueing elements of the network---------------"  


class Tandem():
    def __init__(self, N_s, Mu_s):

        self.N_s = N_s
        self.Mu_s = Mu_s
        self.queue = []
        self.ql = np.zeros(len(N_s), dtype=int)
        self.arrival_vec_q = {}
        self.departure_vec_q = {}
        for i,n_s in enumerate(self.N_s):
          self.queue.append(Queue(n_s, self.Mu_s[i]))
          self.arrival_vec_q[i]=[]
          self.departure_vec_q[i]=[]

    def _step(self, info_vec):
        info_vec_new = np.copy(info_vec)
        for i in range(len(self.N_s)):
          isArr = (info_vec_new[:,2]==1)
          self.arrival_vec_q[i] = np.append(self.arrival_vec_q[i], info_vec_new[isArr,1])
          self.ql[i], departure_vec = self.queue[i]._progress(info_vec_new) 
          if np.shape(departure_vec)[0]>0:
            self.departure_vec_q[i] = np.append(self.departure_vec_q[i],departure_vec[:,1].tolist())
          if np.shape(departure_vec)[0]>1:
            ind_sorted = np.argsort(departure_vec[:,1])
            departure_vec = departure_vec[ind_sorted] 
          info_vec_new = np.append(departure_vec,info_vec[-1]).reshape(-1, 3)
        return self.ql, info_vec_new

class Fork():
    def __init__(self, N_s, Mu_s, P_s):

        self.N_s = N_s
        self.P_s = P_s
        self.queue = []
        self.ql = np.zeros(len(N_s), dtype=int)
        for i,n_s in enumerate(self.N_s):
          self.queue.append(Queue(n_s, Mu_s[i]))

    def _step(self, info_vec):
        info_vec_perFork = {}
        arrival_cnt = info_vec.shape[0]-1
        info_vec_new = np.copy(info_vec)
        picked_queue = np.random.multinomial(1, self.P_s, size=arrival_cnt)
        picked_queue = np.argmax(picked_queue, axis=1)
        for i in range(len(self.N_s)):
          
          info_vec_perFork[i] = np.append(info_vec_new[:-1][picked_queue == i], info_vec_new[-1]).reshape(-1, 3)
          self.ql[i], departure_vec = self.queue[i]._progress(info_vec_perFork[i]) 
          if np.shape(departure_vec)[0]>1:
            ind_sorted = np.argsort(departure_vec[:,1])
            departure_vec = departure_vec[ind_sorted] 
          info_vec_perFork[i] = np.append(departure_vec,info_vec[-1]).reshape(-1, 3)
        return self.ql, info_vec_perFork        

class Join():
    def __init__(self, n_s, mu_s):

        self.ql = 0
        self.queue = Queue(n_s, mu_s)

    def _step(self, info_vec):
        info_vec_new = []
        self.N_branch = len(list(info_vec.keys()))
        for i in range(self.N_branch):
          info_vec_new = np.append(info_vec_new, info_vec[i][:-1]).reshape(-1, 3)
        if np.shape(info_vec_new)[0]>1:
          ind_sorted = np.argsort(info_vec_new[:,1])
          info_vec_new = info_vec_new[ind_sorted] 
        info_vec_new = np.append(info_vec_new,info_vec[i][-1]).reshape(-1, 3)
        self.ql, departure_vec = self.queue._progress(info_vec_new)
        info_vec_new = np.append(departure_vec,info_vec[i][-1]).reshape(-1, 3)
        return self.ql, departure_vec 

class Queue():
    def __init__(self, n_s, mu_s):

        self.n_servers = n_s
        self.n_jobs = 0
        self.ql_vec = [0]
        self.ql_ave = 0
        self.empty_servers = np.arange(n_s)
        self.assigned_servers = []
        self.t_fin = []
        self.ind_fin = []
        self.job_dict = {}
        self.job_dict[0] = {'Tw': 0.0, 'Ts':0.0}
        self.mu_s = mu_s
    def _progress(self, info_vec):
        # -----Queue length before taking the action (upon job arrival)---------
        departure_vec = []
        assert(np.shape(info_vec)[0]>=1), 'error'
        for j in range(np.shape(info_vec)[0]-1):
          job_index = int(info_vec[j][0])
          time = info_vec[j][1]
          isArrival = info_vec[j][2]
          self.ql = max(self.n_jobs - self.n_servers, 0) # ---before arrival----

          if isArrival:
              if self.n_jobs < self.n_servers:
                  t_ent = time
                  self.empty_servers = [x for x in range(self.n_servers) if x not in self.assigned_servers]
                  self.assigned_servers = np.append(self.assigned_servers, random.choice(self.empty_servers))

              else:
                  # -------finding the time that each server gets empty---------
                  t_available = [np.max(self.t_fin[self.assigned_servers == i]) for i in range(self.n_servers)]
                  # --------------pick the earliest server available------------
                  picked_server = np.argmin(t_available)
                  t_ent = max(time, t_available[picked_server])
                  self.assigned_servers = np.append(self.assigned_servers, picked_server)

              t_s = self._service_gen()
              self.t_fin = np.append(self.t_fin, t_ent + t_s)
              self.ind_fin = np.append(self.ind_fin, job_index)
              self.n_jobs += 1
              self.job_dict[job_index] = {'Ta': time, 'Td': t_ent + t_s, 'Ts': t_s, 'Tw': t_ent- time,
                                              'Ba': self.ql}

          next_time = info_vec[j+1][1]
          self.n_jobs -= np.sum(np.array(self.t_fin) < next_time)
          served_jobs = np.arange(len(self.t_fin))[np.array(self.t_fin) < next_time]
          for i in served_jobs:
            departure_vec.append([int(self.ind_fin[i]), self.t_fin[i], 1])
          self.t_fin = np.delete(self.t_fin, served_jobs)
          self.ind_fin = np.delete(self.ind_fin, served_jobs)
          self.assigned_servers = np.delete(self.assigned_servers, served_jobs)
          

        if np.shape(info_vec)[0]==1:
          next_time = info_vec[0][1]
          self.n_jobs -= np.sum(np.array(self.t_fin) < next_time)
          served_jobs = np.arange(len(self.t_fin))[np.array(self.t_fin) < next_time]
          for i in served_jobs:
            departure_vec.append([int(self.ind_fin[i]), self.t_fin[i], 1])
          self.t_fin = np.delete(self.t_fin, served_jobs)
          self.ind_fin = np.delete(self.ind_fin, served_jobs)
          self.assigned_servers = np.delete(self.assigned_servers, served_jobs)

        # queue length of this stage before the next arrival to the first stage
        QL = max(self.n_jobs - self.n_servers, 0) 
        return QL, np.array(departure_vec)

    def _service_gen(self):
        c_s2 = 0.8 #SCV^2
        mean = 1/self.mu_s
        k = 1/c_s2
        theta = mean/k
        Ts = np.random.gamma(k, theta)
        return Ts