In [None]:
import librosa 
import librosa.display as dsp
from IPython.display import Audio

In [2]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import json

In [3]:
import torch

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') #GPU 할당

In [4]:
print(device)

cpu


In [5]:
import random

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(929)

In [6]:
label_dict = {
    "강제추행(성범죄)": 1,
    "강도범죄": 2,
    "절도범죄": 3,
    "폭력범죄": 4,
    "도움요청": 5,
    "일상상황": 6
}

In [7]:
def train_dataset():
    folder = "D:\\위험상황데이터셋\\dataset\\train"
    dataset = []
    class_label = 0
    for file in tqdm(os.listdir(folder),colour='green'):
        if 'wav' in file:
            abs_file_path = os.path.join(folder,file)
            data, sr = librosa.load(abs_file_path, sr = 16000,mono=True)
            
            json_path = abs_file_path.replace("_label.wav",".json")
            with open(json_path,encoding='utf-8') as json_file:
                json_data = json.load(json_file)
                categories = json_data["annotations"][0]["categories"]
                category = categories["category_02"]
                class_label = int(label_dict[category])
            dataset.append([data,class_label])
    
    print("Dataset 생성 완료")
    return pd.DataFrame(dataset,columns=['data','label'])

In [8]:
def test_dataset():
    folder = "D:\\위험상황데이터셋\\dataset\\test"
    dataset = []
    for file in tqdm(os.listdir(folder),colour='green'):
        if 'wav' in file:
            abs_file_path = os.path.join(folder,file)
            data, sr = librosa.load(abs_file_path, sr = 16000,mono=True)
            
            dataset.append([data, file])
    
    print("Dataset 생성 완료")
    return pd.DataFrame(dataset,columns=['data', 'file_name'])

In [10]:
train_wav = train_dataset()
test_wav = test_dataset()

100%|[32m████████████████████████████████████████████████████████████████████████████[0m| 22000/22000 [11:40<00:00, 31.40it/s][0m


Dataset 생성 완료


100%|[32m██████████████████████████████████████████████████████████████████████████████[0m| 5000/5000 [02:08<00:00, 38.78it/s][0m

Dataset 생성 완료





In [39]:
train_wav.head()

Unnamed: 0,data,label
0,"[-9.62628e-05, -0.00020544285, -0.00011758789,...",1
1,"[1.169485e-05, 2.5358913e-05, 3.8124912e-05, 4...",1
2,"[0.00013293176, 0.0001595899, 1.5221034e-05, -...",1
3,"[-4.965741e-05, -4.61519e-05, 2.123904e-05, 5....",1
4,"[-0.00029593214, -0.0006093591, -0.0005188418,...",1


In [40]:
counts = train_wav['label'].value_counts()

# 결과 출력
print(counts)

2    2207
5    2200
3    2200
4    2200
1    2193
Name: label, dtype: int64


In [41]:
train_x = np.array(train_wav.data)
test_x = np.array(test_wav.data)
data_type = type(train_x)
print(data_type)

<class 'numpy.ndarray'>


In [42]:
train_length=[]
test_length=[]

for i in train_x:
    train_length.append(len(i))
    
for i in test_x:
    test_length.append(len(i))
    

bins =15
hist, bin_edges = np.histogram(train_length, bins=bins)
print('train_x length Histogram')
print(bin_edges)
print(hist)

hist, bin_edges = np.histogram(test_length, bins=bins)
print('test_x length Histogram')
print(bin_edges)
print(hist)


train_x length Histogram
[  10240.    92864.2  175488.4  258112.6  340736.8  423361.   505985.2
  588609.4  671233.6  753857.8  836482.   919106.2 1001730.4 1084354.6
 1166978.8 1249603. ]
[9417 1079  385  102    0    0    1    0    7    3    0    0    0    0
    6]
test_x length Histogram
[   7680.           90453.53333333  173227.06666667  256000.6
  338774.13333333  421547.66666667  504321.2         587094.73333333
  669868.26666667  752641.8         835415.33333333  918188.86666667
 1000962.4        1083735.93333333 1166509.46666667 1249283.        ]
[2022  267  150   51    0    0    1    0    4    2    0    0    0    0
    3]


In [43]:
def remove_top_n_percent(data, n=15):
    # 각 데이터의 길이 계산
    lengths = [len(d) for d in data]
    
    # 길이 기준으로 내림차순 정렬
    sorted_lengths = sorted(lengths, reverse=True)
    
    # 상위 n% 길이 계산
    top_n_percent_length = sorted_lengths[int(len(sorted_lengths) * n / 100)]
    
    # 길이가 상위 n%에 해당하는 데이터 제거
    filtered_data = [d for d in data if len(d) <= top_n_percent_length]
    
    return np.array(filtered_data)

In [34]:
data_type = type(train_x)
print(data_type)
train_x = remove_top_n_percent(train_x)
data_type = type(train_x)
print(data_type)
train_lengths = [len(d) for d in train_x]
test_x = remove_top_n_percent(test_x)
test_lengths = [len(d) for d in test_x]

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>


  return np.array(filtered_data)


In [35]:
print('Filtered Train:')
print(max(train_lengths))
print(len(train_lengths))
print(len(train_lengths)/11000*100,'%')
print('\nFiltered Test:')
print(max(test_lengths))
print(len(test_lengths))
print(len(test_lengths)/2500*100,'%')

Filtered Train:
89424
9352
85.01818181818182 %

Filtered Test:
111136
2135
85.39999999999999 %


In [55]:
def get_avr_length(data):
    total_length = 0
    data_count=0
    for i in data:
        total_length+=len(i)
        data_count+=1
    return int(total_length/data_count)
    

def set_length(data, avr_length):
    result = []
    for i in tqdm(data,colour='green'):
        if len(i) <=avr_length:
            padded_data = np.pad(i, (0, avr_length-len(i)), 'wrap')
            tmp = padded_data.tolist()
            result.append(tmp)   
        else:
            result.append(i[:avr_length])
            
    result = np.array(result)
    print('데이터 세팅 완료~!')
    return result

In [56]:
data_type = type(train_x)
print(data_type)

<class 'numpy.ndarray'>


In [57]:
avr_length=get_avr_length(train_x)
train_x = set_length(train_x,avr_length)
test_x = set_length(test_x,avr_length)

100%|[32m██████████████████████████████████████████████████████████████████████████████[0m| 9352/9352 [02:45<00:00, 56.55it/s][0m


데이터 세팅 완료~!


100%|[32m█████████████████████████████████████████████████████████████████████████████[0m| 2135/2135 [00:07<00:00, 296.47it/s][0m


데이터 세팅 완료~!


In [58]:
print('train :', train_x.shape)
print('test :', test_x.shape)

train : (9352, 50667)
test : (2135, 50667)


In [59]:
extracted_features = librosa.feature.mfcc(y=train_x[0], sr=16000, n_mfcc=40)
print(extracted_features.shape)
extracted_features = librosa.feature.mfcc(y=test_x[0], sr=16000, n_mfcc=40)
print(extracted_features.shape)

(40, 99)
(40, 99)


In [60]:
def preprocess_dataset(data):
    mfccs = []
    for i in data:
        extracted_features = librosa.feature.mfcc(y=i,
                                              sr=16000,
                                              n_mfcc=40)
        mfccs.append(extracted_features)
            
    return mfccs

In [61]:
train_mfccs = preprocess_dataset(train_x)
train_mfccs = np.array(train_mfccs)
train_mfccs = train_mfccs.reshape(-1, train_mfccs.shape[1], train_mfccs.shape[2], 1)
#test_x = test_x.reshape(-1, test_x.shape[1], test_x.shape[2], 1)

In [62]:
np.array(train_mfccs).shape

(9352, 40, 99, 1)

Custom DataSet

In [65]:
import torchvision.datasets as datasets # 데이터셋 집합체
import torchvision.transforms as transforms # 변환 툴

from torch.utils.data import DataLoader # 학습 및 배치로 모델에 넣어주기 위한 툴
from torch.utils.data import DataLoader, Dataset

class CustomDataset(Dataset):
    def __init__(self, X, y, train_mode=True, transforms=None): #필요한 변수들을 선언
        self.X = X
        self.y = y
        self.train_mode = train_mode
        self.transforms = transforms

    def __getitem__(self, index): #index번째 data를 return
        X = self.X[index]
        
        if self.transforms is not None:
            X = self.transforms(X)

        if self.train_mode:
            y = self.y[index]
            return X, y
        else:
            return X
    
    def __len__(self): #길이 return
        return len(self.X)

In [66]:
train_X = train_mfccs[:8000]
vali_X = train_mfccs[8000:]

In [67]:
train_y = train_wav.label[:8000]
vali_y = train_wav.label[8000:].reset_index(drop = True)

In [68]:
# 에포크 설정
num_epochs = 100

# 배치 사이즈 설정
batch_size = 10

#만든 train dataset를 DataLoader에 넣어 batch 만들기
train_dataset = CustomDataset(X=train_X, y=train_y)
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True)

vali_dataset = CustomDataset(X=vali_X, y=vali_y)
vali_loader = DataLoader(vali_dataset, batch_size = batch_size, shuffle=False)

In [69]:
train_batches = len(train_loader)
vali_batches = len(vali_loader)

print('/ total train batches :', train_batches)
print('/ total valid batches :', vali_batches)

/ total train batches : 800
/ total valid batches : 136
