In [1]:
import os
import pandas as pd
import xml.etree.ElementTree as ET
from typing import List
from glob import iglob

In [2]:
PATH_OUT = '/Users/jen/Dev/Dissertation/dissertation/Data/SWBD/MFA_SPLIT/'
PATH_WAV = '/Users/jen/Dev/Dissertation/dissertation/Data/SWBD/wavs'
PATH_TERMS = '/Users/jen/Dev/Dissertation/dissertation/Data/SWBD/terminals'
PATH_WHD = '/Users/jen/Dev/Dissertation/dissertation/Data/WikipediaHomographData.csv'
#IN = os.path.join(PATH, 'sw04603.wav')
whd_df = pd.read_csv(PATH_WHD)
homographs = whd_df['homograph'].unique().tolist()

In [3]:
# Authored by Andrew Jiang
# BCG digital Ventures
import os, wave, math, collections

# define named tuples
metatuple = collections.namedtuple('metatuple', ['nchannels', 'sampwidth', 'framerate', 'nframes', 'comptype', 'compname'])
datatuple = collections.namedtuple('datatuple', ['meta', 'data'])

# opens a wav file and returns the data as a tuple
def readwave(src):
    read = wave.open(src, 'rb')
    meta = read.getparams()
    # turn params into a metatuple
    meta = metatuple(meta[0], meta[1], meta[2], meta[3], meta[4], meta[5])
    data = read.readframes(meta.nframes)
    read.close()
    return datatuple(meta, [data])

# writes to a directory
def writewave(dest, data):
    files = []
    data = separate(data)
    #print('length of data')
    #print(len(data))
    #print(data)
    for i in range(len(data)):
        destfile = dest
        makedir(destfile) # make sure dir exists
        write = wave.open(destfile, 'wb')
        write.setparams(data[i].meta)
        write.writeframes(data[i].data)
        write.close()
        files.append(destfile)
    return files

# helper function that creates dir if it doesn't exist
def makedir(dest):
    if(os.path.isdir(os.path.dirname(dest)) != True):
        os.makedirs(os.path.dirname(dest))

# slices audio data at given start, end: frame#
def slicewave(data, start, end):
    if(len(data.data) > 1):
        data = merge(data) # insurance
    meta = data.meta
    start *= meta.sampwidth # deal with sample width
    end *= meta.sampwidth
    spliced = data.data[0][start:end]
    nf = len(spliced) / meta.sampwidth
    meta = meta._replace(nframes=nf)
    return datatuple(meta, [spliced])

# slices audio data at given start, end: seconds
def slicewave_s(data, start, end):
    fr = float(data.meta.framerate)
    newdata = slicewave(data, int(float(start) * fr), int(float(end) * fr))
    return newdata

# splits audio data into equal intervals: # of frames
def split(data, interval=None, overlap=None):
    if(interval == None):
        interval = data.meta.framerate # =1s
    if(overlap == None):
        overlap = interval
    if(interval < 1 or overlap < 1):
        raise ValueError('cannot iterate')
    iterations = int(math.ceil(1.0 * data.meta.nframes / interval))
    canned = []
    for i in range(iterations):
        start = i * interval
        end = start + overlap
        canned.append(slicewave(data, start, end))
    newdata = combine(canned)
    return newdata

# splits audio data into equal intervals: seconds
def split_s(data, interval=None, overlap=None):
    fr = float(data.meta.framerate)
    if(interval != None):
        interval = int(float(interval) * fr)
    if(overlap != None):
        overlap = int(float(overlap) * fr)
    newdata = split(data, interval, overlap)
    return newdata

# separate a data tuple containing multiple audio tracks
# into an array of data tuples containing single audio tracks
def separate(data):
    newdata = []
    nframes = data.meta.nframes
    ndata = len(data.data)
    for i in range(ndata):
        nf = len(data.data[i]) / data.meta.sampwidth
        meta = data.meta._replace(nframes=nf)
        newdata.append(datatuple(meta, data.data[i]))
    return newdata

# combine an array of data tuples containing single audio tracks
# into a single data tuple containing multiple audio tracks
def combine(data):
    newdata = []
    meta = data[0].meta
    for i in range(len(data)):
        newdata += data[i].data
    nf = len(''.join(newdata)) / meta.sampwidth
    meta = meta._replace(nframes=nf)
    return datatuple(meta, newdata)

# merge multiple audio tracks into one
def merge(data):
    meta = data.meta
    newdata = ''.join(data.data)
    return datatuple(meta, [newdata])

In [4]:
#Test
# data = readwave(IN)
# A1 = slicewave_s(data, 1.7*2, 24.4*2)
# A2 = slicewave_s(data, 25.6*2, 50.3*2)
# writewave(OUT, A1)
# writewave(OUT_1, A2)

In [5]:
START = "{http://nite.sourceforge.net/}start"
END = "{http://nite.sourceforge.net/}end"
ID = "{http://nite.sourceforge.net/}id"
PW = 'phonword'
ORTH = 'orth'

In [6]:
def get_orths(FILE : str, item_type : str, start_time: float, end_time: float) -> List: 
    orths = []
    tree = ET.parse(FILE)
    root = tree.getroot()
    for item in root.findall(item_type): 
        start = float(item.get(START))
        end = float(item.get(END))
        if start >= start_time and end <= math.ceil(end_time): 
            orths.append(item.get(ORTH))
    return orths

In [23]:
def get_IDS(terminals: str):
    IDS = []
    term_tree = ET.parse(terminals)
    for e in term_tree.iter(): 
        if e.get(ID) != None:
            IDS.append(e.get(ID))
    return IDS

def get_unique_prefixes(IDS): 
    prefixes = []
    for e in IDS:
        if e.find('-') == -1: 
            prefixes.append(e.split('_')[0])
    return set(prefixes)
    
def get_sentence_ids(uniques, IDS):
    sentences = []
    for u in uniques: 
        sentence_dict = {}
        sentence_ids = []
        for i in IDS: 
            if i.startswith(u+"_"): 
                sentence_ids.append(i)
        sentence_dict['unique'] = u
        sentence_dict['seq'] = sentence_ids
        sentence_dict['start_id'] = sentence_ids[0]
        sentence_dict['end_id'] = sentence_ids[-1]
        sentences.append(sentence_dict)
    return sentences


def go_fwd(ID):
    prefix, endfix = ID.split('_')
    endfix_int = int(endfix)
    endfix_fwd = endfix_int + 1
    endfix_fwd_str = str(endfix_fwd)
    return prefix + "_" + endfix_fwd_str

def recurs_fwd(sentence_start_id, sentence, root):
    next_ = go_fwd(sentence_start_id)
    for e in root.iter(): 
        if e.get(ID) == next_:
            start = e.get(START)
            if (start != None) & (start != 'n/a'): 
                sentence['start_id'] = next_
                sentence['start'] = start
            else:
                next_, sentence, root = recurs_fwd(next_, sentence, root)
    return next_, sentence, root

def go_back(ID):
    prefix, endfix = ID.split('_')
    endfix_int = int(endfix)
    endfix_back = endfix_int - 1
    endfix_back_str = str(endfix_back)
    return prefix + "_" + endfix_back_str

def recurs_back(sentence_end_id, sentence, root):
    previous = go_back(sentence_end_id)
    for e in root.iter(): 
        if e.get(ID) == previous:
            end = e.get(END)
            if end != None: 
                sentence['end_id'] = previous
                sentence['end'] = end
            else:
                previous, sentence, root = recurs_back(previous, sentence, root)
    return previous, sentence, root

def get_start_end(setence_ids, terminals): 
    tree = ET.parse(terminals)
    root = tree.getroot()
    for sentence in sentence_ids:
        orths = []
        for e in root.iter():
            _id = e.get(ID)
            if _id in sentence['seq']: 
                if e.get(ORTH) != None: 
                    orths.append(e.get(ORTH))
            if _id == sentence['start_id']:
                start = e.get(START)
                if start != None:
                    sentence['start'] = start
                elif (start == None) or (start == 'n/a'): 
                    start, sentence, root = recurs_fwd(sentence['start_id'], sentence, root)
                    
            elif _id == sentence['end_id']: 
                end = e.get(END)
                if end != None:
                    sentence['end'] = end
                if end == None: 
                    #Look back for the end of the sentence
                    previous, sentence, root = recurs_back(sentence['end_id'], sentence, root)

            sentence['sentence'] = orths
    return sentence_ids    

# Script

In [24]:
# #Test
# terminals = os.path.join(PATH,"sw4603.A.terminals.xml")
# IDS = get_IDS(terminals)
# uniques = get_unique_prefixes(IDS)
# sentence_ids = get_sentence_ids(uniques, IDS)    
# sentence_data = get_start_end(sentence_ids, terminals)

In [25]:
# for e in sentence_data: 
#     for orth in e['sentence']:
#         if orth in homographs: #filter out non-homograph data
#             #Create lab file
#             with open('{}/A1/sw04603_{}.lab'.format(PATH, e['start_id'] + "-" + e['end_id']), 'w') as f: 
#                 f.write(' '.join(e['sentence']))
#             #Create wav file
#             data = readwave('{}/sw04603.wav'.format(PATH))
#             start_time = float("{:.1f}".format(float(e['start'])))*2
#             end_time = float("{:.1f}".format(float(e['end'])))*2
#             content = slicewave_s(data,  start_time, end_time)
#             OUT = '{}/A1/sw04603_{}.wav'.format(PATH, e['start_id'] + "-" + e['end_id'])
#             writewave(OUT, content)

In [27]:
#All
for term in iglob(os.path.join(PATH_TERMS, '*')): 
    # get corresponding lab, wav file names
    file_name = os.path.basename(term)[:6]
    basename = file_name[:2] + "0" + file_name[2:]
    wav = file_name[:2] + "0" + file_name[2:] + ".wav"
    # get the sentence
    IDS = get_IDS(term)
    uniques = get_unique_prefixes(IDS)
    sentence_ids = get_sentence_ids(uniques, IDS)    
    sentence_data = get_start_end(sentence_ids, term)
    for e in sentence_data: 
        for orth in e['sentence']:
            if orth in homographs: #filter out non-homograph data
                #Create lab file
                with open('{}{}_{}.lab'.format(PATH_OUT, basename, e['start_id'] + "-" + e['end_id']), 'w') as f: 
                    f.write(' '.join(e['sentence']))
                #Create wav file
                try:
                    data = readwave('{}/{}'.format(PATH_WAV, wav))
                    start_time = float("{:.1f}".format(float(e['start'])))*2
                    end_time = float("{:.1f}".format(float(e['end'])))*2
                    content = slicewave_s(data,  start_time, end_time)
                    OUT = '{}{}_{}.wav'.format(PATH_OUT, basename, e['start_id'] + "-" + e['end_id'])
                    writewave(OUT, content)
                except: 
                    continue
