In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
from torch.autograd import Variable
from sklearn.preprocessing import MinMaxScaler 

import torchaudio
import sys
sys.path.append("..")
from dataset_generation.randomDataset import RandomDataset
from torch.utils.data import DataLoader
from utils.codec import CodecTransform

In [None]:
def sliding_windows(data, x_len, y_len):
    x = []
    y = []

    for i in range(len(data)-x_len-y_len- 1):
        _x = data[i:(i+x_len)]
        _y = data[i+x_len:i+x_len+y_len]
        x.append(_x)
        #print(_x.shape) 
        #print(_y.shape)
        y.append(_y)

    x = torch.tensor(data=x)
    y = torch.tensor(data=y)
    return x.unsqueeze(2), y.unsqueeze(2)

test = np.zeros(1000)
x, y = sliding_windows(test, 10, 5)
print(x.shape, y.shape)

In [None]:
class LSTM(nn.Module):
    def __init__(self, num_classes, input_size, hidden_size, num_layers):
        super(LSTM, self).__init__()
        self.num_classes = num_classes
        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True)
        
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Propagate input through LSTM
        ula, (h_out, _) = self.lstm(x)
        
        h_out = h_out.view(-1, self.hidden_size)
        
        out = self.fc(h_out)
        
        return out

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

src = '../dataset_generation/speeches'
nfft=512
nmels=60
#mfcc = torchaudio.transforms.MFCC(sample_rate=16000, n_mfcc=15, melkwargs={'n_fft':nfft, 'n_mels':nmels})
codec = CodecTransform(sample_rate=16000, bandwidth=6.0)
dataSet = RandomDataset(src, 16000, 1000, codec, 50, 49)
train_data, val_data = torch.utils.data.random_split(dataSet, (800, 200))
train_dl = DataLoader(train_data, batch_size=64)
val_dl = DataLoader(val_data, batch_size=64)

In [None]:
# Create data set 
input_waveform = dataSet[0][0]
# assume they're returned separately 
first_row = input_waveform[0, 0, :].numpy()
trainX, trainY = sliding_windows(first_row, 8, 4) 
# (N, 10)
trainX = Variable(torch.Tensor(trainX))
trainY = Variable(torch.Tensor(trainY))
# (N, 5)

print(trainX.shape)
print(trainY.shape)

num_epochs = 10000
learning_rate = 0.01

input_size = 1
hidden_size = 20
num_layers = 1

num_classes = 4 # Predict next 5 outputs 

lstm = LSTM(num_classes, input_size, hidden_size, num_layers)

criterion = torch.nn.MSELoss()    # mean-squared error for regression
optimizer = torch.optim.Adam(lstm.parameters(), lr=learning_rate)
#optimizer = torch.optim.SGD(lstm.parameters(), lr=learning_rate)


In [None]:
from torch.utils.data import Dataset
class EncodecDataset(Dataset):

    def __init__(self, data_X, data_Y):
        super().__init__()
        self.data_X = data_X
        self.data_Y = data_Y
        self.dataset_size = len(data_X)

    def __getitem__(self, index):
        # which file to take a random clip from
        if torch.is_tensor(index): 
            index = index.tolist()
            
        x_values = data_X[index]
        y_values = data_Y[index]
        return x_values, y_values
        

    def __len__(self):
        return self.dataset_size
    

In [None]:
# load da data 

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

x_datapath = "../dataset_generation/encodec_data/trainX.npy"
y_datapath = "../dataset_generation/encodec_data/trainY.npy"
trainX = np.load(x_datapath, allow_pickle = True)[:, 0, :]
input_dim = 10
hidden_dim = 5
output_dim = 1
print(np.expand_dims(trainX, axis = 1).shape, trainY.shape)
random_stuffs = np.random.choice(np.arange(trainX.shape[0]), 1000)
random_stuffs = trainX[random_stuffs]
data_X = []
data_Y = []

for now in random_stuffs: 
    # Run sliding window 
    x, y = sliding_windows(now, input_dim, output_dim)
    if len(data_X) == 0: 
        data_X = x
    else: 
        data_X = torch.cat((data_X, x))
    if len(data_Y) == 0: 
        data_Y = y
    else: 
        data_Y = torch.cat((data_Y, y))
    #print(data_X.shape, data_Y.shape)
    
data_X = data_X.squeeze()
data_Y = data_Y.squeeze()

# Try dataloader 
data_actual = EncodecDataset(data_X, data_Y)
data_generator = DataLoader(data_actual, batch_size = 363, shuffle = True)




In [None]:
from utils.gpt4_architectures import AutoregressiveTransformer
transformer = AutoregressiveTransformer(input_dim, hidden_dim, output_dim)
transformer.to(device)

num_epochs = 100
criterion = torch.nn.MSELoss()    # mean-squared error for regression
learning_rate = 0.001
optimizer = torch.optim.Adam(transformer.parameters(), lr=learning_rate)
print("hi")
n_batches = int(len(data_X)/363)
i = 0
for epoch in range(num_epochs): 
    for local_batch, local_labels in data_generator:
        
        # Transfer to GPU
        local_batch, local_labels = local_batch.unsqueeze(axis = 1).to(device), local_labels.to(device)
        optimizer.zero_grad()
        outputs = transformer(local_batch)
        
        loss = criterion(outputs, local_labels.unsqueeze(axis = 1))
        loss.backward()
        optimizer.step()
        if i % 100 == 0: 
            print(f"batch {i}: {loss.item()}")
        i += 1
    '''
    # JUst do  batches manually for now
    for i in range(int(n_batches)): 
        
        current_X, current_Y = data_X[i*n_batches:(i+1)*n_batches], data_Y[i*n_batches:(i+1)*n_batches]
        outputs = transformer(current_X.unsqueeze(axis =1))
        optimizer.zero_grad()
        loss = criterion(outputs, data_Y)
        loss.backward()
        optimizer.step()'''
        
    
    if epoch % 100 == 0:
      print(f"EPOCH {epoch}: {loss.item()}")


In [None]:

# Train the model
for epoch in range(num_epochs):
    #WE ARE HERE
    outputs = lstm(trainX)
    optimizer.zero_grad()
    
    # obtain the loss function
    #print(outputs.shape)
    trainY=trainY.squeeze()
    #print(trainY.shape)
    loss = criterion(outputs, trainY)
    
    loss.backward()
    
    optimizer.step()
    if epoch % 100 == 0:
      print("Epoch: %d, loss: %1.5f" % (epoch, loss.item()))

In [None]:
print(data_X[1].shape)
startingInput = data_X[300]
total_pred_points = 1500
current = 0

for local_batch, local_labels in data_generator: 
    local_batch = local_batch.unsqueeze(axis = 1)
    print(local_batch, local_batch.shape)
    break
transformer.eval()
currentInput = torch.Tensor()

currentInput = torch.cat((currentInput.to(device), startingInput.to(device)))
currentInput = currentInput.unsqueeze(axis = 0).unsqueeze(axis = 1)
print(currentInput.shape)
predictedWaveform = currentInput

while current < total_pred_points: 
    nextSamples = transformer(currentInput)
    current += len(nextSamples)
    #print(nextSamples.shape)
    predictedWaveform = torch.cat((predictedWaveform, nextSamples.unsqueeze(axis = 0)), dim = 2)
    #print(predictedWaveform.shape)
    currentInput = predictedWaveform[:, :, :10]
    
print("done")
print(predictedWaveform.shape)

In [None]:
# PREDICTION STUFF 
# start with sample of 10 from some waveform 


startingInput = data_X[30:38]
#startingInput = waveform[30:40] #Random cliup of  010 saneomes in waveform 
totalPredictionPoints = 2000
current = 0 
currentInput = torch.tensor(data=startingInput).unsqueeze(0).unsqueeze(2)
predictedWaveform = currentInput.squeeze()
print()
lstm.eval()
while current < totalPredictionPoints: 
  nextSamples = lstm.forward(currentInput).squeeze()
  predictedWaveform=torch.cat((predictedWaveform, nextSamples), dim=0)
  current += num_classes # total length of predicted waveform 
  currentInput = predictedWaveform[-10:].unsqueeze(0).unsqueeze(2)

In [None]:
print(predictedWaveform.shape)
from encodec import EncodecModel
model = EncodecModel.encodec_model_24khz()
model.set_target_bandwidth(6.0)
test = predictedWaveform.cpu().squeeze(0).long().squeeze(0).unsqueeze(1).clip(min = 0, max = 1024)
print(test)


In [None]:
p = predictedWaveform.unsqueeze(0).long().unsqueeze(1).clip(min=0, max=1024)
print(p.shape)
with torch.no_grad():
    reconstruction = model.decode([(predictedWaveform.cpu().long(), None)])[0]

import IPython
IPython.display.Audio(reconstruction[0], rate = model.sample_rate)


In [None]:
print(predictedWaveform[:, :, 300:700])