In [8]:
import numpy as np
import scipy as sp
import torch as th
import import_ipynb
import gym
from stable_baselines3.common.env_checker import check_env
from collections import deque
from SV_channel import SV_channel
from HybridMassiveMIMO import HybridMassiveMIMO

In [9]:
class RL_model(gym.Env):
    """
    Creation Hybrid Massive MIMO system as an environment for Deep Reinforcement Learning tasks
    
    In this version: channel is assumed to be stationary (block-fading) and it is estimated on Rx-side;
    noise power is estimated on Rx-side; channel capacity is calculated by signal constellation.
    """
    # main system parameters
    N_s: int                              # number of data streams
    N_rf: int                             # number of RF chains
    N_tx: tuple[int]                      # number of Tx antennas (UPA)
    N_rx: tuple[int]                      # number of Rx antennas (UPA)
    
    # OFDM parameters
    N_ofdm: int                           # number of OFDM symbols
    N_ifft: int                           # number of points for fft
    N_c: int                              # number of sub-carriers
    N_gi: int                             # guard interval length
    mapping: dict[str, np.complex128]     # map for 1st step modulation
        
    # environment parameters
    N_p: int                              # number of possible phases for UPA elements
    SNR: tuple[int]                       # parameters for SNR uniform distribution [dB]
    static_chan: bool                     # is channel matrix constant through one episode
    cl: int                               # number of clusters
    rays: int                             # number of rays in each cluster
    reward_param: int                     # max memory for reward storage
        
    def __init__(self, N_s: int = 4,
                     N_rf: int = 4, 
                     N_tx: tuple[int] = (4, 4), 
                     N_rx: tuple[int] = (2, 2),
                     N_ofdm: int = 2,
                     N_ifft: int = 512,
                     N_c: int = 450,
                     N_gi: int = 64,
                     mapping: dict[str, np.complex128] = {"00": np.complex128(1 + 0j), "01": np.complex128(0 + 1j), \
                                                         "10": np.complex128(0 - 1j), "11": np.complex128(-1 + 0j)},
                     N_p: int = 1,
                     SNR: tuple[int] = (-5, 30),
                     static_chan: bool = False,
                     cl: int = 3,
                     rays: int = 2,
                     reward_param = 5):
        super(RL_model, self).__init__()
        self.N_s = N_s
        self.N_rf = N_rf
        self.N_tx = N_tx[0] * N_tx[1]
        self.N_rx = N_rx[0] * N_rx[1]
        self.N_p = N_p
        self.SNR = SNR
        self.static_chan = static_chan
        self.reward_param = reward_param
        
        # model of full MIMO system
        self.MIMO_system = HybridMassiveMIMO(N_s = N_s,
                     N_rf = N_rf,
                     N_tx = self.N_tx,
                     N_rx = self.N_rx,
                     N_ofdm = N_ofdm,
                     N_ifft = N_ifft,
                     N_c = N_c,
                     N_gi = N_gi,
                     mapping = mapping)
        
        self.max_interactions = self.N_tx * self.N_rf * (2 ** self.N_p)
        
        self.phase_shift = np.pi / (2 ** self.N_p - 1)
        self.phases = np.linspace(-np.pi / 2, np.pi / 2, 2 ** self.N_p)
        self.actions_n = 2 * self.N_rf * self.N_tx           # all possible actions for antenna elements adjustment
        
        # channel model
        self.SV_chan = SV_channel(cl = cl, rays = rays, d_phi = 5, d_thetta = 5, a_r = N_rx, a_t = N_tx)
        
        if self.static_chan:
            self.H = self.SV_chan.compute_channel()
            
            while np.linalg.det(np.matmul(self.H.conj().T, self.H)) == 0:
                self.H = self.SV_chan.compute_channel()
        
        # convert effective channel matrix to vector for MLP realisation
        self.observation_space = gym.spaces.Box(low = -np.inf, high = np.inf, shape = (2 * self.N_rx * self.N_rf,))
        self.action_space = gym.spaces.Discrete(self.actions_n + 1)
        
        
    def reset(self) -> np.ndarray:
        self.interactions_n = 0
        self.C_new = self.C_old = self.C = 0
        self.reward_mem = deque([0,], maxlen = self.reward_param)
        self.done = False
        
        if ~self.static_chan:
            self.H = self.SV_chan.compute_channel()
            
            while np.linalg.det(np.matmul(self.H.conj().T, self.H)) == 0:
                self.H = self.SV_chan.compute_channel()
        
        self.F_rf = np.exp(1j * np.random.choice(self.phases, (self.N_tx, self.N_rf)))
        
        # estimate channel matrix with noise effect
        self.SNR_episode = np.random.uniform(*self.SNR)
        H_est = self.MIMO_system.compute_channel_estimation(self.H, self.SNR_episode)
        H_est = np.mean(H_est, axis = 1)
        
        H_eff = np.matmul(H_est, self.F_rf)
#         return np.vstack((np.real(H_eff)[np.newaxis, :], np.imag(H_eff)[np.newaxis, :]))
        return np.hstack((np.ravel(np.real(H_eff)), np.ravel(np.imag(H_eff))))


    def step(self, action: int) -> (np.ndarray, int, bool, dict):
        i, j = (action // (self.N_rf * 2), action % (self.N_rf * 2) // 2)
        if action == self.actions_n:
            pass
        elif action % 2 == 0:
            self.F_rf[i][j] *= np.exp(1j * self.phase_shift)
        else:
            self.F_rf[i][j] *= np.exp(-1j * self.phase_shift)
        
        self.C_new, H_eff = self.MIMO_system.compute_C(self.F_rf, self.H, self.SNR_episode)
        dC = self.C_new - self.C_old
        self.C_old = self.C_new
        
        self.C += dC
        reward = dC
        
        self.interactions_n += 1
        self.reward_mem.append(dC)
        
        if (self.interactions_n == self.max_interactions) | (np.std(self.reward_mem) < 0.02):
            self.done = True
            
#         observation = np.vstack((np.real(H_eff)[np.newaxis, :], np.imag(H_eff)[np.newaxis, :]))
        observation = np.hstack((np.ravel(np.real(H_eff)), np.ravel(np.imag(H_eff))))
        
        # Only for debugging and collection statistical data
        info = {"SNR": self.SNR_episode, "C": self.C}
        
        return observation, reward, self.done, info

In [10]:
# Environment checking
env = RL_model()
check_env(env)