In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/indian-music-raga/malkauns04.wav
/kaggle/input/indian-music-raga/yaman02.wav
/kaggle/input/indian-music-raga/malkauns26.wav
/kaggle/input/indian-music-raga/asavari25.wav
/kaggle/input/indian-music-raga/bageshree04.wav
/kaggle/input/indian-music-raga/sarang04.wav
/kaggle/input/indian-music-raga/sarang16.wav
/kaggle/input/indian-music-raga/bageshree29.wav
/kaggle/input/indian-music-raga/bhairavi29.wav
/kaggle/input/indian-music-raga/bhoop03.wav
/kaggle/input/indian-music-raga/asavari26.wav
/kaggle/input/indian-music-raga/Bhairavi01.wav
/kaggle/input/indian-music-raga/yaman01.wav
/kaggle/input/indian-music-raga/bhairavi30.wav
/kaggle/input/indian-music-raga/yaman03.wav
/kaggle/input/indian-music-raga/bhoop02.wav
/kaggle/input/indian-music-raga/yaman27.wav
/kaggle/input/indian-music-raga/bhairavi27.wav
/kaggle/input/indian-music-raga/sarang05.wav
/kaggle/input/indian-music-raga/yaman24.wav
/kaggle/input/indian-music-raga/bhoopali24.wav
/kaggle/input/indian-music-raga/DKanada0

In [None]:
import torch
from torch.utils.data import Dataset
import torchaudio
import re
from sklearn import preprocessing
from sklearn.preprocessing import LabelEncoder
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data.dataloader import default_collate
from sklearn.metrics import accuracy_score ,classification_report


First of all, a dataframe consisting of the filenames, labels and the added encoded labels is created

In [None]:
def create(root_dir):
    file_list = [file for file in os.listdir(root_dir) if file.endswith('.wav')]

    metadata = {'filename': [], 'label': []}

    for file in file_list:
        label = re.search(r'([a-zA-Z]+)', file).group(0)
        metadata['filename'].append(file)
        metadata['label'].append(label)

    df = pd.DataFrame(metadata)
    df.to_csv('metadata.csv', index=False)

create('/kaggle/input/indian-music-raga')
csv_file_path = '/kaggle/working/metadata.csv'
df = pd.read_csv(csv_file_path)
label_encoder = LabelEncoder()
df['encoded_label'] = label_encoder.fit_transform(df['label'])
print(df)

           filename      label  encoded_label
0    malkauns04.wav   malkauns              8
1       yaman02.wav      yaman             10
2    malkauns26.wav   malkauns              8
3     asavari25.wav    asavari              2
4   bageshree04.wav  bageshree              3
..              ...        ...            ...
77  bageshree03.wav  bageshree              3
78    DKanada02.wav    DKanada              1
79    DKanada03.wav    DKanada              1
80      yaman26.wav      yaman             10
81    darbari29.wav    darbari              7

[82 rows x 3 columns]


In [None]:
csv_updated = '/kaggle/working/updated_metadata.csv'

df.to_csv("updated_metadata.csv", index=False)

Next, data preprocessing is done. Preprocessing includes cutting, resampling, mixing down, padding and moise reduction as necessary.

In [None]:
class Raga(Dataset):

    def __init__(self,
                 annotations_file,
                 audio_dir,
                 transformation,
                 target_sample_rate,
                 num_samples):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.transformation = transformation
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        filename = os.path.join(self.audio_dir, self.annotations.iloc[index, 0])

        label = self.getlabel(index)
        signal, sr = torchaudio.load(filename)

        if signal is None:
            return {
                'file': filename,
                'audio': None,
                'mel': None,
                'gt': None,
                'duration_seconds': None
            }

        duration = signal.shape[1] / sr
        signal = self.cut(signal)
        signal = self.padding(signal)
        signal = self.resample(signal, sr)
        signal = self.mix_down(signal)
        signal=self.noise_reduction(signal)
        mel_spec = self.transformation(signal)


        sample = {
            'file': filename,
            'audio': signal,
            'mel': mel_spec,
            'gt': label,
            'duration_seconds': duration
        }
        return sample


    def cut(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def padding(self, signal):
        length= signal.shape[1]
        if length < self.num_samples:
            num_missing_samples = self.num_samples - length
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def resample(self, signal, sr):
        if sr != self.target_sample_rate:
            resampled = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            signal = resampled(signal)
        return signal

    def mix_down(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    def noise_reduction(self, signal, noise_level=0.002):
        # Apply spectral subtraction for noise reduction

        stft = torch.stft(signal, n_fft=2048, hop_length=512, window=torch.hann_window(2048),return_complex=True)
        magnitude = torch.abs(stft)
        phase = torch.angle(stft)

        # Estimate noise magnitude
        noise_magnitude = torch.mean(magnitude[:, :, :100], dim=2, keepdim=True)

        # Apply spectral subtraction
        clean_magnitude = torch.max(magnitude - noise_level * noise_magnitude, torch.tensor(0.0))

        # Reconstruct the cleaned signal
        stft_cleaned = clean_magnitude * torch.exp(1j * phase)
        signal_cleaned = torch.istft(stft_cleaned, hop_length=512, window=torch.hann_window(2048), n_fft=2048)

        return signal_cleaned

    def getlabel(self, index):
        if index < len(self.annotations):
            label = self.annotations.iloc[index,2]
            return label
        else:
            return None




Two convolutional layers are used for feature extraction. They apply convolutional operations, ReLU activation functions, max-pooling for downsampling.
After convolutional layers, the output is flattened using nn.Flatten() to prepare it for the fully connected layers.
A fully connected layer is used for classification.

The forward method specifies the forward pass through the network. It involves passing the input through the convolutional layers, flattening the output, applying dropout, and passing through the fully connected layer to obtain the logits.







In [None]:

class CNN(nn.Module):

    def __init__(self):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(0.3)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(0.3)
        )

        self.flatten = nn.Flatten()
        self.linear = nn.Linear(10368, 11)
        self.dropout= nn.Dropout(0.3)

    def forward(self, input_data):

        x = self.conv1(input_data)
        x = self.conv2(x)
        x = self.flatten(x)
        x = self.dropout(x)
        logits = self.linear(x)
        return logits






Functions are defined for creating a data loader, training a single epoch, and training the model.
The main script checks for GPU availability, creates a dataset object, constructs the model, initializes the loss function and optimizer, and then trains the model using the specified number of epochs. In the main function, the audio data is converted to mel spectograms. Mel spectrograms provide a frequency representation of the audio data which is crucial for raga classification.  They capture the distribution of energy in different frequency bands over time, providing a concise representation of the audio content. Mel spectrograms, by capturing the non-linear characteristics of pitch perception, can enhance the model's ability to discriminate between different pitches and musical nuances.

In [None]:

BATCH_SIZE = 1
EPOCHS = 30
LEARNING_RATE = 0.001

ANNOTATIONS_FILE = '/kaggle/working/updated_metadata.csv'
AUDIO_DIR = '/kaggle/input/indian-music-raga'
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050

def create_dataloader(train_data, batch_size):
    train_dataloader = DataLoader(train_data, batch_size=batch_size, collate_fn=default_collate, shuffle=True)
    return train_dataloader


def train_single_epoch(model, data_loader, loss_fn, optimiser, device):
    for batch in data_loader:
        input, target = batch['mel'], batch['gt']
        input, target = input.to(device), target.to(device)
        target = target.long()
        prediction = model(input)
        loss = loss_fn(prediction, target)

        # backpropagate error and update weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

    print(f"loss: {loss.item()}")


def train(model, data_loader, loss_fn, optimiser, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, data_loader, loss_fn, optimiser, device)
        print("---------------------------")
    print("Finished training")


if __name__ == "__main__":
    #checking gpu availability
    if torch.cuda.is_available():
        device = "cuda"
    else:
        device = "cpu"
    print(f"Using {device}")

    # instantiating our dataset object and creating data loader
    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=400,
        hop_length=160,
        n_mels=32
    )

    dataset = Raga(ANNOTATIONS_FILE,
                             AUDIO_DIR,
                             mel_spectrogram,
                             SAMPLE_RATE,
                             NUM_SAMPLES
                             )

    train_dataloader = create_dataloader(dataset, BATCH_SIZE)

    # construct model and assign it to device
    cnn = CNN().to(device)
    print(cnn)

    # initialising loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(cnn.parameters(), lr=LEARNING_RATE)

    # model training and saving
    train(cnn, train_dataloader, loss_fn, optimizer, device, EPOCHS)
    torch.save(cnn.state_dict(), "raga_mel_spec.pth")



Using cpu
CNN(
  (conv1): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Dropout(p=0.3, inplace=False)
  )
  (conv2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Dropout(p=0.3, inplace=False)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear): Linear(in_features=10368, out_features=14, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)
Epoch 1
loss: 2.3638193607330322
---------------------------
Epoch 2
loss: 2.437779188156128
---------------------------
Epoch 3
loss: 1.6375066041946411
---------------------------
Epoch 4
loss: 18.65873146057129
---------------------------
Epoch 5
loss: 2.1578731536865234
---------------------------
Epoch 6
loss: 9.156569480895996
------------

In [None]:
BATCH_SIZE = 64
EPOCHS = 30
LEARNING_RATE = 0.001

ANNOTATIONS_FILE = '/kaggle/working/updated_metadata.csv'
AUDIO_DIR =  '/kaggle/input/indian-music-raga'
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050

def predict(model, input, target):
    model.eval()
    with torch.no_grad():
        predictions = model(input)

        predicted_index = predictions.argmax(dim=1).item()
        expected_index = target
    return expected_index ,predicted_index






Here we evaluate the model's predictions and prints its accuracy

In [None]:

cnn = CNN()
state_dict = torch.load("raga_mel_spec.pth")
cnn.load_state_dict(state_dict)

mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=SAMPLE_RATE,
    n_fft=400,
    hop_length=160,
    n_mels=32
)
predicted_labels=[]
expected_labels=[]

testing = Raga(ANNOTATIONS_FILE,
                        AUDIO_DIR,
                        mel_spectrogram,
                        SAMPLE_RATE,
                        NUM_SAMPLES)



random_indices_list = torch.randint(0, len(testing), (81,)).tolist()
for index in random_indices_list:
    data_point = testing[index]
    input, target = data_point['mel'], data_point['gt']
    input.unsqueeze_(0)
    expected_index ,predicted_index = predict(cnn, input, target)
    predicted_labels.append(predicted_index)
    expected_labels.append(expected_index)



accuracy = accuracy_score(expected_labels, predicted_labels)
print(f"Accuracy: {accuracy}")

print("Classification Report:")
print(classification_report(expected_labels, predicted_labels))

df = pd.DataFrame()

df['predicted'] = predicted_labels
df['expected'] = expected_labels

print(df)




Accuracy: 0.8888888888888888
Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2
           1       1.00      1.00      1.00         3
           2       0.65      1.00      0.79        11
           3       0.87      0.93      0.90        14
           4       1.00      1.00      1.00         6
           5       1.00      0.67      0.80         9
           6       1.00      1.00      1.00         1
           7       1.00      1.00      1.00         6
           8       1.00      0.67      0.80         9
           9       1.00      0.75      0.86         8
          10       0.92      1.00      0.96        12

    accuracy                           0.89        81
   macro avg       0.95      0.91      0.92        81
weighted avg       0.92      0.89      0.89        81

    predicted  expected
0           4         4
1           3         9
2           3         3
3           6         6
4           9  

We get an accuracy of 88.89% as depicted above