In [23]:
import os
# Configure GPU 
gpu_num = 1 # Use "" to use the CPU
os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_num}"
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

# Import Sionna
import sionna
sionna.config.xla_compat=True 

# Configure the notebook to use only a single GPU and allocate only as much memory as needed
# For more details, see https://www.tensorflow.org/guide/gpu
import tensorflow as tf
gpus = tf.config.list_physical_devices('GPU')


# Set global random seed for reproducibility
            
import numpy as np
import sionna.rt as rt
from sionna.constants import PI
from sionna.channel import cir_to_ofdm_channel
import matplotlib.pyplot as plt
import utils

if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e) 

tf.random.set_seed(1)

In [24]:
# set mode as f2f for reproducing Fig.3, f2s for Fig.4, s2f for Fig.5 respectively.
mode = "s2f"

In [25]:
# set scatterers
scatter_infos = []

# scatter modeling by PCA
# scatterer 0
mat_type_table = np.ones([10, 10])
for ii in range(3):
    for jj in range(ii+1):
        mat_type_table[(ii*3+1):(ii*3+3), ((jj)*3+1):(jj*3+3)] = 0
mat_type_table = tf.convert_to_tensor(mat_type_table, dtype=tf.int32)
roughness = tf.convert_to_tensor([10, 1e-3])
em_prop = tf.convert_to_tensor([tf.complex(6.31, -0.26), tf.complex(5.24, -0.34)])
roughness_table, em_prop_table = utils.Scatter_info.gen_table(mat_type_table, roughness, em_prop)
tile_length = 3.06
tile_width = 3.06
transform = tf.convert_to_tensor([[0., 0., 0.], [0., 0., 0.]])
scatter_infos.append(utils.Scatter_info(roughness_table, em_prop_table, tile_length, tile_width, transform))

#scatterer 1
mat_type_table = np.ones([5, 5])
for ii in range(5):
    for jj in range(ii+1):
        mat_type_table[ii,jj] = 0
mat_type_table = tf.convert_to_tensor(mat_type_table, dtype=tf.int32)
roughness = tf.convert_to_tensor([10, 1e-3])
em_prop = tf.convert_to_tensor([tf.complex(6.31, -0.26), tf.complex(5.24, -0.34)])
roughness_table, em_prop_table = utils.Scatter_info.gen_table(mat_type_table, roughness, em_prop)
tile_length = 3
tile_width = 3
transform = tf.convert_to_tensor([[7., 18., 9.], [0., 0., -PI/2]])
scatter_infos.append(utils.Scatter_info(roughness_table, em_prop_table, tile_length, tile_width, transform))

#scatterer 2
mat_type_table = np.ones([5, 5])
for ii in range(5):
    for jj in range(ii+1):
        mat_type_table[ii,jj] = 0
mat_type_table = tf.convert_to_tensor(mat_type_table, dtype=tf.int32)
roughness = tf.convert_to_tensor([10, 1e-3])
em_prop = tf.convert_to_tensor([tf.complex(6.31, -0.26), tf.complex(5.24, -0.34)])
roughness_table, em_prop_table = utils.Scatter_info.gen_table(mat_type_table, roughness, em_prop)
tile_length = 3
tile_width = 3
transform = tf.convert_to_tensor([[-7., -18., 9.], [0., 0., PI/2]])
scatter_infos.append(utils.Scatter_info(roughness_table, em_prop_table, tile_length, tile_width, transform))

# sequence: elm_2, elm_6, elm_8
scatter_labels = ['elm__2','elm__6','elm__8']
scatter_ids = []

scene = rt.load_scene("./scene/multiple_scatters/multiple_scatters.xml")
for ii in range(len(scatter_labels)):
    scatter_ids.append(scene.objects[scatter_labels[ii]].object_id)

id_hash = np.ones(max(scatter_ids) + 1) * -1
for ii in range(len(scatter_labels)):
    id_hash[scene.objects[scatter_labels[ii]].object_id] = ii
id_hash = tf.convert_to_tensor(id_hash, dtype=tf.int32)

em_property = utils.Em_property(scatter_infos, id_hash)

scene.frequency = 3e9
scene.radio_material_callable = utils.Radio_material(em_property=em_property)
scene.scattering_pattern_callable = utils.Scatter(em_property=em_property)
tx_antenna_num = 4
rx_antenna_num = 4

scene.tx_array = rt.PlanarArray(num_rows=tx_antenna_num, num_cols=1, 
                                vertical_spacing=0.5, 
                                horizontal_spacing=0.5, 
                                pattern="hw_dipole", 
                                polarization="V")
scene.rx_array = rt.PlanarArray(num_rows=rx_antenna_num, num_cols=1, 
                                vertical_spacing=0.5, 
                                horizontal_spacing=0.5, 
                                pattern="hw_dipole", 
                                polarization="V")

scene.add(rt.Transmitter(name="Tx", 
                         position=[-50, 0, 50], 
                         orientation=[0, PI/4, 0]))


In [26]:
BW = 150e6
num_freq = 11
frequencies = np.linspace(scene.frequency - 0.5 * BW, scene.frequency + 0.5 * BW, num_freq)
frequencies = tf.convert_to_tensor(frequencies)
frequencies = tf.cast(frequencies, tf.float32)
noise = 1e-9
SNR = 1 / noise

train_size = 2000
cube_length = 3
random_phase = True
#specify the antenna id used to calibrate roughness parameter
tx_for_train = 0
rx_for_train = 0
#specify the subcarrier id used to calibrate roughness parameter
sc_for_train = 0

In [27]:
# generate train set in Ωtrain
theta = 2 * PI / 3 * np.random.rand() + PI * 6
height_shift = 6 * np.random.rand() - 3

train_position = [9.38, 7.32, 2.40]

for keys in scene.receivers:
    scene.remove(keys)
for jj in range(train_size):
    Rx_pos = cube_length * np.random.rand(3) + np.array(train_position)
    scene.add(rt.Receiver(name=f"Rx_{jj}", position=Rx_pos, orientation=[0, -PI/4, 0]))
paths = scene.compute_paths(check_scene=False, 
                                    num_samples=1e5,
                                    los=False,
                                    reflection=False, 
                                    scattering=True, 
                                    scat_random_phases=random_phase, 
                                    scat_keep_prob=1)

h_f = cir_to_ofdm_channel(frequencies, paths.a, paths.tau)

# When calculating CCM, we only consider the correlation between one of Tx antennas/ Rx antennas/ subcarriers
# at one time, so the other two need to be specified.

if mode[0] == 'f':
    h_f = tf.squeeze(tf.squeeze(h_f)[:,rx_for_train,tx_for_train,:])
else:
    h_f = tf.squeeze(tf.squeeze(h_f)[:,:,tx_for_train,sc_for_train])

R_hf = tf.matmul(tf.transpose(h_f, conjugate=True), h_f) / train_size

In [28]:
# generate test set in Ωtest
theta = 2 * PI / 3 * np.random.rand() + PI * 6
height_shift = 6 * np.random.rand() - 3

test_position = [3.76, 11.30, 4.71]

for keys in scene.receivers:
    scene.remove(keys)
for jj in range(train_size):
    Rx_pos = cube_length * np.random.rand(3) + np.array(test_position)
    scene.add(rt.Receiver(name=f"Rx_{jj}", position=Rx_pos, orientation=[0, -PI/4, 0]))
paths = scene.compute_paths(check_scene=False, 
                                    num_samples=1e5,
                                    los=False,
                                    reflection=False, 
                                    scattering=True, 
                                    scat_random_phases=random_phase, 
                                    scat_keep_prob=1)

h_f = cir_to_ofdm_channel(frequencies, paths.a, paths.tau)
# When calculating CCM, we only consider the correlation between one of Tx antennas/ Rx antennas/ subcarriers
# at one time, so the other two need to be specified.

if mode[2] == 'f':
    h_f = tf.squeeze(tf.squeeze(h_f)[:,rx_for_train,tx_for_train,:])
else:
    h_f = tf.squeeze(tf.squeeze(h_f)[:,:,tx_for_train,sc_for_train])
R_hf_test = tf.matmul(tf.transpose(h_f, conjugate=True), h_f) / train_size

In [29]:
# cal corrmat for learning
scatter_infos = []
R_list = []
v_list = []
# scatterer 0 
scatter_length = 30.6
scatter_width = 30.6
corr_len = 4
transform = tf.convert_to_tensor([[0., 0., 0.], [0., 0., 0.]])

tile_row, tile_colomn, tile_length, tile_width = utils.cal_size(scatter_length, scatter_width, corr_len)
R0, v0 = utils.cal_R(tile_row, tile_colomn, tile_length, tile_width, corr_len)
roughness_table = 15 * tf.sigmoid(tf.reshape(tf.matmul(R0, v0, transpose_b=True), (tile_row, tile_colomn)))
em_prop_table = tf.ones_like(roughness_table, dtype=tf.complex64) * tf.complex(5.2, -0.2)
scatter_infos.append(utils.Scatter_info(roughness_table, em_prop_table, tile_length, tile_width, transform))
R_list.append(R0)
v_list.append(v0)

# scatter 1
scatter_length = 15
scatter_width = 15
corr_len = 3
transform = tf.convert_to_tensor([[7., 18., 9.], [0., 0., -PI/2]])

tile_row, tile_colomn, tile_length, tile_width = utils.cal_size(scatter_length, scatter_width, corr_len)
R1, v1 = utils.cal_R(tile_row, tile_colomn, tile_length, tile_width, corr_len)
roughness_table = 15 * tf.sigmoid(tf.reshape(tf.matmul(R1, v1, transpose_b=True), (tile_row, tile_colomn)))
em_prop_table = tf.ones_like(roughness_table, dtype=tf.complex64) * tf.complex(5.2, -0.2)
scatter_infos.append(utils.Scatter_info(roughness_table, em_prop_table, tile_length, tile_width, transform))
R_list.append(R1)
v_list.append(v1)

# scatterer 2 
scatter_length = 15
scatter_width = 15
corr_len = 3
transform = tf.convert_to_tensor([[-7., -18., 9.], [0., 0., PI/2]])

tile_row, tile_colomn, tile_length, tile_width = utils.cal_size(scatter_length, scatter_width, corr_len)
R2, v2 = utils.cal_R(tile_row, tile_colomn, tile_length, tile_width, corr_len)
roughness_table = 15 * tf.sigmoid(tf.reshape(tf.matmul(R2, v2, transpose_b=True), (tile_row, tile_colomn)))
em_prop_table = tf.ones_like(roughness_table, dtype=tf.complex64) * tf.complex(5.2, -0.2)
scatter_infos.append(utils.Scatter_info(roughness_table, em_prop_table, tile_length, tile_width, transform))
R_list.append(R2)
v_list.append(v2)

In [34]:
for ii in range(len(scatter_labels)):
    scatter_ids.append(scene.objects[scatter_labels[ii]].object_id)

id_hash = np.ones(max(scatter_ids) + 1) * -1
for ii in range(len(scatter_labels)):
    id_hash[scene.objects[scatter_labels[ii]].object_id] = ii
id_hash = tf.convert_to_tensor(id_hash, dtype=tf.int32)

em_property = utils.Em_property(scatter_infos, id_hash)

scene.radio_material_callable = utils.Radio_material(em_property=em_property)
scene.scattering_pattern_callable = utils.Scatter(em_property=em_property)

batch_size = 1000
if mode[0] == 'f':
    optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=2e4)
else:
    optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=2e13)
test_loss = []
train_loss = []


In [35]:
# calculate paths in advance to reduce time consumption
for keys in scene.receivers:
    scene.remove(keys)
for jj in range(batch_size):
    Rx_pos = cube_length * np.random.rand(3) + np.array(train_position)
    scene.add(rt.Receiver(name=f"Rx_{jj}", position=Rx_pos, orientation=[0, -PI/4, 0]))
traced_paths_train = scene.trace_paths(check_scene=False, 
                                        num_samples=1e5,
                                        los=False,
                                        reflection=False, 
                                        scattering=True, 
                                        scat_keep_prob=1)

for keys in scene.receivers:
    scene.remove(keys)
for jj in range(batch_size):
    Rx_pos = cube_length * np.random.rand(3) + np.array(test_position)
    scene.add(rt.Receiver(name=f"Rx_{jj}", position=Rx_pos, orientation=[0, -PI/4, 0]))
traced_paths_test = scene.trace_paths(check_scene=False, 
                                    num_samples=1e5,
                                    los=False,
                                    reflection=False, 
                                    scattering=True, 
                                    scat_keep_prob=1)

In [None]:
# train and test
if mode == "f2f":
    num_of_iter = 50
elif mode == "f2s":
    num_of_iter = 200
elif mode == "s2f":
    num_of_iter = 150
for ii in range(num_of_iter):
    
    paths = scene.compute_fields(*traced_paths_test,
                                    check_scene=False,
                                    scat_random_phases=True)
    h_f = sionna.channel.cir_to_ofdm_channel(frequencies, paths.a, paths.tau) 
    if mode[2] == 'f':
        h_f = tf.squeeze(tf.squeeze(h_f)[:,rx_for_train,tx_for_train,:])
    else:
        h_f = tf.squeeze(tf.squeeze(h_f)[:,:,tx_for_train,sc_for_train])
    R_hf_hat = tf.matmul(tf.transpose(h_f, conjugate=True), h_f) / batch_size  

    #loss function for extrapolation in frequency domain
    if mode[2] == 'f':
        I = tf.eye(R_hf_hat.shape[0], dtype=tf.complex64)
        R_y_hat = R_hf_hat + I / SNR
        R_y = R_hf_test + I / SNR
        diff = tf.linalg.inv(R_y_hat) - tf.linalg.inv(R_y)
        loss = tf.linalg.trace(R_y @ diff @ diff) / (SNR ** 2)
        loss = tf.math.real(loss)
    else:
        loss = tf.reduce_mean(tf.square(tf.abs(R_hf_hat - R_hf_test)))
    #Set loss function to be MSE for extrapolation in spatial domain
    
    test_loss.append(loss.numpy())
    print(f"test loss={loss}")
            

    with tf.GradientTape(persistent=True) as tape:
        
        roughness_table_list = []
        for ii in range(len(v_list)):
            temp = 15 * tf.sigmoid(tf.reshape(tf.matmul(R_list[ii], v_list[ii], transpose_b=True),
                                            scatter_infos[ii].roughness_table.shape))
            roughness_table_list.append(temp)

        em_property.update_roughness(roughness_table_list)

        paths = scene.compute_fields(*traced_paths_train,
                                    check_scene=False,
                                    scat_random_phases=False)
        h_f = sionna.channel.cir_to_ofdm_channel(frequencies, paths.a, paths.tau) 
        if mode[0] == 'f':
            h_f = tf.squeeze(tf.squeeze(h_f)[:,rx_for_train,tx_for_train,:])
        else:
            h_f = tf.squeeze(tf.squeeze(h_f)[:,:,tx_for_train,sc_for_train])
        
        R_hf_hat = tf.matmul(tf.transpose(h_f, conjugate=True), h_f) / batch_size

        if mode[0] == 'f':
            I = tf.eye(R_hf_hat.shape[0], dtype=tf.complex64)
            R_y_hat = R_hf_hat + I / SNR
            R_y = R_hf + I / SNR
            diff = tf.linalg.inv(R_y_hat) - tf.linalg.inv(R_y)
            loss = tf.linalg.trace(R_y @ diff @ diff) / (SNR ** 2)
            loss = tf.math.real(loss)
        else:
            loss = tf.reduce_mean(tf.square(tf.abs(R_hf_hat - R_hf)))

        train_loss.append(loss.numpy())
        print(f"                                  train loss={loss}")

        dv = tape.gradient(loss, v_list)
        optimizer.apply_gradients(grads_and_vars=zip(dv, v_list))    

In [None]:
# plot loss curve

plt.rc('font',family='Times New Roman')
fig, ax_train = plt.subplots()
ax_test = ax_train.twinx()

yticks = [float(format(x, '.3g')) for x in np.linspace(min(train_loss), max(train_loss), 8)]
ax_train.set_yticks(yticks)
yticks = [float(format(x, '.3g')) for x in np.linspace(min(test_loss), max(test_loss), 8)]
ax_test.set_yticks(yticks)

iteration = np.linspace(0,len(train_loss), len(train_loss))
line_train, = ax_train.plot(iteration, np.array(train_loss), color=(0.26,0.55,1), label="train loss")
line_test, = ax_test.plot(iteration, np.array(test_loss), color="orange", label="test loss")

lines = [line_train, line_test]
labels = [line.get_label() for line in lines]
legend = ax_train.legend(lines, labels, loc=0, bbox_to_anchor=(1, 0.95), frameon=True, facecolor='white', edgecolor='black')
legend.get_frame().set_alpha(1)


ax_train.grid(axis='both')

if mode == "f2f":
    plt.title(r"Extrapolation: Frequency Domain To Frequency Domain" + "\n\n" + \
            "train loss=$\mathscr{L}_1$                          test loss=$\mathscr{L}_1$")
elif mode == "f2s":
    plt.title(r"Extrapolation: Frequency Domain To Spatial Domain" + "\n\n" + \
            "train loss=$\mathscr{L}_1$                          test loss=$\mathscr{L}_2$")
elif mode == "s2f":
    plt.title(r"Extrapolation: Spatial Domain To Frequency Domain" + "\n\n" + \
            "train loss=$\mathscr{L}_2$                          test loss=$\mathscr{L}_1$")

ax_train.set_xlabel("Number of iteration")