In [None]:
import math
from typing import Tuple

import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset, TensorDataset, DataLoader

class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, out_size: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Linear(ntoken, d_model) # Embedding layer converted into linear layer
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, out_size)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor) -> Tensor:
        """
        Args:
            src: Tensor, shape [seq_len, batch_size]
            src_mask: Tensor, shape [seq_len, seq_len]

        Returns:
            output Tensor of shape [seq_len, batch_size, ntoken]
        """
#         src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
    
        output = self.transformer_encoder(src)
        output = self.decoder(output)
#         print(output.size())
        return output


def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [None]:
import os

# Change working directory to labels
work_dir = "C:/file_lists_with_labels_ff_estimator"
os.chdir(work_dir)

# Reads labels into dict of (filename: ff_value)
training_data_labels = {}
with open("training.txt") as f:
    for line in f:
        key, val = line.split()
        training_data_labels[key] = float(val)
        
test_data_labels = {}
with open("test.txt") as f:
    for line in f:
        key, val = line.split()
        test_data_labels[key] = float(val)

In [None]:
import numpy as np
import pandas as pd
from tqdm import tqdm

# Change directory to data
os.chdir("C:/rf_without_tgc")

# Iterate thru data to create lists of tensors
training_data = []
training_labels = []
for file in tqdm(os.listdir()):
    if file not in training_data_labels:
        continue
        
    file_data = pd.read_csv(file, header=None).T
    
    x_tensor = file_data.to_numpy().astype(np.float32)
    y_tensor = float(training_data_labels[file])
    
    for x in x_tensor:
        training_data.append(x)
        training_labels.append(y_tensor)
    
test_data = []
test_labels = []
for file in tqdm(os.listdir()):
    if file not in test_data_labels:
        continue
        
    file_data = pd.read_csv(file, header=None).T
    
    x_tensor = file_data.to_numpy().astype(np.float32)
    y_tensor = float(test_data_labels[file])

    for x in x_tensor:
        test_data.append(x)
        test_labels.append(y_tensor)

In [None]:
# # Create class for DataLoader compatability
# class Data():
#     def __init__(self, x, y):
#         self.x = x
#         self.y = y
    
#     def __len__(self):
#         return len(self.x)

#     def __getitem__(self, idx):
#         X = self.x[idx]
#         y =  self.y[idx]

#         return X, y

training_data = np.array(training_data)
training_labels = np.array(training_labels)

test_data = np.array(test_data)
test_labels = np.array(test_labels)

print(training_data.shape)
print(test_data.shape)
    
# training_data = np.swapaxes(training_data, 1, 2)
# test_data = np.swapaxes(test_data, 1, 2)

# Create data tensors
training_data = torch.Tensor(training_data)
training_labels = torch.Tensor(training_labels)

test_data = torch.Tensor(test_data)
test_labels = torch.Tensor(test_labels)

# training_data = (training_data - torch.mean(training_data)) / torch.std(training_data)
# test_data = (test_data - torch.mean(test_data)) / torch.std(test_data)

train_dataset = TensorDataset(training_data, training_labels)
test_dataset = TensorDataset(test_data, test_labels)

# # Load tensors into class for torch DataLoaders
# train_data = Data(training_data, training_labels)
# test_data = Data(test_data, test_labels)

In [None]:
# DataLoader Parameters
train_loader_params = {
    "batch_size":  32, 
    "shuffle":     True,
    "num_workers": 0
}

# Create DataLoader for training data
train_loader = DataLoader(train_dataset, **train_loader_params)

In [None]:
# Choose device for torch computing
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
ntokens = 1024  # For ff estimation
emsize = 1024#200  # embedding dimension
d_hid = 100  # dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 2  # number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 4  # number of heads in nn.MultiheadAttention
dropout = 0.1  # dropout probability
model = TransformerModel(ntokens, 1, emsize, nhead, d_hid, nlayers, dropout).to(device)

In [None]:
import copy
import time

criterion = nn.MSELoss()
lr = 3  # learning rate
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

def train(model: nn.Module) -> None:
    model.train()  # turn on train mode
    total_loss = 0.
    log_interval = 5000
    start_time = time.time()
#     src_mask = generate_square_subsequent_mask(bptt).to(device)

    num_batches = 261120 // 20 # Total training signals // batch_size
    for batch, (signal, target) in enumerate(train_loader):
        signal, target = signal.to(device), target.to(device)
#         signal = torch.unsqueeze(signal, dim=1)
        batch_size = signal.size(0)
#         print(signal.size())
#         print(target.size())
        output = model(signal)
        output = torch.squeeze(output)
        loss = criterion(output, target)
#         print(output.size(), "\n")
#         print(output.size(), target.size())
        
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        if batch % log_interval == 0 and batch > 0:
            lr = scheduler.get_last_lr()[0]
            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / log_interval
#             ppl = math.exp(cur_loss)
            print(f'| epoch {epoch:3d} | {batch:5d}/{num_batches:5d} batches | '
                  f'lr {lr:02.2f} | ms/batch {ms_per_batch:5.2f} | '
                  f'loss {cur_loss:5.2f}')
            total_loss = 0
            start_time = time.time()

def evaluate(model: nn.Module, eval_data: Tensor) -> float:
    model.eval()  # turn on evaluation mode
    total_loss = 0.
    src_mask = generate_square_subsequent_mask(bptt).to(device)
    with torch.no_grad():
        for i in range(0, eval_data.size(0) - 1, bptt):
            data, targets = get_batch(eval_data, i)
            batch_size = data.size(0)
            if batch_size != bptt:
                src_mask = src_mask[:batch_size, :batch_size]
            output = model(data, src_mask)
            output_flat = output.view(-1, ntokens)
            total_loss += batch_size * criterion(output_flat, targets).item()
    return total_loss / (len(eval_data) - 1)

In [None]:
best_val_loss = float('inf')
epochs = 50
best_model = None

begin_time = time.time()

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(model)
#     val_loss = evaluate(model, val_data)
#     val_ppl = math.exp(val_loss)
#     elapsed = time.time() - epoch_start_time
#     print('-' * 89)
#     print(f'| end of epoch {epoch:3d} | time: {elapsed:5.2f}s | '
#           f'valid loss {val_loss:5.2f} | valid ppl {val_ppl:8.2f}')
#     print('-' * 89)

#     if val_loss < best_val_loss:
#         best_val_loss = val_loss
#         best_model = copy.deepcopy(model)

    scheduler.step()

print(time.time() - begin_time)

In [None]:
from scipy import stats

# DataLoader Parameters
loader_params = {
    "batch_size":  1, 
    "shuffle":     False,
    "num_workers": 0
}

# Create test DataLoader
test_loader = DataLoader(test_dataset, **loader_params)

def test():
    model.eval()
    
    # Initialise arrays and dict
    predictions = np.array([])
    labels = np.array([])
    averaged_dict = {}
    
    with torch.no_grad():
        for i, (signal, label) in tqdm(enumerate(test_loader)):
            # Send input to device
            signal = torch.Tensor(signal).to(device)
            signal = torch.unsqueeze(signal, 0)
             
            # Get output of net, append to lists
            output = model(signal).cpu().detach().numpy()
            output = output[0][0]
            predictions = np.append(predictions, output)            
            labels = np.append(labels, label)
#             print(output, "\n")
#             print(output, label)
        
        for i in range(len(labels)):
            if labels[i] not in averaged_dict:
                averaged_dict[labels[i]] = [predictions[i]]
            else:
                averaged_dict[labels[i]].append(predictions[i])
            
    for i in averaged_dict:
        averaged_dict[i] = np.mean(averaged_dict[i])
    
    averaged_predictions = []
    ordered_labels = []
        
    for i in averaged_dict:
        ordered_labels.append(i)
        averaged_predictions.append(averaged_dict[i])
    
    print(predictions)
    print(labels)
    
#     r = scipy.stats.pearsonr(predictions, labels)
    
    r = stats.pearsonr(averaged_predictions, ordered_labels)
    
    print('Pearson r of the model is %.2f' % r[0])
    return r[0]
test()

In [None]:
ntokens = 1024  # For ff estimation
dropout = 0.2  # dropout probability
epochs  = 60 
emsize = 1024

data_arr = []
models_tested = 0

for d_hid in range(50, 500, 150):
    for nlayers in range(1, 7, 3):
        for nhead in range(1, 7, 3):
            r = 0.0
            fail = False
            try:
                model = TransformerModel(ntokens, 1, emsize, nhead, d_hid, nlayers, dropout).to(device)

                criterion = nn.MSELoss()
                lr = 2.  # learning rate
                optimizer = torch.optim.SGD(model.parameters(), lr=lr)
                scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.98)

                for epoch in range(1, epochs + 1):
                    train(model)
                    scheduler.step()

                r = test()

                data_arr.append((emsize, d_hid, nlayers, nhead, r))

            except Exception as e:
                print(e)
                fail = True

            if fail:
                print("Model %d:   emsize: %d   d_hid: %d   nlayers: %d   nhead: %d   FAILED" % (models_tested, emsize, d_hid, nlayers, nhead))
            else:
                print("Model %d:   emsize: %d   d_hid: %d   nlayers: %d   nhead: %d   r:%.3f" % (models_tested, emsize, d_hid, nlayers, nhead, r))

            models_tested += 1
                
data_save = np.array(data_arr)
np.save("results.npy", data_save)