In [16]:
import os
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split


In [18]:
def create_spectrogram(audio_path, save_path):
    y, sr = librosa.load(audio_path, sr=None)
    plt.figure(figsize=(4, 4))
    S = librosa.feature.melspectrogram(y=y, sr=sr)
    S_dB = librosa.power_to_db(S, ref=np.max)
    librosa.display.specshow(S_dB, sr=sr, x_axis='time', y_axis='mel')
    plt.axis('off')
    plt.savefig(save_path, bbox_inches='tight', pad_inches=0)
    plt.close()

# Directories
data_dir = "./dataset"  # Your root folder
scream_dir = os.path.join(data_dir, "scream")
non_scream_dir = os.path.join(data_dir, "non_scream")
spectrogram_dir = "./spectrograms"

# Create spectrogram folders
os.makedirs(os.path.join(spectrogram_dir, "scream"), exist_ok=True)
os.makedirs(os.path.join(spectrogram_dir, "non_scream"), exist_ok=True)

# Generate and save spectrograms
for folder, label in [(scream_dir, "scream"), (non_scream_dir, "non_scream")]:
    for file_name in os.listdir(folder):
        file_path = os.path.join(folder, file_name)
        save_path = os.path.join(spectrogram_dir, label, file_name.replace(".wav", ".png"))
        create_spectrogram(file_path, save_path)


In [19]:
from PIL import Image
from torchvision import transforms

class SpectrogramDataset(Dataset):
    def __init__(self, spectrogram_dir, transform=None):
        self.spectrogram_dir = spectrogram_dir
        self.transform = transform
        self.data = []
        self.labels = []

        for label, folder in enumerate(["non_scream", "scream"]):
            folder_path = os.path.join(spectrogram_dir, folder)
            for file_name in os.listdir(folder_path):
                self.data.append(os.path.join(folder_path, file_name))
                self.labels.append(label)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data[idx]
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        label = self.labels[idx]
        return img, label

# Transformations
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

# Create Dataset
dataset = SpectrogramDataset(spectrogram_dir, transform=transform)

# Train-test split
train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42)

# DataLoader
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)


In [22]:
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)  # 3 input channels (RGB), 32 filters
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)  # 64 filters
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)  # Pooling layer
        self.relu = nn.ReLU()

        # Placeholder for the flattened size
        self.fc1 = None
        self.fc2 = nn.Linear(128, 2)  # 128 hidden units, 2 output classes

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(self.relu(self.conv2(x)))
        # Flatten
        x = x.view(x.size(0), -1)
        
        # Dynamically initialize the fc1 layer based on input size
        if self.fc1 is None:
            self.fc1 = nn.Linear(x.size(1), 128).to(x.device)
        
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x


# Initialize model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNModel().to(device)


In [36]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 25
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Accuracy
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct/total:.2f}%")

MODEL_PATH = "scream_detection_model.pth"
torch.save(model.state_dict(), MODEL_PATH)
print(f"Model saved to {MODEL_PATH}")


Epoch [1/25], Loss: 0.9222, Accuracy: 92.85%
Epoch [2/25], Loss: 0.0575, Accuracy: 98.04%
Epoch [3/25], Loss: 0.0346, Accuracy: 98.84%
Epoch [4/25], Loss: 0.0229, Accuracy: 99.32%
Epoch [5/25], Loss: 0.0088, Accuracy: 99.56%
Epoch [6/25], Loss: 0.0087, Accuracy: 99.72%
Epoch [7/25], Loss: 0.0161, Accuracy: 99.44%
Epoch [8/25], Loss: 0.0078, Accuracy: 99.68%
Epoch [9/25], Loss: 0.0057, Accuracy: 99.80%
Epoch [10/25], Loss: 0.0043, Accuracy: 99.84%
Epoch [11/25], Loss: 0.0055, Accuracy: 99.84%
Epoch [12/25], Loss: 0.0578, Accuracy: 98.20%
Epoch [13/25], Loss: 0.0468, Accuracy: 98.28%
Epoch [14/25], Loss: 0.0172, Accuracy: 99.28%
Epoch [15/25], Loss: 0.0201, Accuracy: 99.44%
Epoch [16/25], Loss: 0.0037, Accuracy: 99.96%
Epoch [17/25], Loss: 0.0007, Accuracy: 100.00%
Epoch [18/25], Loss: 0.0004, Accuracy: 100.00%
Epoch [19/25], Loss: 0.0003, Accuracy: 100.00%
Epoch [20/25], Loss: 0.0002, Accuracy: 100.00%
Epoch [21/25], Loss: 0.0002, Accuracy: 100.00%
Epoch [22/25], Loss: 0.0001, Accuracy:

In [None]:
import pyaudio
import wave

def predict_live_audio(model, device):
    CHUNK = 1024
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 44100
    RECORD_SECONDS = 10
    WAVE_OUTPUT_FILENAME = "live_audio.wav"

    # Record audio
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
    print("Recording...")
    frames = []
    for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        data = stream.read(CHUNK)
        frames.append(data)
    print("Recording finished.")
    stream.stop_stream()
    stream.close()
    p.terminate()

    # Save audio file
    wf = wave.open(WAVE_OUTPUT_FILENAME, "wb")
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b"".join(frames))
    wf.close()

    # Generate spectrogram
    live_spec_path = "live_spectrogram.png"
    create_spectrogram(WAVE_OUTPUT_FILENAME, live_spec_path)

    # Predict
    img = Image.open(live_spec_path).convert("RGB")
    img = transform(img).unsqueeze(0).to(device)
    model.eval()
    with torch.no_grad():
        outputs = model(img)
        _, predicted = torch.max(outputs, 1)
        return "Scream" if predicted.item() == 1 else "Not Scream"

# Test
print(predict_live_audio(model, device))



Recording...
Recording finished.
Scream
