In [None]:
!pip install librosa

In [41]:
import os
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import glob
import shutil
import librosa
import random
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.tree import DecisionTreeClassifier

import warnings
warnings.filterwarnings(action='ignore')

In [2]:
CFG = {
    'SR':16000,
    'N_MFCC':32, # Melspectrogram 벡터를 추출할 개수
    'SEED':42
}

In [29]:
train_df = pd.read_csv('./open//train.csv')
test_df = pd.read_csv('./open/test.csv')

In [30]:
# Folder Locations

dataset = "./open/"
TRAIN_WAV = dataset + "train/"
TEST_WAV = dataset + "test/"
PREPROCESSED = dataset + "preprocessed_data/"
TRAIN_LABEL_SEP = PREPROCESSED + "train_label_sep/"
WAV_TRAIN_LABEL_SEP = PREPROCESSED + "wav_train_label_sep/"


if not os.path.exists(dataset + "preprocessed_data"):
    os.mkdir(dataset + "preprocessed_data")
    
if not os.path.exists(PREPROCESSED + "train_label_sep"):
    os.mkdir(PREPROCESSED + "train_label_sep")
    
if not os.path.exists(PREPROCESSED + "wav_train_label_sep"):
    os.mkdir(PREPROCESSED + "wav_train_label_sep")

In [31]:
wav_file_dict = {"train_wav" : TRAIN_WAV,
                "test_wav" : TEST_WAV,
                 "wav_sep" : WAV_TRAIN_LABEL_SEP
                 }

wav_file_locations = {}
for key, value in wav_file_dict.items():
    wav_file_locations[key] = glob.glob(value + "*.wav")
    
csv_file_dict = {"train_label_sep" : TRAIN_LABEL_SEP,
                 "wav_train_label_sep" : WAV_TRAIN_LABEL_SEP
                }

csv_file_location = {}
for key, value in csv_file_dict.items():
    csv_file_location[key] = glob.glob(value + "*.csv")

In [32]:
# Local definitions - 1

def clean_name(column_name):
    '''
    column name 중 변경할 사항이 있다면 아래 코드를 변경해주세요, 
    ~ 로 스플릿한 구절 중 0번째 len이 7인 경우; 예시: "(220123"
    ) 이후의 course name을 return합니다.
    
    아래와 같이 사용해 주시면 편합니다.
    df["course_name"] = df["course_name"].apply(clean_course_name)

    '''
    if column_name.startswith("."):
        # Classify the date range to cleanse.
        temp = column_name.replace(".","")
        temp = temp.replace("wav", ".wav")
        column_name = "./open" + temp
        return column_name
    else:
        return column_name

In [33]:
# Transformer

class TransformerModel(nn.Module):
    def __init__(self, input_dim, output_dim, num_heads, num_layers, hidden_dim, dropout):
        super(TransformerModel, self).__init__()
        
        self.embedding = nn.Linear(input_dim, hidden_dim)
        self.pos_encoding = PositionalEncoding(hidden_dim)
        
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(hidden_dim, num_heads, dim_feedforward=hidden_dim, dropout=dropout),
            num_layers
        )
        
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        
        x = x.permute(1, 0, 2)  # Reshape for transformer input
        x = self.transformer_encoder(x)
        
        x = x.permute(1, 0, 2)  # Reshape back to (batch_size, seq_len, hidden_dim)
        x = torch.mean(x, dim=1)  # Average pooling over sequence length
        
        x = self.fc(x)
        return F.softmax(x, dim=1)

# positional encoding

class PositionalEncoding(nn.Module):
    def __init__(self, hidden_dim, max_len=5000):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, hidden_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-math.log(10000.0) / hidden_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

In [34]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)

seed_everything(CFG['SEED'])

In [35]:
def get_mfcc_feature(df):
    features = []
    for path in tqdm(df['path']):
        # librosa패키지를 사용하여 wav 파일 load
        y, sr = librosa.load(path, sr=CFG['SR'])
        # librosa패키지를 사용하여 mfcc 추출
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CFG['N_MFCC'])
        y_feature = []
        # 추출된 MFCC들의 평균을 Feature로 사용
        for e in mfcc:
            y_feature.append(np.mean(e))
        features.append(y_feature)

    mfcc_df = pd.DataFrame(features, columns=['mfcc_'+str(x) for x in range(1,CFG['N_MFCC']+1)])
    return mfcc_df


In [36]:
train_df

Unnamed: 0,id,path,label
0,TRAIN_0000,./train/TRAIN_0000.wav,1
1,TRAIN_0001,./train/TRAIN_0001.wav,2
2,TRAIN_0002,./train/TRAIN_0002.wav,4
3,TRAIN_0003,./train/TRAIN_0003.wav,5
4,TRAIN_0004,./train/TRAIN_0004.wav,4
...,...,...,...
4996,TRAIN_4996,./train/TRAIN_4996.wav,5
4997,TRAIN_4997,./train/TRAIN_4997.wav,0
4998,TRAIN_4998,./train/TRAIN_4998.wav,1
4999,TRAIN_4999,./train/TRAIN_4999.wav,1


In [37]:
train_df['path'] = train_df['path'].apply(clean_name)
test_df['path'] = test_df['path'].apply(clean_name)
train_df, test_df

(              id                         path  label
 0     TRAIN_0000  ./open/train/TRAIN_0000.wav      1
 1     TRAIN_0001  ./open/train/TRAIN_0001.wav      2
 2     TRAIN_0002  ./open/train/TRAIN_0002.wav      4
 3     TRAIN_0003  ./open/train/TRAIN_0003.wav      5
 4     TRAIN_0004  ./open/train/TRAIN_0004.wav      4
 ...          ...                          ...    ...
 4996  TRAIN_4996  ./open/train/TRAIN_4996.wav      5
 4997  TRAIN_4997  ./open/train/TRAIN_4997.wav      0
 4998  TRAIN_4998  ./open/train/TRAIN_4998.wav      1
 4999  TRAIN_4999  ./open/train/TRAIN_4999.wav      1
 5000  TRAIN_5000  ./open/train/TRAIN_5000.wav      4
 
 [5001 rows x 3 columns],
              id                       path
 0     TEST_0000  ./open/test/TEST_0000.wav
 1     TEST_0001  ./open/test/TEST_0001.wav
 2     TEST_0002  ./open/test/TEST_0002.wav
 3     TEST_0003  ./open/test/TEST_0003.wav
 4     TEST_0004  ./open/test/TEST_0004.wav
 ...         ...                        ...
 1876  TEST_1876

In [38]:
for label in train_df['label'].unique():
    filtered_csv = train_df[train_df['label'] == label]
    filename = f"label_{label}.csv"
    filtered_csv.to_csv(TRAIN_LABEL_SEP + filename, index=False)

for file in csv_file_location["train_label_sep"]:
    csv_file = pd.read_csv(file)
    csv_file['path'] = csv_file['path'].apply(clean_name)
    filename = os.path.basename(file)
    csv_file.to_csv(TRAIN_LABEL_SEP + filename, index=False)
    

In [None]:
parent_folder = TRAIN_WAV

for label in tqdm(train_df['label'].unique(), desc='Processing folders'):
    features_df = pd.DataFrame()
    filtered_csv_filename = f"label_{label}.csv"
    filtered_csv = pd.read_csv(os.path.join(TRAIN_LABEL_SEP, filtered_csv_filename))
    
    target_folder = os.path.join(WAV_TRAIN_LABEL_SEP, f"label_{label}")
    
    for path in filtered_csv['path']:
        wav_filename = os.path.basename(path)
        target_path = os.path.join(parent_folder, wav_filename)
        audio, sr = librosa.load(target_path)
        mfcc = librosa.feature.mfcc(y=audio, sr=sr)
        mfcc_flattened = mfcc.flatten()
        row = pd.DataFrame([list(mfcc_flattened) + [label]])
        index_name = wav_filename.split(".")[0]
        row.index = [index_name]
        features_df = pd.concat([features_df, row])
    
    

    features_df.sort_index(inplace=True)
    filename = WAV_TRAIN_LABEL_SEP + f"label_{label}.csv"
    features_df.to_csv(filename)

In [39]:
def get_mfcc_feature(df):
    features = []
    for path in tqdm(df['path']):
        # librosa패키지를 사용하여 wav 파일 load
        y, sr = librosa.load(path, sr=CFG['SR'])
        # librosa패키지를 사용하여 mfcc 추출
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CFG['N_MFCC'])
        y_feature = []
        # 추출된 MFCC들의 평균을 Feature로 사용
        for e in mfcc:
            y_feature.append(np.mean(e))
        features.append(y_feature)

    mfcc_df = pd.DataFrame(features, columns=['mfcc_'+str(x) for x in range(1,CFG['N_MFCC']+1)])
    return mfcc_df

In [40]:
train_x = get_mfcc_feature(train_df)
test_x = get_mfcc_feature(test_df)

100%|██████████████████████████████████████| 5001/5001 [00:40<00:00, 123.23it/s]
100%|██████████████████████████████████████| 1881/1881 [00:15<00:00, 118.93it/s]


for file in csv_file_location["wav_train_label_sep"]:
    df = pd.read_csv(file)
    df.info()
    print("*"*10)

for file in csv_file_location["wav_train_label_sep"]:
    df = pd.read_csv(file)
    print(df.describe())
    print("*"*10)

In [42]:
train_y = train_df['label']

In [45]:
model = DecisionTreeClassifier(random_state=CFG['SEED'])
model.fit(train_x, train_y)

In [46]:
preds = model.predict(test_x)

In [48]:
submission = pd.read_csv(dataset + 'sample_submission.csv')
submission['label'] = preds
submission.to_csv(dataset + "baseline_submission.csv", index=False)