# Train model with noisy envelope - filter version

Starting from `RNN-Morse-envelope` we now focus on denoisng the envelope. The model is now a filter with a one dimensional output instead of producing all the desired features. It is also built around the encoder-decoder concept. Not so surprisingly to optimal number of hidden nodes is found to be 5 which corresponds to the number of essential Morse keying features: dits, dahs, silence between dits and dahs, silence between characters and silence between words.

Also we train the model with much noisier signals than before with more epochs. It seems that one should train the model with the SNR level of signals that we want to try to clean. There is however a limit (-23 dB as calculated) below which the system will just learn how to self oscillate.

In [None]:
!pip install sounddevice torchinfo

## Generate annotated raw signal

Generates the envelope after audio preprocessing. The resulting decimation factor is 128 thus we will take 1 every 128 samples from the original signal modulated at 8 kHz sample rate. This uses a modified version of `encode_df` (`encode_df_decim`) of `MorseGen` thus the original ratio in samples per dit is respected. This effectively takes a floating point ratio (shown in display) for the samples per dit decimation (about 5.77 for the nominal values of 8 kHz sampling rate and 13 WPM Morse code speed) 

In [None]:
import MorseGen
import matplotlib.pyplot as plt 
import numpy as np

def get_new_data(SNR_dB=-23, nchars=132, nwords=27, phrase=None):
    if not phrase:
        phrase = MorseGen.get_morse_str(nchars=nchars, nwords=nwords)
    print(len(phrase), phrase)
    Fs = 8000
    morse_gen = MorseGen.Morse()
    samples_per_dit = morse_gen.nb_samples_per_dit(Fs, 13)
    n_prev = int((samples_per_dit/128)*12) + 1 # number of samples to look back is slightly more than a dit-dah and a word space (2+3+7=12)
    print(f'Samples per dit at {Fs} Hz is {samples_per_dit}. Decimation is {samples_per_dit/128:.2f}. Look back is {n_prev}.')
    label_df = morse_gen.encode_df_decim(phrase, samples_per_dit, 128)
    # extract the envelope
    envelope = label_df['env'].to_numpy()
    # remove undesired labels
    label_df.drop(columns=['ele', 'dit','dah'], inplace=True)
    SNR_linear = 10.0**(SNR_dB/10.0)
    SNR_linear *= 256 # Apply original FFT
    print(f'Resulting SNR for original {SNR_dB} dB is {(10.0 * np.log10(SNR_linear)):.2f} dB')
    t = np.linspace(0, len(envelope)-1, len(envelope))
    power = np.sum(envelope**2)/len(envelope)
    noise_power = power/SNR_linear
    noise = np.sqrt(noise_power)*np.random.normal(0, 1, len(envelope))
    # noise = butter_lowpass_filter(raw_noise, 0.9, 3) # Noise is also filtered in the original setup from audio. This empirically simulates it
    signal = (envelope + noise)**2
    signal[signal > 1.0] = 1.0 # a bit crap ...
    return envelope, signal, label_df, n_prev

Try it ...

In [None]:
envelope, signal, label_df, n_prev = get_new_data(-17)

# Show
print(n_prev)
print(type(signal), signal.shape)
print(type(label_df), label_df.shape)
    
x0 = 0
x1 = 1500

plt.figure(figsize=(50,3))
plt.plot(signal[x0:x1]*0.5, label="sig")
plt.plot(envelope[x0:x1]*0.9, label='env')
plt.plot(label_df[x0:x1].env*0.9 + 2.0, label='env', color="orange")
plt.plot(label_df[x0:x1].chr*0.9 + 2.0, label='chr', color="green")
plt.plot(label_df[x0:x1].wrd*0.9 + 2.0, label='wrd', color="red")
plt.title("signal and labels")
plt.legend()
plt.grid()

## Create data loader

### Define dataset

In [None]:
import torch

class MorsekeyingDataset(torch.utils.data.Dataset):
    def __init__(self, device, SNR_dB=-23, nchars=132, nwords=27, phrase=None):
        self.envelope, self.signal, self.label_df, self.seq_len = get_new_data(SNR_dB, nchars, nwords, phrase)
        self.X = torch.FloatTensor(self.signal).to(device)
        self.y = torch.FloatTensor(self.label_df.values).to(device)
        
    def __len__(self):
        return self.X.__len__() - self.seq_len

    def __getitem__(self, index):
        return (self.X[index:index+self.seq_len], self.y[index+self.seq_len])
    
    def get_envelope(self):
        return self.envelope
    
    def get_signal(self):
        return self.signal
    
    def get_labels(self):
        return self.label_df
    
    def get_seq_len(self):
        return self.seq_len()

### Define data loader

The SNR must be calculated in the FFT bin bandwidth. In the original `RNN-Morse-pytorch` notebook the bandwidth is 4 kHz / 256 = 15,625 Hz and SNR is 3 dB. Theoretically you would apply the FFT ratio to the original SNR but this does not work in practice. You have to take a much lower SNR to obtain a similar envelope.

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_dataset = MorsekeyingDataset(device, -20, 132*8, 27*8)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=False) # Batch size must be 1

In [None]:
signal = train_dataset.get_signal()
envelope = train_dataset.get_envelope()
label_df = train_dataset.get_labels()

print(type(signal), signal.shape)
print(type(label_df), label_df.shape)

x0 = 0
x1 = 1500

plt.figure(figsize=(50,6))
plt.plot(signal[x0:x1]*0.8, label="sig")
plt.plot(envelope[x0:x1]*0.9, label='env')
plt.plot(label_df[x0:x1].env*0.9 + 1.0, label='env', color="orange")
plt.plot(label_df[x0:x1].chr*0.9 + 1.0, label='chr', color="green")
plt.plot(label_df[x0:x1].wrd*0.9 + 1.0, label='wrd', color="red")
plt.title("signal and labels")
plt.legend(loc=2)
plt.grid()

## Create model

Let's create the model now so we have an idea of its inputs and outputs

In [None]:
import torch
import torch.nn as nn

class MorseEnvLSTM(nn.Module):
    """
    Initial implementation
    """
    def __init__(self, device, input_size=1, hidden_layer_size=8, output_size=6):
        super().__init__()
        self.device = device # This is the only way to get things work properly with device
        self.hidden_layer_size = hidden_layer_size
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_layer_size)
        self.linear = nn.Linear(hidden_layer_size, output_size)
        self.hidden_cell = (torch.zeros(1, 1, self.hidden_layer_size).to(self.device),
                            torch.zeros(1, 1, self.hidden_layer_size).to(self.device))

    def forward(self, input_seq):
        lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq), 1, -1), self.hidden_cell)
        predictions = self.linear(lstm_out.view(len(input_seq), -1))
        return predictions[-1]
    
    def zero_hidden_cell(self):
        self.hidden_cell = (
            torch.zeros(1, 1, self.hidden_layer_size).to(device),
            torch.zeros(1, 1, self.hidden_layer_size).to(device)
        )        
    
class MorseEnvBatchedLSTM(nn.Module):
    """
    Initial implementation - dataset compatible
    """
    def __init__(self, device, input_size=1, hidden_layer_size=8, output_size=6):
        super().__init__()
        self.device = device # This is the only way to get things work properly with device
        self.hidden_layer_size = hidden_layer_size
        self.output_size = output_size
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_layer_size)
        self.linear = nn.Linear(hidden_layer_size, output_size)
        self.hidden_cell = (torch.zeros(1, 1, self.hidden_layer_size).to(self.device),
                            torch.zeros(1, 1, self.hidden_layer_size).to(self.device))
        self.m = nn.Softmax(dim=-1)

    def _minmax(self, x):
        x -= x.min(0)[0]
        x /= x.max(0)[0]        
        
    def _hardmax(self, x):
        x /= x.sum()        
        
    def _sqmax(self, x):
        x = x**2
        x /= x.sum()
        
    def forward(self, input_seq):
        #print(len(input_seq), input_seq.shape, input_seq.view(-1, 1, 1).shape)
        lstm_out, self.hidden_cell = self.lstm(input_seq.view(-1, 1, 1), self.hidden_cell)
        predictions = self.linear(lstm_out.view(len(input_seq), -1))
        if self.output_size > 1:
            self._sqmax(predictions[-1])
        return predictions[-1]
    
    def zero_hidden_cell(self):
        self.hidden_cell = (
            torch.zeros(1, 1, self.hidden_layer_size).to(device),
            torch.zeros(1, 1, self.hidden_layer_size).to(device)
        )     
    
class MorseEnvBatchedLSTML2(nn.Module):
    """
    Wwo layers - dataset compatible
    """
    def __init__(self, device, input_size=1, hidden_layer_size=12, output_size_l1=8, output_size=6):
        super().__init__()
        self.device = device # This is the only way to get things work properly with device
        self.hidden_layer_size = hidden_layer_size
        self.output_size = output_size
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_layer_size)
        self.linear1 = nn.Linear(hidden_layer_size, output_size_l1)
        self.linear2 = nn.Linear(output_size_l1, output_size)
        self.hidden_cell = (torch.zeros(1, 1, self.hidden_layer_size).to(self.device),
                            torch.zeros(1, 1, self.hidden_layer_size).to(self.device))
        self.m = nn.Softmax(dim=-1)

    def _minmax(self, x):
        x -= x.min(0)[0]
        x /= x.max(0)[0]        
        
    def _hardmax(self, x):
        x /= x.sum()        
        
    def _sqmax(self, x):
        x = x**2
        x /= x.sum()
        
    def forward(self, input_seq):
        #print(len(input_seq), input_seq.shape, input_seq.view(-1, 1, 1).shape)
        lstm_out, self.hidden_cell = self.lstm(input_seq.view(-1, 1, 1), self.hidden_cell)
        out1 = self.linear1(lstm_out.view(len(input_seq), -1))
        predictions = self.linear2(out1)
        #predictions = self.linear(lstm_out.view(len(input_seq), -1))
        if self.output_size > 1:
            self._sqmax(predictions[-1])
        return predictions[-1]
    
    def zero_hidden_cell(self):
        self.hidden_cell = (
            torch.zeros(1, 1, self.hidden_layer_size).to(device),
            torch.zeros(1, 1, self.hidden_layer_size).to(device)
        )     
    
class MorseEnvLSTM2(nn.Module):
    """
    LSTM stack
    """
    def __init__(self, device, input_size=1, hidden_layer_size=8, output_size=6, dropout=0.2):
        super().__init__()
        self.device = device # This is the only way to get things work properly with device
        self.hidden_layer_size = hidden_layer_size
        self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers=2, dropout=dropout)
        self.linear = nn.Linear(hidden_layer_size, output_size)
        self.hidden_cell = (torch.zeros(2, 1, self.hidden_layer_size).to(self.device),
                            torch.zeros(2, 1, self.hidden_layer_size).to(self.device))

    def forward(self, input_seq):
        lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq), 1, -1), self.hidden_cell)
        predictions = self.linear(lstm_out.view(len(input_seq), -1))
        return predictions[-1]
    
    def zero_hidden_cell(self):
        self.hidden_cell = (
            torch.zeros(2, 1, self.hidden_layer_size).to(device),
            torch.zeros(2, 1, self.hidden_layer_size).to(device)
        )        
        
class MorseEnvBatchedLSTM2(nn.Module):
    """
    LSTM stack - dataset compatible
    """
    def __init__(self, device, input_size=1, hidden_layer_size=8, output_size=6, dropout=0.2):
        super().__init__()
        self.device = device # This is the only way to get things work properly with device
        self.hidden_layer_size = hidden_layer_size
        self.output_size = output_size
        self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers=2, dropout=dropout)
        self.linear = nn.Linear(hidden_layer_size, output_size)
        self.hidden_cell = (torch.zeros(2, 1, self.hidden_layer_size).to(self.device),
                            torch.zeros(2, 1, self.hidden_layer_size).to(self.device))
        self.m = nn.Softmax(dim=-1)

    def forward(self, input_seq):
        #lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq), 1, -1), self.hidden_cell)
        lstm_out, self.hidden_cell = self.lstm(input_seq.view(-1, 1, 1), self.hidden_cell)
        predictions = self.linear(lstm_out.view(len(input_seq), -1))
        return predictions[-1] if self.output_size == 1 else self.m(predictions[-1])
    
    def zero_hidden_cell(self):
        self.hidden_cell = (
            torch.zeros(2, 1, self.hidden_layer_size).to(device),
            torch.zeros(2, 1, self.hidden_layer_size).to(device)
        )        
        
class MorseEnvNoHLSTM(nn.Module):
    """
    Do not keep hidden cell
    """
    def __init__(self, device, input_size=1, hidden_layer_size=8, output_size=6):
        super().__init__()
        self.device = device # This is the only way to get things work properly with device
        self.hidden_layer_size = hidden_layer_size
        self.lstm = nn.LSTM(input_size, hidden_layer_size)
        self.linear = nn.Linear(hidden_layer_size, output_size)

    def forward(self, input_seq):
        h0 = torch.zeros(1, 1, self.hidden_layer_size).to(self.device)
        c0 = torch.zeros(1, 1, self.hidden_layer_size).to(self.device)
        lstm_out, _ = self.lstm(input_seq.view(len(input_seq), 1, -1), (h0, c0))
        predictions = self.linear(lstm_out.view(len(input_seq), -1))
        return predictions[-1]
    
class MorseEnvBiLSTM(nn.Module):
    """
    Attempt Bidirectional LSTM: does not work
    """
    def __init__(self, device, input_size=1, hidden_size=12, num_layers=1, num_classes=6):
        super(MorseEnvBiLSTM, self).__init__()
        self.device = device # This is the only way to get things work properly with device
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size*2, num_classes)  # 2 for bidirection
    
    def forward(self, x):
        # Set initial states
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device) # 2 for bidirection 
        c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x.view(len(x), 1, -1), (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size*2)
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out[-1]    

Create the model instance and print the details

In [None]:
# Hidden layers:
# 4: good at reconstructing signal, some post-processing necessary for dit/dah, word silence is weak and undistinguishable from character silence 
# 5: fairly good at reconstructing signal, all signals distinguishable with some post-processing for dit/dah
# 6: more contrast on all signals but a spike appears in the character space in predicted envelope
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
morse_env_model = MorseEnvBatchedLSTM(device, hidden_layer_size=14, output_size=3).to(device) # This is the only way to get things work properly with device
morse_env_loss_function = nn.MSELoss()
morse_env_optimizer = torch.optim.Adam(morse_env_model.parameters(), lr=0.001)

print(morse_env_model)
print(morse_env_model.device)

In [None]:
# Input and hidden tensors are not at the same device, found input tensor at cuda:0 and hidden tensor at cpu
for m in morse_env_model.parameters():
    print(m.shape, m.device)
X_t = torch.rand(n_prev)
#X_t = torch.tensor([-0.9648, -0.9385, -0.8769, -0.8901, -0.9253, -0.8637, -0.8066, -0.8066, -0.8593, -0.9341, -1.0000, -0.9385])
X_t = X_t.cuda()
print(X_t)
morse_env_model(X_t)

In [None]:
import torchinfo
torchinfo.summary(morse_env_model)

## Train model

In [None]:
it = iter(train_loader)
X, y = next(it)
print(X.reshape(70,1).shape, X[0].shape, y[0].shape)
print(X[0], y[0])
X, y = next(it)
print(X[0], y[0])

In [None]:
%%time
from tqdm.notebook import tqdm

epochs = 8
morse_env_model.train()

for i in range(epochs):
    train_losses = []
    loop = tqdm(enumerate(train_loader), total=len(train_loader), leave=True)
    for j, train in loop:
        X_train = train[0][0]
        y_train = train[1][0]
        morse_env_optimizer.zero_grad()
        if morse_env_model.__class__.__name__ in ["MorseEnvLSTM", "MorseEnvLSTM2", "MorseEnvBatchedLSTM", "MorseEnvBatchedLSTML2", "MorseEnvBatchedLSTM2"]:
            morse_env_model.zero_hidden_cell() # this model needs to reset the hidden cell
        y_pred = morse_env_model(X_train)
        single_loss = morse_env_loss_function(y_pred, y_train)
        single_loss.backward()
        morse_env_optimizer.step()
        train_losses.append(single_loss.item())
        # update progress bar
        if j % 1000 == 0:
            loop.set_description(f"Epoch [{i+1}/{epochs}]")
            loop.set_postfix(loss=np.mean(train_losses))

print(f'final: {i+1:3} epochs loss: {np.mean(train_losses):6.4f}')

In [None]:
torch.save(morse_env_model.state_dict(), 'models/morse_env_model')

### Predict (test)

In [None]:
new_phrase = "VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB VVV DE F4EXB"
test_dataset = MorsekeyingDataset(device, -20, 132, 27, new_phrase)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False) # Batch size must be 1

In [None]:
signal = test_dataset.get_signal()
label_df = test_dataset.get_labels()

print(type(signal), signal.shape)
print(type(label_df), label_df.shape)

x0 = 0
x1 = 3000

plt.figure(figsize=(50,3))
plt.plot(signal[x0:x1]*0.8, label="sig")
plt.plot(label_df[x0:x1].env*0.9, label='env', color="orange")
plt.plot(label_df[x0:x1].chr*0.9 + 1.0, label='chr', color="green")
plt.plot(label_df[x0:x1].wrd*0.9 + 1.0, label='wrd', color="red")
plt.title("signal and labels")
plt.legend(loc=2)
plt.grid()

In [None]:
%%time
p_test = torch.empty(1,3).to(device)
morse_env_model.eval()

loop = tqdm(enumerate(test_loader), total=len(test_loader))
for j, test in loop:
    with torch.no_grad():
        X_test = test[0]
        pred_val = morse_env_model(X_test[0])
        p_test = torch.cat([p_test, pred_val.reshape(1,3)])
        
p_test = p_test[1:]
print(p_test.shape)

In [None]:
p_test_t = torch.transpose(p_test, 0, 1).cpu()
p_test_t.shape

In [None]:
y_env = test_dataset.get_envelope()[n_prev:]
plt.figure(figsize=(50,6))
plt.plot(p_test_t[0,:x1]*0.8, label="env")
plt.plot(y_env[x0:x1]*0.9, label='y', color="orange", alpha=0.7)
plt.plot(p_test_t[1,:x1]*0.9 + 1.0, label="chr", color="green")
plt.plot(p_test_t[2,:x1]*0.9 + 1.0, label="wrd", color="red")
plt.title("Predictions")
plt.legend(loc=2)
plt.grid()
plt.savefig('img/pred.png')

In [None]:
import scipy as sp
import scipy.special
from scipy.io import wavfile

Fcode = 600
Fs = 8000
noverlap = 128
decim = 128
eenv = p_test_t[0].numpy()
echr = p_test_t[1].numpy()
ewrd = p_test_t[2].numpy()
emod = eenv * (1.0 - echr - ewrd)
emod /= max(emod)
remod = np.array([[x]*noverlap for x in emod]).flatten()
wt = (Fcode / Fs)*2*np.pi
tone = np.sin(np.arange(len(remod))*wt)
wavfile.write('audio/re.wav', Fs, tone*remod)
ref_mod = np.array([[x]*decim for x in y_env]).flatten()
plt.figure(figsize=(100,5))
plt.plot((tone*remod)[:100000])
plt.plot(ref_mod[:100000]*1.2, label='mor')
plt.title("reconstructed signal")
plt.grid()

In [None]:
omod = signal[n_prev:]
print(emod.shape, omod.shape)
orig_mod = np.array([[x]*decim for x in omod]).flatten()
orig_mod /= max(orig_mod)
wavfile.write('audio/or.wav', Fs, tone*orig_mod)
plt.figure(figsize=(100,5))
plt.plot((tone*orig_mod)[:100000])
plt.plot(ref_mod[:100000]*1.2, label='mor')
plt.title("original filtered signal")
plt.grid()

In [None]:
import scipy as sp

sx = np.linspace(0, 1, 121)
sy = sp.special.expit(8*(1.1*sx-0.6))
plt.plot(sx, sy)
plt.grid()
plt.xlabel('x')
plt.title('expit(x)')
plt.show()