In [None]:
import json
import datetime
import logging
from typing import Generator, Dict, List

In [None]:
SEGMENT_FILE = "../assets/sample/audio_diarized.json"


def load_segments(segments_file):
    """
    Load segments from a JSON file.
    """
    try:
        with open(segments_file, "r") as file:
            segments = json.load(file)["segments"]
    except Exception as e:
        logging.info(f"Error loading segments from {segments_file}: {e}")
    return segments

We care mostly about sentence segments for now.


In [None]:
segments = load_segments(SEGMENT_FILE)
len(segments)

In [None]:
# as timestamp like in srt files
datetime.datetime.fromtimestamp(segments[0]["start"]).strftime("%H:%M:%S.%f")

Speaker names may not be always known.


In [None]:
speaker_names = {
    "SPEAKER_07": "Alice",
}

In [None]:
start_timestamp = datetime.datetime.fromtimestamp(segments[0]["start"]).strftime(
    "%H:%M:%S.%f"
)
end_timestamp = datetime.datetime.fromtimestamp(segments[0]["end"]).strftime(
    "%H:%M:%S.%f"
)

phrase = segments[0]["text"]
speaker = segments[0]["speaker"]
alt_speaker_name = speaker_names.get(speaker, speaker)

print(
    f'{start_timestamp} --> {end_timestamp}\n{alt_speaker_name}: "{phrase.strip()}"\n'
)

Get full segments with timestamps and speaker labels.


In [None]:
def labeled_segment(segment, speaker_names: dict = {}) -> dict:
    """
    Convert a segment to a labeled string with timestamps and speaker names.
    """
    if len(segment["words"]) <= 2:
        return

    start_timestamp = segment["start"]
    end_timestamp = segment["end"]
    speaker = segment["speaker"]
    alt_speaker_name = speaker_names.get(speaker, speaker)
    phrase = segment["text"]

    return {
        "start": start_timestamp,
        "end": end_timestamp,
        "speaker": alt_speaker_name,
        "phrase": phrase.strip(),
    }

In [None]:
labeled_segment(segments[0], speaker_names)

We could also skip short segments (based on words spoken).


In [None]:
def get_segment_iterator(
    segments, speaker_names: dict = {}
) -> Generator[dict, None, None]:
    return (labeled_segment(segment, speaker_names) for segment in segments)

In [None]:
get_segment_iterator(segments, speaker_names)

In [None]:
def group_speakers(segments, speaker_names: dict = {}) -> List[Dict[str, str | float]]:
    speaker_phrases = []
    prev_p = {"speaker": "", "phrase": ""}
    for p in get_segment_iterator(segments, speaker_names):
        if p is None:
            continue  # skip segments with too few words
        if p["speaker"] == prev_p["speaker"]:
            # join phrases of the same speaker and update end time
            prev_p["phrase"] += " " + p["phrase"]
            prev_p["end"] = p["end"]
        else:
            speaker_phrases.append(p)
            prev_p = p
    return speaker_phrases

In [None]:
group_speakers(segments, speaker_names)