In [None]:
import wave
import random
import librosa
import os
import torch
from tqdm import tqdm
import numpy as np
from torch import nn
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import StandardScaler

import warnings
warnings.filterwarnings('ignore')

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
def extract_feature(file_name, mfcc_count=40):
        
#     Нагенерим кучу всяких фич, в своем большенстве основанных на спектрограммах
#     Вообще именно выделение фич кажется мне наиболее важным аспектом этой задачи,
#     хотя может я просто использую простоватую модель
    
    
    X, sample_rate = librosa.load(file_name)
    stft = np.abs(librosa.stft(X))
    mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=mfcc_count).T,axis=0)
    chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T,axis=0)
    mel = np.mean(librosa.feature.melspectrogram(X, sr=sample_rate).T,axis=0)
    contrast = np.mean(librosa.feature.spectral_contrast(S=stft, sr=sample_rate).T,axis=0)
    chroma_cens = librosa.feature.chroma_cens(y=X, sr=sample_rate, n_chroma=20).mean(axis=1)
    tempo = librosa.feature.rhythm.tempogram(y=X, sr=sample_rate, win_length=50).mean(axis=1)
    tonnetz = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(X), sr=sample_rate).T,axis=0)
    
    return np.concatenate((mfccs, chroma, mel, contrast, chroma_cens, tempo, tonnetz))

In [None]:
def write_wav(frames, name, sample_rate=16000):
#     Научимся записывать .wav
    f = wave.open(name, 'wb')
    f.setnchannels(1)
    f.setsampwidth(2)
    f.setframerate(sample_rate)
    f.setnframes(int(len(frames)))
    f.writeframes(frames)
    f.close()

In [None]:
def cut_wav(frames):
    
#     Это именно тот детектор расположения события, который должен повышать точность.
#     И он действительно ее повышает! Просто обрежем ту часть, где тихо.
    
    maximum = max(frames)
    
    for i in range(len(frames)):
        if frames[i] > maximum/13:
            start = i
            break
    
    for i in range(len(frames)):
        if frames[::-1][i] > maximum/13:
            stop = i
            break
            
    return start, len(frames) - stop

In [None]:
files_classes = {}
X = []
y = []

#Считаем файл мета-данных и создадим словарь с именами файлов в качестве ключей и метками
# в качестве значений

with open("data_v_7_stc/meta/meta.txt") as f:
    for line in f:
        file_name = line.split()[0]
        file_class = line.split()[-1]
        files_classes[file_name] = file_class
        
unique_classes = ['tool', 'keyboard', 'door', 'knocking_door', 'bags', 'speech', 'background', 'ring']
files_class_numbers = {i: unique_classes.index(files_classes[i]) for i in files_classes.keys()}

In [None]:
#Обрежем все файлы и сохраним их в отдельную папку, файлы с речью трогать не будем,
# им уготована иная судьба

def cut_files(input_path, output_path, check_speech=False):

    for i in tqdm(os.listdir(path=input_path)):
        
        if check_speech and files_classes[i] == "speech":
            continue

        file = wave.open(input_path + i)
        sample_rate = file.getframerate()
        frames = file.readframes(file.getnframes())
        frames_int16 = (np.frombuffer(frames, dtype=np.int16))
        start, stop = cut_wav(frames_int16)
        write_wav(frames[start * 2 : stop * 2], output_path + i, sample_rate)

In [None]:
os.makedirs("new_data")
os.makedirs("new_test_data")
cut_files("data_v_7_stc/audio/", "new_data/", check_speech=True)
cut_files("data_v_7_stc/test/", "new_test_data/")

In [None]:
# Всего у нас около 250 файлов с речью в обучающей выборке, и это при том, что файлов с
# дверьми более 3000! Удручающий дисбаланс. Воспользуемся тем, что файлы с речью длинные - 
# просто разрежем их на кусочки по 6 секунд и получим классный датасет.
# На таком и обучаться сплошное удовольствие!

speech_files = [i for i in files_classes.keys() if files_classes[i] == "speech"]
speech_data = b""

for i in tqdm(speech_files):
    if i in os.listdir("data_v_7_stc/audio/"):
        f = wave.open("data_v_7_stc/audio/"+i)
        speech_data += f.readframes(f.getnframes())
        
    
    
fps = 16000*2
speech_files_count = len(speech_data)/fps//6
speech_files_count = int(speech_files_count)
    
os.makedirs("new_speech")

for i in tqdm(range(speech_files_count)):
    write_wav(speech_data[i*fps*6:(i+1)*fps*6], "new_speech/test_speech{}.wav".format(i))

In [None]:
#Ну, собственно, генерим фичи, тысячи их! Самое долгое во всем ноутбуке.
# Минут 30 тут придется подождать - а пока можно прочитать годный пост на хабре - 
# https://habr.com/post/351924/ 
# Вообще все посты от этого товарища - годнота высшей категории, очень советую



X = []
y = []
X_file_names = []
for i in tqdm(os.listdir(path="new_data/")):
    try:
        X_file_names.append(i)
        X.append(extract_feature("new_data/"+i, mfcc_count=60))
        y.append(files_class_numbers[i])
    except:
        pass


X = [X[i] for i in range(len(X)) if y[i] is not 5]
X_file_names = [X_file_names[i] for i in range(len(X)) if y[i] is not 5]
y = [i for i in y if i is not 5]

for i in tqdm(os.listdir(path="new_speech/")):
    try:    
        X_file_names.append(i)
        X.append(extract_feature("new_speech/"+i, mfcc_count=60))
        y.append(unique_classes.index("speech"))
    except:
        pass
    
    
X_final = []
X_final_names = []

for i in tqdm(os.listdir(path="new_test_data/")):
    try:    
        X_final.append(extract_feature("new_test_data/"+i, mfcc_count=60))
        X_final_names.append(i)
    except:
        pass

In [None]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.0015, shuffle = True)

In [None]:
class MyDataset(Dataset): 

    def __init__(self, X, y):
        
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor(y)
        
    def __getitem__(self, index):
        return self.X[index], self.y[index]

    def __len__(self):
        return len(self.y)

In [None]:
train_dataset = MyDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=len(X_train), shuffle=True)

test_dataset = MyDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=len(X_test), shuffle=True)

In [None]:
#Могу поставить на то, что в каждое второе решение будет основано на RNN и будет
#заставлять краснеть меня с моей полносвязной сеточкой. Но результат, в общем-то,
#совсем не плохой, так что мне даже не пришлось лезь в реккурентные дебри, 
#писать которые, к слову, не очень приятно на pytorch'e


class Net(nn.Module):
    def __init__(self, out_count=8, in_count=len(X_scaled[0])):
        super(Net, self).__init__()
        self.linear = nn.Sequential(
                nn.Linear(in_count, 500),
                nn.BatchNorm1d(500),
                nn.ReLU(),
                nn.Dropout(0.45),
                nn.Linear(500, 1500),
                nn.BatchNorm1d(1500),
                nn.ReLU(),
                nn.Dropout(0.45),
                nn.Linear(1500, 200),
                nn.BatchNorm1d(200),
                nn.ReLU(),
                nn.Dropout(0.45),
                nn.Linear(200, out_count),
                nn.Softmax())
        
        self.loss_test = []
        self.loss_train = []

        self.acc_test = []
        self.acc_train = []

    
    def forward(self, x):
        x = self.linear(x)
        return x

In [None]:
net = Net().to(device)
loss_fn = torch.nn.CrossEntropyLoss()

In [None]:


def train_model(net, epoch_count, train_loader, test_loader, learning_rate=0.00001, plot = True):
    opt = torch.optim.Adam(net.parameters(), lr=learning_rate)
    for i in range(epoch_count):


        #test
        running_loss = []
        running_acc = []
        
        for sample, label in test_loader:
            data = sample.to(device)
            label = label.to(device)
            net.eval()
            out = net(data)
            _, predicted = torch.max(out, 1)
            loss_t = loss_fn(out, label)
            running_loss.append(loss_t.item())
            running_acc.append(sum((predicted == label).tolist())/len(predicted))

        if plot: print("---------- \n test")
        acc = np.mean(running_acc)
        loss_ = np.mean(running_loss)
        net.acc_test.append(acc)
        if plot: print(acc)
        net.loss_test.append(loss_)
        
        
        #train
        running_loss = []
        running_acc = []
        
        
        for sample, label in train_loader:
            #augmentation_mask = torch.normal(torch.ones(sample.shape), torch.ones(sample.shape)*0.05)
            #sample = sample*augmentation_mask
            data = (sample).to(device)
            label = label.to(device)
            net.train()
            net.zero_grad()
            out = net(data)
            loss = loss_fn(out, label)
            _, predicted = torch.max(out, 1)
            loss.backward()
            opt.step()  
            running_loss.append(loss.item())
            running_acc.append(sum((predicted == label).tolist())/len(predicted))
        if plot: print("---------- \n train")
        acc = np.mean(running_acc)
        loss_ = np.mean(running_loss)
        net.acc_train.append(acc)
        if plot: print(acc)
        net.loss_train.append(loss_)

        if plot and i%10 == 0:
            plt.figure(figsize=(8,7))
            plt.subplot(2,1,1)
            plt.title("Loss")
            plt.plot(net.loss_test, color="orange", label="test")
            plt.plot(net.loss_train, color="blue", label="train")
            plt.legend()
            plt.grid()
            plt.subplot(2,1,2)
            plt.title("Accuracy")
            plt.plot(net.acc_test, color="orange", label="test")
            plt.plot(net.acc_train, color="blue", label="train")
            plt.legend()
            plt.grid()
            plt.show()

In [None]:
train_model(net, 2500, train_loader, test_loader, learning_rate=0.00003)

In [None]:
tmp_net = net
tmp_net.eval()
X_final_scale = scaler.transform(X_final)
out = tmp_net((torch.FloatTensor(X_final_scale)).to(device))
_, predicted = torch.max(out, 1)
class_prob = list(map(max, out.tolist()))
ans = []
for i in range(len(predicted)):
    ans.append((X_final_names[i], str(class_prob[i])[:6], unique_classes[predicted[i].item()]))
    
for i in sorted(ans, key=lambda x: x[1]):
    print (*i, sep="\t")
    

In [None]:
# Решение открытой задачи

#Самым нормальным решением оказалось сделать следующее - обучим восемь сеток, каждая
# будет говорить, относится ли звук к конкретному классу или нет
# Потом пошаманим немного с выводом и получим большую часть unknown'ов
# Качество такого решения сильно превышает наивное "возьмем те файлы, на которых нейронка 
# дает низкую интенсивность вывода". Хотя и мое решение далеко от идеала.

In [None]:
net_array = []
loaders_array = []
labels_array_binary = []
for i in range(8):
    y_binary = [1 if j==i else 0 for j in y]
    labels_array_binary.append(y_binary)
    X_train_binary, X_test_binary, y_train_binary, y_test_binary = train_test_split(X_scaled, y_binary, test_size=0.001, shuffle = True)
    train_dataset_binary = MyDataset(X_train_binary, y_train_binary)
    train_loader_binary = DataLoader(train_dataset_binary, batch_size=len(X_train_binary), shuffle=True)
    test_dataset_binary = MyDataset(X_test_binary, y_test_binary)
    test_loader_binary = DataLoader(test_dataset_binary, batch_size=len(X_test_binary), shuffle=True)
    loaders_array.append((train_loader_binary, test_loader_binary))
    net_array.append(Net(out_count=2, in_count=len(train_dataset_binary.X[0])).to(device))

In [None]:
for i in range(8):
    train_model(net_array[i], 70, loaders_array[i][0], loaders_array[i][1], learning_rate=0.0003)
    #train_model(net_array[i], 10, loaders_array[i][0], loaders_array[i][1], learning_rate=0.00005)

In [None]:
sum_arr = []
for i in range(8):
    tmp_net = net_array[i]
    tmp_net.eval()
    X_final_scale = scaler.transform(X_final)
    out = tmp_net((torch.FloatTensor(X_final_scale)).to(device))
    _, predicted = torch.max(out, 1)
    sum_arr.append(out)
s = (sum_arr[0]+sum_arr[1]+sum_arr[2]+sum_arr[3]+sum_arr[4]+sum_arr[5]+sum_arr[6]+sum_arr[7])

In [None]:
ans_bin=[]

from operator import itemgetter

for i, j in zip(X_final_names, s):
    ans_bin.append((i,j.tolist()[0]))

for i in sorted(ans_bin, key=itemgetter(1, 0)):
    print (*i, sep="\t")

ans_bin = sorted(ans_bin, key=itemgetter(1, 0))
unknown_array = [i[0] for i in ans_bin[:7]] + [i[0] for i in ans_bin[::-1][:60]]
known_array = set(X_final_names) - set(unknown_array)

ans_bin = ([i + "\t" + "1" + "\n" for i in sorted(unknown_array)[::-1]] +
                                [i + "\t" + "0" + "\n" for i in known_array])

with open("result_open_task.txt", "w") as f:
    f.writelines(ans_bin)

In [None]:
new_ans = []
for i in ans:
    if i[0] not in unknown_array and float(i[1]) > 0.45:
        new_ans.append(i)
    else:
        new_ans.append((i[0], "0.0001", i[2]))



new_ans = ["\t".join(i)+"\n" for i in sorted(new_ans, key=itemgetter(0, 1))]

with open("result.txt", "w") as f:
    f.writelines(new_ans)

In [None]:
#Ну вот и все. Спасибо за чтение и за проведение такого интересного конкурса!