In [None]:
!pip install torch
!pip install numpy
!pip install matplotlib

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
from sklearn.model_selection import train_test_split

from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Load the .pt files
mat_path = '/content/drive/MyDrive/data_detect/mat.pt'
labels_path = '/content/drive/MyDrive/data_detect/labels.pt'
model_path = '/content/drive/MyDrive/data_detect/trained_model.pth'
mat_tensor = torch.load(mat_path)
labels_tensor = torch.load(labels_path)

# Verify the loaded tensors
print("Shape of mat_tensor:", mat_tensor.shape)
print("Shape of labels_tensor:", labels_tensor.shape)

In [None]:
input_size = 12  # Size of each input feature vector (number of channels)
hidden_size = 50  # Number of features in the hidden state
output_size = 1  # Binary output (speech detected or not)
num_layers = 1  # Number of LSTM layers
seq_length = 300  # Number of time bins to remember
batch_size = 32  # Batch size

In [None]:
# Define the LSTM model
class SpeechLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(SpeechLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, h0, c0):
        out, (hn, cn) = self.lstm(x, (h0, c0))
        out = self.fc(out)  # Use the output of the LSTM
        return out, hn, cn

In [None]:
# Prepare the data to include sequences of `seq_length`
def create_sequences(data, labels, seq_length):
    sequences = []
    seq_labels = []
    for i in range(0, len(data) - seq_length + 1, seq_length):
        sequences.append(data[i:i + seq_length])
        seq_labels.append(labels[i:i + seq_length])
    return torch.stack(sequences), torch.stack(seq_labels)

In [None]:



# Create the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SpeechLSTM(input_size, hidden_size, output_size, num_layers).to(device)



# Assuming mat_tensor and labels_tensor are already defined and preprocessed
# Create sequences
X_seq, y_seq = create_sequences(mat_tensor, labels_tensor, seq_length)

# Split into training and test sets
X_train_seq, X_test_seq, y_train_seq, y_test_seq = train_test_split(X_seq, y_seq, test_size=0.2)
X_train_seq = X_train_seq.to(device)
y_train_seq = y_train_seq.to(device)
X_test_seq = X_test_seq.to(device)
y_test_seq = y_test_seq.to(device)



# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.003)

# Training loop
num_epochs = 60
train_losses = []
train_accuracies = []

eval_losses = []
eval_accuracies = []

best_accuracy = 0.0

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0

    for b in range(0, len(X_train_seq), batch_size):
        batch_X = X_train_seq[b:b + batch_size]
        batch_y = y_train_seq[b:b + batch_size]
        batch_loss = 0
        batch_correct = 0
        batch_total = 0

        h0 = torch.zeros(num_layers, batch_X.size(0), hidden_size).to(device)
        c0 = torch.zeros(num_layers, batch_X.size(0), hidden_size).to(device)

        optimizer.zero_grad()

        for t in range(seq_length):
            X_step = batch_X[:, t, :].unsqueeze(1)
            y_step = batch_y[:, t].unsqueeze(1)
            outputs, h0, c0 = model(X_step, h0, c0)
            outputs = outputs.squeeze(1)  # Remove the sequence dimension
            loss = criterion(outputs, y_step)
            loss.backward(retain_graph=True)  # Retain graph for multiple time steps
            batch_loss += loss.item()

            # Calculate accuracy
            predicted = (torch.sigmoid(outputs) > 0.5).float()
            batch_correct += (predicted == y_step).sum().item()
            batch_total += y_step.size(0)

        optimizer.step()
        batch_loss /= seq_length
        epoch_loss += batch_loss
        correct += batch_correct
        total += batch_total

    epoch_loss /= (len(X_train_seq) / batch_size)
    train_losses.append(epoch_loss)
    accuracy = correct / total
    train_accuracies.append(accuracy)
    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {epoch_loss:.4f}, Average Accuracy: {accuracy:.4f}')

    # Evaluate on test set
    model.eval()
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        h0 = torch.zeros(num_layers, X_test_seq.size(0), hidden_size).to(device)
        c0 = torch.zeros(num_layers, X_test_seq.size(0), hidden_size).to(device)
        for t in range(seq_length):
            X_step = X_test_seq[:, t, :].unsqueeze(1)
            y_step = y_test_seq[:, t].unsqueeze(1)
            print("EVAL : Shape of X_step:", X_step.shape)
            print("EVAL : Shape of y_step:", y_step.shape)
            outputs, h0, c0 = model(X_step, h0, c0)
            outputs = outputs.squeeze(1)
            predicted = (torch.sigmoid(outputs) > 0.5).float()
            test_correct += (predicted == y_step).sum().item()
            test_total += y_step.size(0)

    test_accuracy = test_correct / test_total
    eval_accuracies.append(test_accuracy)
    print(f'Epoch [{epoch+1}/{num_epochs}], Test Accuracy: {test_accuracy:.4f}')

    # Save the model if the test accuracy is the best so far
    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        torch.save(model.state_dict(), 'best_speech_lstm_model.pth')

    print('Epoch [{}/{}], Loss: {:.4f}, Accuracy: {:.4f}'.format(epoch+1, num_epochs, epoch_loss, accuracy)

In [None]:
# Plot training loss and accuracy
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Loss')
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(eval_accuracies, label='Accuracy')
plt.title('Training Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.show()

In [None]:
model = SpeechLSTM(input_size, hidden_size, output_size, num_layers).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()

In [None]:
# Real-time speech detection class
class RealTimeSpeechDetector:
    def __init__(self, model, input_size):
        self.model = model
        self.input_size = input_size
        # self.h0 = torch.zeros(num_layers, 1, hidden_size).to(device)
        # self.c0 = torch.zeros(num_layers, 1, hidden_size).to(device)
        self.h0 = torch.zeros(num_layers, 1, hidden_size).to(device)
        self.c0 = torch.zeros(num_layers, 1, hidden_size).to(device)

    def predict(self, sample):
        self.model.eval()
        with torch.no_grad():
            output, self.h0, self.c0 = self.model(sample, self.h0, self.c0)
        return 1 if output[0, -1, 0].item() > 0.5 else 0

In [None]:
print(type(X_test_seq))
print(X_test_seq.shape)
sequence_test = X_test_seq[0].unsqueeze(0)
labels_test = y_test_seq[0]
print(sequence_test.shape)
print(labels_test.shape)


In [None]:
import torch
import matplotlib.pyplot as plt
from collections import deque

class RealTimeSpeechDetector:
    def __init__(self, model, input_size, hidden_size, num_layers, device):
        self.model = model
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.device = device
        self.h0 = torch.zeros(num_layers, 1, hidden_size).to(device)
        self.c0 = torch.zeros(num_layers, 1, hidden_size).to(device)

    def predict(self, sample):
        self.model.eval()
        sample = sample.unsqueeze(0).unsqueeze(0).to(self.device)  # Shape: (1, 1, input_size)
        with torch.no_grad():
            output, self.h0, self.c0 = self.model(sample, self.h0, self.c0)
        return 1 if torch.sigmoid(output).item() > 0.5 else 0

# Instantiate the real-time detector with the specified sequence length
detector = RealTimeSpeechDetector(model, input_size, hidden_size, num_layers, device)

# Add samples in real-time and get predictions for visualization
num_time_bins = 300  # Number of time bins to plot
predictions = []
targets = []

# Process the test set
for i in range(num_time_bins):
    sample = X_test_seq[i % len(X_test_seq), -1, :]  # Get the last time step of the sequence
    prediction = detector.predict(sample)  # Use the last sample in the sequence
    predictions.append(prediction)
    targets.append(y_test_seq[i % len(y_test_seq), -1].item())  # Ensure correct target assignment

    # Plot predictions vs targets at each step
    if (i + 1) % 10 == 0:  # Save plots every 10 time bins
        plt.figure()
        plt.plot(range(len(predictions)), predictions, label='Predictions', marker='o')
        plt.plot(range(len(targets)), targets[:len(predictions)], label='Targets', marker='x')
        plt.title(f'Predictions vs Targets at Time {i}')
        plt.xlabel('Sample Index')
        plt.ylabel('Speech Detection')
        plt.legend()
        plt.grid(True)
        plt.savefig(f'plot_{i}.png')
        plt.close()
