In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
pip install python_speech_features

In [None]:
import os
import numpy as np
import scipy.io.wavfile as wav
from python_speech_features import mfcc
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import joblib

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the Attention Mechanism
class Attention(nn.Module):
    def __init__(self, input_dim):
        super(Attention, self).__init__()
        self.W = nn.Linear(input_dim, input_dim)
        self.V = nn.Linear(input_dim, 1, bias=False)

    def forward(self, x):
        scores = self.V(torch.tanh(self.W(x)))  # Compute attention scores
        weights = torch.nn.functional.softmax(scores, dim=1)  # Apply softmax to get attention weights
        context_vector = torch.sum(weights * x, dim=1)  # Compute context vector
        return context_vector

# Define the CRNN model with Attention Mechanism
class CRNNWithAttention(nn.Module):
    def __init__(self, input_size, num_filters, rnn_hidden_size, output_size):
        super(CRNNWithAttention, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=num_filters, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv1d(in_channels=num_filters, out_channels=num_filters*2, kernel_size=3, stride=2, padding=1)
        self.rnn = nn.LSTM(input_size=num_filters*2, hidden_size=rnn_hidden_size, batch_first=True, bidirectional=True)
        self.attention = Attention(input_dim=rnn_hidden_size*2)  # Add attention layer
        self.fc = nn.Linear(rnn_hidden_size*2, output_size)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Change shape to (batch, seq_len, features)
        x = self.conv1(x)
        x = nn.ReLU()(x)
        x = self.conv2(x)
        x = nn.ReLU()(x)

        x = x.permute(0, 2, 1)  # Change shape to (batch, seq_len, input_size)
        x, _ = self.rnn(x)
        
        # Apply attention mechanism
        x = self.attention(x)
        
        x = self.dropout(x)
        x = self.fc(x)
        return x

# Load the audio dataset
def loadDataset(directory, max_folders):
    dataset = []
    i = 0
    for folder in os.listdir(directory):
        i += 1
        if i > max_folders:
            break
        for file in os.listdir(os.path.join(directory, folder)):
            file_path = os.path.join(directory, folder, file)
            if file == ".DS_Store" or os.path.isdir(file_path):
                continue
            try:
                (rate, sig) = wav.read(file_path)
                mfcc_feat = mfcc(sig, rate, winlen=0.020, appendEnergy=False)
                feature = (mfcc_feat, folder)
                dataset.append(feature)
            except ValueError as e:
                print(f"Skipping file '{file_path}': {str(e)}")
    return dataset

# Load the dataset
directory = "/content/drive/MyDrive/Deep Learning/genres_original"
dataset = loadDataset(directory, max_folders=100)

# Find the maximum length of the features
max_length = max(len(data[0]) for data in dataset)

# Pad or truncate sequences to the maximum length
def pad_or_truncate(feature, max_length):
    length = feature.shape[0]
    if length > max_length:
        return feature[:max_length]
    elif length < max_length:
        padded_feature = np.zeros((max_length, feature.shape[1]))
        padded_feature[:length] = feature
        return padded_feature
    return feature

# Process features
X = np.array([pad_or_truncate(data[0], max_length) for data in dataset])
y = np.array([data[1] for data in dataset])

# Normalize the input features
scaler = StandardScaler()
X_scaled = np.array([scaler.fit_transform(x) for x in X])

# Convert labels to integers using LabelEncoder
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

# Convert the data to PyTorch tensors
X_tensor = torch.tensor(X_scaled, dtype=torch.float32).to(device)
y_tensor = torch.tensor(y_encoded, dtype=torch.long).to(device)

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.33, random_state=1)

# Create DataLoader for batch processing, utilizing more workers for faster data loading
batch_size = 32
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Define the neural network hyperparameters with simplified architecture
input_size = X_scaled[0].shape[1]  # Number of features (13 for MFCC)
num_filters = 32  # Reduced filters for faster training
rnn_hidden_size = 64  # Reduced hidden size
output_size = len(np.unique(y_encoded))

# Create the CRNN model with attention
model = CRNNWithAttention(input_size, num_filters, rnn_hidden_size, output_size).to(device)

# Define the loss function and optimizer with L2 regularization
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)  # Increased learning rate for faster convergence

# Early stopping and learning rate scheduler
early_stopping_patience = 10
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3, factor=0.5, verbose=True)

# Gradient clipping to avoid exploding gradients
max_norm = 5.0

# Train the CRNN model with early stopping
num_epochs = 50  # Reduced epochs
best_loss = float('inf')
patience_counter = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
        optimizer.step()

        running_loss += loss.item()

    # Print average loss for the epoch
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

    # Scheduler step
    scheduler.step(avg_loss)

    # Early stopping condition
    if avg_loss < best_loss:
        best_loss = avg_loss
        patience_counter = 0
        # Save model checkpoint
        torch.save(model.state_dict(), "/content/drive/MyDrive/Deep Learning/crnn_model_best.pth")
    else:
        patience_counter += 1

    if patience_counter >= early_stopping_patience:
        print("Early stopping triggered.")
        break

# Evaluate the CRNN model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print("Accuracy:", accuracy)

# Save the label_encoder
label_encoder_save_path = "/content/drive/MyDrive/Deep Learning/label_encoder.pkl"
joblib.dump(label_encoder, label_encoder_save_path)
print(f"Label encoder saved to {label_encoder_save_path}")


In [None]:
import torch
import numpy as np
import scipy.io.wavfile as wav
from python_speech_features import mfcc
from sklearn.preprocessing import StandardScaler, LabelEncoder
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import joblib

# Save the label_encoder
label_encoder_save_path = "/content/drive/MyDrive/Deep Learning/label_encoder.pkl"
joblib.dump(label_encoder, label_encoder_save_path)
print(f"Label encoder saved to {label_encoder_save_path}")

# Define the Attention Mechanism
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.W = nn.Linear(hidden_size, hidden_size)
        self.v = nn.Linear(hidden_size, 1, bias=False)

    def forward(self, x):
        # x shape: (batch_size, seq_len, hidden_size)
        scores = self.v(torch.tanh(self.W(x)))  # (batch_size, seq_len, 1)
        weights = torch.nn.functional.softmax(scores, dim=1)  # (batch_size, seq_len, 1)
        context = torch.sum(weights * x, dim=1)  # (batch_size, hidden_size)
        return context

# Define the CRNN model with Attention Mechanism
class CRNN(nn.Module):
    def __init__(self, input_size, num_filters, rnn_hidden_size, output_size):
        super(CRNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=num_filters, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=num_filters, out_channels=num_filters*2, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.rnn = nn.LSTM(input_size=num_filters*2, hidden_size=rnn_hidden_size, batch_first=True, bidirectional=True)
        self.attention = Attention(hidden_size=rnn_hidden_size*2)
        self.fc = nn.Linear(rnn_hidden_size*2, output_size)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Change shape to (batch, seq_len, features)
        x = self.conv1(x)
        x = nn.ReLU()(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = nn.ReLU()(x)
        x = self.pool(x)

        x = x.permute(0, 2, 1)  # Change shape to (batch, seq_len, input_size)
        x, _ = self.rnn(x)
        x = self.attention(x)  # Apply attention mechanism

        x = self.dropout(x)
        x = self.fc(x)
        return x

# Load the CRNN model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_size = 13  # Number of MFCC features
num_filters = 32
rnn_hidden_size = 64
output_size = len(label_encoder.classes_)  # Number of classes
model = CRNN(input_size, num_filters, rnn_hidden_size, output_size).to(device)

model_load_path = "/content/drive/MyDrive/Deep Learning/crnn_model.pth"
model.load_state_dict(torch.load(model_load_path))
model.eval()

# Define preprocessing and transformation functions
def preprocess_audio(file_path, max_length=1000):
    rate, sig = wav.read(file_path)
    mfcc_feat = mfcc(sig, rate, winlen=0.020, appendEnergy=False)
    padded_mfcc = pad_or_truncate(mfcc_feat, max_length)
    return padded_mfcc

def pad_or_truncate(feature, max_length):
    length = feature.shape[0]
    if length > max_length:
        return feature[:max_length]
    elif length < max_length:
        padded_feature = np.zeros((max_length, feature.shape[1]))
        padded_feature[:length] = feature
        return padded_feature
    return feature

# Load and preprocess the audio file
audio_path = "/content/drive/MyDrive/Deep Learning/genres_original/pop/pop.00047.wav"
mfcc_features = preprocess_audio(audio_path)
mfcc_features = StandardScaler().fit_transform(mfcc_features)  # Normalize
mfcc_features_tensor = torch.tensor(mfcc_features, dtype=torch.float32).unsqueeze(0).to(device)

# Make a prediction
with torch.no_grad():
    output = model(mfcc_features_tensor)
    _, predicted_class = torch.max(output, 1)

# Map prediction to label
predicted_label = label_encoder.inverse_transform([predicted_class.item()])[0]
print(f"Predicted label: {predicted_label}")
