In [2]:
import sys
from basic_utils import *

Loading config.json


In [50]:
name = "transcripts/affirm_seven_samurai"
results = load_config(name + '.json')
results = results['results']

Loading transcripts/affirm_seven_samurai.json


In [4]:
results.keys()

dict_keys(['transcripts', 'speaker_labels', 'items'])

In [5]:
def parse_detected_entities_response(detected_entities_response, entities):
    if 'ErrorList' in detected_entities_response and len(detected_entities_response['ErrorList']) > 0:
        logger.error("encountered error during batch_detect_entities")
        logger.error("error:" + json.dumps(detected_entities_response['ErrorList'], indent=4))

    if 'ResultList' in detected_entities_response:
        result_list = detected_entities_response["ResultList"]
        # entities = {}
        for result in result_list:
            detected_entities = result["Entities"]
            for detected_entity in detected_entities:
                if float(detected_entity["Score"]) >= ENTITY_CONFIDENCE_THRESHOLD:

                    entity_type = detected_entity["Type"]

                    if entity_type != 'QUANTITY':
                        text = detected_entity["Text"]

                        if entity_type == 'LOCATION' or entity_type == 'PERSON' or entity_type == 'ORGANIZATION':
                            if not text.isupper():
                                text = string.capwords(text)

                        if entity_type in entities:
                            entities[entity_type].add(text)
                        else:
                            entities[entity_type] = set([text])
        return entities
    else:
        return {}

def get_speaker_label(speaker_segments, start_time):
    for segment in speaker_segments:
        if segment['start_time'] <= start_time < segment['end_time']:
            return segment['speaker']
    return None

def parse_speaker_segments(results):
    speaker_labels = results['speaker_labels']['segments']
    speaker_segments = []
    for label in speaker_labels:
        segment = dict()
        segment["start_time"] = float(label["start_time"])
        segment["end_time"] = float(label["end_time"])
        segment["speaker"] = label["speaker_label"]
        speaker_segments.append(segment)
    return speaker_segments

In [6]:
speaker_label_exist = False
speaker_segments = None
if 'speaker_labels' in results:
    speaker_label_exist = True
    speaker_segments = parse_speaker_segments(results)
# speaker_label_exist, speaker_segments

In [48]:
items = results['items']
last_speaker = None
paragraphs = []
current_paragraph = ""
comprehend_chunks = []
current_comprehend_chunk = ""
previous_time = 0
last_pause = 0
last_item_was_sentence_end = False
commonDict = {'i': 'I'}
custom_vocabs = None

for item in items:
    if item["type"] == "pronunciation":
        start_time = float(item['start_time'])

        if speaker_label_exist:
            current_speaker = get_speaker_label(
                speaker_segments, float(item['start_time']))
            if last_speaker is None or current_speaker != last_speaker:
                if current_paragraph is not None:
                    paragraphs.append(current_paragraph)
                current_paragraph = current_speaker + \
                " ({}m:{}s)".format(
                    round(start_time//60),
                    round((start_time/60-start_time//60) * 60))
                last_pause = start_time
            last_speaker = current_speaker

        elif (start_time - previous_time) > 2 or (
                        (start_time - last_pause) > 15 and last_item_was_sentence_end):
            last_pause = start_time
            if current_paragraph is not None or current_paragraph != "":
                paragraphs.append(current_paragraph)
            current_paragraph = ""

        phrase = item['alternatives'][0]['content']
        if custom_vocabs is not None:
            if phrase in custom_vocabs:
                phrase = custom_vocabs[phrase]
                logger.info("replaced custom vocab: " + phrase)
        if phrase in commonDict:
            phrase = commonDict[phrase]
        current_paragraph += " " + phrase

        # add chunking
        current_comprehend_chunk += " " + phrase

        last_item_was_sentence_end = False

    elif item["type"] == "punctuation":
        current_paragraph += item['alternatives'][0]['content']
        current_comprehend_chunk += item['alternatives'][0]['content']
        if item['alternatives'][0]['content'] in (".", "!", "?"):
            last_item_was_sentence_end = True
        else:
            last_item_was_sentence_end = False

    if (item["type"] == "punctuation" and len(current_comprehend_chunk) >= 4500) \
            or len(current_comprehend_chunk) > 4900:
        comprehend_chunks.append(current_comprehend_chunk)
        current_comprehend_chunk = ""

    if 'end_time' in item:
        previous_time = float(item['end_time'])

if not current_comprehend_chunk == "":
    comprehend_chunks.append(current_comprehend_chunk)
if not current_paragraph == "":
    paragraphs.append(current_paragraph)
# print("\n\n".join(paragraphs))

In [49]:
with open(name + '.html', mode='w') as f:
    f.write("<h2>"+ name + "</h2>")
    f.write("<br/><br/>".join(paragraphs))
#     for s in sorted_symbols:
#         f.write(p2[p2.symbol == s][brief_cols].T.to_html())
#         f.write('<p style="page-break-before: always">')
    f.close()