The code was developed on google colab in combination with google drive. If you want it to work you have to save the CTW2019 hdf5 dataset in the exact same way, or change the code. The dataset can be obtained on the following url: https://data.ieeemlc.org/Ds1Detail .

In the code below we introduce the dataset and improve it. Then we make functions that can be used to shape dataset into different categories described in the paper.

In [None]:
import pandas as pd
from keras.models import Sequential
from keras.layers import Dense
from keras.wrappers.scikit_learn import KerasRegressor
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
import shutil
import pathlib
import os
from google.colab import drive
drive.mount('/content/drive')

from typing import Union, List
from pathlib import Path
import itertools as it
import numpy as np
import scipy as sp
import scipy.constants
import h5py
import pickle
import hashlib

#def __enter__(self):
#  return self
CTW2019_PATH = Path('.').resolve().parent
CTW_PATH = CTW2019_PATH.parent
PROJECT_PATH = CTW_PATH.parent

# Path to CTW2019 dataset
DEFAULT_DATASET_PATH = PROJECT_PATH.joinpath('/content/drive/MyDrive/', 'CTW2019_h5')
assert DEFAULT_DATASET_PATH.exists(), f'Missing dataset at `{DEFAULT_DATASET_PATH}`!'

# Assumed antenna composition
antennas = np.asarray([8, 1, 2])
# Number of all antennas
n_antennas = np.prod(antennas)
# center frequency (1.25 GHz)
fc = 1_250_000_000.0
# Speed of light in vacuum
c0 = sp.constants.speed_of_light
# Wavelength of center frequency
lambda0 = c0 / fc
# Assume antenna spacing (see: https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=9013195)
antenna_spacing = lambda0 / 2
# Spacing between subcarriers (see arxiv:2002.09452)
ch_spacing = 20_000_000.0 / (1024 + 1)

# Offset (correction to place antennas to the center of coordinate system)
# Center is (3.5, -3.15, 1.8)
#antenna_offset_correction = np.array([3.5, -3.15, 1.8])
antenna_offset_correction = np.array([0, 0, 0], dtype=np.float32)

# Offset coordinate center at antenna #0, shift it to the center of antenna
antenna_offsets = -1 * (antennas - 1) / 2 * antenna_spacing
antenna_offsets[-1] *= -1 # invert Z axis

antenna_offsets += antenna_offset_correction

# Frequencies of subcarrier (1024 subcarriers + central, where 50 edge carriers are empty/zero)
fsub = np.concatenate([
    np.arange(-462, 0, dtype=np.float32),
    -np.arange(-462, 0, dtype=np.float32)[::-1],
])
assert fsub[0] == -462.
assert fsub[-1] == 462.
assert fsub[462] == 1.
assert fsub[461] == -1.
fsub = fc + fsub * ch_spacing 
fsub = np.expand_dims(fsub, axis=-1)
assert fsub.shape == (924, 1)

def get_tx_positions():
    nx, ny, nz = antennas
    x_offset, y_offset, z_offset = antenna_offsets
    tx_pos = [
        (dx*antenna_spacing+x_offset, dy*antenna_spacing+y_offset, -dz*antenna_spacing+z_offset)
        for dz, dy, dx in it.product(range(nz), range(ny), range(nx))
    ]
    tx_pos = np.asarray(tx_pos)
    assert tx_pos.shape == (n_antennas, 3)
    return tx_pos

def get_dataset(dataset_dir=DEFAULT_DATASET_PATH):
    h_filename = 'h_Estimated_CTW_Train.h5'
    h_path = dataset_dir.joinpath(h_filename)
    assert h_path.exists(), f'{h_filename} is missing!'
    pos_filename = 'r_Position_CTW_Train.h5'
    pos_path = dataset_dir.joinpath(pos_filename)
    assert pos_path.exists(), f'{pos_filename} is missing!'
    #snr_filename = 'SNR_Est_Train.pickel'  #snr_filename = 'SNR_CTW_Train.h5'
    #snr_path = dataset_dir.joinpath(snr_filename)
    #assert snr_path.exists(), f'{snr_filename} is missing!'
    kwargs = dict(mode='r', swmr=True)

    with h5py.File(h_path, **kwargs) as hf:
        h = hf['h_Estimated'][:].T
        h = h.astype(np.float32)

        # Fix #1: Correct the order of FFT components. In Data: (1 to 511, -512 to 0)
        h = np.fft.fftshift(h, axes=2)

        assert h.shape[1:] == (16, 924, 2)

    #with h5py.File(snr_path, **kwargs) as hf:
    #    snr = hf['SNR_Est'][:].T
    #    snr = snr.astype(np.float32)
    #    assert snr.shape[1:] == (16, )

    with h5py.File(pos_path, **kwargs) as hf:
        pos = hf['r_Position'][:].T
        pos = pos.astype(np.float32)

        # Fix #2: Correction of position data. Antenna will now be in the center.
        offset = (3.5, -3.15, 1.8)
        pos[:,0] -= offset[0]
        pos[:,1] -= offset[1]
        pos[:,2] -= offset[2]
        assert pos.shape[1:] == (3, )
    return h, pos

def get_train_test_dataset(train_split=0.9, dataset_dir=DEFAULT_DATASET_PATH, shuffle=True, random_state=None):
    """CTW2019 challenge dataset is manageable and it can fit into memory."""
    h, pos = get_dataset(dataset_dir=dataset_dir)
    
    #raise DeprecationWarning

    # Fix #1: remove measurements from time when device was placed on (or taken off) table
    idx = np.argwhere(pos[:,2] < -0.515).flatten()
    pos = pos[idx]
    h = h[idx]

    # Random generator
    rng = np.random.default_rng(seed=random_state)
    n_samples = len(pos)

    # Shuffle or not to shuffle
    idx = rng.permutation(n_samples) if shuffle else np.arange(n_samples)
    split = round(len(idx) * train_split)
    train_idx, test_idx = idx[:split], idx[split:]

    # Shuffle all the dataset the same way
    train = (h[train_idx], pos[train_idx])
    test = (h[test_idx], pos[test_idx])
    return train, test

def random_split_dataset(dataset_dir=DEFAULT_DATASET_PATH, split=0.9, shuffle=True, memory=None, verbose=True, random_state=None):
    _get_dataset = memory.cache(get_dataset) if memory else get_dataset

    h, pos = _get_dataset(dataset_dir=dataset_dir)

    # Random generator
    rng = np.random.default_rng(seed=random_state)

    # How many samples are we dealing with
    n_samples = len(pos)

    # We will operate over indexes instead with dataset(s) directly
    idx = rng.permutation(n_samples) if shuffle else np.arange(n_samples)

    # if there is only 1 split make it as array for consistency
    if isinstance(split, (float, int)):
        split = (split, )

    # define split indexes
    split = [int(s * n_samples) for s in split]

    # add zero and n_samples for consistency
    split = (0, *split, n_samples)

    # Finally construct batches
    output = []
    for i in range(1, len(split)):
        start_idx = split[i-1]
        end_idx = split[i]
        batch = idx[start_idx : end_idx]
        output.append((h[batch], pos[batch]))
    output = list(output)

    # Sanity check before exits
    assert len(output) == (len(split) - 1), f'{len(output)} != {len(split) - 1}'
    assert sum(map(lambda item: len(item[-1]), output)) == n_samples
    #if verbose: print(f'Total: {n_samples}; Train: {split/n_samples*100:.2f}%; Test: {(n_samples - split)/n_samples*100:.2f}%')
    return output
  
def short_edge_section_split_dataset(dataset_dir=DEFAULT_DATASET_PATH, shuffle=True, memory=None, verbose=True, random_state=None):
    _get_dataset = memory.cache(get_dataset) if memory else get_dataset

    h, pos = _get_dataset(dataset_dir=dataset_dir)

    # Random generator
    rng = np.random.default_rng(seed=random_state)

    # How many samples are we dealing with
    n_samples = len(pos)

    # We will operate over indexes instead with dataset(s) directly
    idx = rng.permutation(n_samples) if shuffle else np.arange(n_samples)

    # Reshuffle
    h, pos = h[idx], pos[idx]

    
    x, y = pos[...,0], pos[...,1]
    threshold = -1.0 * x + 2.0

    train_idx = np.argwhere(y > threshold).flatten()

    # train mask
    mask = np.zeros(n_samples, np.bool)
    mask[train_idx] = 1

    if verbose:
        print(f'Total: {n_samples}; Train: {sum(mask)/n_samples*100:.2f}%; Test: {sum(~mask)/n_samples*100:.2f}%')

    assert len(pos[mask]) > len(pos[~mask])

    return (
        (h[mask], pos[mask]),
        (h[~mask], pos[~mask]),
    )

def long_edge_section_split_dataset(dataset_dir=DEFAULT_DATASET_PATH, shuffle=True, memory=None, verbose=True, random_state=None):
    _get_dataset = memory.cache(get_dataset) if memory else get_dataset

    h, pos = _get_dataset(dataset_dir=dataset_dir)

    # Random generator
    rng = np.random.default_rng(seed=random_state)

    # How many samples are we dealing with
    n_samples = len(pos)

    # We will operate over indexes instead with dataset(s) directly
    idx = rng.permutation(n_samples) if shuffle else np.arange(n_samples)

    # Reshuffle
    h, pos = h[idx], pos[idx]


    x, y = pos[...,0], pos[...,1]
    threshold = 0.9 * x + 4.1

    train_idx = np.argwhere(y < threshold).flatten()

    # train mask
    mask = np.zeros(n_samples, np.bool)
    mask[train_idx] = 1

    if verbose: print(f'Total: {n_samples}; Train: {sum(mask)/n_samples*100:.2f}%; Test: {sum(~mask)/n_samples*100:.2f}%')

    assert len(pos[mask]) > len(pos[~mask])

    return (
        (h[mask], pos[mask]),
        (h[~mask], pos[~mask]),
    )


def within_area_split_dataset(dataset_dir=DEFAULT_DATASET_PATH, shuffle=True, memory=None, verbose=True, random_state=None):
    _get_dataset = memory.cache(get_dataset) if memory else get_dataset

    h, pos = _get_dataset(dataset_dir=dataset_dir)

    # Random generator
    rng = np.random.default_rng(seed=random_state)

    # How many samples are we dealing with
    n_samples = len(pos)

    # We will operate over indexes instead with dataset(s) directly
    idx = rng.permutation(n_samples) if shuffle else np.arange(n_samples)

    # Reshuffle
    h, pos = h[idx], pos[idx]


    # Follow equation:
    # r = 0.4
    # t = np.linspace(0, 1, 360)
    # x = r * np.cos(2*np.pi*t) + 0.8
    # y = r * np.sin(2*np.pi*t) + 4
    x, y = pos[...,0], pos[...,1]

    radius = 0.5
    conditions = np.sqrt((y - 4)**2 + (x - 0.8)**2) > radius

    train_idx = np.argwhere(conditions).flatten()

    # train mask
    mask = np.zeros(n_samples, np.bool)
    mask[train_idx] = 1

    if verbose: print(f'Total: {n_samples}; Train: {sum(mask)/n_samples*100:.2f}%; Test: {sum(~mask)/n_samples*100:.2f}%')

    assert len(pos[mask]) > len(pos[~mask])

    return (
        (h[mask], pos[mask]),
        (h[~mask], pos[~mask]),
    )
!nvidia-smi

We create the proposed model.

In [None]:
from keras.callbacks import EarlyStopping
from keras.layers import Dense, Conv2D,  Conv3D,MaxPool2D, Flatten, GlobalAveragePooling2D,  BatchNormalization, Layer, Add
from keras.models import Sequential
from keras.models import Model
import tensorflow as tf


class ResnetBlock(Model):
    """
    A standard resnet block.
    """

    def __init__(self, channels: int, down_sample=False):
        """
        channels: same as number of convolution kernels
        """
        super().__init__()

        self.__channels = channels
        self.__down_sample = down_sample
        self.__strides = [2, 1] if down_sample else [1, 1]

        KERNEL_SIZE = (3, 3)
        # use He initialization, instead of Xavier (a.k.a 'glorot_uniform' in Keras), as suggested in [2]
        INIT_SCHEME = "he_normal"

        self.conv_1 = Conv2D(self.__channels, strides=self.__strides[0],kernel_size=KERNEL_SIZE, padding="same", kernel_initializer=INIT_SCHEME)
        self.bn_1 = BatchNormalization()
        self.conv_2 = Conv2D(self.__channels, strides=self.__strides[1],kernel_size=KERNEL_SIZE, padding="same", kernel_initializer=INIT_SCHEME)
        self.bn_2 = BatchNormalization()
        self.merge = Add()

        if self.__down_sample:
            # perform down sampling using stride of 2, according to [1].
            self.res_conv = Conv2D(
                self.__channels, strides=2, kernel_size=(1, 1), kernel_initializer=INIT_SCHEME, padding="same")
            self.res_bn = BatchNormalization()

    def call(self, inputs):
        res = inputs

        x = self.conv_1(inputs)
        x = self.bn_1(x)
        x = tf.nn.relu(x)
        x = self.conv_2(x)
        x = self.bn_2(x)

        if self.__down_sample:
            res = self.res_conv(res)
            res = self.res_bn(res)

        # if not perform down sample, then add a shortcut directly
        x = self.merge([x, res])
        out = tf.nn.relu(x)
        return out


class PirnatEco(Model):

    def __init__(self, num_classes, **kwargs):
        """
            num_classes: number of classes in specific classification task.
        """
        super().__init__(**kwargs)
        self.conv_1 = Conv2D(32, (1, 7), strides=(1, 3), padding="same", kernel_initializer="he_normal")
        self.init_bn = BatchNormalization()
        self.pool_2 = MaxPool2D(pool_size=(1, 4))
        self.res_1_1 = ResnetBlock(32)
        self.res_1_2 = ResnetBlock(32)
        self.res_2_1 = ResnetBlock(64, down_sample=True)
        self.res_2_2 = ResnetBlock(64)
        self.res_3_1 = ResnetBlock(128, down_sample=True)
        self.res_3_2 = ResnetBlock(128)
        self.res_4_1 = ResnetBlock(256, down_sample=True)
        self.res_4_2 = ResnetBlock(256)
        self.avg_pool = GlobalAveragePooling2D()
        self.flat = Flatten()
        self.fc1 = Dense(1000, activation=tf.keras.layers.LeakyReLU(alpha=0.001))
        self.fc = Dense(3)

    def call(self, inputs):
        out = self.conv_1(inputs)
        out = self.init_bn(out)
        out = tf.nn.relu(out)
        out = self.pool_2(out)
        for res_block in [self.res_1_1, self.res_1_2, self.res_2_1, self.res_2_2, self.res_3_1, self.res_3_2, self.res_4_1, self.res_4_2]:#, self.res_5_1, self.res_5_2]:
            out = res_block(out)
        out = self.avg_pool(out)
        out = self.flat(out)
        out = self.fc1(out)
        out = self.fc(out)
        return out

Here we create x_train, x_test, y_train, y_test datasets that will later be used to train the model. We shape it into random category.

In [None]:
output = random_split_dataset(dataset_dir=DEFAULT_DATASET_PATH, split=0.9, shuffle=True, memory=None, verbose=True, random_state=None)
x_train, x_test, y_train, y_test = output[0][0],output[1][0],output[0][1],output[1][1]

The following for loops are made to delete data that was recorded while robotic vacum cleaner was being lowered on to the table.

In [None]:
for _ in range(len(y_train[:,2])):
  if not abs(y_train[:,2][_]) > 2.31:
    y_train=np.delete(y_train, _, 0)
    x_train=np.delete(x_train, _, 0)

In [None]:
for _ in range(len(y_test[:,2])):
  if not abs(y_test[:,2][_]) > 2.31:
    y_test=np.delete(y_test, _, 0)
    x_test=np.delete(x_test, _, 0)

Finnally we train the model and test it. We also draw a graph to see how the result improves through the epochs.

In [None]:
from tensorflow.keras import backend as K
K.clear_session()

tb_callback = tf.keras.callbacks.TensorBoard(log_dir="./logs")
%load_ext tensorboard 


def rmse(predictions, targets):
    return np.sqrt(((predictions - targets) ** 2).mean())
def MDE(true: tf.Tensor, pred: tf.Tensor) -> tf.Tensor:
    """Mean euclidean Distance Error. Similar to RMSE."""
    return tf.reduce_mean(tf.sqrt(tf.reduce_sum(tf.square(true - pred), axis=-1)))

model = PirnatEco(3)
model.build((len(y_train)+len(y_test),16,924,2))
model.summary()
epochs = 100
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.9, name="sgd"), loss=MDE)

trainset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
testset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
trainset = trainset.shuffle(128).batch(32, drop_remainder=True)
testset = testset.batch(4, drop_remainder=True)
model.fit_generator(trainset, steps_per_epoch=None, epochs=epochs, verbose=1, callbacks=[tb_callback], validation_data=testset)

score = model.evaluate(x_test, y_test, verbose=0)
prediction = model.predict(x_test)
difference = np.subtract(y_test, prediction)
rmse = rmse(prediction, y_test)
print("average error: ", str(round(abs(difference).mean()*100, 2)),"cm")
print("rmse: ", str(round(abs(rmse).mean()*100, 2)),"cm")
print("Test loss (MDE) : ", str(round(abs(score)*100, 2)),"cm")
print('---------------------------------------------------------------------------------------')
difference_x=difference[:,0]
print("average error_x: ", str(round(abs(difference_x).mean()*100, 2)),"cm")
print('---------------------------------------------------------------------------------------')
difference_y=difference[:,1]
print("average error_y: ", str(round(abs(difference_y).mean()*100, 2)),"cm")
print('---------------------------------------------------------------------------------------')
difference_z=difference[:,2]
print("average error_z: ", str(round(abs(difference_z).mean()*100, 2)),"cm")

%tensorboard --logdir logs