In [None]:
import yaml
with open('./config.yml', 'rb') as yml:
    config = yaml.safe_load(yml)

In [None]:
repo_name =config['repo_name']
name='fw07' #'atr503','eval1','eval2','eval3','jdrt, fw07'

target=config['target'] # target token name
sr=config['sr']
test_csv='./datasets/test_'+name+'_'+target+'.csv'
result_path='./results'

In [None]:
# https://note.mjunya.com/posts/2021-12-13-multi-gpu-order/
import os
# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]=config['CUDA_VISIBLE_DEVICES']
!echo ${CUDA_VISIBLE_DEVICES}

import torch
for i in range(torch.cuda.device_count()):
    info = torch.cuda.get_device_properties(i)
    print(f"CUDA:{i} {info.name}, {info.total_memory / 1024 ** 2}MB")

print("------------------------------")
print(f"version: {torch.__version__}")
print(f"available: {torch.cuda.is_available()}")
print(f"count: {torch.cuda.device_count()}")
for i in range(0,torch.cuda.device_count()):
    print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
    print(f"GPU {i}: {torch.cuda.get_device_capability(i)}")
print(f"default: {torch.cuda.current_device()}")

In [None]:
import torch.nn as nn
import torchaudio, datasets, warnings
from datasets import load_dataset, load_metric, Audio
import pandas as pd
import numpy as np
warnings.filterwarnings('ignore')

In [None]:
test= datasets.load_dataset("csv", data_files={"test":[test_csv]},usecols=['path',target],num_proc=config['num_proc'])

In [None]:
from transformers import Wav2Vec2Processor
from transformers import Wav2Vec2ForCTC
processor = Wav2Vec2Processor.from_pretrained(repo_name)
model = Wav2Vec2ForCTC.from_pretrained(repo_name)

In [None]:
import random
from IPython.display import display, HTML

def show_random(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))
    
show_random(test['test'],2)

In [None]:
def path2array(batch):
    array, rate = torchaudio.load(filepath=batch['path'],format=config['format'])
    batch["audio_array"]= array
    batch["sampling_rate"] =rate
    return batch

In [None]:
%%time
test=test.map(path2array,num_proc=config['num_proc'])

In [None]:
# no change name ["input_values"],["labels"]

def prepare_dataset(batch):
    batch["input_values"] = processor(batch["audio_array"], sampling_rate=batch["sampling_rate"]).input_values[0]
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch[target]).input_ids
    return batch

In [None]:
%%time
test=test.map(prepare_dataset,num_proc=config['num_proc'])

In [None]:
from evaluate import load
wer_metric = load_metric("wer")
cer_metric = load_metric('cer')

In [None]:
def map2result(batch):
  model.to("cuda")
  input_values = processor(
      batch["audio_array"], 
      sampling_rate=batch["sampling_rate"], 
      return_tensors="pt"
  ).input_values.to("cuda")

  with torch.no_grad():
    logits=model(input_values).logits

  pred_ids = torch.argmax(logits, dim=-1)
  batch["hypothesis"] = processor.batch_decode(pred_ids)[0]
  
  return batch

In [None]:
def result2csv(batch):
    s=batch['path']
    batch['path']=os.path.splitext(os.path.basename(s))[0]
    return batch

In [None]:
%%time
result=test['test'].map(map2result)

In [None]:
result=result.map(result2csv,num_proc=config['num_proc'])

In [None]:
show_random(result.remove_columns(['audio_array','labels','input_values','sampling_rate']),5)

In [None]:
# WER&CER
result_wer=wer_metric.compute(predictions=result["hypothesis"], references=result[target])
result_cer=cer_metric.compute(predictions=result["hypothesis"], references=result[target])
print("Test WER: {:.3f}".format(result_wer))
print("Test CER: {:.3f}".format(result_cer))

path=result_path+'/wer_'+target+'.csv'
if not os.path.isfile(path):
    with open(path, mode='w')as f:
        f.write('dataset,target,wer,cer\n')

list1=[]
list1.extend([name,target,result_wer,result_cer])
with open(path, mode='a')as f:
    f.write((','.join(map(str,list1)))+'\n')

In [None]:
# save csv & rename "ID","reference","hypothesis"
result=result.rename_column('path','ID')
result=result.rename_column(target,'reference')
result.to_csv(result_path+'/result_'+name+'_'+target+'.csv',columns=['ID','reference','hypothesis'])

In [None]:
i=1
model.to("cuda")
a=test["test"][i]["phone"]
input_values = processor(test["test"][i]["audio_array"], sampling_rate=test["test"][i]["sampling_rate"], return_tensors="pt").input_values.to("cuda")
with torch.no_grad():
  logits = model(input_values).logits
pred_ids = torch.argmax(logits, dim=-1)
decoded=processor.decode(pred_ids[0])
converted=processor.tokenizer.convert_ids_to_tokens(pred_ids[0].tolist())
joined=" ".join(converted)

print(os.path.basename(test["test"][i]["path"]))
print(f"target: {a}")
print(f"token_decode: {decoded}")
print(f"token_list: {converted}")
print(f"token_str: {joined}")
print(f"token_size: {len(converted)}")