# Improving Video Translation Quality with Large Language Models: A Look at Speech-to-Text Output Refinement

In [None]:
### Solution Overview

<img src="./images/Arch_Diagram.jpg" alt="drawing" width="800"/>

# Setup

### Install Latest SDKs and import the libraries

In [1]:
!pip install openai --quiet

In [2]:
import io
import uuid
import botocore
import boto3
import re
import time
import pprint
import subprocess
import json
import sagemaker
from sagemaker import get_execution_role
from datetime import datetime, timezone
from pytube import YouTube
from moviepy.editor import VideoFileClip
import os
import pysrt
import openai

ALSA lib confmisc.c:767:(parse_card) cannot find card '0'
ALSA lib conf.c:4554:(_snd_config_evaluate) function snd_func_card_driver returned error: No such file or directory
ALSA lib confmisc.c:392:(snd_func_concat) error evaluating strings
ALSA lib conf.c:4554:(_snd_config_evaluate) function snd_func_concat returned error: No such file or directory
ALSA lib confmisc.c:1246:(snd_func_refer) error evaluating name
ALSA lib conf.c:4554:(_snd_config_evaluate) function snd_func_refer returned error: No such file or directory
ALSA lib conf.c:5033:(snd_config_expand) Evaluate error: No such file or directory
ALSA lib pcm.c:2501:(snd_pcm_open_noupdate) Unknown PCM default
ALSA lib confmisc.c:767:(parse_card) cannot find card '0'
ALSA lib conf.c:4554:(_snd_config_evaluate) function snd_func_card_driver returned error: No such file or directory
ALSA lib confmisc.c:392:(snd_func_concat) error evaluating strings
ALSA lib conf.c:4554:(_snd_config_evaluate) function snd_func_concat returned error: N

### Download Video and upload to S3
Make sure all your resources are stored in the same region. You'll be using the same bucket for this entire walkthrough.

In [3]:
original_language_code = 'en-US'
translated_language_code = 'hi'   
def rename_file(old_name, new_name):
    try:
        os.rename(old_name, new_name)
        return True
    except Exception as e:
        print(f"Error renaming file: {e}")
        return False
    
youtube_filename = "%s__%s__%s.mp4"%(str(uuid.uuid4()),original_language_code,translated_language_code)

def download_video():   
    if not os.path.isfile(youtube_filename):
        yt = YouTube('https://www.youtube.com/watch?v=GhTC-pMgOjc&t=23s')
        yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download(filename=youtube_filename)
    return youtube_filename

video_filename = download_video()

BUCKET = 'sunil-video-translation' #Add your bucket name here

if(BUCKET==''):
    BUCKET = sagemaker.Session().default_bucket()
print(f'The S3 bucket used in this demo will be: {BUCKET}')

OUTPUT_PATH_TRANSCRIBE = f's3://{BUCKET}/transcribe-results'

The S3 bucket used in this demo will be: sunil-video-translation


In [4]:
video_filename

'b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi.mp4'

<img src="./images/Arch_Diagram_Step-1.jpg" alt="drawing" width="800"/>

### STEP-1 Separate Audio from Video

In [5]:
audio_file_name = video_filename.replace(".mp4",".wav")
videoclip = VideoFileClip(video_filename)
if not os.path.isfile(audio_file_name):
    videoclip.audio.write_audiofile(audio_file_name,ffmpeg_params=['-ac','1'])

MoviePy - Writing audio in b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi.wav


                                                                      

MoviePy - Done.




In [6]:
sess = sagemaker.session.Session()
role = sagemaker.get_execution_role()
region = boto3.session.Session().region_name

# Amazon S3 (S3) client
s3 = boto3.client('s3', region)
bucket_region = s3.head_bucket(Bucket=BUCKET)['ResponseMetadata']['HTTPHeaders']['x-amz-bucket-region']
assert bucket_region == region, "Your S3 bucket {} and this notebook need to be in the same region.".format(BUCKET)

# Amazon SageMaker client
sagemaker_client = boto3.client('sagemaker')

# Amazon Transcribe client
transcribe_client = boto3.client("transcribe")

This is the execution role that will be used to call Amazon S3, Transcribe, Translate and Polly. 

In [7]:
from sagemaker import get_execution_role

ROLE = get_execution_role()
display(ROLE)

'arn:aws:iam::770763726637:role/service-role/AmazonSageMaker-ExecutionRole-20230808T082672'

Add the following policies to this role in IAM:
* AmazonAugmentedAIFullAccess
* AmazonTranscribeFullAccess



In [9]:
audio_file_name


'b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi.wav'

In [10]:
%%bash -s "$BUCKET"
rm /tmp/*.json
rm /tmp/*.wav
rm /tmp/*.srt
mkdir audio_files
mv *.wav audio_files
aws s3 cp ./audio_files s3://$1/es_conf_transcribe_demo/ --recursive


rm: cannot remove ‘/tmp/*.wav’: No such file or directory


upload: audio_files/b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi.wav to s3://sunil-video-translation/es_conf_transcribe_demo/b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi.wav


### STEP-2 Transcribe the Audio

<img src="./images/Arch_Diagram_Step-2.jpg" alt="drawing" width="800"/>

In [11]:
# Here is a transcribe function
def transcribe(job_name, job_uri, out_bucket, format="wav", vocab_name=None):
    """Transcribe a .wav or .mp4 file to text.
    Args:
        job_name (str): the name of the job that you specify;
                        the output json will be job_name.json
        job_uri (str): input path (in s3) to the file being transcribed
        out_bucket (str): s3 bucket name that you want the output json
                          to be placed in
        format (str): mp4 or wav for input file format;
                      defaults to mp4
        vocab_name (str): name of custom vocabulary used;
                          optional, defaults to None
    """
    
    if format not in ['mp3','mp4','wav','flac']:
        print("Invalid format")
        return

    try:
        
        transcribe_client.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={"MediaFileUri": job_uri},
            MediaFormat=format,
            LanguageCode="en-US",
            OutputBucketName=out_bucket,
            Subtitles = {
                'Formats': [
                    'srt'
                ],
                'OutputStartIndex': 1 
            }
        )        
        time.sleep(2)        
        print(transcribe_client.get_transcription_job(TranscriptionJobName=job_name))

    except Exception as e:
        print(e)

In [12]:
# Path to folder

folder_path = f"s3://{BUCKET}/es_conf_transcribe_demo/"
job_name = audio_file_name.rsplit('.',1)[0]
job_name
time.sleep(2)
transcribe(job_name, folder_path+audio_file_name, BUCKET)

{'TranscriptionJob': {'TranscriptionJobName': 'b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi', 'TranscriptionJobStatus': 'IN_PROGRESS', 'LanguageCode': 'en-US', 'MediaFormat': 'wav', 'Media': {'MediaFileUri': 's3://sunil-video-translation/es_conf_transcribe_demo/b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi.wav'}, 'Transcript': {}, 'StartTime': datetime.datetime(2023, 9, 12, 20, 11, 25, 210000, tzinfo=tzlocal()), 'CreationTime': datetime.datetime(2023, 9, 12, 20, 11, 25, 179000, tzinfo=tzlocal()), 'Settings': {'ChannelIdentification': False, 'ShowAlternatives': False}, 'Subtitles': {'Formats': ['srt'], 'SubtitleFileUris': []}}, 'ResponseMetadata': {'RequestId': 'ca193fff-4287-4cb4-bbdf-56c816e83bd7', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'ca193fff-4287-4cb4-bbdf-56c816e83bd7', 'content-type': 'application/x-amz-json-1.1', 'content-length': '507', 'date': 'Tue, 12 Sep 2023 20:11:26 GMT'}, 'RetryAttempts': 0}}


In [13]:
# Wait for the status of the transcription job to finish
while True:
    response = transcribe_client.get_transcription_job(
        TranscriptionJobName=job_name 
    )
    status = response['TranscriptionJob']['TranscriptionJobStatus']
    if status in ['COMPLETED', 'FAILED']:
        print(status)
        break
    print("Not ready yet...")
    time.sleep(5)

Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
Not ready yet...
COMPLETED


In [14]:
# create the two S3 buckets below
bucket_name_original_srt = "sunil-video-translation-output-original-srt"
bucket_name_translated_srt = "sunil-video-translation-output-translated-srt"

# time duration threshold to split transcribed text to subtitles. 0.05 is 50 milliseconds
time_duration_threshold = 0.2

from datetime import datetime, timedelta

openai.organization = ""
openai.api_key = ""

### STEP-3 Text Correction via LLM API call

<img src="./images/Arch_Diagram_Step-3.jpg" alt="drawing" width="800"/>

In [15]:
def call_api(prompt):
    response = openai.ChatCompletion.create(
        model='gpt-4',
        messages=prompt
    )
    message = response.choices[0].message.content.strip()
    return message

def fix_text(text):
    user_prompt = []
    user_prompt.append({'role':'system', 'content':'I will provide a sentence about AWS Well-Architected. Correct the sentence. If the sentence is correct or incomplete, just return the same sentence'})
    user_prompt.append({'role':'user', 'content': text})
    api_response = call_api(user_prompt)
    return api_response

### STEP-4 Translate Text

<img src="./images/Arch_Diagram_Step-4.jpg" alt="drawing" width="800"/>

In [24]:

# Amazon Translate client
translate = boto3.client(service_name='translate')

# Pass in seconds with millisecond value. Eg: 73.045 and output is in 00:01:13,045 (SRT format)
def format_time(seconds):
    sec = timedelta(seconds=seconds)
    d = datetime(1,1,1) + sec
    s = d.strftime("%H:%M:%S,%f")
    return str(s[:-3])

def translate_text(text,source_language_code,target_language_code):
    if source_language_code == target_language_code:
        return text
    else:
        result = translate.translate_text(Text=text, 
            SourceLanguageCode=source_language_code, TargetLanguageCode=target_language_code)
        return result.get('TranslatedText')

def save_original_and_translated(json_file_name,source_language_code,target_language_code):
    #print("Working with json_file_name: ",json_file_name)
    with open(json_file_name) as f:
        data = json.load(f)
    
    Tuple_list=[]
        
    for word in data['results']['items']:
        if word['type'] != 'punctuation':
            current_word = str(word['alternatives'][0]['content'])
            start_time = float(word['start_time'])
            end_time = float(word['end_time'])
            confidence_value = float(word['alternatives'][0]['confidence'])
            if len(Tuple_list)==0:
                Tuple_list.append([current_word,start_time,end_time])
            else:
                last_item = Tuple_list.pop()
                old_word = last_item[0]
                old_start_time = last_item[1]
                old_end_time = last_item[2]
                old_duration = old_end_time - old_start_time
                
                if (start_time - old_end_time) > time_duration_threshold or old_word.endswith('.') :
                    Tuple_list.append(last_item)
                    Tuple_list.append([current_word,start_time,end_time])
                else:
                    current_word = old_word+' '+current_word
                    start_time = old_start_time
                    Tuple_list.append([current_word,start_time,end_time])
        else:
            last_item = Tuple_list.pop()
            old_word = last_item[0]+str(word['alternatives'][0]['content'])
            old_start_time = last_item[1]
            old_end_time = last_item[2]
            Tuple_list.append([old_word,old_start_time,old_end_time])
                    
    srt_filename_original = json_file_name.replace(".json","_original.srt")
    srt_filename_translated = json_file_name.replace(".json","_translated.srt")
    
    index=1
    with open(srt_filename_original,"w") as f1,open(srt_filename_translated,"w") as f2 :
        for item in Tuple_list:
            start = item[1]
            end = item[2]
            original_text = item[0]
            fixed_text = fix_text(original_text)
            # file 1
            f1.write(str(index))
            f1.write("\n")
            f1.write(format_time(start))
            f1.write(' --> '),
            f1.write(format_time(end))
            f1.write("\n")
            f1.write(fixed_text)
            f1.write("\n\n")
            # file 2
            f2.write(str(index))
            f2.write("\n")
            f2.write(format_time(start))
            f2.write(' --> '),
            f2.write(format_time(end))
            f2.write("\n")
            translated_text = translate_text(fixed_text,source_language_code,target_language_code)
            # translated_text = translate_text(original_text,source_language_code,target_language_code)
            f2.write(translated_text)
            f2.write("\n\n")
            
            index = index+1
            print(original_text)
            print(fixed_text)
            print(translated_text)
    
    basefilename_original = os.path.basename(srt_filename_original)
    s3.upload_file(srt_filename_original, bucket_name_original_srt, basefilename_original)
    
    print("uploaded original: ",basefilename_original)

    basefilename_translated = os.path.basename(srt_filename_translated)
    s3.upload_file(srt_filename_translated, bucket_name_translated_srt, basefilename_translated)
    
    print("uploaded translated: ",basefilename_translated)

In [25]:
json_file_name = job_name+'.json'
output_file_path = '/tmp/'+json_file_name
print ("Filename ",json_file_name)
s3.download_file(BUCKET,json_file_name, output_file_path)
os.listdir('/tmp/')

Filename  b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi.json


['.ICE-unix',
 '.font-unix',
 '.X11-unix',
 '.Test-unix',
 '.XIM-unix',
 'b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi.json',
 'b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi_original.srt',
 'b7b6b377-59a2-4e4c-b804-5104f905dd98__en-US__hi_translated.srt',
 'systemd-private-cac83c6dc8e74a988e2e2613351323cf-chronyd.service-MiVOkD',
 'hsperfdata_role-agent',
 'ffmpeg.linux64',
 '.java_pid3683']

In [26]:
base_filename_json = json_file_name.rsplit('.',1)[0]
language_string = base_filename_json.rsplit("__",2)
source_language_code = language_string[-2].split('-')[0].lower()
target_language_code= language_string[-1].lower()
    
print("source_language_code: ",source_language_code)
print("target_language_code: ",target_language_code)

source_language_code:  en
target_language_code:  hi


In [27]:
# Convert transcribed json to a subtitle (srt) file.
save_original_and_translated(output_file_path,source_language_code,target_language_code)


Hello and welcome to this five minute overview of Aws.
Hello and welcome to this five minute overview of AWS Well-Architected.
नमस्कार और AWS Well-Architected के इस पांच मिनट के अवलोकन में आपका स्वागत है।
Well, Architected,
Well-Architected.
अच्छी तरह से तैयार किया गया।
I am Mark Isaacs senior startup solution architect at Aws.
I am Mark Isaacs, Senior Startup Solutions Architect at AWS.
मैं मार्क इसाक, एडब्ल्यूएस में सीनियर स्टार्टअप सॉल्यूशंस आर्किटेक्ट हूं।
Today, I will be giving you an overview of Aws well architected, what it is and how it can help you.
Today, I will be giving you an overview of AWS Well-Architected, what it is, and how it can help you.
आज, मैं आपको AWS Well-Architected का अवलोकन दूंगा कि यह क्या है, और यह आपकी मदद कैसे कर सकता है।
So why does Aws Well, architecture exist?
So why does AWS Well-Architected exist?
तो AWS वेल-आर्किटेक्टेड क्यों मौजूद है?
Aws solution architects have years of experience working with customers,
AWS solution architects have years of ex

In [None]:
import stat
from pysrt import srtitem
from pyssml.AmazonSpeech import AmazonSpeech
import shutil

In [None]:
polly = boto3.client("polly")
l_tmp_dir = '/tmp' # Lambda fuction can use this directory.
# ffmpeg is stored with this script.
# When executing ffmpeg, execute permission is requierd.
# But Lambda source directory do not have permission to change it.
# So move ffmpeg binary to `/tmp` and add permission.
print ("Files before ",os.listdir(l_tmp_dir))

In [None]:
ffmpeg_bin = "{0}/ffmpeg.linux64".format(l_tmp_dir)
shutil.copyfile('ffmpeg.linux64', ffmpeg_bin)
os.environ['IMAGEIO_FFMPEG_EXE'] = ffmpeg_bin
os.chmod(ffmpeg_bin, os.stat(ffmpeg_bin).st_mode | stat.S_IEXEC)

print ("Files after ",os.listdir(l_tmp_dir))

In [None]:
from moviepy.config import change_settings
change_settings({"FFMPEG_BINARY": ffmpeg_bin})


from moviepy.audio.io.AudioFileClip import AudioFileClip

### STEP-5 Generate Audio from translated text

<img src="./images/Arch_Diagram_Step-5.jpg" alt="drawing" width="800"/>

In [None]:
  # Voices https://docs.aws.amazon.com/polly/latest/dg/voicelist.html
voiceid_list = {
    
    "en" : "Aditi",
    "fr" : "Mathieu",
    "es" : "Miguel",
    "ru" : "Maxim",
    "zh" : "Zhiyu",
    "ja" : "izuki",
    "pt" : "Ricardo",
    "de" : "Marlene",
    "it" : "carla",
    "tr" : "Filiz",
    "hi" : "Aditi"
}

from pydub import AudioSegment
AudioSegment.converter = ffmpeg_bin

def get_milli_seconds(timecode):
    return (timecode.hours * 3600000 + timecode.minutes * 60000 \
    + timecode.seconds * 1000 + timecode.milliseconds)

def write_mp3_files(filename_translated,voice_id):
    
    translated_subs = pysrt.open(filename_translated,encoding='utf-8')

    for index,each in enumerate(translated_subs):
        subtitle_index = index+1
        content = each.text
        print (subtitle_index)
        print (content)
        
        time_duration = each.duration
        time_duration_milliseconds = get_milli_seconds(time_duration)
        # Add some buffer to time duration just to accomodate beginning and end
        time_duration_milliseconds = time_duration_milliseconds + 250
        time_duration_string = str(time_duration_milliseconds)+"ms"
        ssml= "<speak><prosody amazon:max-duration='"+ time_duration_string +"'>" + content + "</prosody></speak>"
        
        response = polly.synthesize_speech(
            OutputFormat='mp3',
            Text=ssml,
            TextType ='ssml',
            VoiceId=voice_id,     
        )

        body = response['AudioStream'].read()
        append_string = '__'+str(subtitle_index)+'.mp3'
        mp3filename = filename_translated.replace('.srt',append_string)
        wavfilename = mp3filename.replace('.mp3','.wav')
        
        with open(mp3filename,'wb') as file:
            file.write(body)
            file.close()
            
        translated_wavfilename = AudioFileClip(mp3filename)
        translated_wavfilename.write_audiofile(wavfilename,ffmpeg_params=['-ac','1'])
        
        os.remove(mp3filename)    


def save_voice_muted_audio_file(filename_translated,original_audio):
    translated_subs = pysrt.open(filename_translated,encoding='utf-8')
    for index,each in enumerate(translated_subs):
        subtitle_index = index+1
        content = each.text
        current_start = get_milli_seconds(each.start)
        current_end = get_milli_seconds(each.end)
        previous_end = get_milli_seconds(translated_subs[index-1].end)

        if subtitle_index ==1:
            voice_muted_original_audio = original_audio[:current_start]+ (original_audio[current_start:current_end]-80)
        else:
            voice_muted_original_audio = voice_muted_original_audio + original_audio[previous_end:current_start]+\
                                        (original_audio[current_start:current_end]-80)

    last_end = get_milli_seconds(translated_subs[-1].end)
    voice_muted_original_audio = voice_muted_original_audio +original_audio[last_end:]
    
    return voice_muted_original_audio

def save_final_audio(filename_translated,voice_muted_original_audio,final_audio_name):
    counter = 0
    translated_subs = pysrt.open(filename_translated,encoding='utf-8')
    for index,each in enumerate(translated_subs):
        subtitle_index = index+1
        content = each.text
        current_start = get_milli_seconds(each.start)
        current_end = get_milli_seconds(each.end)
        append_string = '__'+str(subtitle_index)+'.wav'
        wavfilename = filename_translated.replace('.srt',append_string)
        temp = AudioSegment.from_wav(wavfilename)
        start_timecode = current_start - 100
        start_timecode = max(start_timecode,0)
        voice_muted_original_audio = voice_muted_original_audio.overlay(temp,position=start_timecode)
        orig_duration = current_end-current_start
        translated_duration = len(temp)
        difference = abs(translated_duration-orig_duration)
        if (difference > 300):
            print (content)
            counter = counter + 1
        print (content)
        print ("original duration in millisecs: ",orig_duration, 'Translated duration: ',translated_duration,"difference: ",difference, "index ",subtitle_index)

    voice_muted_original_audio.export(final_audio_name, format="wav")


In [None]:

srt_file_name = job_name + '_translated.srt'
output_file_path = '/tmp/'+srt_file_name
print ("Filename ",srt_file_name)


In [None]:
bucket_name = "sunil-video-translation-output-translated-srt" 
s3.download_file(bucket_name,srt_file_name, output_file_path)

base_filename_json = srt_file_name.rsplit('.',1)[0]
language_string = base_filename_json.rsplit("__",2)
source_language_code = language_string[-2].split('-')[0].lower()
target_language_code= language_string[-1].lower().split('_')[0]
source_language_code
target_language_code
voice_id = voiceid_list[target_language_code]
voice_id

In [None]:
write_mp3_files(output_file_path,voice_id)
print (" Finished - write_mp3_files")

In [None]:
os.listdir('/tmp/')

In [None]:
audio_output_path = './audio_files/' + job_name + '.wav'
original_audio = AudioSegment.from_wav(audio_output_path)
audio_length = original_audio.duration_seconds
print("audio_length:",audio_length)

voice_muted_original_audio = save_voice_muted_audio_file(output_file_path,original_audio)
print ("Finished - save_voice_muted_audio_file")

In [None]:
final_audio_path =  audio_output_path.replace(".wav","_final.wav")
save_final_audio(output_file_path,voice_muted_original_audio,final_audio_path)
print ("Saved original audio")

### STEP-6 Attach Audio to Video

<img src="./images/Arch_Diagram_Step-6.jpg" alt="drawing" width="800"/>

In [None]:
# Attach translated speech to original file
from moviepy.audio.io.AudioFileClip import AudioFileClip

videoclip = VideoFileClip(youtube_filename)
translated_audio = AudioFileClip(final_audio_path)
new_clip = videoclip.set_audio(translated_audio)

print("Attaching translated_audio:", translated_audio)
print("Using Video file:", youtube_filename)

translated_video = youtube_filename.replace(".mp4","_translated.mp4")
new_clip.write_videofile(translated_video)