# Video translation with speaker diarization and voice cloning

In [1]:
!pip install kaleido
!pip install openai
!pip install tiktoken
!pip install cohere

!pip install -q -U gradio
!pip install git+https://github.com/m-bain/whisperx.git
!pip install -q -U moviepy
!pip install -q -U deepl
!pip install -q -U librosa
!pip install -q -U TTS

Collecting kaleido
  Downloading kaleido-0.2.1-py2.py3-none-manylinux1_x86_64.whl (79.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.9/79.9 MB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kaleido
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
lida 0.0.10 requires fastapi, which is not installed.
lida 0.0.10 requires python-multipart, which is not installed.
lida 0.0.10 requires uvicorn, which is not installed.[0m[31m
[0mSuccessfully installed kaleido-0.2.1
Collecting openai
  Downloading openai-1.2.1-py3-none-any.whl (220 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m220.2/220.2 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
Collecting httpx<1,>=0.23.0 (from openai)
  Downloading httpx-0.25.1-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m899.8/899.8 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.8/10.8 MB[0m [31m47.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.5/78.5 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m289.9/289.9 kB[0m [31m35.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.9/48.9 kB[0m [31m7.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.1/71.1 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.8/90.8 kB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setu

In [2]:
import os
import gradio as gr
import whisperx
import moviepy.editor as mp
import deepl
import torch
import librosa
from TTS.api import TTS

  torchaudio.set_audio_backend("soundfile")
  torchaudio.set_audio_backend("soundfile")


In [1]:
HF_TOKEN = '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'

from google.colab import drive
drive.mount('/content/drive')



# Extract audio from video
def extract_audio(video_path):
  clip = mp.VideoFileClip(video_path)
  audio_path = os.path.splitext(video_path)[0] + ".wav"
  clip.audio.write_audiofile(audio_path)
  return audio_path



# Perform speech diarization
def speech_diarization(audio_path, hf_token):
  device = "cuda"
  batch_size = 16
  compute_type = "float16"
  model = whisperx.load_model("large-v2", device, compute_type=compute_type)

  audio = whisperx.load_audio(audio_path)
  result = model.transcribe(audio, batch_size=batch_size)
  print(result["segments"])

  # delete model if low on GPU resources
  import gc; gc.collect(); torch.cuda.empty_cache(); del model

  # 2. Align whisper output
  model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
  result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)

  print(result["segments"]) # after alignment

  # delete model if low on GPU resources
  import gc; gc.collect(); torch.cuda.empty_cache(); del model_a

  # 3. Assign speaker labels
  diarize_model = whisperx.DiarizationPipeline(model_name='pyannote/speaker-diarization@2.1', use_auth_token=hf_token, device=device)

  # add min/max number of speakers if known
  diarize_segments = diarize_model(audio)
  # diarize_model(audio, min_speakers=min_speakers, max_speakers=max_speakers)

  result = whisperx.assign_word_speakers(diarize_segments, result)
  print(diarize_segments)
  print(result["segments"])

  return result["segments"]



# Create per speaker voice clips for tts voice cloning
def speaker_voice_clips(transcription, audio_path):
  # Create 3 uninterrupted per speaker timecodes
  snippets_timecodes = {}
  for segment in transcription:
    speaker = segment['speaker']

    if speaker not in snippets_timecodes:
      snippets_timecodes[speaker] = []

    if len(snippets_timecodes[speaker]) < 3:
      snippet = {
          'start': segment['start'],
          'end': segment['end']
      }
      snippets_timecodes[speaker].append(snippet)

  # Cut voice clips and stitch them together
  original_audio = mp.AudioFileClip(audio_path)
  audio_file_directory = os.path.dirname(audio_path)

  voice_clip_names = []
  for speaker, speaker_snippets in snippets_timecodes.items():
    subclips = []
    for snippet in speaker_snippets:
      start, end = snippet['start'], snippet['end']
      subclip = original_audio.subclip(start, end)
      subclips.append(subclip)

    concatenated_clip = mp.concatenate_audioclips(subclips)

    output_filename = os.path.join(audio_file_directory, f"{speaker}_voice_clips.wav")
    concatenated_clip.write_audiofile(output_filename)
    voice_clip_names.append(output_filename)

  return voice_clip_names



# Perform text translation
def translate_transcript(transcript, target_language):
  translated_transcript = []
  for segment in transcript:
    text_to_translate = segment['text']
    translated_text = deepl.translate(text_to_translate, target_language)

    translated_segment = {
        'start': segment['start'],
        'end': segment['end'],
        'text': translated_text,
        'speaker': segment['speaker']
    }

    translated_transcript.append(translated_segment)

  return translated_transcript



# Perform voice cloning
def voice_cloning_translation(translated_transcription, speakers_voice_clips, target_language, speaker_model):
  device = "cuda"
  vits_language_map = {
      'ru':'rus',
      'uk':'ukr'
  }
  models = {
    'xtts': 'tts_models/multilingual/multi-dataset/xtts_v2',
    'vits': f'tts_models/{target_language}/fairseq/vits',
  }

  selected_model = None

  # Select model
  for key in models.keys():
    if key in speaker_model:
      if key is 'vits':
        target_language == vits_language_map[target_language]
      selected_model = models[key]

  tts = TTS(selected_model).to(device)

  speaker_tracks = {}

  for speaker, _ in speakers_voice_clips.items():
    clips = []

    for item in translated_transcription:
      if item['speaker'] == speaker:
        # Generate audio using TTS
        audio = tts.tts_with_vc(text=item['text'], speaker_wav=speakers_voice_clips[speaker], language=target_language)

        # Create an AudioFileClip
        audio_clip = AudioFileClip(audio)  # Assuming audio is a path to the generated audio file

        # Add start time as start of the AudioClip
        audio_clip = audio_clip.set_start(item['start'])

        # Append to the list of clips
        clips.append(audio_clip)

    # Fill gaps with silence and concatenate clips
    full_clip = concatenate_audioclips(clips)
    speaker_tracks[speaker] = full_clip

  return audio



# Adjust voice pace
def adjust_voice_pace(audio_path, target_duration):
    y, sr = librosa.load(audio_path, sr=None)
    duration = librosa.get_duration(y=y, sr=sr)
    tempo = duration / target_duration
    y_stretched = librosa.effects.time_stretch(y, tempo)
    librosa.output.write_wav(audio_path, y_stretched, sr)
    return audio_path



# Perform video translation
def video_translation(video_path, target_language, hf_token, speaker_model):

  audio_path = extract_audio(video_path)

  transcription = speech_diarization(audio_path, hf_token)

  translated_transcription = translate_transcript(transcription, target_language)
  speakers_voice_clips = speaker_voice_clips(transcription, audio_path)

  cloned_audio_path = voice_cloning_translation(translated_transcription, speakers_voice_clips, target_language, speaker_model)

  # target_duration = mp.VideoFileClip(video_path).duration
  # for i, audio_path in enumerate(cloned_audio_paths):
  #   cloned_audio_paths[i] = adjust_voice_pace(audio_path, target_duration)

  video = mp.VideoFileClip(video_path)
  audio_clips = [mp.AudioFileClip(audio_path) for audio_path in cloned_audio_paths]
  audio = mp.concatenate_audioclips(audio_clips)
  video_with_new_audio = video.set_audio(audio)

  video_with_new_audio_path = os.path.splitext(video_path)[0] + "_" + target_language + ".mp4"
  video_with_new_audio.write_videofile(video_with_new_audio_path)

  return video_with_new_audio_path



video_translation('/content/drive/MyDrive/Data/fridman-harris-demo-6min.mp4', 'ru', HF_TOKEN)

IndentationError: ignored

In [None]:


def translate_video(video_path, target_language, speaker_model_path):
    try:
      video_with_new_audio_path = video_translation(video_path, target_language, speaker_model_path)
    except Exception as e:
      print(f"An error occurred: {e}")
    return gr.components.Video(video_with_new_audio_path)


inputs = [
    gr.Video(label="Select a video file"),
    gr.Dropdown(["uk", "ru"], label="Select target language"),
    gr.Dropdown(["XTTS (16 languages)", "VITs (over 1100 languages)"], label="Select text-to-speech generation model")
]

outputs = gr.Video(label="Translated video")
gr.Interface(fn=translate_video, inputs=inputs, outputs=outputs, title="AI Video Translation").launch()

