In [88]:
%load_ext autoreload
%autoreload 2

import os, sys, glob
import json
from operator import itemgetter
import re
import numpy as np
import pandas as pd
import shutil
from praatio import textgrid as tgio

sys.path.append('../utils/')

from config import *
from preproc_utils import gentle_fill_missing_words, create_word_prediction_df, clean_hyphenated_words, clean_named_entities, dataframe_to_textgrid


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
task = 'wheretheressmoke'
overwrite = False

### Set directories

In [3]:
# set directories
stim_dir = os.path.join(BASE_DIR, 'stimuli')
gentle_dir = os.path.join(stim_dir, 'gentle')
preproc_dir = os.path.join(stim_dir,'preprocessed')
task_out_dir = os.path.join(preproc_dir, task)
backup_dir = os.path.join(task_out_dir, 'src')

audio_fn = glob.glob(os.path.join(stim_dir, 'audio', f'*{task}*.wav'))[0]

### Load adjusted file

Currently we only mapped the word tier from gentle to praat -- need to map the phoneme tier as well

In [26]:
def gentle_to_textgrid(alignment_fn):
	"""
	Take a filename and its associated transcription and fill in all the gaps
	"""
    
	rearranged_words = []
	file_ons = 0
	
	# load the alignment file
	with open(alignment_fn, encoding='utf-8') as f:
		content = json.load(f)
	all_ons = content['words'][0]['start']
	
	for ix, word in enumerate(content['words']):
		# if the word was successfully aligned
		if word['case'] == 'success' or word['case'] == 'assumed':
			word_ons = np.round(word['start'], 3)
			word_off = np.round(word['end'], 3)
			target = word['word']
			rearranged_words.append((word_ons, word_off, target))
		else:
			# search forwards and backwards to find the previous and next word
			# use the end and start times to get word times 
			target = content['words'][ix]['word']
			prev_end, next_start = align_missing_word(content, ix)
			
			rearranged_words.append((prev_end, next_start, target))
	
	# adjust for overlap in times
	for ix, word_times in enumerate(rearranged_words):
		if ix != 0:
			prev_start, prev_end, prev_word = rearranged_words[ix-1]
			curr_start, curr_end, curr_word = word_times

			# if the current start time is before the previous end --> adjust
			# to be the previous end time
			if curr_start < prev_end:
				rearranged_words[ix] = (prev_end, curr_end, curr_word)
				curr_start, curr_end, curr_word = rearranged_words[ix]

			# if the current end time is after the current start time
			# set to be the next start time
			if curr_end < curr_start and (ix+1 != len(rearranged_words)):
				next_start, next_end, next_word = rearranged_words[ix+1]
				rearranged_words[ix] = (curr_start, next_start, curr_word)
				curr_start, curr_end, curr_word = rearranged_words[ix]

			# final catch is adding a tiny bit of padding to the end word to adjust
			if curr_end == curr_start:
				rearranged_words[ix] = (curr_start, curr_end+0.0001, curr_word)
	
	tg = tgio.Textgrid()
	tg.addTier(tgio.IntervalTier('word', rearranged_words))
	return content, tg

def gentle_fill_missing_words(alignment_fn):
	'''
	A simple way to fill missing aligned words
	'''
	
	# load the alignment file
	with open(alignment_fn, encoding='utf-8') as f:
		content = json.load(f)
		
	for ix, word in enumerate(content['words']):
		if word['case'] != 'success':
			prev_end, next_start = align_missing_word(content, ix)
			content['words'][ix].update({'start': prev_end, 'end': next_start, 'case': 'assumed'})
			
	return content

def align_missing_word(content, ix):
	'''
	Searches from a word in both directions and then distributes time evenly
	'''
	# keep track of how many are missing
	forward_ix = ix
	forward_missing = 0
	
	# search forward
	while True:
		# move one forward
		forward_ix += 1
		if content['words'][forward_ix]['case'] == 'success':
			next_start = np.round(content['words'][forward_ix]['start'], 3)
			break
		else:
			forward_missing += 1
	
	# keep track of how many are missing
	back_ix = ix
	back_missing = 0
	
	while True:
		# move one backwards
		back_ix -= 1
		
		if content['words'][back_ix]['case'] == 'success':
			prev_end = np.round(content['words'][back_ix]['end'], 3)
			break
		else:
			back_missing += 1
	
	# space evenly between the number of missing items
	total_missing = back_missing + forward_missing + 1 # add one to include current item
	x_vals = np.linspace(prev_end, next_start, total_missing + 2)[1:-1] # add 2 to pad the points on either side
	
	# if there is anything missing
	# normalize indices to 0
	missing_ixs = np.arange(ix-back_missing,ix+forward_missing+1)
	
	# index of the value in the interpolated array
	arr_ix = np.argwhere(ix == missing_ixs)
	
	# then extract value from that array and round
	next_start = x_vals[arr_ix].squeeze()
	next_start = np.round(next_start, 3)
	
	# have to adjust prev end to be the interpolated value
	if len(missing_ixs) > 1 and arr_ix:
		prev_end = x_vals[np.argwhere(ix == missing_ixs)-1].squeeze()
		prev_end = np.round(prev_end, 3)
	
	return prev_end, next_start


In [5]:
def scale_phonemes_to_word(word_start, word_end, phonemes):
    """
    Scale phoneme timings to match the word boundaries.
    """
    word_duration = word_end - word_start
    phoneme_duration = sum(p['duration'] for p in phonemes)
    scale_factor = word_duration / phoneme_duration

    scaled_phonemes = []
    current_time = word_start
    for phone in phonemes:
        scaled_duration = phone['duration'] * scale_factor
        phone_end = current_time + scaled_duration
        scaled_phonemes.append({
            'start': current_time,
            'end': phone_end,
            'phone': phone['phone']
        })
        current_time = phone_end

    # Adjust the last phoneme to exactly match the word end time
    if scaled_phonemes:
        scaled_phonemes[-1]['end'] = word_end

    return scaled_phonemes

def gentle_to_textgrid_phoneme(alignment_fn, word_textgrid):
    """
    Take a filename of a Gentle alignment JSON and a word TextGrid, and return a Praat TextGrid for the phoneme tier,
    including CMU phoneme categories, with phonemes scaled to match word boundaries.
    """
    
    # load the alignment file
    with open(align_fn, encoding='utf-8') as f:
        content = json.load(f)
    
    word_tier = word_textgrid.getTier('word')
    rearranged_phones = []
    
    word_index = 0
    gentle_index = 0
    while gentle_index < len(content['words']) and word_index < len(word_tier):
        gentle_word = content['words'][gentle_index]
        word_interval = word_tier.entries[word_index]
                
        # Check if we need to combine hyphenated words
        if '-' in word_interval.label.lower() and gentle_word['word'] != word_interval.label.lower():

            combined_word = gentle_word['word']
            combined_phones = gentle_word['phones'] if 'phones' in gentle_word else []
            next_gentle_index = gentle_index + 1
            
            while next_gentle_index < len(content['words']):
                next_word = content['words'][next_gentle_index]
                combined_word += next_word['word']
                if 'phones' in next_word:
                    combined_phones.extend(next_word['phones'])
                
                if combined_word.lower() == word_interval.label.lower().replace('-', ''):
                    # We've found a match for the hyphenated word
                    gentle_word = {
                        'word': word_interval.label.lower(),
                        'phones': combined_phones,
                        'case': 'success' if all(w['case'] == 'success' for w in content['words'][gentle_index:next_gentle_index+1]) else 'partial'
                    }
                    gentle_index = next_gentle_index
                    break
                next_gentle_index += 1
        
        if gentle_word['case'] == 'success' and 'phones' in gentle_word and gentle_word['word'].lower() == word_interval.label.lower():
            word_start, word_end = word_interval.start, word_interval.end
            
            # Scale phonemes to match the word boundaries
            scaled_phonemes = scale_phonemes_to_word(word_start, word_end, gentle_word['phones'])
            
            for phone in scaled_phonemes:
                phone_start = np.round(phone['start'], 3)
                phone_end = np.round(phone['end'], 3)
                phone_label = phone['phone']
                
                # only get the first phoneme --> this maps to CMU phoneme dictionary
                phone_label = phone_label.split('_')[0].upper()
                
                rearranged_phones.append((phone_start, phone_end, phone_label))
            
            word_index += 1
        else:
            # If we can't find a match, move to the next word in both Gentle and TextGrid
            word_index += 1
        
        gentle_index += 1
    
    # Sort phones by start time (in case they're not already in order)
    rearranged_phones.sort(key=lambda x: x[0])
    
    # Fill gaps with silence
    final_phones = []
    for ix in range(len(rearranged_phones)):
        curr_start, curr_end, curr_phone = rearranged_phones[ix]
        if ix > 0:
            prev_start, prev_end, prev_phone = final_phones[-1]
            if curr_start > prev_end:
                # Insert silence
                final_phones.append((prev_end, curr_start, ""))
        final_phones.append((curr_start, curr_end, curr_phone))
    
    # tg = tgio.Textgrid()
    word_textgrid.addTier(tgio.IntervalTier('phone', final_phones))
    return word_textgrid

In [64]:
task = 'wheretheressmoke'

praat_fn = os.path.join(preproc_dir, task, f'{task}_transcript-praat.TextGrid')
align_fn = os.path.join(gentle_dir, task, 'align.json')

word_textgrid = tgio.openTextgrid(praat_fn, False)
phone_textgrid = gentle_to_textgrid(align_fn) #, word_textgrid)

# tg_phone.getTier('phone')

In [198]:
tg = tgio.Textgrid()

for tier_name in ['phone', 'word']:
    tier = phone_textgrid.getTier(tier_name)
    tg.addTier(tier)

praat_phone_fn = os.path.join(preproc_dir, task, f'{task}_transcript-praat_phone.TextGrid')
tg.save(praat_phone_fn, 'long_textgrid', True)

### Convert preprocessed CSV to textgrid

In [92]:
task = 'howtodraw'

# Use the preprocessed dataframe to make a textgrid
preproc_fn = os.path.join(preproc_dir, task, f'{task}_transcript-preprocessed.csv')
audio_fn = glob.glob(os.path.join(stim_dir, 'audio', f'*{task}*.wav'))[0]

df_preproc = pd.read_csv(preproc_fn)
tg = dataframe_to_textgrid(df_preproc, audio_fn)

tg_fn = os.path.join(preproc_dir, task, f'{task}_transcript-praat.TextGrid')
tg.save(tg_fn, 'long_textgrid', True)

### Set up file structure for AudioTextDataset

In [167]:
import prosody_utils as prosody

window_size = 25

df_preproc = pd.read_csv(os.path.join(BASE_DIR, 'stimuli/preprocessed/', task, f'{task}_transcript-preprocessed.csv'))
# df_preproc = df_preproc.rename(columns={'Word_Written': 'word', 'Punctuation': 'punctuation'})

###########################################
#### Create a dataset for processing  #####
###########################################

# create a list of indices that we will iterate through to sample the transcript
segments = prosody.get_segment_indices(n_words=len(df_preproc), window_size=window_size)[:-1]
# inputs = [prosody.transcript_to_input(df_preproc, segment, add_punctuation=True) for segment in segments]
# inputs, labels = zip(*inputs)

In [122]:
inputs, df = prosody.transcript_to_input(df_preproc, segments[1], add_punctuation=True)

In [138]:
# Process every segment
for i, segment in enumerate(segments):
    # Crop dataframe to the current set of indices
    df_segment = df_preproc.iloc[segment]

    sys.exit(0)

SystemExit: 0

In [171]:
from preproc_utils import cut_audio_segments

In [131]:
# Candidate segments are one index before the candidate index (e.g., predict the upcoming word)
candidate_idxs = np.where(df_preproc['NWP_Candidate'].to_numpy())[0]

for idx in candidate_idxs[-1:]:

    candidate_segment = segments[idx-1]
    sys.exit(0)

SystemExit: 0

In [169]:
# Create sequential pairs
candidate_idxs = np.where(df_preproc['NWP_Candidate'].to_numpy())[0] # First get indices
candidate_idxs = np.concatenate([[0], candidate_idxs], axis=0) # Add the first item for the first cut
segments = np.vstack((candidate_idxs[:-1], candidate_idxs[1:]-1)).T # Stack and make pairs

# Convert to a list of lists (if needed)
segment_indices = segments.tolist()

In [205]:
# Get all segments except the first (which can't be cut) and the last (which isn't a word in the transcript)
segments = prosody.get_segment_indices(n_words=len(df_preproc), window_size=window_size)[1:-1]
segment_idxs = [[min(segment), max(segment)]for segment in segments]

# cut_audio_segments(df_preproc, task, audio_fn, audio_out_dir, segment_idxs)

In [215]:
# Cut the audio
audio_fns, df_segments = cut_audio_segments(df_preproc, task, audio_fn, audio_out_dir, segment_idxs[:2])

In [235]:
tg

<praatio.data_classes.textgrid.Textgrid at 0x14665af7db20>

In [239]:
df.loc[:, ['Onset', 'Offset']] = df.loc[:, ['Onset', 'Offset']] - df.iloc[0]['Onset']

In [241]:
df_transcript = df_preproc.rename(columns={'Word_Written': 'word', 'Punctuation': 'punctuation'})

for segment, audio_fn in zip(segments, audio_fns):

    # For some reason I made a stupid naming convention conversion somewhere so need to flip
    # back and forth
    inputs, df = prosody.transcript_to_input(df_transcript, segment, add_punctuation=True)
    df = df.rename(columns={'word': 'Word_Written'})

    # Normalize to the start of the clip (e.g., make the first onset here 0s)
    df.loc[:, ['Onset', 'Offset']] = df.loc[:, ['Onset', 'Offset']] - df.iloc[0]['Onset']

    # Make a textgrid file
    textgrid_fn = audio_fn.replace('audio', 'textgrids').replace('.wav', '.TextGrid')
    tg = dataframe_to_textgrid(df, audio_fn)

    tg.save(textgrid_fn, 'long_textgrid', True)
    sys.exit(0)

SystemExit: 0

In [227]:
tg = dataframe_to_textgrid(df, audio_fn)

In [232]:
textgrid_fn = audio_fn.replace('audio', 'textgrids').replace('.wav', 'TextGrid')

'/dartfs/rc/lab/F/FinnLab/datasets/nlp-datasets/pfka-moth-stories/textgrids/howtodraw_segment-00001TextGrid'

In [190]:
candidate_segments = [segments[idx-1] for idx in candidate_idxs]

In [104]:
inputs = [prosody.transcript_to_input(df_preproc, segment, add_punctuation=True) for segment in segments]