### Code to extract ground truth segmentation for TIMIT

In [18]:
import os
from pathlib import Path

def read_timit_phone_alignment(phn_file, frame_rate=0.02, sr=16e3):
    starts = []
    ends = []
    labels = []
    with open(phn_file, 'r') as f:
        for line in f:
            start, end, label = line.split()
            start = int(start) // int(frame_rate * sr)
            end = int(end) // int(frame_rate * sr)
            labels.extend([label for _ in range(start, end)])
    return labels

def extract_timit_phone_alignments(manifest_dir, align_dir, out_dir,  split):
    manifest_dir = Path(manifest_dir)
    out_dir = Path(out_dir)
    if not out_dir.exists():
        out_dir.mkdir(parents=True, exist_ok=True)
    align_dict = {}
    for root, dir_list, file_list in os.walk(align_dir):
        root = Path(root)
        for fn in file_list:
            if fn.endswith('.PHN'):
                fn = root / Path(fn)
                spk_id = fn.name.split(".")[0]
                file_id = f'{fn.parent.name}_{spk_id}'
                align_dict[file_id] = fn 
    
    vocab = ['h#']
    with open(manifest_dir / f'{split}.tsv', 'r') as f_tsv,\
        open(manifest_dir / f'{split}.lengths', 'r') as f_len,\
        open(out_dir / f'{split}.src', 'w') as f_src,\
        open(out_dir / f'{split}.src.txt', 'w') as f_src_txt:
        lines = f_tsv.read().strip().split('\n')
        sizes = list(map(int, f_len.read().strip().split('\n')))
        _ = lines.pop(0)
        for l, s in zip(lines, sizes):
            filepath = l.split('\t')[0]
            file_id = Path(filepath).name.split(".")[0]
            print(file_id)  # XXX
            ali_file = align_dict[file_id]
            labels = read_timit_phone_alignment(ali_file)
            int_labels = []
            print(f'True length: {s}, alignment length: {len(labels)}')
            for y in labels:
                if not y in vocab:
                    vocab.append(y)
                int_labels.append(vocab.index(y))
            if s > len(labels):
                gap = s - len(labels)
                labels.extend(['h#']*gap)
                int_labels.extend([0]*gap)
            elif s < len(labels):
                labels = labels[:s]
                int_labels = int_labels[:s]
            assert len(int_labels) == s
            f_src_txt.write(" ".join(labels)+"\n")
            f_src.write(" ".join(list(map(str, int_labels)))+"\n")
    print(f'Number of phonemes: {len(vocab)}') 
    print(vocab)

In [19]:
split = 'valid'
align_split = 'TEST' #split.upper()
manifest_dir = '/home/lwang114/workplace/fall2022/UnsupSpeech2Sign/manifest/timit_phn/matched/feat'
align_dir = f'/home/hertin/data/timit/TIMIT/{align_split}'
out_dir = '/home/lwang114/workplace/fall2022/UnsupSpeech2Sign/manifest/timit_phn/matched/feat/CLUS60'

extract_timit_phone_alignments(manifest_dir, align_dir, out_dir, split)

FADG0_SI1279
True length: 90, alignment length: 90
FADG0_SI1909
True length: 139, alignment length: 139
FADG0_SI649
True length: 230, alignment length: 230
FADG0_SX109
True length: 152, alignment length: 152
FADG0_SX199
True length: 188, alignment length: 188
FADG0_SX19
True length: 135, alignment length: 135
FADG0_SX289
True length: 145, alignment length: 145
FADG0_SX379
True length: 133, alignment length: 133
FAKS0_SI1573
True length: 248, alignment length: 248
FAKS0_SI2203
True length: 175, alignment length: 175
FAKS0_SI943
True length: 187, alignment length: 187
FAKS0_SX133
True length: 165, alignment length: 165
FAKS0_SX223
True length: 154, alignment length: 154
FAKS0_SX313
True length: 176, alignment length: 176
FAKS0_SX403
True length: 167, alignment length: 167
FAKS0_SX43
True length: 122, alignment length: 122
FCAL1_SI1403
True length: 241, alignment length: 241
FCAL1_SI2033
True length: 166, alignment length: 166
FCAL1_SI773
True length: 145, alignment length: 145
FCAL1_SX14

### Extract forced alignment for ASL LibriSpeech

In [None]:
import os
from pathlib import Path
from textgrids import TextGrids

def is_inside(x1, x2):
    if x1[0] > x2[1] or x1[1] < x2[0]:
        return False
    return True

def read_librispeech_phone_alignment(phn_file):
    starts = []
    ends = []
    labels = []
    tg = TextGrids(phn_file) 
    for phn in tg["phones"]:
        starts.append(phn.xmin)
        ends.append(phn.xmax)
        labels.extend(phn.text)
    return starts, ends, labels

def map_phones_to_words(
        phn_starts, phn_ends, phn_labels,
        wrd_starts, wrd_ends, wrd_labels,
    ):
    mapped_starts = [[] for _ in range(len(wrd_starts))]
    mapped_ends = [[] for _ in range(len(wrd_starts))]
    mapped_labels = [[] for _ in range(len(wrd_starts))]
    for p_s, p_e, p_lbl in zip(phn_starts, phn_ends, phn_labels):
        for w_idx, (w_s, w_e, w_lbl) in enumerate(
            zip(wrd_starts, wrd_ends, wrd_labels)
        ):
            if is_inside((p_s, p_e), (w_s, w_e)):
                mapped_starts[w_idx].append(p_s)
                mapped_ends[w_idx].append(p_e)
                mapped_labels[w_idx].append(p_lbl)
    print(wrd_starts, wrd_ends, wrd_labels)
    print(mapped_starts, mapped_ends, mapped_labels)
    return mapped_starts, mapped_ends, mapped_labels

def extract_librispeech_phone_alignments(
        manifest_dir, 
        align_dir, 
        out_dir, 
        split, 
        frame_rate=0.02, 
        sr=16e3,
    ):
    manifest_dir = Path(manifest_dir)
    align_dir = Path(align_dir)
    out_dir = Path(out_dir)
    vocab = ['sil']
    with open(manifest_dir / f"{split}.jsonlines", "r") as f_wrd_ali,\
        open(manifest_dir / f"feat/{split}.lengths", "r") as f_len,\
        open(manifest_dir / f"{split}.phn", "w") as f_phn,\
        open(out_dir / f"{split}.src", "w") as f_src,\
        open(out_dir / f"{split}.src.txt", "w") as f_src_txt:
        sizes = list(map(int, f_len.read().strip().split("\n")))
        for line, size in zip(f_wrd_ali, sizes):
            utt = json.loads(line.rstrip("\n"))
            utt_id = utt["utterance_id"]
            wrd_starts = utt["begins"]
            wrd_ends = utt["ends"]
            wrd_labels = utt["labels"]
            parts = utt_id.split("_")
            align_subdir = "/".join(parts[:-1])
            align_name = f"{utt_id}.TextGrid"
            align_path = align_dir / align_subdir / align_name
            
            phn_starts, phn_ends, phn_labels = read_librispeech_phone_alignment(align_file)
            
            mapped_starts, mapped_ends, mapped_labels = map_phones_to_words(
                phn_starts, phn_ends, phn_labels,
                wrd_starts, wrd_ends, wrd_labels,
            )
            
            # Create phone-level transcript and frame-level phone sequence
            mapped_sequence = []
            mapped_int_sequence = []
            mapped_frame_sequence = []
            for starts, ends, labels in zip(mapped_starts, mapped_ends, mapped_labels):
                for start, end, phn in zip(starts, ends, labels):
                    if not phn in vocab:
                        vocab.append(phn)    
                    mapped_sequence.append(phn)
                    mapped_int_sequence.append(str(vocab.index(phn)))
                    s = start // int(frame_rate * sr)
                    e = end // int(frame_rate * sr)
                    mapped_frame_sequence.extend([phn]*(e-s))
            
            if size > len(mapped_int_sequence):
                gap = size - len(mapped_int_sequence)
                mapped_int_sequence.extend([0]*gap)
                mapped_frame_sequence.extend(['sil']*gap)
            elif size < len(mapped_int_sequence):
                mapped_int_sequence = mapped_int_sequence[:size]
                mapped_frame_sequence = mapped_frame_sequence[:size]
            assert len(mapped_int_sequence) == size
            
            f_src_txt.write(" ".join(mapped_frame_sequence)+"\n")
            f_src.write(" ".join(mapped_int_sequence)+"\n")
            f_phn.write(" ".join(mapped_sequence)+"\n")