In [1]:
import torch
import torch.nn as nn
import torchaudio
from torchaudio.datasets import SPEECHCOMMANDS
import os
import random
import numpy as np
from torch.utils.data import DataLoader, random_split
import torch.nn.functional as F
from tqdm.notebook import tqdm

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')


Using device: cuda


In [2]:
# Define a subclass to access the speech commands dataset
class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__("./", download=True)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:
                return [os.path.join(self._path, line.strip()) for line in fileobj]

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w not in excludes]

# Load the dataset
train_set = SubsetSC("training")
val_set = SubsetSC("validation")
test_set = SubsetSC("testing")

# Define keywords including the new category
keywords = ['on', 'off', 'silence_unknown']

# Define a function to filter the dataset and assign labels
def filter_and_label(data):
    waveform, sample_rate, label, *_ = data
    if label in ['on', 'off']:
        return waveform, keywords.index(label)
    else:
        return waveform, len(keywords) - 1  # Assign the last index for "silence_unknown"

# Filter and label datasets
train_set = [filter_and_label(data) for data in train_set]
val_set = [filter_and_label(data) for data in val_set]
test_set = [filter_and_label(data) for data in test_set]

print(f'Train set size: {len(train_set)}, Validation set size: {len(val_set)}, Test set size: {len(test_set)}')


100%|██████████| 2.26G/2.26G [03:32<00:00, 11.4MB/s]


Train set size: 105829, Validation set size: 9981, Test set size: 11005


In [3]:
# Define the data loader
batch_size = 32

def collate_fn(batch):
    tensors, targets = [], []

    for waveform, label in batch:
        # Ensure waveform is exactly 16000 samples long
        if waveform.size(1) > 16000:
            waveform = waveform[:, :16000]
        elif waveform.size(1) < 16000:
            waveform = torch.nn.functional.pad(waveform, (0, 16000 - waveform.size(1)))

        tensors.append(waveform)
        targets.append(torch.tensor(label))

    tensors = torch.stack(tensors)
    targets = torch.stack(targets)
    return tensors, targets

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

# Define the model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 8, kernel_size=5, stride=1, padding=2)
        self.conv2 = nn.Conv1d(8, 16, kernel_size=5, stride=1, padding=2)
        self.pool = nn.MaxPool1d(4)
        
        # Compute the output size after the convolution and pooling layers
        dummy_input = torch.zeros(1, 1, 16000)  # Assuming input size of (batch, channels, length)
        dummy_output = self._forward_conv(dummy_input)
        conv_output_size = dummy_output.view(-1).shape[0]
        
        self.fc1 = nn.Linear(conv_output_size, 64)
        self.fc2 = nn.Linear(64, len(keywords))  # Update the output size

    def _forward_conv(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        return x

    def forward(self, x):
        x = self._forward_conv(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Instantiate the model
model = SimpleCNN().to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training function
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} '
                  f'({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')

# Validation function
def validate(model, device, val_loader, criterion):
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            val_loss += criterion(output, target).item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    val_loss /= len(val_loader.dataset)
    print(f'\nValidation set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} '
          f'({100. * correct / len(val_loader.dataset):.0f}%)\n')

# Training loop
num_epochs = 10
for epoch in range(1, num_epochs + 1):
    train(model, device, train_loader, optimizer, criterion, epoch)
    validate(model, device, val_loader, criterion)

# Save the trained model
torch.save(model.state_dict(), 'model.pth')


Validation set: Average loss: 0.0065, Accuracy: 9257/9981 (93%)


Validation set: Average loss: 0.0052, Accuracy: 9324/9981 (93%)


Validation set: Average loss: 0.0039, Accuracy: 9467/9981 (95%)


Validation set: Average loss: 0.0033, Accuracy: 9575/9981 (96%)


Validation set: Average loss: 0.0022, Accuracy: 9748/9981 (98%)


Validation set: Average loss: 0.0014, Accuracy: 9836/9981 (99%)


Validation set: Average loss: 0.0012, Accuracy: 9880/9981 (99%)


Validation set: Average loss: 0.0011, Accuracy: 9868/9981 (99%)


Validation set: Average loss: 0.0008, Accuracy: 9902/9981 (99%)


Validation set: Average loss: 0.0004, Accuracy: 9943/9981 (100%)



In [4]:
# Testing function
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            # Move data to CPU explicitly
            data, target = data.cpu(), target.cpu()
            output = model(data)
            test_loss += criterion(output, target).item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print(f'Test set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} '
          f'({100. * correct / len(test_loader.dataset):.0f}%)')


In [7]:
import tkinter as tk
from tkinter import font as tkFont
import pyaudio
import wave
import torchaudio
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

# Define keywords and their corresponding labels
keywords = ['on', 'off', 'silence_unknown']

# Define the model architecture
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 8, kernel_size=5, stride=1, padding=2)
        self.conv2 = nn.Conv1d(8, 16, kernel_size=5, stride=1, padding=2)
        self.pool = nn.MaxPool1d(4)
        
        # Compute the output size after the convolution and pooling layers
        dummy_input = torch.zeros(1, 1, 16000)  # Assuming input size of (batch, channels, length)
        dummy_output = self._forward_conv(dummy_input)
        conv_output_size = dummy_output.view(-1).shape[0]
        
        self.fc1 = nn.Linear(conv_output_size, 64)
        self.fc2 = nn.Linear(64, len(keywords))

    def _forward_conv(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        return x

    def forward(self, x):
        x = self._forward_conv(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Load the trained model onto the CPU
model = SimpleCNN()
model.load_state_dict(torch.load("model.pth", map_location='cpu'))
model.eval()

# Function to record audio
def record_audio(filename="input.wav", duration=3, fs=16000):
    CHUNK = 1024
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = fs
    RECORD_SECONDS = duration
    
    p = pyaudio.PyAudio()
    
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=RATE,
                    input=True,
                    frames_per_buffer=CHUNK)
    
    frames = []
    for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        data = stream.read(CHUNK)
        frames.append(data)
    
    stream.stop_stream()
    stream.close()
    p.terminate()
    
    wf = wave.open(filename, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
    wf.close()

# Function to preprocess audio
def preprocess_audio(filename):
    waveform, sample_rate = torchaudio.load(filename)
    waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform)
    waveform = waveform.mean(dim=0, keepdim=True)  # Convert stereo to mono
    waveform = F.pad(waveform, (0, 16000 - waveform.shape[1]))  # Pad or truncate to fixed length
    waveform = waveform.unsqueeze(0)  # Add batch dimension
    return waveform

# Function to predict using the model
def predict(waveform):
    with torch.no_grad():
        output = model(waveform)
        predicted_label = torch.argmax(output, dim=1)
    return keywords[predicted_label.item()]

# Function to handle button click event
def on_record_click():
    record_audio("input.wav")
    waveform = preprocess_audio("input.wav")
    prediction = predict(waveform)
    result_label.config(text=f"Predicted keyword: {prediction}")
    handle_prediction(prediction)

# Function to toggle the bulb
def toggle_bulb():
    global bulb_state
    if bulb_state == "on":
        bulb_state = "off"
        bulb_label.config(text="Bulb State: Off", bg="black")
    else:
        bulb_state = "on"
        bulb_label.config(text="Bulb State: On", bg="yellow")

# Function to handle keyword prediction
def handle_prediction(prediction):
    if prediction == "on":
        toggle_bulb()
    elif prediction == "off":
        toggle_bulb()

# Create the GUI
root = tk.Tk()
root.title("Keyword Detection")
root.geometry("1200x800")
root.configure(bg="#2e3f4f")

# Custom Fonts
title_font = tkFont.Font(family="Helvetica", size=36, weight="bold")
button_font = tkFont.Font(family="Helvetica", size=24, weight="bold")
label_font = tkFont.Font(family="Helvetica", size=24)

# Title Label
title_label = tk.Label(root, text="Voice-Controlled Bulb", font=title_font, bg="#2e3f4f", fg="white")
title_label.pack(pady=40)

# Record Button
record_button = tk.Button(root, text="Record", command=on_record_click, font=button_font, bg="#ff5722", fg="white", activebackground="#ff784e", activeforeground="white", width=15, height=2)
record_button.pack(pady=40)

# Result Label
result_label = tk.Label(root, text="Predicted keyword: ", font=label_font, bg="#2e3f4f", fg="white")
result_label.pack(pady=40)

# Global variable to track bulb state
bulb_state = "off"

# Bulb State Label
bulb_label = tk.Label(root, text="Bulb State: Off", bg="black", fg="white", font=("Arial", 24), width=30, height=10)
bulb_label.pack(pady=40)

root.mainloop()


KeyboardInterrupt: 