In [None]:
AZURE_OPENAI_MODEL = "" #deployment name
AZURE_OPENAI_ENDPOINT = ""
AZURE_OPENAI_KEY = ""
AZURE_OPENAI_VERSION = ""

# https://azure.microsoft.com/en-us/pricing/details/cognitive-services/openai-service/ #FOR FRANCE CENTRAL
AZURE_OPENAI_COST_IN = 0.010 # Euros[€] per 1000 tokens
AZURE_OPENAI_COST_OUT = 0.028 # Euros[€] per 1000 tokens

# Transcribing, Translating, and Adding English Subtitle to a Dutch Video

1. Video format check/conversion (.webm to .mp4) "what the fuck is .wedm?" (library ffmpeg)
2. Video-to-audio (.webm to .mp3) (library moviepy)
3. Audio-to-text (.str output) (library whisper)
4. Text translation (.str output) (Azure OpenAI gpt4)
5. MANUALLY checking the result by playing it using VLC (and manually fixing mistakes)
6. Improving the subtitle text style (converting from .srt to .ass) (library pysubs2)
7. Adding the translated subtitle to the video (library ffmpeg)

# Logging function

In [None]:
import os
import time

In [None]:
def log_and_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        print(f"Starting '{func.__name__}'...")
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Finished '{func.__name__}' in {end_time - start_time:.2f} seconds.")
        return result
    return wrapper

# 1. Get video duration

In [None]:
import ffmpeg

In [None]:
def get_video_duration(video_file):
    try:
        probe = ffmpeg.probe(video_file)
        duration_seconds = float(probe['format']['duration'])
        duration_minutes = duration_seconds / 60
        print(f"The input video has {duration_seconds:.1f} seconds ({duration_seconds/60:.1f} minutes).\nWe expect the process to be finished in less than {duration_seconds * 2.5:.1f} seconds ({duration_seconds * 2.5/60:.1f} minutes)!\nBE PATIENT!!!!\n")
    except ffmpeg.Error as e:
        print(f"An error occurred: {e.stderr.decode('utf8')}")
        return None

In [None]:
# input_video = ''
# get_video_duration(input_video)

# 2. Extract audio (.mp3) from video

In [None]:
import os
from moviepy.editor import VideoFileClip

In [None]:
@log_and_time
def extract_audio(input_file):
    try:
        audio_path = os.path.splitext(input_file)[0] + '.mp3'
        video = VideoFileClip(input_file)
        video.audio.write_audiofile(audio_path, verbose=False, logger=None)
        return audio_path
    except Exception as e:
        print(f"An error occurred during audio extraction: {str(e)}")
        return None

In [None]:
# input_file = ''
# audio_path = extract_audio(input_file)
# audio_path

# 3. speech-to-text to .str

In [None]:
# !pip install git+https://github.com/openai/whisper.git
import whisper
import os

In [None]:
def format_timestamp(seconds):
    ms = int((seconds % 1) * 1000)
    seconds = int(seconds)
    hrs = seconds // 3600
    mins = (seconds % 3600) // 60
    secs = seconds % 60
    return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}"

@log_and_time   
def transcribe_dutch_audio(audio_path):
    try:
        model = whisper.load_model("base")
        result = model.transcribe(audio_path, language='nl')
        
        # Generate the transcription file path
        transcription_path = os.path.splitext(audio_path)[0] + '_NL.txt'
        
        # Save the transcription to the file with segments
        with open(transcription_path, 'w', encoding='utf-8') as f:
            for segment in result['segments']:
                start = format_timestamp(segment['start'])
                end = format_timestamp(segment['end'])
                text = segment['text'].strip()
                f.write(f"{start} --> {end}\n{text}\n\n")
        
        # Return the transcription file path
        return transcription_path
    except Exception as e:
        print(f"An error occurred during transcription: {str(e)}")
        return None

In [None]:
# audio_path = ''
# transcription_path = transcribe_dutch_audio(audio_path)
# transcription_path

# 4. Transcription to .str

In [None]:
@log_and_time 
def transcription_to_srt(transcription_path):
    try:
        str_path = os.path.splitext(transcription_path)[0] + '.srt'
        
        with open(transcription_path, 'r', encoding='utf-8') as f:
            segments = f.read().strip().split('\n\n')
        
        with open(str_path, "w", encoding="utf-8") as srt_file:
            for i, segment in enumerate(segments):
                lines = segment.split('\n')
                if len(lines) >= 2:
                    times, dutch_text = lines[0], lines[1]
                    start_time, end_time = times.split(' --> ')
                    
                    srt_file.write(f"{i + 1}\n")
                    srt_file.write(f"{start_time} --> {end_time}\n")
                    srt_file.write(f"{dutch_text}\n\n")
        return str_path
    except Exception as e:
        print(f"An error occurred during SRT file creation: {str(e)}")
        return None

In [None]:
# transcription_path = '' #.txt
# str_path = transcription_to_srt(transcription_path)
# str_path

# 5. .srt from Dutch to English translation

In [None]:
# !pip install openai
from openai import AzureOpenAI

In [None]:
@log_and_time
def azureopenai_translate_dutch_file(AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY, AZURE_OPENAI_VERSION, AZURE_OPENAI_MODEL, str_path):
    try:
        client = AzureOpenAI(azure_endpoint=AZURE_OPENAI_ENDPOINT, 
                             api_key=AZURE_OPENAI_KEY, 
                             api_version=AZURE_OPENAI_VERSION)
        
        with open(str_path, 'r', encoding='utf-8') as file:
            file_content = file.read()

        system_prompt = "Translate the following Dutch subtitles to English:"
       
        response = client.chat.completions.create(
            model=AZURE_OPENAI_MODEL,
            messages=[{"role": "system", "content": system_prompt}, 
                      {"role": "user", "content": file_content}]
        )
        
        translated_content = response.choices[0].message.content
   
        # Calculate and print the cost if the cost variables are set      
        if AZURE_OPENAI_COST_IN is not None and AZURE_OPENAI_COST_OUT is not None:
            input_tokens = response.usage.prompt_tokens
            output_tokens = response.usage.completion_tokens
            total_cost = (input_tokens * AZURE_OPENAI_COST_IN/1000) + (output_tokens * AZURE_OPENAI_COST_OUT/1000)
            print(f" - Cost of translation: €{total_cost:.6f} (input tokens: {input_tokens}, output tokens: {output_tokens})")
        else:
            print(" - Costs not calculated!")

        english_str_path = os.path.splitext(str_path)[0] + '_EN.srt'
        with open(english_str_path, 'w', encoding='utf-8') as file:
            file.write(translated_content)
        return english_str_path
    except Exception as e:
        print(f"An error occurred during translation: {str(e)}")
        return None

In [None]:
# str_path = ''
# english_str_path = azureopenai_translate_dutch_file(AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY, AZURE_OPENAI_VERSION, AZURE_OPENAI_MODEL, str_path)
# english_str_path

## 6. .srt to .ass

In [None]:
# !pip install pysubs2
import pysubs2

In [None]:
# Helper function to convert hex color to RGB
def hex_to_rgb(hex_color):
    hex_color = hex_color.lstrip('&H').lstrip('#')
    return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))

@log_and_time
def srt_to_ass(srt_path):
    font_name="Roboto"
    font_size=24
    
    primary_color="&H00FFFFFF"
    outline_color="&H00000000" 
    back_color="&H64000000"
    
    try:
        subs = pysubs2.load(srt_path, encoding="utf-8")
        style = pysubs2.SSAStyle()
        style.fontname = font_name
        style.fontsize = font_size
        style.primarycolor = pysubs2.Color(*hex_to_rgb(primary_color))
        style.outlinecolor = pysubs2.Color(*hex_to_rgb(outline_color))
        style.backcolor = pysubs2.Color(*hex_to_rgb(back_color))
        style.bold = -1
        style.shadow = 2
        style.outline = 1
        style.alignment = pysubs2.Alignment.BOTTOM_CENTER
        style.marginl = 10
        style.marginr = 10
        style.marginv = 10
        subs.styles["Default"] = style

        ass_path = os.path.splitext(srt_path)[0] + '.ass' 
        subs.save(ass_path)
        return ass_path
        
    except Exception as e:
        print(f"An error occurred during SRT to ASS conversion: {str(e)}")
        return None

In [None]:
# srt_path = ''
# ass_path = srt_to_ass(srt_path)
# ass_path

# 7. Convert original video to .mp4 (if needed)

In [None]:
import os
import ffmpeg

In [None]:
@log_and_time
def convert_to_mp4(input_file):
    try:
        if input_file.lower().endswith('.mp4'):
            return input_file
        video_mp4 = os.path.splitext(input_file)[0] + '.mp4'
        ffmpeg.input(input_file).output(video_mp4, vcodec='libx264', acodec='aac').run(overwrite_output=True, quiet=True)
        return video_mp4
    except ffmpeg.Error as e:
        print(f"An error occurred during conversion: {e.stderr.decode('utf8')}")
        return None


In [None]:
# input_file = ''
# video_mp4 = convert_to_mp4(input_file)
# video_mp4

# 8. Add subtitle to video

In [None]:
import ffmpeg
import os
import re

In [None]:
def generate_sequential_output_path(video_path):
    base_dir, base_name = os.path.split(video_path)
    if not base_dir:
        base_dir = '.'  # Use current directory if no directory is provided
    name, ext = os.path.splitext(base_name)
    base_pattern = re.sub(r'(\d+)$', '', name)
    
    try:
        existing_files = [f for f in os.listdir(base_dir) if f.startswith(base_pattern) and f.endswith(ext)]
    except FileNotFoundError:
        print(f"The directory {base_dir} does not exist.")
        return None

    highest_number = 1
    for file in existing_files:
        match = re.search(r'(\d+)(?=\.[^.]+$)', file)
        if match:
            number = int(match.group(1))
            if number > highest_number:
                highest_number = number
    
    new_number = highest_number + 1
    new_base_name = f"{base_pattern}{new_number}{ext}"
    return os.path.join(base_dir, new_base_name)


@log_and_time
def add_subtitle(video_path, subtitle_path):
    try:
        # Generate the output path based on the video path
        output_path = generate_sequential_output_path(video_path)
        if output_path is None:
            return None
        
        # Add subtitles to the video using ffmpeg
        ffmpeg.input(video_path).output(output_path, vf=f"subtitles={subtitle_path}").run(overwrite_output=True, quiet=True)
        return output_path
    except Exception as e:
        print(f"An error occurred during adding subtitles: {str(e)}")
        return None

In [None]:
# video_path = ''
# ass_path = ''

# final_video = add_subtitle(video_path, ass_path)
# final_video

# 9. Clean paths

In [None]:
def clean_up(paths):
    for path in paths:
        try:
            os.remove(path)
        except OSError as e:
            print(f"Error deleting file {path}: {e}")

In [None]:
# clean_up([audio_path, transcription_path, srt_path, ass_path])

# CALLING EVERYTHING!!!!

In [None]:
# input_video = ''

# #1 get video duration
# get_video_duration(input_video)

# #2 extract audio
# audio_path = extract_audio(input_video)

# #3 Transcript the audio (in Dutch)
# transcription_path = transcribe_dutch_audio(audio_path)

# #4 Making the str from the transcription
# str_path = transcription_to_srt(transcription_path)

# #5 Translating the Dutch .str to English using Azure OpenAI
# english_str_path = azureopenai_translate_dutch_file(AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY, AZURE_OPENAI_VERSION, AZURE_OPENAI_MODEL, str_path)

# #6 Making the .ass file
# ass_path = srt_to_ass(english_str_path)

# #7 making sure the original video is .
# video_mp4 = convert_to_mp4(input_video)

# #8 Adding subtitle (.ass) to a video
# final_video = add_subtitle(video_mp4, ass_path)

# #9 Clean-up folder
# os.remove(audio_path)
# os.remove(transcription_path)
# os.remove(str_path)
# os.remove(ass_path)



In [None]:
import tkinter as tk
from tkinter import filedialog, scrolledtext
import sys
import threading
import time
import os

In [None]:
stop_processing = threading.Event()

def run_process(video_path):
    stop_processing.clear()
    go_button.config(state='disabled')
    
    # Step 1: Get video duration
    get_video_duration(video_path)

    # Step 2: Extract audio
    audio_path = extract_audio(video_path)
    if not audio_path:
        return

    # Step 3: Transcribe the audio (in Dutch)
    transcription_path = transcribe_dutch_audio(audio_path)
    if not transcription_path:
        return

    # Step 4: Convert transcription to SRT
    srt_path = transcription_to_srt(transcription_path)

    # Step 5: Translate the Dutch SRT to English using Azure OpenAI
    english_srt_path = azureopenai_translate_dutch_file(AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY, AZURE_OPENAI_VERSION, AZURE_OPENAI_MODEL, srt_path)

    # Step 6: Convert SRT to ASS
    ass_path = srt_to_ass(english_srt_path)

    # Step 7: Ensure the original video is in MP4 format
    video_mp4 = convert_to_mp4(video_path)

    # Step 8: Add subtitle (.ass) to the video
    final_video = add_subtitle(video_mp4, ass_path)
    if not final_video:
        return

    print(f"\nFinal video saved to: {final_video}")
    
    # Step 9: Clean-up folder
    clean_up([audio_path, transcription_path, srt_path, ass_path])

    for _ in range(10):
        if stop_processing.is_set():
            break
        time.sleep(1)

    if open_video_var.get() and not stop_processing.is_set():
        # Open the final video after processing if the checkbox is selected
        os.system(f'open "{final_video}"')
    
    print("\nProcess completed.")
    go_button.config(state='normal')
    stop_processing.set()  # Ensure the timer stops

In [None]:
class TextRedirector:
    def __init__(self, widget, tag="stdout"):
        self.widget = widget
        self.tag = tag

    def write(self, str):
        if self.widget.winfo_exists():
            self.widget.configure(state='normal')
            self.widget.insert("end", str, (self.tag,))
            self.widget.configure(state='disabled')
            self.widget.see("end")

    def flush(self):
        pass

In [None]:
def select_video():
    global selected_video_path
    selected_video_path = filedialog.askopenfilename(filetypes=[("Video files", "*.mp4 *.webm")])
    if selected_video_path:
        selected_file_label.config(text=f"Selected file: {os.path.basename(selected_video_path)}")
        go_button.pack()
        stop_button.pack()

# Function to update the timer
def update_timer(start_time):
    if not stop_processing.is_set():
        elapsed_time = int(time.time() - start_time)
        timer_label.config(text=f"Elapsed Time: {elapsed_time}s")
        timer_label.after(1000, update_timer, start_time)

# Function to start processing and the timer
def start_processing():
    start_time = time.time()
    update_timer(start_time)
    threading.Thread(target=run_process, args=(selected_video_path,)).start()


In [None]:
import tkinter as tk
from tkinter import filedialog, scrolledtext
import os
import sys
import threading
import time

root = tk.Tk()
root.title("Video Processing App")

frame = tk.Frame(root, padx=10, pady=10)
frame.pack(padx=10, pady=10)

select_button = tk.Button(frame, text="Select Video", command=select_video)
select_button.pack()

# Add a Label widget for displaying the selected file name
selected_file_label = tk.Label(frame, text="")
selected_file_label.pack()

# Initialize the Go button but don't pack it yet
go_button = tk.Button(frame, text="Go!", command=start_processing)

# Add a Stop button to stop the process, positioned below the log box
stop_button = tk.Button(root, text="Stop", command=lambda: stop_processing.set())

# Add a Label widget for displaying the elapsed time
timer_label = tk.Label(root, text="Elapsed Time: 0s")
timer_label.pack(side=tk.BOTTOM, anchor=tk.SE, padx=10, pady=10)

# Add a Text widget for displaying logs
log_text = scrolledtext.ScrolledText(root, wrap=tk.WORD, state='disabled', width=80, height=20)
log_text.pack(padx=10, pady=10)

# Add a checkbox to open the video after finishing
open_video_var = tk.BooleanVar(value=True)
open_video_checkbox = tk.Checkbutton(root, text="Open video after finishing", variable=open_video_var)
open_video_checkbox.pack(pady=10)

# Redirect stdout to the text box
sys.stdout = TextRedirector(log_text)

stop_button.pack(pady=10)

root.mainloop()