In [None]:
trim_required = True
video_start_time = "00:00:32.00"
video_end_time = ""
video_file_path = ""
video_output_directory = "~/training_output/videos/"
transcript_output_directory = "~/training_output/transcripts/"
audio_output_directory = "~/training_output/audio/"
speaker_diarization_output_directory = "~/training_output/speaker_diarization/"
audio_subclip_output_directory = "~/training_output/subclips/"
json_output_directory = "~/training_output/full-run-json/"

In [None]:
from pathlib import Path
from transcriptai.video import trim_video_file, convert_mp4_to_mp3
import os
import shutil


In [None]:
video_path_posix = Path(video_file_path)
original_video_file_name = video_path_posix.stem
video_file_name = video_path_posix.name
original_video_path = str(video_path_posix.absolute())
video_to_process_file_path = os.path.join(video_output_directory, video_file_name)

## Trim video file if required
Some videos start with content we don't care about, such as ads before starting the game

In [None]:
if trim_required:
    trim_video_file(
        original_video_path, video_to_process_file_path,
        video_start_time, video_end_time
    )
else:
    shutil.copyfile(original_video_path, video_to_process_file_path)

## Convert video to audio file
Convert video to audio file to make file we work with smaller since we only care about the audio

In [None]:
from moviepy.editor import AudioFileClip
audio_file_clip = AudioFileClip(video_to_process_file_path)

audio_file_name = original_video_file_name + ".mp3"
converted_audio_path = audio_output_directory + audio_file_name
audio_file_clip.write_audiofile(converted_audio_path)


## Apply speaker diarization to audio file

In [None]:
from transcriptai.audio import apply_speaker_diarization_to_audio_file, transcribe_audio_file

hf_access_token = os.getenv("HUGGING_FACE_WRITE_ACCESS_TOKEN")
diarization_result_path = apply_speaker_diarization_to_audio_file(
    converted_audio_path, 
    speaker_diarization_output_directory, 
    hf_access_token
)

## Get annotations from speech diarization results

In [None]:
import malaya_speech

annotations = malaya_speech.extra.rttm.load(diarization_result_path)
sample_name = list(annotations.keys())[0]
sample = annotations[sample_name]
speaker_map = {
}

## Group segments by chunks less than 90 seconds
We want to group segments by chunks less than 90 seconds because we want to split up the audio into matching chunks. OpenAI's 'Whisper' model starts giving weird results when the audio is too long, so I found 90 seconds to be a sweet spot for this.

In [None]:
current_run_time = 0
rttm_all_segments = []
current_segment_section = {
    "segment_chunks": []
}

for segment, track, label in sample.itertracks():
    segment_track = {
        "track": track,
        "segment_start_time": segment.start,
        "segment_end_time": segment.end,
        "speaker": label,
    }    
    current_seg_run_time = segment.end - segment.start
    if current_seg_run_time + current_run_time < 90:
        current_segment_section['segment_chunks'].append(segment_track)
        current_run_time += current_seg_run_time
    else:
        rttm_all_segments.append(current_segment_section)
        current_segment_section = {
            'segment_chunks': []
        }
        current_segment_section['segment_chunks'].append(segment_track)
        current_run_time = current_seg_run_time


## Add start and end time to full segment chunk
Now that we have split up the segments into chunks, we want to get the start time and end time for the whole segment chunk.

In [None]:
import math

for rttm_chunk in rttm_all_segments:
    first_chunk = rttm_chunk['segment_chunks'][0]
    # Start time
    if first_chunk['track'] == 0:
        rttm_chunk['segment_start_time'] = 0
        rttm_chunk['clip_start_time'] = 0
    else:
        rttm_chunk['segment_start_time'] = first_chunk['segment_start_time']
        rttm_chunk['clip_start_time'] = math.floor(first_chunk['segment_start_time'])
    
    # End time
    if len(rttm_chunk['segment_chunks']) == 1:
        rttm_chunk['segment_end_time'] = first_chunk['segment_end_time']
        rttm_chunk['clip_end_time'] = math.ceil(first_chunk['segment_end_time'])       
    else:
        last_chunk = rttm_chunk['segment_chunks'][-1]
        rttm_chunk['segment_end_time'] = last_chunk['segment_end_time']
        rttm_chunk['clip_end_time'] = math.ceil(last_chunk['segment_end_time'])       

## Create directory to store all subclips for current video file

In [None]:
new_subclip_directory = os.path.join(audio_subclip_output_directory, original_video_file_name)
os.makedirs(new_subclip_directory)

## Create subclips for each full segment chunk
Now that we know the length for each segment, we want to create an audio subclip for each segment so that we can analyze it with OpenAI's transcribe endpoint.

In [None]:
for rttm_chunk in rttm_all_segments:
    subclip_start_time = rttm_chunk['clip_start_time']
    subclip_end_time = rttm_chunk['clip_end_time']
    subclip_file_name = str(subclip_start_time) + "-" + str(subclip_end_time) + audio_file_name
    subclip_file_path = os.path.join(new_subclip_directory, subclip_file_name)
    audio_subclip = audio_file_clip.subclip(subclip_start_time, subclip_end_time)
    audio_subclip.write_audiofile(subclip_file_path)    

## Transcribe all audio files in new subclip directory

In [None]:
current_transcript_directory = os.path.join(transcript_output_directory, original_video_file_name)
os.makedirs(current_transcript_directory)

In [None]:
from transcriptai.audio import transcribe_audio_file

all_subclips = os.listdir(new_subclip_directory)
all_subclips.sort()

for subclip_file in all_subclips:
    current_subclip_file_path = os.path.join(new_subclip_directory, subclip_file)
    transcript = transcribe_audio_file(current_subclip_file_path)
    raw_transcript_text = transcript['text']

    transcript_text_file_name = subclip_file + ".txt"
    transcript_file_path = os.path.join(
        current_transcript_directory, transcript_text_file_name
    )
    with open(transcript_file_path, "w") as f:
        f.write(raw_transcript_text)    


## Save run JSON 

In [None]:
import json

json_output = {
    "output": rttm_all_segments
}
json_file_name = original_video_file_name + ".json"
json_file_path = os.path.join(json_output_directory, json_file_name)
with open(json_file_path, "w") as outfile:
    json.dump(json_output, outfile)