In [1]:
import xml.etree.ElementTree as ET
import glob, json
import os, time
from pathlib import Path
import boto3

In [2]:
def transcribe_file(job_name, file_uri, output_uri, output_url, transcribe_client, s3):
    transcribe_client.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={'MediaFileUri': file_uri},
        MediaFormat='wav',
        LanguageCode='en-US',
        OutputBucketName=output_uri
    )

    max_tries = 60
    while max_tries > 0:
        max_tries -= 1
        job = transcribe_client.get_transcription_job(TranscriptionJobName=job_name)
        job_status = job['TranscriptionJob']['TranscriptionJobStatus']
        if job_status in ['COMPLETED', 'FAILED']:
            print(f"Job {job_name} is {job_status}.")
            if job_status == 'COMPLETED':
                #download json file and access it
                s3.download_file(output_uri, job_name + ".json", output_url + job_name + ".json")
                transcription = json.load(open(output_url + job_name + ".json"))
                return transcription

            break
        else:
            print(f"Waiting for {job_name}. Current status is {job_status}.")
        time.sleep(10)


In [3]:
from re import findall


def add_transcripts(path, file_, output, transcriptions):
    # Which tier?
    tier_name = 'english'
    tree = ET.parse(path + file_)

    root = tree.getroot()

    #match transcription timestamps with tiers
    i = 1
    x = 0
    for times in root.iter('TIME_SLOT'):
        j = i + 1
        element = root.find('TIME_ORDER')
        e = element.findall('TIME_SLOT')
        s_time = ""
        e_time = ""
        for t in e:
            if t.get('TIME_SLOT_ID') == "ts" + str(i):
                s_time = t.get('TIME_VALUE')

            if t.get('TIME_SLOT_ID') == "ts" + str(j):
                e_time = t.get('TIME_VALUE')
                break
        list_of_items = transcriptions['results']['items']
        while True:
            if x >= len(list_of_items): break
            #convert time format and check where it belongs
            token = list_of_items[x]
            x += 1
            if token['type'] == "punctuation": continue
            token_start = token['start_time']
            token_start = int(float(token_start) * 1000)
            token_end = token['end_time']
            token_end = int(float(token_end) * 1000)
            
            if token_start >= int(s_time) and token_end <= int(e_time):
                #find the tier segment it belongs to
                for tier in root.iter('TIER'):
                    if tier.attrib['TIER_ID'] == tier_name:
                        for anon in tier.iter('ALIGNABLE_ANNOTATION'):
                            #for a in anon:
                            if anon.attrib['ANNOTATION_ID'] == "a" + str(int(j/2)):
                                for annotation in anon.iter('ANNOTATION_VALUE'):
                                    # insert text
                                    source_text = token["alternatives"][0]["content"]
                                
                                    # update the annotation
                                    if annotation.text == None:
                                        annotation.text = str(source_text)
                                    else:
                                        text = " " + str(source_text)
                                        annotation.text += text

                                    # feedback
                                    #print("done")

            
            #go to the next tier when all the words have been inserted
            else:
                if token_end > int(e_time): break
        i += 2

    # Save the file to output dir
    tree.write(os.path.join(path, os.path.basename(output)))



In [4]:
def main():
    path = r"C:\Users\loren\Dropbox\Documentos\Trabajo\RA-ship\speech-to-text-python\trial-data\\"
    input ="trial.eaf"
    output = "english-" + input

    #first we get the english transcriptions
    transcribe_client = boto3.client('transcribe')
    s3 = boto3.client('s3')
    file_uri = 's3://ra-english/cut.wav'
    output_uri = 'ra-english'
    output_url = r"C:\Users\loren\Dropbox\Documentos\Trabajo\RA-ship\speech-to-text-python\\"
    transcriptions = transcribe_file('audio', file_uri, output_uri, output_url, transcribe_client, s3)
    #now we add them to the eaf file
    add_transcripts(path, input, output, transcriptions)

if __name__ == "__main__":
    main()


Waiting for audio. Current status is IN_PROGRESS.
Waiting for audio. Current status is IN_PROGRESS.
Waiting for audio. Current status is IN_PROGRESS.
Waiting for audio. Current status is IN_PROGRESS.
Job audio is COMPLETED.
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
