## Import modules

In [1]:
import torch
import librosa
import pandas as pd
import numpy as np

## Read data

In [2]:
def read_files(csv_path, audio_folder_path):
    track_df = pd.read_csv(csv_path)

    audio_dict = {
        "track": [],
        "y": [],
        "sr": [],
    }

    for track_name in track_df["track"]:
        print(f"loading {track_name}")
        y, sr = librosa.load(f"{audio_folder_path}/{track_name}")
        audio_dict["track"].append(track_name)
        audio_dict["y"].append(y)
        audio_dict["sr"].append(sr)

    return track_df, pd.DataFrame(audio_dict)


In [3]:
data_folder_path = "../data"
track_df, audio_df = read_files(f"{data_folder_path}/train.csv", f"{data_folder_path}/audios/clips")


loading normalize_5s_intro_thc1MtNagC8.wav
loading normalize_5s_intro_Wo2qUD1g7xM.wav
loading normalize_5s_intro_3ObVN3QQiZ8.wav
loading normalize_5s_intro_S-zQJFRX5Fg.wav
loading normalize_5s_intro_SyZOAgXiPMw.wav
loading normalize_5s_intro_GQT8ejgV2_A.wav
loading normalize_5s_intro_PQAIxeSIQU4.wav
loading normalize_5s_intro_E-8pyVBvCPQ.wav
loading normalize_5s_intro_Qr8eZSVaw10.wav
loading normalize_5s_intro_p7j-tz1Cn4o.wav
loading normalize_5s_intro_nISI4qF55F4.wav
loading normalize_5s_intro_RoeRU5zxkak.wav
loading normalize_5s_intro_EygNk739nnY.wav
loading normalize_5s_intro_w1G3rqVil1s.wav
loading normalize_5s_intro_KKc_RMln5UY.wav
loading normalize_5s_intro_Ng2JdroNfC0.wav
loading normalize_5s_intro_xc0sWhVhmkw.wav
loading normalize_5s_intro_VVRszjvg3_U.wav
loading normalize_5s_intro_C7u6rtswjCU.wav
loading normalize_5s_intro_HiPkwl5p1GY.wav
loading normalize_5s_intro_mYa_9d2Daas.wav
loading normalize_5s_intro_6MSYrN4YfKY.wav
loading normalize_5s_intro_O2q_9lBDM7I.wav
loading nor

## Preprocess data

In [4]:
def preprocess_data(track_df, audio_df):
    x = np.array([[[value]] for value in audio_df["y"].values])
    return x


In [5]:
# print(audio_df)
# for index, (track, y, sr) in audio_df.iterrows():
#     print(y)

x = preprocess_data(track_df, audio_df)
y = np.array([track_df['score'].values]).T

# print(x)
print("x shape =", x.shape)
print("y shape =", y.shape)


x shape = (220, 1, 1, 110250)
y shape = (220, 1)


## Prepare dataset

In [6]:
from torch.utils.data import Dataset, DataLoader
class MyDataset(Dataset):
    def __init__(self, x, y):
        self.x = torch.tensor(x, dtype=torch.float32) 
        self.y = torch.tensor(y, dtype=torch.float32) 
        self.len = x.shape[0]

    def __getitem__(self, index):
        return self.x[index], self.y[index]

    def __len__(self):
        return self.len

In [7]:
dataset = MyDataset(x=x, y=y)

train_test_ratio = 0.8
train_size = int(len(dataset) * train_test_ratio)
test_size = len(dataset) - train_size
train_set, test_set = torch.utils.data.random_split(dataset, [train_size, test_size])


## Build Model

In [28]:
import torch.nn as nn
import torch.nn.functional as F


class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, (1, 4), 1)
        self.conv2 = nn.Conv2d(16, 32, (1, 4), 1)
        self.pool = nn.MaxPool2d((1, 2), 2)
        self.dropout1 = nn.Dropout(0.25)
        self.fc1 = nn.Linear(1763904, 128)
        self.dropout2 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool(x)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output =self.sigmoid(x)
        return output


In [29]:
def train(model, device, train_loader,criterion, optimizer, epoch, log_interval=10):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        # print(f"output = {output} target = {target} loss = {loss}")
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, device,criterion, test_loader):
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss  # sum up batch loss

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}\n'.format(test_loss))


## Training

In [30]:
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

learning_rate = 0.001
gamma = 0.1
epochs = 10

train_loader = torch.utils.data.DataLoader(train_set)
test_loader = torch.utils.data.DataLoader(test_set)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Model().to(device)
print(model)
# criterion = torch.nn.MSELoss()
criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adadelta(model.parameters(), lr=learning_rate)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

Model(
  (conv1): Conv2d(1, 16, kernel_size=(1, 4), stride=(1, 1))
  (conv2): Conv2d(16, 32, kernel_size=(1, 4), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=(1, 2), stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout1): Dropout(p=0.25, inplace=False)
  (fc1): Linear(in_features=1763904, out_features=128, bias=True)
  (dropout2): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=128, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)


In [32]:
# Training
for epoch in range(1, epochs + 1):
    train(model, device, train_loader, criterion, optimizer, epoch, log_interval=100)
    test(model, device, criterion, test_loader)
    scheduler.step()



Test set: Average loss: 0.0324


Test set: Average loss: 0.0301


Test set: Average loss: 0.0300


Test set: Average loss: 0.0300


Test set: Average loss: 0.0300


Test set: Average loss: 0.0300


Test set: Average loss: 0.0300


Test set: Average loss: 0.0300


Test set: Average loss: 0.0300


Test set: Average loss: 0.0300



## Save Model

In [33]:
model_folder_path = "../model"
save_model_name = "model1.pt"

torch.save(model, f"{model_folder_path}/{save_model_name}")

## Predict

In [35]:
load_model_name = "success_model1.pt"

model = torch.load(f"{model_folder_path}/{load_model_name}")
model.eval()

test_track_df, test_audio_df = read_files(f"{data_folder_path}/test.csv", f"{data_folder_path}/audios/clips")
test_x = preprocess_data(test_track_df, test_audio_df)

output_dict = {
    "track": [],
    "score": []
}

for track, features in zip(test_track_df['track'], test_x):
    features = np.array([features])
    features = torch.tensor(features, dtype=torch.float32).to(device)
    score = model(features)
    output_dict["track"].append(track)
    output_dict["score"].append(score[0][0].cpu().detach().numpy())

output_df = pd.DataFrame(output_dict)
output_df.to_csv(f"{data_folder_path}/submission.csv", index=False)


loading normalize_5s_intro_0EVVKs6DQLo.wav
loading normalize_5s_intro_d7to9URtLZ4.wav
loading normalize_5s_intro_TzhhbYS9EO4.wav
loading normalize_5s_intro_nn5nypm7GG8.wav
loading normalize_5s_intro_hed6HkYNA7g.wav
loading normalize_5s_intro_rWznOAwxM1g.wav
loading normalize_5s_intro_zyQkFh-E4Ak.wav
loading normalize_5s_intro_agKkcRXN2iE.wav
loading normalize_5s_intro_SZaZU_qi6Xc.wav
loading normalize_5s_intro_ZpDQJnI4OhU.wav
loading normalize_5s_intro_D4nWzd63jV4.wav
loading normalize_5s_intro_9odM1BRqop4.wav
loading normalize_5s_intro_F64yFFnZfkI.wav
loading normalize_5s_intro_Js2JQH_kt0I.wav
loading normalize_5s_intro_Skt_NKI4d6U.wav
