In [None]:
import json
import os
import shutil
from tqdm.notebook import tqdm
from pydub import AudioSegment
from textgrid import TextGrid

import epitran
from phonemizer import phonemize
from ipapy.arpabetmapper import ARPABETMapper
from arpabetandipaconvertor.phoneticarphabet2arpabet import PhoneticAlphabet2ARPAbetConvertor

## Prepare parallel files

In [None]:
in_base_dir = '/path/to/spider/my'
dev_audio_dir = os.path.join(in_base_dir, 'dev', 'speech_wav')
dev_json = os.path.join(in_base_dir, 'dev', 'dev_rewriter(full).json')
train_audio_dir = os.path.join(in_base_dir, 'train', 'speech_wav')
train_json = os.path.join(in_base_dir, 'train', 'train_rewriter.json')

align_data_dir = '/path/to/Prosodylab-Aligner/data/spider/'

In [None]:
dev_set = json.load(open(dev_json, 'r'))
len(dev_set)

In [None]:
for i, s in tqdm(enumerate(dev_set), total=len(dev_set)):
    for j, c in enumerate(s):
        # sp[-1] == 0 --> sp is (0, 0), t is punct
        _tokens = [t.upper() for t, sp in zip(c['question_toks'], c['span_ranges']) if sp[-1] != 0]
        for k in range(len(_tokens)):
            if _tokens[k][0].isnumeric():
                _tokens[k] = '*' + _tokens[k] + '*'
        
        _align_txt_path = os.path.join(align_data_dir, f'dev-{i}.{j}.lab')
        with open(_align_txt_path, 'w') as f:
            f.write(' '.join(_tokens))
        
        _align_wav_path = os.path.join(align_data_dir, f'dev-{i}.{j}.wav')
        _src_wav_path = os.path.join(dev_audio_dir, f'{i}.wav')
        shutil.copyfile(_src_wav_path, _align_wav_path)

In [None]:
del dev_set

In [None]:
train_set = json.load(open(train_json, 'r'))
len(train_set)

In [None]:
for i, s in tqdm(enumerate(train_set), total=len(train_set)):
    for j, c in enumerate(s):
        # sp[-1] == 0 --> sp is (0, 0), t is punct
        _tokens = [t.upper() for t, sp in zip(c['question_toks'], c['span_ranges']) if sp[-1] != 0]
        for k in range(len(_tokens)):
            if _tokens[k][0].isnumeric():
                _tokens[k] = '*' + _tokens[k] + '*'
        
        _align_txt_path = os.path.join(align_data_dir, f'train-{i}.{j}.lab')
        with open(_align_txt_path, 'w') as f:
            f.write(' '.join(_tokens))
        
        _align_wav_path = os.path.join(align_data_dir, f'train-{i}.{j}.wav')
        _src_wav_path = os.path.join(train_audio_dir, f'{i}.wav')
        shutil.copyfile(_src_wav_path, _align_wav_path)

In [None]:
del train_set

### token-level parallel files

In [None]:
in_base_dir = '/Users/mac/Desktop/syt/Deep-Learning/Dataset/spider/my'
dev_audio_dir = os.path.join(in_base_dir, 'dev', 'speech_wav')
dev_json = os.path.join(in_base_dir, 'dev', 'dev_rewriter(full).json')
train_audio_dir = os.path.join(in_base_dir, 'train', 'speech_wav')
train_json = os.path.join(in_base_dir, 'train', 'train_rewriter.json')

align_data_dir = '/Users/mac/Desktop/syt/Deep-Learning/Repos/Prosodylab-Aligner/data/spider-tokens/'
os.makedirs(align_data_dir, exist_ok=True)

proso_dict_path = '/Users/mac/Desktop/syt/Deep-Learning/Repos/Prosodylab-Aligner/eng.dict'

In [None]:
proso_word_set = set()
with open(proso_dict_path, 'r') as f:
    for l in f:
        w, prons = l.split(' ', 1)
        proso_word_set.add(w)

# len(cmu_word_set)
len(proso_word_set)

In [None]:
dev_set = json.load(open(dev_json, 'r'))
len(dev_set)

In [None]:
for i, s in tqdm(enumerate(dev_set), total=len(dev_set)):
    _src_wav_path = os.path.join(dev_audio_dir, f'{i}.wav')
    _speech = AudioSegment.from_wav(_src_wav_path)
    
    os.makedirs(os.path.join(align_data_dir, f'dev-{i}'), exist_ok=True)
    for j, c in enumerate(s):
        # sp[-1] == 0 --> sp is (0, 0), t is punct
        _token_spans = [(t.upper(), sp) for t, sp in zip(c['question_toks'], c['span_ranges']) if sp[-1] != 0]
        
        for k, (_t, _sp) in enumerate(_token_spans):
            if _t not in proso_word_set:
                continue
            
            _st = int(float(_sp[0]) * 1000)
            _ed = int(float(_sp[1]) * 1000)
            _speech_token = _speech[_st:_ed]
            
            _align_txt_path = os.path.join(align_data_dir, f'dev-{i}/{j}.{k}.lab')
            with open(_align_txt_path, 'w') as f:
                f.write(_t)

            _align_wav_path = os.path.join(align_data_dir, f'dev-{i}/{j}.{k}.wav')
            _speech_token.export(_align_wav_path, format='wav')
        
#         _align_txt_path = os.path.join(align_data_dir, f'dev-{i}.{j}.lab')
#         with open(_align_txt_path, 'w') as f:
#             f.write(' '.join(_tokens))
        
#         _align_wav_path = os.path.join(align_data_dir, f'dev-{i}.{j}.wav')
#         _src_wav_path = os.path.join(dev_audio_dir, f'{i}.wav')
#         shutil.copyfile(_src_wav_path, _align_wav_path)

In [None]:
train_set = json.load(open(train_json, 'r'))
len(train_set)

In [None]:
for i, s in tqdm(enumerate(train_set), total=len(train_set)):
    _src_wav_path = os.path.join(train_audio_dir, f'{i}.wav')
    _speech = AudioSegment.from_wav(_src_wav_path)
    
    os.makedirs(os.path.join(align_data_dir, f'train-{i}'), exist_ok=True)
    for j, c in enumerate(s):
        # sp[-1] == 0 --> sp is (0, 0), t is punct
        _token_spans = [(t.upper(), sp) for t, sp in zip(c['question_toks'], c['span_ranges']) if sp[-1] != 0]

        for k, (_t, _sp) in enumerate(_token_spans):
            if _t not in proso_word_set:
                continue
            
            _st = int(float(_sp[0]) * 1000)
            _ed = int(float(_sp[1]) * 1000)
            _speech_token = _speech[_st:_ed]
            
            _align_txt_path = os.path.join(align_data_dir, f'train-{i}/{j}.{k}.lab')
            with open(_align_txt_path, 'w') as f:
                f.write(_t)

            _align_wav_path = os.path.join(align_data_dir, f'train-{i}/{j}.{k}.wav')
            _speech_token.export(_align_wav_path, format='wav')


In [None]:
del train_set

### Schema tokens

In [None]:
db_audio_dir = '/Users/mac/Desktop/syt/Deep-Learning/Dataset/spider/my/db/speech_wav'
db_vocab_path = '/Users/mac/Desktop/syt/Deep-Learning/Dataset/spider/my/db/schema_vocab.txt'

align_data_dir = '/Users/mac/Desktop/syt/Deep-Learning/Repos/Prosodylab-Aligner/data/spider-db-tokens/'
os.makedirs(align_data_dir, exist_ok=True)

In [None]:
proso_dict_path = '/Users/mac/Desktop/syt/Deep-Learning/Repos/Prosodylab-Aligner/eng.dict'

proso_word_set = set()
with open(proso_dict_path, 'r') as f:
    for l in f:
        w, prons = l.split(' ', 1)
        proso_word_set.add(w)

# len(cmu_word_set)
len(proso_word_set)

In [None]:
with open(db_vocab_path, 'r') as f:
    db_vocab = f.read().split('\n')
len(db_vocab)

In [None]:
for w in db_vocab:
    if w.upper() not in proso_word_set:
        continue
        
    _align_txt_path = os.path.join(align_data_dir, f'{w}.lab')
    with open(_align_txt_path, 'w') as f:
        f.write(w.upper())

    _align_wav_path = os.path.join(align_data_dir, f'{w}.wav')
    _src_wav_path = os.path.join(db_audio_dir, f'{w}.wav')
    shutil.copyfile(_src_wav_path, _align_wav_path)

## Add phoneme alignments to data file

### Utterances

In [None]:
in_base_dir = '/Users/mac/Desktop/syt/Deep-Learning/Dataset/spider/my'
dev_audio_dir = os.path.join(in_base_dir, 'dev', 'speech_wav')
dev_json = os.path.join(in_base_dir, 'dev', 'dev_rewriter(full).json')
dev_out_json = os.path.join(in_base_dir, 'dev', 'dev_rewriter(full)+phonemes.json')
train_audio_dir = os.path.join(in_base_dir, 'train', 'speech_wav')
train_json = os.path.join(in_base_dir, 'train', 'train_rewriter.json')
train_out_json = os.path.join(in_base_dir, 'train', 'train_rewriter+phonemes.json')

proso_base_dir = '/Users/mac/Desktop/syt/Deep-Learning/Repos/Prosodylab-Aligner/'

In [None]:
tg = TextGrid.fromFile(os.path.join(proso_base_dir, 'data/spider-tokens/dev-0/0.0.TextGrid'))

In [None]:
print(tg[0])
print(tg[0][0])
print(tg[0][0].minTime)
print(tg[0][0].maxTime)
print(tg[0][0].mark)

In [None]:
dev_set = json.load(open(dev_json, 'r'))
len(dev_set)

In [None]:
for i, s in tqdm(enumerate(dev_set), total=len(dev_set)):
    for j, c in enumerate(s):
        ## c: cand 
        token_phonemes = []
        token_phoneme_spans = []
        
        for k, t in enumerate(c['question_toks']):
            phonemes_align_path = os.path.join(proso_base_dir, f'data/spider-tokens/dev-{i}/{j}.{k}.TextGrid')
            try:
                phonemes_tg = TextGrid.fromFile(phonemes_align_path) # tg[0] is phonemes, [1] is words 
            except FileNotFoundError:
                ## skipped token 
                token_phonemes.append(None)
                token_phoneme_spans.append(None)
                continue
            
            _phs = [_intv.mark for _intv in phonemes_tg[0]]
            _ph_spans = [(_intv.minTime, _intv.maxTime) for _intv in phonemes_tg[0]]
            token_phonemes.append(_phs)
            token_phoneme_spans.append(_ph_spans)
        
        c['token_phonemes'] = token_phonemes
        c['token_phoneme_spans'] = token_phoneme_spans

In [None]:
with open(dev_out_json, 'w') as f:
    json.dump(dev_set, f, indent=2)

In [None]:
train_set = json.load(open(train_json, 'r'))
len(train_set)

In [None]:
for i, s in tqdm(enumerate(train_set), total=len(train_set)):
    for j, c in enumerate(s):
        ## c: cand 
        token_phonemes = []
        token_phoneme_spans = []
        
        for k, t in enumerate(c['question_toks']):
            phonemes_align_path = os.path.join(proso_base_dir, f'data/spider-tokens/train-{i}/{j}.{k}.TextGrid')
            try:
                phonemes_tg = TextGrid.fromFile(phonemes_align_path) # tg[0] is phonemes, [1] is words 
            except FileNotFoundError:
                ## skipped token 
                token_phonemes.append(None)
                token_phoneme_spans.append(None)
                continue
            
            _phs = [_intv.mark for _intv in phonemes_tg[0]]
            _ph_spans = [(_intv.minTime, _intv.maxTime) for _intv in phonemes_tg[0]]
            token_phonemes.append(_phs)
            token_phoneme_spans.append(_ph_spans)
        
        c['token_phonemes'] = token_phonemes
        c['token_phoneme_spans'] = token_phoneme_spans

In [None]:
with open(train_out_json, 'w') as f:
    json.dump(train_set, f, indent=2)

### Schema

In [None]:
db_base_dir = '/Users/mac/Desktop/syt/Deep-Learning/Dataset/spider/my/db/'
db_audio_dir = os.path.join(db_base_dir, 'speech_wav')
db_out_json = os.path.join(db_base_dir, 'schema_phonemes.json')

proso_base_dir = '/Users/mac/Desktop/syt/Deep-Learning/Repos/Prosodylab-Aligner/'

In [None]:
db_ph_dict = dict()

for w in db_vocab:
    phonemes_align_path = os.path.join(proso_base_dir, f'data/spider-db-tokens/{w}.TextGrid')
    try:
        phonemes_tg = TextGrid.fromFile(phonemes_align_path) # tg[0] is phonemes, [1] is words 
    except FileNotFoundError:
        ## skipped token 
        continue

    _phs = [_intv.mark for _intv in phonemes_tg[0]]
    _ph_spans = [(_intv.minTime, _intv.maxTime) for _intv in phonemes_tg[0]]
    db_ph_dict[w] = {
        'phonemes': _phs,
        'phoneme_spans': _ph_spans,
    }

len(db_ph_dict), db_ph_dict['x']

In [None]:
with open(db_out_json, 'w') as f:
    json.dump(db_ph_dict, f, indent=2)

## Exp: token-to-phonemes

In [None]:
epi = epitran.Epitran('eng-Latn')

In [None]:
a_converter = PhoneticAlphabet2ARPAbetConvertor()

In [None]:
word = 'swimming'

In [None]:
# ipa_phonemes = epi.transliterate(word)
# ipa_phonemes

In [None]:
ipa_phonemes = phonemize(
    word,
    language='en-us',
#     backend='festival',
#     separator=Separator(phone=None, word='', syllable=''),
    strip=True,
#     with_stress=True,
    preserve_punctuation=True,
#     njobs=4
).replace('ː',':')
ipa_phonemes

In [None]:
# amapper = ARPABETMapper()
# s_a = amapper.map_unicode_string(ipa_phonemes, ignore=True, return_as_list=True)
# s_a

In [None]:
a_converter.convert(ipa_phonemes)