### Prepare DALI for fine-tune task

### Extract Vocals

In [None]:
import DALI as dali_code
import os
import torch
from lsync.voice_extractor import VoiceExtractor
from lsync.config import ORIGINAL_SR, TARGET_SR
from lsync.util import save_audio, save_audio_file
from tqdm.notebook import tqdm


path_audio = os.path.abspath('dataset/DALI/audio')
path_vocals = os.path.abspath("dataset/DALI/processed_audio")
torch.cuda.is_available()

In [None]:
converted = set([os.path.splitext(x)[0] for x in os.listdir(path_vocals)])
ve = VoiceExtractor()

for fname in tqdm(os.listdir(path_audio)):
    try:
        audio_name, ext = os.path.splitext(fname)
        if ext != '.mp3' or audio_name in converted:
            continue
        file_path = os.path.join(path_audio, fname)
        vocals = ve.extract_voice(file_path, post_process=True)
        save_audio(vocals, audio_name, sr=TARGET_SR, out_path=path_vocals)
    except Exception as e:
        print(e)
        continue

### Make Speech Recognition Dataset for Fine-tune

#### Load DALI annotation

In [None]:
import DALI as dali_code
import os
from lsync.util import save_audio_file
import pandas as pd
import librosa

dali_data_path = os.path.abspath('dataset/DALI/v1')
path_audio = os.path.abspath('dataset/DALI/audio')
dali_data = dali_code.get_the_DALI_dataset(dali_data_path, skip=[], keep=[])
dali_info = dali_code.get_info(dali_data_path + '/info/DALI_DATA_INFO.gz')
path_segments = os.path.abspath("dataset/DALI/segmented_audio")
csv_path = os.path.abspath('dali_lines.csv')

#### Make lyrics line level dataset

In [None]:
processed_songs = [os.path.splitext(x)[0] for x in os.listdir(path_vocals)]

def extract_audio_segment(audio, time_seg, save_path, sr = TARGET_SR):
    start_t, end_t = time_seg
    start_idx, end_idx = int(start_t * sr), int(end_t * sr)
    segment = audio[start_idx:end_idx]
    save_audio_file(segment, save_path, sr=sr)
    return segment

def get_segment_fname(id: str, segment_index: int):
    return f"{id}_{segment_index}"

def make_dataset():
    data_list = []
    for id in tqdm(processed_songs):
        # Get vocals audio
        audio_path = os.path.join(path_vocals, f"{id}.wav")
        audio, sr = librosa.load(audio_path, sr=TARGET_SR)
        # Get annotations
        entry = dali_data[id]
        anno = entry.annotations['annot']
        lines = anno['lines']
        # Process each line
        for seg_idx, line in enumerate(lines):
            # Extract segment
            segment_time = line['time']
            extract_audio_segment(
                audio,
                segment_time,
                save_path=os.path.join(path_segments, get_segment_fname(id, seg_idx))
            )
            # Add annotation to dataset
            data_list.append((id, seg_idx, line['text']))
        
    df = pd.DataFrame(data_list, columns=['id', 'segment_index', 'text'])
    return df

df = make_dataset()
df.to_csv(csv_path, index=False)
df.head(3)

### Make training dataset & data cleaning

In [None]:
from num2words import num2words
import re
import numpy as np

allowed_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ \'')

def clean_lyrics(lyrics: str) -> str:
    # Find all numbers in the text using regular expression
    num_regex = r'\d+'
    numbers = re.findall(num_regex, lyrics)
    
    # Convert each number to its word form
    for num in numbers:
        word_form = num2words(int(num))
        lyrics = lyrics.replace(num, word_form)

    cleaned = ""
    for char in lyrics:
        if char in allowed_chars:
            cleaned += char
        elif char == '-':
            cleaned += ' '
    return cleaned

dataset_csv_path = "dataset.csv"
def transform_dataset(csv_path, path_segments, lang='english'):
    df = pd.read_csv(csv_path)
    df['path'] = df.agg(lambda x: f"{path_segments}/{x['id']}_{str(x['segment_index'])}.wav", axis=1)
    delete_idx = []
    for idx, row in tqdm(df.iterrows()):
        song_id = row['id']
        # Get annotations
        entry = dali_data[song_id]
        meta = entry.info['metadata']
        song_lang = meta['language']
        # Skip different lang
        if song_lang != lang:
            delete_idx.append(idx)
            continue
        # Clean invalid data
        if not isinstance(row['text'], str) or len(row['text']) == 0:
            delete_idx.append(idx)
            continue
        # Check for audio validity
        y, sr = librosa.load(row['path'], sr=TARGET_SR)
        yt, _ = librosa.effects.trim(y, top_db=30)
        duration = librosa.get_duration(y=yt, sr=sr)
        if duration > 3.1 or duration < 0.15: # CUDA out of memory
            delete_idx.append(idx)
            continue        

    df = df.drop(delete_idx)
    df = df.drop(['id', 'segment_index'], axis=1)

    # Text cleaning
    df['text'] = df['text'].map(clean_lyrics)
    df.replace('', np.nan, inplace=True)
    df.dropna(inplace=True)
    return df

df = transform_dataset(csv_path, path_segments)
df.to_csv(dataset_csv_path, index=False)
df.head(10)

# Finetune

### Load dataset

In [None]:
from datasets import load_dataset, Audio, Dataset

dataset_csv_path = "dataset.csv"
dataset = Dataset.from_csv(dataset_csv_path)

### Sample of dataset loaded

In [None]:
import IPython.display as ipd
import numpy as np
import random
import librosa

rand_int = random.randint(0, len(dataset))

def audio_processing(fp):
    y, sr = librosa.load(fp, sr=TARGET_SR)
    return y

print(dataset[rand_int]["text"])
ipd.Audio(data=audio_processing(dataset[rand_int]["path"]), autoplay=True, rate=16000)


### Data cleaning

In [None]:
import re
chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"]'


def remove_special_characters(batch):
    batch["text"] = re.sub(chars_to_ignore_regex, '', batch["text"]).lower()
    return batch

dataset = dataset.map(remove_special_characters)

In [None]:
import json

def make_vocab(dataset):
    vocab = set()
    for sample in dataset['text']:
        for ch in sample:
            vocab.add(ch)
    vocab_dict = {v: k for k, v in enumerate(sorted(vocab))}
    vocab_dict["|"] = vocab_dict[" "]
    del vocab_dict[" "]
    vocab_dict["[UNK]"] = len(vocab_dict)
    vocab_dict["[PAD]"] = len(vocab_dict)
    with open('vocab.json', 'w') as vocab_file:
        json.dump(vocab_dict, vocab_file)
    return vocab_dict

vocab_dict = make_vocab(dataset)
with open('vocab.json', 'r') as f:
    vocab_dict = json.load(f)


print(vocab_dict)