# VoxCommunis data processing pipeline

This is a script of running MFA on recordings from Common Voice corpus. 

To run this pipeline, you need to download:

1. Python modules: epitran, praatio, re, pandas, numpy, subprocess, shutil, os
2. The data of XPF corpus

The pipeline takes these steps to process data:

1. [Step 0: Setups](#step-0-setups)
2. [Step 1: Remap speakers](#step-1-remap-the-validated-speakers)
3. [Step 2: Prepare the lexicon](#step-3-prepare-the-lexicon)
4. [Step 3: G2P grapheme-to-phoneme](#step-4-g2p-grapheme-to-phoneme-epitran-or-xpf)
5. [Step 4: Create TextGrid and .wav files](#step-2-create-textgrid-files-and-wav-files-based-on-the-mp3-recordings-from-common-voice)
6. [Step 5: Validation](#step-5-train-the-acoustic-model)
7. [Step 6: Run MFA](#step-6-train-the-acoustic-model-and-forced-align)
8. [Finale](#finale)

This script was created by Miao Zhang (miao.zhang@uzh.ch), 22.12.2023

This script was modified by Miao Zhang, 07.02.2024 (Revalidation added)

Modified on 16.02.2024: added automatic log.

## Step 0.1. Modules
Import packages and setup file directories (for both the scripts and data).

In [1]:
# Import modules
import os, subprocess, shutil, re, multiprocessing, zipfile
import pandas as pd
# Turn Copy-On-Write on
pd.options.mode.copy_on_write = True

# For creating textgrids
from praatio import textgrid

# For move files concurrently
from concurrent.futures import ThreadPoolExecutor

# Import Path
from pathlib import Path

# Import processing functions
import vxc_processing as vxcproc
import vxc_setup as vxcstp

In [None]:
# This is to reload modules if changes are made.
# Don't run this if you didn't make any changes to vxc_processing.py or vxc_processing_cjk.py.
import importlib
importlib.reload(vxcproc)
importlib.reload(vxcstp)
if is_cjk:
    importlib.reload(vxccjkproc)

## Step 0.2. Paths

Set the paths and directories of data and scripts to use.

_IMPORTANT_: the folder of the corpus data you downloaded from Common Voice should be named as: {lang_code}_v{version_number}.
- For example: the folder for the 16th version of Divhehi should be named: dv_v16.
- Another example: the folder for the 15th version of Upper Sorbian should be: hsb_v15.

In [None]:
###################################### Directories ################################################

# This is the directory where your data downloaded from Common Voice should be saved. This is the root directory where data from each language should be saved in individual folders.
common_voice_dir = '/Users/miaozhang/Research/CorpusPhon/CorpusData/CommonVoice' 

# To use XPF as the G2P, you will need to download the XPF data from: https://github.com/CohenPr-XPF/XPF/tree/master/Data.
# Specify the directory where your XPF data is saved.
xpf_dir = '/Users/miaozhang/Research/CorpusPhon/Scripts/XPF' 

######################### Language name/code and Common Voice version ##############################

# The language code used in Common Voice:
lang_code = 'rw' 
# If the language is Chinese/Japanese/Korean
is_cjk = vxcstp.detect_cjk(lang_code)
# Import functions from vxc_processing_cjk.py to process CJK texts.
if is_cjk:
    import vxc_processing_cjk as vxccjkproc

# The version of the data in Common Voice. Use only numbers.
cv_mod_version = '17' # which version of common voice corpus that the model is trained on?
cv_align_version = '17' # which version of common voice corpus is forced-aligned?

######################### G2P ######################################################################

# Specify the G2P engine. Only these keywords are acceptable: 
g2p = 'epi'
# 'xpf' for XPF
# 'epi' for Epitran
# 'chr' for Charsiu
# 'mfa' for MFA
# 'vxc' for self-difined lexicon

# Get processing info
lang_row = vxcstp.get_codes(lang_code)
lang_cv_name = lang_row['name_cv']

# Specify G2P details
if g2p == 'epi':
    # If you are using epitran, ...
    # Refer to VoxCommunics_info.csv to get the processing code of the language in epitran
    epi_code = 'kin-Latn'
elif g2p == 'mfa':
    # Specify the path of the MFA G2P model
    mfa_g2p_path = ''
elif g2p == 'xpf':
    # If you are using XPF, get the name of the language in XPF corpus
    lang_xpf_name = lang_row['name_xpf'].replace(' ', '')
    code_xpf = lang_row['code_xpf']
elif g2p == 'chr':
    # If you are using Charsiu, get the processing code for the language in Charsiu.
    code_chr = lang_row['code_chr']
    chr_g2p_script = 'chr_g2p.py'


# MFA paths
# This is where the acoustic model will be saved after MFA training is done:
mfa_mod_folder = '/Users/miaozhang/Documents/MFA/pretrained_models/acoustic'

# This is where files that will be uploaded to the OSF repo will be saved after the processing is finished:
osf_path = '/Users/miaozhang/Research/CorpusPhon/CorpusData/VoxCommunis_OSF'

######################### What writing system is the language using? ###############################

# Specify if the language uses Cyrillic letters
if_cyrl = 0

######################### Using existing model? ###############################

# Are you using a pre-trained model or training your own model?
# If training your own model, then set it to 0
if_self_mod = 0
# If you set it to 1, specify the path of the model in step 6 section below (in this code block)
if if_self_mod == 1:
    # Specify the path of the model
    acs_mod_path = '/Users/miaozhang/Documents/MFA/pretrained_models/acoustic/korean_mfa.zip'

######################### Using existing lexicon? ###############################

# Do you have your own prepared lexicon?
# If no, then set the value to 0
if_self_lex = 0
# If you set it to 1, specify the path of the lexicon in step 6 section below (in this code block)

######################################################################################

# Specify if the subversion of a corpus is used. The default is 0
if_subversion = 0 
# If if_subversion == 1, what suffix you would use?:
# Ignore this part, if you don't have a subversion of the corpus you are using.
if if_subversion == 0:
    subversion = '_' + 'sub3'

################################################################################################### 

# Get paths
language_dir, clip_info_path, validated_log, validated_recs_path = vxcstp.find_lang_dir(lang_code, cv_align_version, common_voice_dir)

# Get file names.    
naming_schema = pd.read_csv('vxc_naming_schema.csv', usecols = ['Python_code'])['Python_code'].tolist()
naming_schema = [eval(name) for name in naming_schema]
acs_mod_name = naming_schema[0]
textgrid_folder_name = naming_schema[1]
word_file_name = naming_schema[2]
dict_file_name = naming_schema[3]
spkr_file_name = naming_schema[4]
# Get the paths
textgrid_folder_path = os.path.join(language_dir, textgrid_folder_name)
word_file_path = os.path.join(language_dir, word_file_name)
dict_file_path = os.path.join(language_dir, dict_file_name)
spkr_file_path = os.path.join(language_dir, spkr_file_name)

###################################################################################################################

# For step 3: G2P
if g2p == 'xpf':
    xpf_translater_path = 'xpf_translate04.py'
    rule_file_path = os.path.join(xpf_dir, code_xpf + '_' + lang_xpf_name, code_xpf + '.rules')
    if lang_code == 'ug':
        verify_file_path = os.path.join(xpf_dir, code_xpf + '_' + lang_xpf_name, code_xpf + '-arabic.verify.csv')
    else:
        verify_file_path = os.path.join(xpf_dir, code_xpf + '_' + lang_xpf_name, code_xpf + '.verify.csv')
elif g2p == 'epi':
    epitran_translater_path = 'epi_run.py'

###################################################################################################################

# For step 5: running MFA
# Validate the corpus
if if_self_mod != 1:
    if if_subversion == 0:
        acs_mod_path = os.path.join(mfa_mod_folder, acs_mod_name)
    else:
        acs_mod_name = re.sub('.zip', subversion + '.zip', acs_mod_name)
        acs_mod_path = os.path.join(mfa_mod_folder, acs_mod_name)
output_path = os.path.join(language_dir, 'output')

# If you are using your own model or lexicon:
if if_self_lex == 1:
    # Specify the path of the lexicon
    dict_file_path = '/Users/miaozhang/Documents/MFA/pretrained_models/dictionary/french_mfa.dict'  

###################################################################################################################

vxcstp.show_files(language_dir, acs_mod_path, dict_file_path, spkr_file_path, textgrid_folder_path)

## Step 1. Speaker remapping
Get speaker IDs to put on TextGrids for speaker adaptation.

In [None]:
# Remap the speakers and save it to output the validated recordings to the speaker file
if_output = True
if not is_cjk:
    # Process non-CJK
    valid = vxcproc.remap_spkr(language_dir, spkr_file_path, lang_code, output=if_output)
else:
    # Process CJK
    valid = vxccjkproc.remap_cjk_spkr(language_dir, spkr_file_path, lang_code, output=if_output)

print(f'There are {len(valid)} validated recordings in total for {lang_cv_name}.', '\n')
print('Some sentence examples:\n')

if not is_cjk:
    for line in valid['sentence'].sample(20).tolist():
        print('\t', line)
else:
    for line in valid['sentence_tok'].sample(20).tolist():
        print('\t', line)

## Step 2. Word list
Generate the word list from Common Voice transcripts.

In [None]:
# Remove punctuations and get the unique word types
words = vxcproc.process_words(valid, lang_code)

# Remove other unwanted words (E.g., Cyrl in Latin writnig, other alphabets in CJK languages, etc.)
words = vxcproc.remove_unwanted_words(words, lang_code, is_cjk, if_cyrl)

# Save the word list as a .txt file
if os.path.exists(word_file_path):
    os.remove(word_file_path)

with open(word_file_path,'w') as word_file:
    for word in words:
        word_file.write(word + "\n")

print("Word list saved to:", word_file_path)

## Step 3. G2P
There three files you need to proceed if you use XPF.
1. A G2P rule file
2. A veryfication file
3. The translater python script

In [None]:
if os.path.exists(dict_file_path):
    os.remove(dict_file_path)
   
# XPF
if g2p == 'xpf':
    vxcproc.xpf_g2p(xpf_translater_path, rule_file_path, verify_file_path, word_file_path, dict_file_path)

# Epitran
elif g2p == 'epi':
    if not is_cjk:
        vxcproc.epi_g2p(words, epi_code, dict_file_path)
    else:
        vxccjkproc.epi_cjk_g2p(words, epi_code, dict_file_path)

# Charsiu
elif g2p == 'chr':
    chr_args = [chr_g2p_script, word_file_path, code_chr, dict_file_path]
    subprocess.run(['python'] + chr_args)

# MFA
elif g2p == 'mfa':
    cmd_mfa_g2p = f'mfa g2p {word_file_path} {mfa_g2p_path} {dict_file_path}' 
    print('To g2p, copy and run:\n\t', cmd_mfa_g2p, '\n')

# Other source
elif g2p == 'vxc':
    if lang_code == 'zh-CN':
        vxccjkproc.cmn_g2p(words, dict_file_path)
    elif lang_code == 'nan-tw':
        vxccjkproc.nan_g2p(words, dict_file_path)

print(f'Check the lexicon file: {dict_file_path}')

## Step 4. TextGrid files

All validated clips that are longer than 1s and the sentence is not empty will be moved to a subfolder called 'validated'.

The invalidated clips will stay in the 'clips' folder.

In [None]:
# Make the folder for validated clips
if not os.path.exists(validated_recs_path):
    os.makedirs(validated_recs_path)

# Setup file chunks to batch processing clip moving and textgrid creating 
n_clips = len(valid)
n_workers = 10
chunksize = round(n_clips / n_workers)

# Move the clips and create textgrid files:
with ThreadPoolExecutor(n_workers) as exe:
    for i in range(0, len(valid), chunksize):
        chunk_data = valid.loc[i:(i+chunksize),]
        if not is_cjk:
            _ = exe.submit(vxcproc.move_and_create_tg, chunk_data)
        else:
            _ = exe.submit(vxccjkproc.move_and_create_cjk_tg, chunk_data)

# Uncomment the next line if you want to delete the invalidated recordings
shutil.rmtree(os.path.join(language_dir, "clips"))

## Step 5. MFA

### Step 5.1. Validating

First, you need to activate the MFA environment in the terminal.
1. Press ctrl+` to open Terminal in VS Code.
2. Run 'conda activate aligner' until you see '(aligner)' at the beginning of the line in Terminal.
3. When you finished using MFA (both training and aligning), run 'conda deactivate' to shut down the MFA environment.

In [None]:
# Create a folder of MFA in document
# You DON'T need to run this if you already have an MFA folder in your Documents folder
# Uncomment the command below to run:
#!mfa model download acostic english.zip

To validate the corpus, run this line in terminal: 

        mfa validate {wherever your validated recordings are} {wherever your lexicon file is} --ignore_acoustics --clean

You can copy the command lines from below.
Notebook can't handle ```mfa``` commands. MFA commands can only run in Terminal.

In [None]:
cmd_validate = f'mfa validate {validated_recs_path} {dict_file_path} --ignore_acoustics --clean'
print('To validate, copy:\t\n\t' + cmd_validate)

### Step 5.2. Train the model

        mfa train --clean {where your validated recordings are} {where your lexicon file is} {where your model will be saved}

You can copy the command lines from below.
Notebook can't handle ```mfa``` commands. The mfa commands above can only run in Terminal.

In [None]:
# Train your own model
cmd_train = f'mfa train --clean {validated_recs_path} {dict_file_path} {acs_mod_path}'
print('To train, copy:\t\n\t' + cmd_train)

### Step 5.3. Forced-alignment

        mfa align --clean {where your validated recordings are} {where your lexicon file is} {where your acoustic model is} {where your output will be saved}
        
When the model is trained, align the corpus.

However, since the MFA alignment somehow stops after generating 32609 textgrid files, we will split the corpus into n subfolders with each subfolder containing 32000 files.
If the corpus has more than 32000 recordings, move the mp3 and textgrid files into subfolders.

THen print out the MFA commands to align the data in (each subfolder of) the validated folder.

In [None]:
#### !!!SKIP THIS PART IF THERE ARE LESS THAN 32000 recordings IN THE FOLDER!!! ####

# Get all mp3 files in the validated folder
all_mp3 = [item for item in os.listdir(validated_recs_path) if os.path.splitext(item)[1] == '.mp3']
n_clips = len(all_mp3)
print(f"There are {n_clips} clips in the validated folder.")
n_valid = len(valid)

if n_clips > 32000:
    # Create subfolders
    subfolders = valid['subfolder'].unique()
    for subfolder in subfolders:
        subfolder_path = os.path.join(validated_recs_path, subfolder)
        if not os.path.exists(subfolder_path):
            os.makedirs(subfolder_path)

    # Create the paths in the subfolders for each recording according to their grouping
    splits = valid[valid['path'].isin(all_mp3)]
    splits.to_csv(os.path.join(language_dir, 'all_splits.csv'), index = False)

    # Move the files into subfolders using multithreads
    n_workers = 10
    chunksize = round(len(splits) / n_workers)
    with ThreadPoolExecutor(n_workers) as exe:
        for i in range(0, len(splits), chunksize):
            chunk_data = splits.loc[i:(i+chunksize),]
            _ = exe.submit(vxcproc.split_recs, chunk_data)
    
    # If there are still files left in the root directory, move them into their subfolders
    rest_mp3 = [item for item in os.listdir(validated_recs_path) if os.path.splitext(item)[1] == '.mp3']
    rest_move = valid[valid['path'].isin(rest_mp3)]
    vxcproc.split_recs(rest_move)
    del rest_move, rest_mp3
    
    # Check if there are still mp3 or textgrid files in the root directory
    contains_subdir = any(
        os.path.isfile(os.path.join(validated_recs_path, item)) and 
        (item.lower().endswith('.mp3') or item.lower().endswith('.textgrid')) 
        for item in os.listdir(validated_recs_path)
        )
    if contains_subdir:
        print("The validated folder still contains mp3 or TextGrid files.", "\n")
    else:
        print("\n", "All mp3 or TextGrid files are moved to subfolders.", "\n")

    # Check if there are overlapping file names across the subfolders
    overlap_dict = vxcproc.check_file_overlaps(validated_recs_path)
    if len(overlap_dict) == 0:
        print("\n", "There are no overlapping file names across the subfolders.", "\n") 
    else:
        print(overlap_dict)

# Print the MFA commands for alignment
all_items = os.listdir(validated_recs_path)
all_items = [file for file in all_items if '.DS_Store' not in file]
all_items.sort()
any_file = any(os.path.isfile(os.path.join(validated_recs_path, item)) for item in all_items)
if not any_file:
    mfa_align_script_path = '/Users/miaozhang/Research/CorpusPhon/Scripts/vxc_pipeline/mfa_align.sh'
    print(f'There are {len([os.path.isdir(os.path.join(validated_recs_path, item)) for item in all_items])} subfolders for {lang_cv_name}.', '\n')
    # Use a bash script to automatically align the data in all subfolders. Remember to activate the MFA virtual environment: conda activate aligner
    # print(f'Copy and run this in the terminal to grant execution permission to the script:\tchmod +x {mfa_align_script_path}', '\n')
    print(f'Copy and run this in the terminal to align the data in all subfolders:\t')
    print(f'\tbash {mfa_align_script_path} {validated_recs_path} {dict_file_path} {acs_mod_path} {output_path}', '\n')
else:  
    cmd_train = f'mfa align --clean {validated_recs_path} {dict_file_path} {acs_mod_path} {output_path}'
    print('To align, copy:')
    print('\t' + cmd_train)

### Step 5.4: Put back (optional)

When the alignment is done, if splits were created for aligning the data, put the recordings back to one single folder.

Then check if all files are put back to the validated folder's root directory.

In [None]:
if n_valid > 32000:
    # Put the files back to validated root folder
    n_workers = 10
    chunksize = round(len(valid) / n_workers)
    with ThreadPoolExecutor(n_workers) as exe:
        for i in range(0, len(valid), chunksize):
            chunk_data = valid.loc[i:(i+chunksize),]
            _ = exe.submit(vxcproc.merge_recs, chunk_data)

    # Use os.scandir() for better performance
    with os.scandir(validated_recs_path) as entries:
        subfolders = [entry.name for entry in entries if entry.is_dir()]
        subfolders.sort()

    # Lists to store undeleted subfolders and files
    undeleted_subfolders = []

    # Batch deletion of empty subfolders
    for subfolder in subfolders:
        subfolder_path = os.path.join(validated_recs_path, subfolder)
        with os.scandir(subfolder_path) as sub_entries:
            if not any(entry.is_file() for entry in sub_entries):
                # If the subfolder does not contain any files, delete it
                shutil.rmtree(subfolder_path)
                print("\n", f"Subfolder '{subfolder}' deleted because it contains no files.", "\n")
            else:
                undeleted_subfolders.append(subfolder)

    print("\n", "Subfolders checked and processed.", "\n")

    # List undeleted subfolders
    if len(undeleted_subfolders) > 0:
        print("Undeleted subfolders:")
        for subfolder in undeleted_subfolders:
            print(subfolder)
            # Move the files in the uncleared subfolder back to the validated folder if there is still any
            vxcproc.move_files_to_root(validated_recs_path, os.path.join(validated_recs_path, subfolder))
        
    # Check if all the subfolders are deleted
    with os.scandir(validated_recs_path) as entries:
        contains_subdir = any(entry.is_dir() for entry in entries)
        if contains_subdir:
            print("\nThe validated folder still contains subfolders.")
        else:
            print("\nThe validated folder does not contain any subfolders now.")

# Check if the output and input files match.
with multiprocessing.Pool() as pool:
    result = pool.apply(vxcproc.compare_inout, args=(output_path, validated_recs_path))
print(result)


## Finale

Then, move the output files (the speaker file, the lexicon, the acoustic model, and the aligned textgrids) to the OSF folder to be ready to upload.

In [None]:
# Zip output textgrids
txtgrds_path = os.path.join(osf_path, 'textgrids', textgrid_folder_name)
from threading import Lock
if os.path.exists(output_path):
    # list all files to add to the zip
    tgfiles = [os.path.join(output_path, filename) for filename in os.listdir(output_path) if '.DS_Store' not in filename]
    # create lock for adding files to the zip
    lock = Lock()
    # open the zip file
    with zipfile.ZipFile(txtgrds_path, 'w', compression=zipfile.ZIP_DEFLATED) as handle:
        # create the thread pool
        with ThreadPoolExecutor(10) as exe:
            # add all files to the zip archive
            _ = [exe.submit(vxcproc.add_file, lock, handle, tg, output_path) for tg in tgfiles]

# Move the acoustic model
shutil.copy(acs_mod_path, os.path.join(osf_path, 'acoustic_models'))

# Move the lexicon
shutil.copy(dict_file_path, os.path.join(osf_path, 'lexicons'))

# Move the speaker file
shutil.copy(spkr_file_path, os.path.join(osf_path, 'spkr_files'))

# Check if the model was trained
model_trained = os.path.exists(os.path.join(osf_path, 'acoustic_models', acs_mod_name))
# Check if the textgrid archive was created
aligned = os.path.exists(os.path.join(osf_path, 'textgrids', textgrid_folder_name))

Finally, upadate the tracking info in `VoxCommunis_Info.csv`. 

Make sure it is not in the lang_code_processing folder. Once updated, push the updated .csv to the GitHub.

In [None]:
# Paste the name of the outputs into the tracking file
cv_track = pd.read_csv('VoxCommunis_Info.csv')
cv_track = cv_track.astype('string')
if model_trained:
    print(f'The acoustic model for {lang_cv_name} has been trained.')
    cv_track.loc[cv_track['code_cv'] == lang_code, 'acoustic_model'] = acs_mod_name
else:
    print(f'Couldn\'t find the acoustic model.')
    cv_track.loc[cv_track['code_cv'] == lang_code, 'acoustic_model'] = ''

if aligned:
    print(f'The forced alignment for {lang_cv_name} has been created.')
    cv_track.loc[cv_track['code_cv'] == lang_code, 'textgrids'] = textgrid_folder_name
else:
    print(f'Couldn\'t find the alignment.')
    cv_track.loc[cv_track['code_cv'] == lang_code, 'textgrids'] = ''

cv_track.loc[cv_track['code_cv'] == lang_code, 'spkr_file'] = spkr_file_name
cv_track.loc[cv_track['code_cv'] == lang_code, 'lexicon'] = dict_file_name

# Update the tracking file
cv_track.to_csv('VoxCommunis_Info.csv', index = False)

# Sample some files to check the alignment
# How many sentences you want to check?
n_sentences_to_check = 20


post_check = valid['path'].sample(n_sentences_to_check, random_state=42).tolist()
snd_path = [os.path.join(language_dir, 'validated', snd) for snd in post_check]
tg_path = [os.path.join(language_dir, 'output', Path(snd).with_suffix('.TextGrid')) for snd in post_check]
post_check = pd.DataFrame(
    {'sound': snd_path,
    'textgrid': tg_path}
)
post_check.to_csv(os.path.join(language_dir, 'post_check.csv'), index = False)