# Training a Regression Network on Radar Dataset (Ouachita Mountains)

## Importing the required modules

In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import matplotlib.pyplot as plt
import numpy as np
np.set_printoptions(suppress=True)
import scipy.io as sio
import pandas as pd
import os
import re
import math
import itertools
import time
from sklearn.metrics import roc_curve, auc
from sklearn.model_selection import KFold
from itertools import product
from sklearn.utils.class_weight import compute_class_weight
torch.cuda.empty_cache()

# Reading the labels and creating train and test data

In [2]:
def index(string):
    s = re.findall("[0-9]", string)
    return int(''.join(s))

scenario_idx = 62 # Ouachita Mountains
names = os.listdir(f'data/EXAMPLES/num{scenario_idx}_NAMF_DATA_25k/')
names = sorted(names, key = index)
print(len(names))
y = pd.read_csv(f'data/EXAMPLES/num{scenario_idx}_Ground_Truth_25k.csv')
col_names = y.columns[4:7]
y = y[col_names].to_numpy()

y_train = y[:int(0.8*len(names))]
y_test = y[(int(0.8*len(names))+1):]
training_names = names[:int(0.8*len(names))]
test_names = names[(int(0.8*len(names))+1):]

print('Training labels: ')
print(y_train)

# Tensor Corners
##################################################################################
# num29: [10851, 215, -5.45], num60: [11073, 215, -5.3], num62: [11471, 215, -5.6]
# num76: [11388, 215, -6.15], num35: [11381, 215, -0.95]
##################################################################################

# Training dataset global constants
coord_tr = [11471, 215, -5.6] # Tensor corner
rng_res_tr = 59.9585/2         # Range resolution
az_step_tr = 0.4               # Azimuth step size
el_step_tr = 0.01              # Elevation step size

# Test dataset global constants
coord_ts = [11471, 215, -5.6] # Tensor corner
rng_res_ts = 59.9585/2         # Range resolution
az_step_ts = 0.4               # Azimuth step size
el_step_ts = 0.01              # Elevation step size


def Drawing_Batch(names, label, bs, ind, normalize = True):
    x = []
    labels = []
    
    for j in range(ind*bs, (ind+1)*bs):
        try: temp = sio.loadmat(f'data/EXAMPLES/num{scenario_idx}_NAMF_DATA_25k/'+names[j])['P']
        except: break
        if normalize:
            Anorm = temp - np.min(temp.flatten())
            temp = np.divide(Anorm, np.max(Anorm.flatten()))
        x.append(temp)
        labels.append(label[j,:])
        
    x = torch.FloatTensor(np.array(x))
    labels = torch.FloatTensor(np.array(labels))
    return x,labels

25000
Training labels: 
[[ 0.19278 19.175    5.3732 ]
 [11.244   23.269   24.522  ]
 [-0.2566  20.21     9.9473 ]
 ...
 [ 7.7966  22.974   29.859  ]
 [15.579   24.133   17.291  ]
 [ 6.6286  23.169   29.05   ]]


In [3]:
print(len(y_train))
print(len(y_test))

20000
4999


## Define a Regression CNN and instantiating it

In [4]:
# class Net(nn.Module):
#     def __init__(self):
#         super(Net, self).__init__()
#         self.conv1 = nn.Conv1d(21, 32, 3, 1)
#         self.conv2 = nn.Conv1d(32, 64, 3, 1)
#         self.batchnorm1 = nn.BatchNorm1d(32)
#         self.batchnorm2 = nn.BatchNorm1d(64)
#         self.fc1 = nn.Linear(64 * 5, 20)  # Adjust input size based on the output of conv layers and max pooling
#         self.fc2_reg = nn.Linear(20, 2)  # Adjusted output size

#     def forward(self, x):
#         x = self.conv1(x)
#         x = F.relu(self.batchnorm1(x))
#         x = F.max_pool1d(x, 2)
        
#         x = self.conv2(x)
#         x = F.relu(self.batchnorm2(x))
#         x = F.max_pool1d(x, 2)
        
#         x = torch.flatten(x, 1)
#         x = self.fc1(x)
#         x = F.relu(x)
#         output_reg = self.fc2_reg(x)  # (bs, 2)
        
#         return output_reg
    
# from torchsummary import summary
# device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
# model = Net()
# model = model.to(device)
# if device == 'cuda:0':
#     model = torch.nn.DataParallel(model)
#     cudnn.benchmark = True
# print(summary(model,(21,26)))

## Define a CART (Convolutional Adaptive Radar Transformer) and instantiating it

In [5]:
class Net(nn.Module):
    def __init__(
        self,
        in_features: int = 21,
        seq_len: int = 26,
        d_model: int = 64,
        nhead: int = 8,
        num_layers: int = 4,
        dim_feedforward: int = 256,
        dropout: float = 0.1,
        num_outputs: int = 2
    ):
        super(Net, self).__init__()
        self.seq_len = seq_len
        self.d_model = d_model

        # 1) Patch embedding: each of the seq_len time‑steps is a token of dim in_features
        self.patch_embed = nn.Linear(in_features, d_model)

        # 2) CLS token
        self.cls_token = nn.Parameter(torch.zeros(1, 1, d_model))

        # 3) Positional embeddings for (seq_len + 1) tokens
        self.pos_embed = nn.Parameter(torch.zeros(1, seq_len + 1, d_model))

        # 4) Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True  # so input shape is (batch, seq, d_model)
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # 5) Classification / regression head
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Linear(d_model, d_model // 2),
            nn.GELU(),
            nn.Linear(d_model // 2, num_outputs)
        )

        # initialize
        nn.init.trunc_normal_(self.pos_embed, std=0.02)
        nn.init.trunc_normal_(self.cls_token, std=0.02)
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            nn.init.xavier_uniform_(m.weight)
            if m.bias is not None:
                nn.init.zeros_(m.bias)
        elif isinstance(m, nn.LayerNorm):
            nn.init.ones_(m.weight)
            nn.init.zeros_(m.bias)

    def forward(self, x: torch.Tensor):
        """
        x: (batch_size, in_features=21, seq_len=26)
        returns: (batch_size, num_outputs=2)
        """
        bs = x.size(0)
        # reshape to (batch, seq_len, in_features)
        x = x.permute(0, 2, 1)

        # patch embedding -> (batch, seq_len, d_model)
        x = self.patch_embed(x)

        # prepend cls token -> (batch, seq_len+1, d_model)
        cls_tokens = self.cls_token.expand(bs, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)

        # add positional embedding
        x = x + self.pos_embed

        # Transformer encoding
        x = self.transformer(x)

        # take CLS representation
        cls_rep = x[:, 0]  # (batch, d_model)

        # head to outputs
        out = self.mlp_head(cls_rep)
        return out

## Global Definitions

In [6]:
bs = 128  # batch_size
num_epoch = 50  # number of epochs
PATH = './ckpt_model.pth'   # forsaving the model
criterion = nn.MSELoss()
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

# Define a Loss function and optimizer; Using GPU or CPU
model = Net()
optimizer = optim.Adam(model.parameters(), lr = 0.001)
model = model.to(device)
if device == 'cuda:0':
    model = torch.nn.DataParallel(model)
    cudnn.benchmark = True
    
def Spher2Cart_1D(spherical):
    cartesian = np.zeros(3)
    hypotenuse = np.cos(np.radians(spherical[2]))*spherical[0]
    cartesian[0] = np.cos(np.radians(spherical[1]))*hypotenuse
    cartesian[1] = -np.sin(np.radians(spherical[1]))*hypotenuse
    cartesian[2] = np.sin(np.radians(spherical[2]))*spherical[0]
    return cartesian

## Train and evaluate the network

In [7]:
def main(training_names, test_names, bs, num_epoch, y_train, y_test, normalize=True):
    best_error = 1e+20      # a dummy and very large number for saving the best discovered model
    for epoch in range(num_epoch):
        print('Epoch {}/{}'.format(epoch, num_epoch))
        print('-'*10)
        running_loss_train = 0
        running_loss_test = 0

        model.train()
        for i in range(0, len(training_names)//bs):
            x_train, labels = Drawing_Batch(training_names, y_train, bs, i, normalize)
            x_train = x_train.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            out = model(x_train)
            loss = criterion(out, labels[:,0:2])
            loss.backward()
            optimizer.step()
            running_loss_train += loss.item()
            
        out = torch.cat((out, torch.unsqueeze(labels[:,2], dim=1)), dim=1)
        
        true_train = Spher2Cart_1D(np.multiply(labels.cpu().data.numpy()[1,], [rng_res_tr,az_step_tr,el_step_tr]) + coord_tr)
        pred_train = Spher2Cart_1D(np.multiply(out.cpu().data.numpy()[1,], [rng_res_tr,az_step_tr,el_step_tr]) + coord_tr)
        print(true_train)
        print(pred_train)

        model.eval()
        with torch.no_grad():
            for i in range(0, len(test_names)//bs):
                x_test, labels_test = Drawing_Batch(test_names, y_test, bs, i, normalize)
                x_test = x_test.to(device)
                labels_test = labels_test.to(device)
                out_test = model(x_test)
                loss_test = criterion(out_test, labels_test[:,0:2])
                running_loss_test += loss_test.item()

        out_test = torch.cat((out_test, torch.unsqueeze(labels_test[:,2], dim=1)), dim=1)
        
        true_test = Spher2Cart_1D(np.multiply(labels_test.cpu().data.numpy()[1,], [rng_res_ts,az_step_ts,el_step_ts]) + coord_ts)
        pred_test = Spher2Cart_1D(np.multiply(out_test.cpu().data.numpy()[1,], [rng_res_ts,az_step_ts,el_step_ts]) + coord_ts)
        print(true_test)
        print(pred_test)
        
        epoch_loss_train = running_loss_train*x_train.size()[0]/len(training_names)
        epoch_loss_test = running_loss_test*x_test.size()[0]/len(test_names)

        print('Train Loss: {:.6f} ---- Test Loss: {:.6f}'.format(epoch_loss_train, epoch_loss_test))
        if epoch%5==0:
            if epoch_loss_test < best_error:
                torch.save(model.state_dict(), PATH)
                best_error = epoch_loss_test

start = time.time()
main(training_names, test_names, bs, num_epoch, y_train, y_test, normalize=True)
end = time.time()
print(end - start)

Epoch 0/50
----------
[-8263.79447029  7958.09617352 -1065.12529474]
[-8329.13596781  7857.47363627 -1063.07111602]
[-8252.27557442  7945.33781512 -1064.67476422]
[-8317.04118405  7836.74176966 -1062.07233825]
Train Loss: 49.810909 ---- Test Loss: 7.793518
Epoch 1/50
----------
[-8263.79447029  7958.09617352 -1065.12529474]
[-8230.65418693  7957.79446704 -1062.89176568]
[-8252.27557442  7945.33781512 -1064.67476422]
[-8219.84888649  7949.78776706 -1062.79327137]
Train Loss: 3.833410 ---- Test Loss: 6.830496
Epoch 2/50
----------
[-8263.79447029  7958.09617352 -1065.12529474]
[-8287.28283574  7924.6965201  -1064.55170967]
[-8252.27557442  7945.33781512 -1064.67476422]
[-8240.96115946  7944.07969453 -1063.83633809]
Train Loss: 3.555828 ---- Test Loss: 5.585723
Epoch 3/50
----------
[-8263.79447029  7958.09617352 -1065.12529474]
[-8288.161396    7919.89397565 -1064.30257848]
[-8252.27557442  7945.33781512 -1064.67476422]
[-8246.47472167  7943.6981056  -1064.18072071]
Train Loss: 3.511878 

In [9]:
def Spher2Cart_2D(spherical):
    cartesian = np.zeros((len(spherical),3))
    hypotenuse = np.multiply(np.cos(np.radians(spherical[:,2])), spherical[:,0])
    cartesian[:,0] = np.multiply(np.cos(np.radians(spherical[:,1])), hypotenuse)
    cartesian[:,1] = np.multiply(-np.sin(np.radians(spherical[:,1])), hypotenuse)
    cartesian[:,2] = np.multiply(np.sin(np.radians(spherical[:,2])), spherical[:,0])
    return cartesian

# Testing: (range,az,el)
model.eval()
out_test_reg = np.zeros((len(y_test),3))
labels_test_reg = np.zeros((len(y_test),3))

for i in range(0, len(y_test)//bs):  
    x_test, labels_test = Drawing_Batch(test_names, y_test, bs, i, True)
    labels_test = labels_test.cpu().data.numpy()
    labels_test_reg[bs*i : bs*i + bs] = (labels_test[:,0:3])

    cur_test_reg = model(x_test)
    out_test_reg[bs*i : bs*i + bs, 0:2] = cur_test_reg.cpu().data.numpy()
    out_test_reg[bs*i : bs*i + bs, 2] = labels_test_reg[bs*i : bs*i + bs, 2]
    
azim_tot = 0
for i in np.arange(0,len(out_test_reg),1):
    azim_tot += np.linalg.norm(out_test_reg[i,1] - labels_test_reg[i,1])
    
azim_tot = azim_tot / len(out_test_reg)
print(f'Azimuth Estimation Error (deg) = {azim_tot}')

new_data = Spher2Cart_2D(np.multiply(out_test_reg, [rng_res_ts,az_step_ts,el_step_ts]) + coord_ts)
new_labels_data = Spher2Cart_2D(np.multiply(labels_test_reg, [rng_res_ts,az_step_ts,el_step_ts]) + coord_ts)

sum_tot = 0
for i in np.arange(0,len(new_data) - (len(new_data) % bs),1):
    sum_tot += np.linalg.norm(new_data[i,:] - new_labels_data[i,:])
    
# def reject_outliers(data, m=2):
#     return data[abs(data - np.mean(data)) < m * np.std(data)]
# 
# azims = (np.multiply(out_test_reg, [rng_res_ts,az_step_ts,el_step_ts]) - np.multiply(labels_test_reg, [rng_res_ts,az_step_ts,el_step_ts]))[:,1]
# azims = reject_outliers(azims)
# mse = np.mean(azims**2)
# bias_sq = np.mean(azims)**2
# var = np.var(azims)
# print(mse, bias_sq, var)

sum_tot = sum_tot / (len(new_data) - (len(new_data) % bs))
print(f'Localization Error (m) = {sum_tot}')

Azimuth Estimation Error (deg) = 0.296981071176029
Localization Error (m) = 57.865952131498965
