In [1]:
#   ____                            _____     _____                   __          __        _        _                 
#  / __ \                     /\   |_   _|   / ____|                  \ \        / /       | |      | |                
# | |  | |_ __   ___ _ __    /  \    | |    | |  __ _   _ _ __ ___     \ \  /\  / /__  _ __| | _____| |__   ___  _ __  
# | |  | | '_ \ / _ \ '_ \  / /\ \   | |    | | |_ | | | | '_ ` _ \     \ \/  \/ / _ \| '__| |/ / __| '_ \ / _ \| '_ \ 
# | |__| | |_) |  __/ | | |/ ____ \ _| |_   | |__| | |_| | | | | | |     \  /\  / (_) | |  |   <\__ \ | | | (_) | |_) |
#  \____/| .__/ \___|_| |_/_/    \_\_____|   \_____|\__, |_| |_| |_|      \/  \/ \___/|_|  |_|\_\___/_| |_|\___/| .__/ 
#        | |                                         __/ |                                                      | |    
#        |_|                                        |___/                                                       |_|    

In [2]:
!pip install gym
#!pip install torch==1.5.0+cpu -f https://download.pytorch.org/whl/torch_stable.html



In [3]:
import json
from itertools import count

import gym
from gym import spaces

import torch
from torch import Tensor

import random
from random import randint

from typing import Dict, List, Optional, Callable, Any, Tuple, Union, Type

import pathlib
import collections

![alt text](https://www.taitradioacademy.com/wp-content/uploads/2014/10/Image-25-800x450.png)

In [4]:
# <START> ------ CODE BLOCK 1
# <END> -------- CODE BLOCK 1

class TDMAGym(gym.Env):
  NO_ACTION = 0  # no action on sub-frame
  UPLINK_ACTION = 1  # use sub-frame as up-link role (user [Tx] --> base-station [Rx])
  DOWNLINK_ACTION = 2  # use sub-frame as down-link role (base-station [Tx] --> user [Rx])

  def __init__(self, n_freqs: int, n_subframes: int, horizon: int):
    super(TDMAGym, self).__init__()  # initialize gym.Env base class
    if n_subframes % 2 != 0:  # since a link is separated in up and down link we need even number of subframe
      raise ValueError('n_subframes must be an even number')

    # action space dict includes frequency and sub-frames action
    self.action_space = spaces.Dict(
      {
        'freq': spaces.Discrete(n_freqs),  # frequency used
        'subframes': spaces.MultiDiscrete([3] * n_subframes)  # sub-frames role actions
      }
    )
    # observation space dict includes the time-step, last action and its sub-frames successes
    self.observation_space = spaces.Dict(
      {
        'time_step': spaces.Discrete(horizon + 1),  # include terminal state
        'last_a': self.action_space,  # last action done by the agent
        'last_success': spaces.MultiDiscrete([2] * n_subframes)  # what sub-frames received an acknowledgement
      }
    )
    self.reward_range = 0.0, float(n_subframes)  # action-reward range between 0 and the number of sub-frames

    self.freq_sf: Dict[int, Tuple[int, Tensor]] = dict()  # frequency and sub-frames role assignation w.r.t. to time-step
    for t in range(horizon):  # do not include terminal state
      freq = randint(0, n_freqs - 1)  # choose random frequency to do Tx
      ul_subframes = random.sample(range(n_subframes), k=n_subframes // 2)  # choose which sub-frames will be used as up-link
      dl_subframes = [
        sf for sf in range(n_subframes) if sf not in ul_subframes  # choose the rest of sub-frames as down-link
      ]
      sf = torch.zeros(n_subframes, dtype=torch.int)  # initialize sub-frames roles
      sf.fill_(self.NO_ACTION)  # set all sub-frames to no-action
      sf[ul_subframes] = self.UPLINK_ACTION  # set all up-link sub-frames to uplink-action value
      sf[dl_subframes] = self.DOWNLINK_ACTION  # set all down-link sub-frames to downlink-action value
      self.freq_sf[t] = freq, sf  # set sub-frames role w.r.t. time-step

    self.hor = horizon  # gym horizon to know when we are DONE
    self.t = 0  # initial time-step / observation

  def step(self, action: Dict) -> Tuple[  # step function to interact with the gym
    Dict, float, bool, Dict[str, Any]
  ]:
    if self.t >= self.hor:  # terminal state reached
      return collections.OrderedDict(), 0.0, True, {}  # return empty interaction

    assert self.action_space.contains(action)  # action is within gym action-space assertion

    freq, sf = self.freq_sf[self.t]  # get frequency and sub-frames role for current time-step
    a_freq, a_sf = action['freq'], action['subframes']  # fetch frequency and sub-frames role from the action

    if freq == a_freq:  # gym and action frequency are the same
      sf_success = [
        int(gym_sf == act_sf) for gym_sf, act_sf in zip(sf, a_sf)  # action sub-frames role is correct w.r.t. the gym's sub-frames
      ]
    else:
      sf_success = [0] * len(self.action_space['subframes'].nvec)  # wrong frequency, no sub-frames successes

    self.t += 1  # increment our time-step / observation
    o = collections.OrderedDict(
      time_step=self.t, last_a=action,   # observation that we will return, time-step, last action
      last_success=[int(sf_s) for sf_s in sf_success]  # and successes from last action
    )
    r = float(sum(sf_success))  # sum successes to give the reward
    return o, r, False, {}   # gyms always returns <obs, reward, if terminal obs reached, debug/info dictionary>

  def reset(self) -> Dict:  # reset our gym for a new episode
    self.t = 0  # initial time-step
    obs = collections.OrderedDict(time_step=self.t)  # initial observation
    return obs

  def render(self, mode='human'):  # gym visual rendering (e.g. text, image, plot, 3D frame, etc.)
    if self.t < self.hor:  # terminal gym state not reached
      # show current time-step, frequency and sub-frames role
      print(
        'rendering={{time_step={time_step}, freq={freq}, sf={sf}}}'
          .format(
          time_step=self.t, freq=self.freq_sf[self.t][0],
          sf=str(self.freq_sf[self.t][1].tolist()).replace(' ', '')
        )
      )
    else:  # terminal gym state reached and out of horizon
      print('rendering={terminal state reached, out of horizon}')

In [5]:
def build_epsilon_fn(steps: int) -> Callable[[int], float]:  # epsilon function builder
  log_epsilon_space = torch.logspace(1.0, -1.0, steps=steps, base=10.0) / 10.0  # epsilon log-space {1.0, ..., 0.0}

  def epsilon_fn(episode_i: int):  # epsilon function
    if episode_i >= steps:  # max step reached
      return 0.0
    return log_epsilon_space[episode_i].item()  # return epsilon value

  return epsilon_fn  # return our function as an callable object

In [6]:
class TDMAQLearningAgent:

  def __init__(
      self, lr: float, gamma: float, act_space: spaces.Dict,
      obs_space: spaces.Dict, eps_fn_steps: int
  ):
    self.act_space = act_space  # action-space from the gym environment
    self.obs_space = obs_space  # observation-space from the gym environment

    self.lr = lr  # learning rate
    self.gamma = gamma  # gamma parameter

    # <START> ------ CODE BLOCK 2.A
    self.qtable: Dict[str, Tensor] = dict()  # Q-table with observation encoded as text string as keys for a 3D Q-table
    self.qtable_dims: Tuple[int, int, int] = (
      act_space['freq'].n, act_space['subframes'].nvec.size,  # 3D Q-table dimensions (n_freqs X n_subframes x n_roles)
      act_space['subframes'].nvec.max()
    )
    # <END> -------- CODE BLOCK 2.A

    self.eps_fn_steps = eps_fn_steps  # number of steps (episodes) in epsilon log-space
    self.eps_fn: Callable[[int], float] = build_epsilon_fn(eps_fn_steps)  # build our epsilon function
    self.episode_i: Optional[int] = None  # keep track of the episode number (e.g. for epsilon)
    self.eps: Optional[float] = None  # epsilon value as a property of our agent

  def act(self, o: Dict, eps: Optional[float] = None) -> Dict:  # choose action
    if eps is None:
      eps = self.eps  # not epsilon as argument, use agent's current epsilon value

    o_enc = self.encode_obs(o)  # encode obs to a string "{time_step}:{freq}:{subframes}:{successes}"

    # <START> ------ CODE BLOCK 2.B 
    if o_enc not in self.qtable:
      self.qtable[o_enc] = torch.zeros(self.qtable_dims)  # add new Q-table for new obs, Q-values are zero

    if random.uniform(1.0, 0.0) < eps:  # epsilon-greedy condition for exploration
      return self.explore(o)  # exploring random action

    max_sf_values, max_sf_indices = self.qtable[o_enc].max(dim=2)  # get values and index of best actions for all sub-frame Q-values
    max_freq = max_sf_values.sum(dim=1).argmax()  # get frequency with the best Q-values for all sub-frames

    if max_sf_indices.ndimension() > 1:  # multiple equal maximums
      max_sf_indices = max_sf_indices[max_freq]  # if Q-value equality occurs, take random one

    # ------- same algorithm but using python loops instead of pytorch -------
    # max_sf_values2 = []
    # max_sf_indices2 = []
    # for i in range(self.qtable[o_enc].shape[0]):
    #     j_vals = []
    #     j_inds = []
    #     for j in range(self.qtable[o_enc].shape[1]):
    #         max_k = float(max(self.qtable[o_enc][i, j]))
    #         for k in range(self.qtable[o_enc].shape[2]):
    #             if self.qtable[o_enc][i, j, k] == max_k:
    #                 j_inds.append(k)
    #                 break
    #         j_vals.append(max_k)
    #     max_sf_values2.append(j_vals)
    #     max_sf_indices2.append(j_inds)

    # max_freq_values2 = []
    # for i in range(self.qtable[o_enc].shape[0]):
    #     max_freq_values2.append(sum(max_sf_values2[i]))

    # max_freq2 = float('-inf')
    # for i in range(self.qtable[o_enc].shape[0]):
    #     if max_freq_values2[i] == max(max_freq_values2):
    #         max_freq2 = i
    #         break

    # assert torch.all(max_sf_values == torch.tensor(max_sf_values2))
    # assert torch.all(max_sf_indices == torch.tensor(max_sf_indices2))
    # assert torch.all(max_freq == torch.tensor(max_freq2))
    # -----------------------------------------------------------------------

    a = collections.OrderedDict(
      freq=int(max_freq), subframes=max_sf_indices.tolist()  # create and return action with best Q-values
    )
    # <END> -------- CODE BLOCK 2.B

    assert (
      isinstance(a, collections.OrderedDict) and
      self.act_space.contains(a)
    )
    return a

  def explore(self, o: Dict) -> Dict:  # action-space exploration
    # <START> ------ CODE BLOCK 2.C
    a = collections.OrderedDict(
      freq=randint(0, self.act_space['freq'].n - 1),  # random frequency
      subframes=[
        randint(0, n - 1) for n in self.act_space['subframes'].nvec  # random role action for each sub-frame
      ]
    )
    # <END> ------ CODE BLOCK 2.C
    assert (
      isinstance(a, collections.OrderedDict) and
      self.act_space.contains(a)
    )
    return a  # random action

  def update(
      self, o: Dict, a: Dict, r: float, o_prime: Dict  # agent update function (e.g. Q-learning update)
  ) -> float:
    o_enc = self.encode_obs(o)  # encode obs to a string "{time_step}:{freq}:{subframes}:{successes}"
    o_prime_enc = self.encode_obs(o_prime)  # encode obs to a string "{time_step}:{freq}:{subframes}:{successes}"

    # <START> ------ CODE BLOCK 2.D
    
    if o_enc not in self.qtable:
      self.qtable[o_enc] = torch.zeros(self.qtable_dims)  # add new Q-table for new obs, Q-values are zero

    if o_prime_enc not in self.qtable:
      self.qtable[o_prime_enc] = torch.zeros(self.qtable_dims)  # add new Q-table for new obs, Q-values are zero

    a_inds = a['freq'], range(len(a['subframes'])), a['subframes']  # get Q-table indices of action
    a_vals = self.qtable[o_enc][a_inds]  # TD-source (e.i. TD-error = [TD-target - TD-source]), also old Q-values to compute delta update

    max_prime_sf_vals, _ = self.qtable[o_prime_enc].max(dim=2)  # get values and index of best actions for all sub-frame Q-values
    a_prime_val = max_prime_sf_vals[max_prime_sf_vals.sum(dim=1).argmax()]  # get frequency with the best Q-values for all sub-frames

    # ------- same algorithm but using python loops instead of pytorch -------
    # a_vals2 = []
    # for i in range(len(a['subframes'])):
    #     a_vals2.append(
    #         self.qtable[o_enc][a['freq'], i, a['subframes'][i]]
    #     )
    # assert torch.all(a_vals == torch.tensor(a_vals2))

    # max_prime_sf_values2 = []
    # for i in range(self.qtable[o_prime_enc].shape[0]):
    #     j_vals = []
    #     for j in range(self.qtable[o_prime_enc].shape[1]):
    #         max_k = float(max(aa[i, j]))
    #         j_vals.append(max_k)
    #     max_prime_sf_values2.append(j_vals)

    # max_prime_freq2 = max([sum(sf) for sf in max_prime_sf_values2])
    # for i in range(self.qtable[o_prime_enc].shape[0]):
    #     if sum(max_prime_sf_values2[i]) == max_prime_freq2:
    #         max_prime_freq2 = i
    #         break

    # a_prime_val2 = max_prime_sf_values2[max_prime_freq2]
    # assert torch.all(a_prime_val == torch.tensor(a_prime_val2))
    # -----------------------------------------------------------------------

    last_success = torch.tensor(o_prime['last_success'])  # last success for credit assignment of the update

    td_target = (r + self.gamma * a_prime_val) * last_success  # compute Temporal-Difference target (e.i [r + gamma * Q(o', a_max)])
    self.qtable[o_enc][a_inds] += (
        self.lr * (td_target - a_vals) * last_success  # update Q-table (e.i. Q(o, s) <- Q(o, s) + lr * [TD-target - Q(o, a)])
    )

    delta_update = float(sum(self.qtable[o_enc][a_inds] - a_vals))  # compute delta update
    # <END> -------- CODE BLOCK 2.D
    assert isinstance(delta_update, float)
    return delta_update  # return delta update to training loop

  def episode_reset(self, o: Dict, episode_i: int) -> Dict:  # reset agent for a new episode
    self.episode_i = episode_i  # current episode number
    self.eps = self.eps_fn(episode_i)  # epsilon w.r.t. episode number
    return self.act(o, self.eps)  # act upon initial observation

  def save_to_file(self, path: str, overwrite: bool):  # save agent state to file
    file = pathlib.Path(path)  # file path
    if not overwrite and file.exists():  # exception if file exist and overwrite is false
      raise FileExistsError()

    # agent state as a python dictionary
    agent_state: Dict[str, Any] = {
      'gamma': self.gamma,
      'act_space': {
        'n_freq': self.act_space['freq'].n,
        'subframes_nvec': [
          int(n) for n in self.act_space['subframes'].nvec
        ]
      },
      'obs_space': {
        'n_time_step': self.obs_space['time_step'].n
      },
      'qtable': {k: v.tolist() for k, v in self.qtable.items()}
    }

    with file.open('w') as f:  # open file to write in it
      json.dump(agent_state, f)  # write agent state in file

  def load_from_file(self, path: str):  # load agent state from compatible saved state
    file = pathlib.Path(path)  # file path
    if not file.exists():
      return  # file does not exists

    with file.open('r') as f:  # open file to read it
      agent_state = json.load(f)  # load agent state from file

    self.gamma = agent_state['gamma']  # set gamma to loaded value
    self.qtable = {
      k: torch.tensor(v) for k, v in agent_state['qtable'].items()  # set number of actions from loaded value
    }

    assert self.act_space['freq'].n == agent_state['act_space']['n_freq']  # assert compatiblity
    assert [int(n) for n in self.act_space['subframes'].nvec] == (
      agent_state['act_space']['subframes_nvec']    # assert compatiblity
    )
    assert self.obs_space['time_step'].n == (
      agent_state['obs_space']['n_time_step']    # assert compatiblity
    )

  @staticmethod
  def encode_obs(o: Dict) -> str:  # encoding observation to a string to use it as Q-table key
    encoded = '{time_step}:{last_a}:{last_success}'.format(
      time_step=o['time_step'] if 'time_step' in o else '',
      last_a=(
        TDMAQLearningAgent.encode_act(o['last_a'])
        if 'last_a' in o else ''
      ),
      last_success=o['last_success'] if 'last_success' in o else ''
    ).replace(' ', '')
    return encoded  # return encoded observation as string

  @staticmethod
  def encode_act(a: Dict) -> str:  # encoding action to a string to use it as part of Q-table key
    encoded = '{freq}:{sf}'.format(
      freq=a['freq'], sf=a['subframes']
    ).replace(' ', '')
    return encoded  # return encoded action as string

In [7]:


random.seed(1234)  # python random number generator seed
torch.manual_seed(1234)  # pytorch random number generator seed

n_freqs = 4  # number of frequencies
n_subframes = 4  # number of sub-frames within a time-step frame
horizon = 15  # horizon of our gym (episodes)

gym_env = TDMAGym(n_freqs, n_subframes, horizon)  # our gym environment

lr = 0.55  # learning rate
gamma = 0.0  # gamma parameter
eps_fn_steps = 1500  # number of steps (episodes) in epsilon log-space

# initialize our Rx agent with learning parameters and gym parameters
agent = TDMAQLearningAgent(
  lr, gamma, gym_env.action_space, gym_env.observation_space, eps_fn_steps
)
#agent.load_from_file('tdma_agent_1.json')    # load previously saved agent

show_verbose = False  # print information for debugging
render_gym = False  # render gym

# print simulation parameters
print(
  'model: {{n_freqs={n_freqs}, n_subframes:{n_subframes},'
  ' horizon={horizon}}}, '
  'agent: {{lr={lr}, gamma={gamma}, eps_fn_steps={eps_fn_steps}}}'.format(
    n_freqs=n_freqs, n_subframes=n_subframes, horizon=horizon, lr=lr,
    gamma=gamma, eps_fn_steps=eps_fn_steps
  )
)

running_len = 5  # length of our running training data
running_delta = []  # running delta (e.g. the last running_len delta update)
running_acc = []  # running accuracy (e.g. the last running_len accuracy)

for episode_i in count():  # training loop
  if show_verbose:
    print('starting episode {episode_i}...'.format(episode_i=episode_i))

  delta_update: List[float] = []  # delta update of our Q-table
  n_successes: int = 0  # number of optimal actions (actions with maximum reward)
  cumul_r: float = 0.0  # cumulative reward

  # <START> ------ CODE BLOCK 3.A
  o = gym_env.reset()  # reset gym for new episode
  a = agent.episode_reset(o, episode_i)  # reset agent with initial gym observation and episode number
  # <END> -------- CODE BLOCK 3.A

  print(a)

  for t in count():  # episode loop
    if render_gym:
      gym_env.render()  # show gym rendering
    o_prime, r, done, _ = gym_env.step(a)  # interact with the gym, get environment transition
    if done:
      break  # terminal gym observation reached, out of horizon

    # <START> ------ CODE BLOCK 3.B
    delta_update.append(agent.update(o, a, r, o_prime))  # update agent with transition, get delta update
    cumul_r += r  # add reward to cumulative reward
    n_successes += int(r == 1.0 * n_subframes)  # success if optimal action-reward of 1.0 * number of sub-frames
    # <END> -------- CODE BLOCK 3.B

    if show_verbose:  # show transition of our model (e.i. <o, a, r, o'>)
      print(
        'transition={{o={o}, a={a}, r={r}, o_prime={o_prime}}},'
        ' delta_update={delta}'.format(
          o=agent.encode_obs(o), a=agent.encode_act(a),
          r=r, o_prime=agent.encode_obs(o_prime),
          delta=delta_update[-1]
        )
      )
    # <START> ------ CODE BLOCK 3.C
    o = o_prime  # increment to next observation (e.g. next observation)
    a = agent.act(o)  #  act upon next observation
    # <END> -------- CODE BLOCK 3.C

  # strip running data because we reached running length
  if len(running_acc) >= running_len or len(running_delta) >= running_len:
    running_acc.pop(0)  # delete oldest running accuracy
    running_delta.pop(0)  # delete oldest running update delta

  # <START> ------ CODE BLOCK 3.D
  running_acc.append(n_successes / horizon)  # add latest accuracy to running data
  running_delta.append(sum(delta_update))  # add latest update delta to running data
  # <END> -------- CODE BLOCK 3.D

  # show episode results
  print(
    'episode {episode_i}: cumul_reward={cumul_r}, accuracy:{acc:0.5}, '
    'cumul_delta={cumul_delta:0.5}, eps={eps:0.5}'.format(
      episode_i=episode_i, cumul_r=cumul_r, acc=running_acc[-1],
      cumul_delta=running_delta[-1], eps=agent.eps
    )
  )

  # training stop conditions
  if (
      all([acc == 1.0 for acc in running_acc])  # all running accuracy are maximized
      and all([delta < 0.0001 for delta in running_delta])  # all running delta update a lower than 0.0001
      and episode_i >= running_len  # running data have reached running length
  ) or episode_i >= eps_fn_steps + running_len:  # epsilon was 0 for all running data (nothing will change)
    break  # exit training loop

agent.save_to_file('tdma_agent_1.json', overwrite=True)  # save agent state to file
gym_env.close()  # close gym environment

model: {n_freqs=4, n_subframes:4, horizon=15}, agent: {lr=0.55, gamma=0.0, eps_fn_steps=1500}
OrderedDict([('freq', 3), ('subframes', [2, 1, 0, 0])])
episode 0: cumul_reward=0.0, accuracy:0.0, cumul_delta=0.0, eps=1.0
OrderedDict([('freq', 3), ('subframes', [0, 2, 0, 1])])
episode 1: cumul_reward=5.0, accuracy:0.0, cumul_delta=4.95, eps=0.99693
OrderedDict([('freq', 3), ('subframes', [0, 0, 2, 1])])
episode 2: cumul_reward=8.0, accuracy:0.0, cumul_delta=9.295, eps=0.99387
OrderedDict([('freq', 0), ('subframes', [0, 1, 0, 1])])
episode 3: cumul_reward=2.0, accuracy:0.0, cumul_delta=2.2, eps=0.99083
OrderedDict([('freq', 1), ('subframes', [1, 0, 2, 0])])
episode 4: cumul_reward=3.0, accuracy:0.0, cumul_delta=2.75, eps=0.98779
OrderedDict([('freq', 0), ('subframes', [2, 1, 0, 2])])
episode 5: cumul_reward=5.0, accuracy:0.0, cumul_delta=7.15, eps=0.98476
OrderedDict([('freq', 0), ('subframes', [1, 2, 0, 1])])
episode 6: cumul_reward=6.0, accuracy:0.0, cumul_delta=4.4, eps=0.98174
OrderedDi