<a href="https://colab.research.google.com/github/whkwls2653/Emotion-Recognition/blob/main/wav2vec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


from sklearn.model_selection import train_test_split
                                                         
dataset_train, dataset_test = train_test_split(allfile_datalist, test_size=0.25, random_state=0)
print(len(dataset_train))
print(len(dataset_test))

In [None]:
!pip install transformers==4.11.3
!pip install torchaudio
!pip install librosa
!pip install jiwer
!pip install dataset

In [3]:
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

import torch
import ast
from transformers import Wav2Vec2Processor
from torch.nn.utils.rnn import pad_sequence

@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor.from_pretrained("hyyoka/wav2vec2-xlsr-korean-senior", pad_token_id=49)
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def mapping(self,data):
        with self.processor.as_target_processor():
            ret = self.processor("".join([i if i!='\x1b' else '|' for i in ast.literal_eval(data)])).input_ids
            ret_torch = torch.tensor([int(0 if value is None else value) for value in ret])
        return ret_torch

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature['input_values']} for feature in features]
        # e.g. feature['label'] = "„Öá„Öè„Ñ¥„Ñ¥„Öï„Öá„Öé„Öè„ÖÖ„Öî„Öá„Öõ"
        label_features = [{"input_ids": self.mapping(feature["label"])} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)
        batch["labels"] = labels

        return batch


# ÏûêÏÜå Îã®ÏúÑÎ°ú ÎÇòÎàÑÏñ¥ÏßÄÏßÄ ÏïäÏùÄ Í≤ΩÏö∞ ÏÇ¨Ïö© -> ÌïúÎ≤à ÌïòÎ©¥ Ï†ÄÏû•Ìï¥ÎÜìÍ∏∞
class DataProc:
    def __init__(self, model_name="hyyoka/wav2vec2-xlsr-korean-senior"):
        self.processor = Wav2Vec2Processor.from_pretrained(model_name, pad_token_id=49)

    def to_jaso(self, sentence):
        NO_JONGSUNG = ''
        CHOSUNGS = ['„Ñ±', '„Ñ≤', '„Ñ¥', '„Ñ∑', '„Ñ∏', '„Ñπ', '„ÖÅ', '„ÖÇ', '„ÖÉ', '„ÖÖ', '„ÖÜ', '„Öá', '„Öà', '„Öâ', '„Öä', '„Öã', '„Öå', '„Öç', '„Öé']
        JOONGSUNGS = ['„Öè', '„Öê', '„Öë', '„Öí', '„Öì', '„Öî', '„Öï', '„Öñ', '„Öó', '„Öò', '„Öô', '„Öö', '„Öõ', '„Öú', '„Öù', '„Öû', '„Öü', '„Ö†', '„Ö°', '„Ö¢', '„Ö£']
        JONGSUNGS = [NO_JONGSUNG,  '„Ñ±', '„Ñ≤', '„Ñ≥', '„Ñ¥', '„Ñµ', '„Ñ∂', '„Ñ∑', '„Ñπ', '„Ñ∫', '„Ñª', '„Ñº', '„ÑΩ', '„Ñæ', '„Ñø', '„ÖÄ', '„ÖÅ', '„ÖÇ', '„ÖÑ', '„ÖÖ', '„ÖÜ', '„Öá', '„Öà', '„Öä', '„Öã', '„Öå', '„Öç', '„Öé']

        N_CHOSUNGS, N_JOONGSUNGS, N_JONGSUNGS = 19, 21, 28
        FIRST_HANGUL, LAST_HANGUL = 0xAC00, 0xD7A3 #'Í∞Ä', 'Ìû£'    
     
        result = []
        for char in sentence:
            if ord(char) < FIRST_HANGUL or ord(char) > LAST_HANGUL: 
                result.append('|')
            else:          
                code = ord(char) - FIRST_HANGUL
                jongsung_index = code % N_JONGSUNGS
                code //= N_JONGSUNGS
                joongsung_index = code % N_JOONGSUNGS
                code //= N_JOONGSUNGS
                chosung_index = code
                result.append(CHOSUNGS[chosung_index])
                result.append(JOONGSUNGS[joongsung_index])
                if jongsung_index!=0:
                    result.append(JONGSUNGS[jongsung_index])
                
        with self.processor.as_target_processor():
            ret = self.processor("".join(result))
        
        return ret.input_ids

    def prepare_dataset(self, df):
        """
        df.cols = ['audio', 'sentence', 'path']
        """
        df['label'] = df['sentence'].apply(self.to_jaso)
        return df

Downloading:   0%|          | 0.00/215 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/542 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/309 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/36.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/407 [00:00<?, ?B/s]

In [10]:
!pip install datasets
# import datasets
from datasets import load_metric, load_from_disk
# from data_proc import  get_senior_data
# from data_collator import DataCollatorCTCWithPadding
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
from transformers import TrainingArguments, Trainer
from tqdm import tqdm
import torch
import ast

import numpy as np
# import os
# os.environ['CUDA_LAUNCH_BLOCKING']='1'
# os.environ['CUDA_VISIBLE_DEVICES'] = "2,3"

repo_name = 'wav2vec2-xlsr-korean-senior'

processor = Wav2Vec2Processor.from_pretrained("hyyoka/wav2vec2-xlsr-korean-senior")
wer_metric = load_metric("wer")
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)
    pred.label_ids[pred.label_ids == -100] = 49 # as fleek model has 2 pad tokens
    pred_str = processor.batch_decode(pred_ids)
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

def get_model(model_name="hyyoka/wav2vec2-xlsr-korean-senior"): 
    model = Wav2Vec2ForCTC.from_pretrained(
        model_name, 
        attention_dropout=0.1,
        hidden_dropout=0.1,
        feat_proj_dropout=0.0,
        mask_time_prob=0.05,
        layerdrop=0.1,
        ctc_loss_reduction="mean", 
        pad_token_id=49,
        vocab_size=50,
        ignore_mismatched_sizes=True
    )

    model.freeze_feature_extractor()
    model.gradient_checkpointing_enable()
    return model

def train_model(train, test, model):
    training_args = TrainingArguments(
        output_dir=repo_name,
        group_by_length=True,
        per_device_train_batch_size=32,
        per_device_eval_batch_size=32,
        gradient_accumulation_steps=4,
        evaluation_strategy="steps",
        num_train_epochs=30,
        fp16=True,
        save_steps=300,
        eval_steps=300,
        logging_steps=50,
        learning_rate=3e-4,
        warmup_steps=300,
        save_total_limit=1,
        load_best_model_at_end=True,
        metric_for_best_model='eval_loss',
        dataloader_num_workers=6
        )

    trainer = Trainer(
        model=model,
        args=training_args,
        compute_metrics=compute_metrics,
        train_dataset=train,
        eval_dataset=test,
        tokenizer=processor.feature_extractor,
        data_collator=data_collator
    )
    print(f"build trainer on device {training_args.device} with {training_args.n_gpu} gpus")
    trainer.train()

if __name__ == "__main__":
    
    # torch.cuda.empty_cache()
    
    dataset = load_from_disk('./elders_dataset')
    dataset = dataset.remove_columns(['wav', 'text', 'labels'])

    train = dataset['train'] #.select([i for i in range(0,5000)])
    test =  dataset['valid'] #.select([i for i in range(0,5000)])

    model = get_model()
    train_model(train, test, model)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


  wer_metric = load_metric("wer")


Downloading builder script:   0%|          | 0.00/1.90k [00:00<?, ?B/s]

FileNotFoundError: ignored

In [None]:
annotation_root= '/content/gdrive/MyDrive/·ÑÄ·Ö°·Ü∑·Ñå·Ö•·Üº·Ñã·Öµ·Ü´·Ñâ·Öµ·Ü® ·ÑÉ·Ö¢·Ñí·Ö¨/·ÑÉ·Ö¶·Ñã·Öµ·Ñê·Ö•·Ñâ·Ö¶·Ü∫/KEMDy19/annotation'
txt_label_zip_toteval_root='/content/gdrive/MyDrive/·ÑÄ·Ö°·Ü∑·Ñå·Ö•·Üº·Ñã·Öµ·Ü´·Ñâ·Öµ·Ü® ·ÑÉ·Ö¢·Ñí·Ö¨/·ÑÉ·Ö¶·Ñã·Öµ·Ñê·Ö•·Ñâ·Ö¶·Ü∫/2019txt_label_zip_toteval.pkl'

ambig_annotation=0
no_file=0
if os.path.isfile(txt_label_zip_toteval_root):
  with open(txt_label_zip_toteval_root,'rb') as f:
    allfile_datalist=pickle.load(f)
else:
  allfile_datalist=[]
  for i, annotation in tqdm(enumerate(os.listdir(annotation_root))):
    all_txts=[]
    # print('annotaion file : (%d / %d)'%(i,len(os.listdir(annotation_root))))
    session_file = pd.read_csv(os.path.join(annotation_root,annotation)) 
    # print(session_file) 
    segments=session_file['Segment ID']
    # print(segments)
    
    for j in range(1,len(segments)):

      #index starts from 1
      f_name=segments[j]
      
      # print("f_name:",f_name)
      
      sess_num=f_name.split('_')[0][-2:]
      sc_pro_num=f_name.split('_')[0]+'_'+f_name.split('_')[1]
      # print(sess_num,sc_pro_num)
      
    #  /content/gdrive/MyDrive/·ÑÄ·Ö°·Ü∑·Ñå·Ö•·Üº·Ñã·Öµ·Ü´·Ñâ·Öµ·Ü® ·ÑÉ·Ö¢·Ñí·Ö¨/·ÑÉ·Ö¶·Ñã·Öµ·Ñê·Ö•·Ñâ·Ö¶·Ü∫/KEMDy19/wav/Session01/Sess01_script01/Sess01_script01_F001.txt
      file_loc='/content/gdrive/MyDrive/·ÑÄ·Ö°·Ü∑·Ñå·Ö•·Üº·Ñã·Öµ·Ü´·Ñâ·Öµ·Ü® ·ÑÉ·Ö¢·Ñí·Ö¨/·ÑÉ·Ö¶·Ñã·Öµ·Ñê·Ö•·Ñâ·Ö¶·Ü∫/KEMDy19/wav/Session%s/%s/%s.txt'%(sess_num,sc_pro_num,f_name)
      # print(file_loc)
      if not os.path.isfile(file_loc):
        print("no file location :",file_loc)
        no_file+=1
        continue
      with open(file_loc) as f:
        lines = f.readlines()
        all_txts.append(lines)



    session_file.loc[(session_file['Total Evaluation'] == "fear"), 'Total Evaluation'] = 0  #Í≥µÌè¨ => 0
    session_file.loc[(session_file['Total Evaluation'] == "surprise"), 'Total Evaluation'] = 1  #ÎÜÄÎûå => 1
    session_file.loc[(session_file['Total Evaluation'] == "angry"), 'Total Evaluation'] = 2  #Î∂ÑÎÖ∏ => 2
    session_file.loc[(session_file['Total Evaluation'] == "sad"), 'Total Evaluation'] = 3  #Ïä¨Ìîî => 3
    session_file.loc[(session_file['Total Evaluation'] == "neutral"), 'Total Evaluation'] = 4  #Ï§ëÎ¶Ω => 4
    session_file.loc[(session_file['Total Evaluation'] == "happy"), 'Total Evaluation'] = 5  #ÌñâÎ≥µ => 5
    session_file.loc[(session_file['Total Evaluation'] == "disgust"), 'Total Evaluation'] = 6  #ÌòêÏò§ => 6



    
    # print(len(all_txts))
    ## zip txts and labels,  session_file start from 1
    for q, label in zip(all_txts, session_file['Total Evaluation'][1:])  :
      if type(label) is not int :
        ambig_annotation+=1
        continue
      data = []
      data.append(q[0])
      # print('Q',type(q),q[0])
      # print('label',label)
      data.append(str(label))
      allfile_datalist.append(data)
    # print(data_list)
    # allfile_datalist.append(data_list)



  with open(txt_label_zip_toteval_root,'wb') as f:
    pickle.dump(allfile_datalist,f)
# print("data_list length :",len(data_list))
# print("sample : ",data_list[0])
  
# print(len(all_txts))