### Install requirements

In [None]:
!pip uninstall numpy
!pip install --upgrade numpy==1.23
!pip install pydub
!pip install pyannote.audio
!pip install tts
!pip install azure-cognitiveservices-speech azure-ai-textanalytics
!pip install noisereduce soundfile
!pip install speech_recognition
!pip install googletrans
!pip install gtts
!pip install ffmpeg

### Import necessary libraries

In [None]:
#from pyannote.audio import Pipeline
from pydub import AudioSegment
import os
import os
import azure.cognitiveservices.speech as speechsdk
from azure.ai.textanalytics import TextAnalyticsClient
from azure.core.credentials import AzureKeyCredential
import moviepy.editor as mp
import time
#import noisereduce as nr
import librosa
import soundfile as sf
import numpy as np
from scipy.io.wavfile import write
#import speech_recognition as sr
from transformers import pipeline
from googletrans import Translator
from gtts import gTTS
from pydub import AudioSegment
import azure.cognitiveservices.speech as speechsdk
import ffmpeg

#### Global variables

In [None]:
# Azure credentials
speech_key = "****"
speech_region = "eastus"
translator_key = "****"
translator_endpoint = "https://api.cognitive.microsofttranslator.com/"

tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2", gpu=False)

## Utils

#### Reduce Noise

In [None]:
def remove_noise(input_audio_path, output_audio_path, noise_reduction_level=0.1):
    """
    Remove background noise from audio.
    Args:
        input_audio_path (str): Path to the input audio file.
        output_audio_path (str): Path to save the noise-reduced audio file.
        noise_reduction_level (float): The amount of noise reduction (0 to 1).
    """
    # Load the audio file
    audio_data, sample_rate = librosa.load(input_audio_path, sr=None)

    # Perform noise reduction using spectral gating
    reduced_noise = librosa.effects.reduce_noise(y=audio_data, sr=sample_rate, prop_decrease=noise_reduction_level)

    # Save the noise-reduced audio file
    write(output_audio_path, sample_rate, (reduced_noise * 32767).astype(np.int16))
    print(f"Noise-reduced audio saved at {output_audio_path}")

#### Video to audio

In [None]:
def extract_audio_from_video(video_file, output_audio_file):
    print("Extracting audio from video...")
    video = mp.VideoFileClip(video_file)
    video.audio.write_audiofile(output_audio_file)
    print(f"Audio saved to {output_audio_file}")

#### Speech to text

In [None]:
def transcribe_audio(audio_file_path, source_language):
    speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=speech_region)
    speech_config.speech_recognition_language = source_language  # Set the source language (e.g., "fr-FR" for French)
    audio_config = speechsdk.AudioConfig(filename=audio_file_path)

    recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config)

    print(f"Transcribing audio file in {source_language}...")
    result = recognizer.recognize_once()

    if result.reason == speechsdk.ResultReason.RecognizedSpeech:
        print(f"Transcription: {result.text}")
        return result.text
    else:
        print(f"Error transcribing audio: {result.reason}")
        return None

#### Translate

In [None]:
def translate_text(transcription, target_language):
    import requests
    import uuid

    # Prepare the translation request
    path = '/translate?api-version=3.0'
    params = f'&to={target_language}'
    constructed_url = translator_endpoint + path + params

    headers = {
        'Ocp-Apim-Subscription-Key': translator_key,
        'Ocp-Apim-Subscription-Region': speech_region,
        'Content-type': 'application/json',
        'X-ClientTraceId': str(uuid.uuid4())
    }

    body = [{'text': transcription}]

    request = requests.post(constructed_url, headers=headers, json=body)
    response = request.json()

    translated_text = response[0]["translations"][0]["text"]
    print(f"Translated Text: {translated_text}")
    return translated_text

#### Text to Speech

In [None]:
def text_to_speech(text, output_file, original_audio, target_language):
    tts.tts_to_file(text=text, file_path=output_file, speaker_wav=original_audio, language=target_language)

#### Replace original audio with dubbed audio in Video


In [None]:
def mute_original_audio(input_video_path, output_muted_video_path):
    """
    Mute the original audio in the video.

    Args:
        input_video_path (str): Path to the input video file.
        output_muted_video_path (str): Path to save the muted output video.
    """
    subprocess.run([
        'ffmpeg', '-i', input_video_path, '-c', 'copy', '-an', output_muted_video_path
    ], check=True)
    print(f"Muted video saved at: {output_muted_video_path}")

def replace_audio(input_video_path, input_audio_path, output_video_path):
    """
    Replace the original audio in a video with new audio.

    Args:
        input_video_path (str): Path to the input muted video file.
        input_audio_path (str): Path to the new audio file.
        output_video_path (str): Path to save the final video with replaced audio.
    """
    subprocess.run([
        'ffmpeg', '-i', input_video_path, '-i', input_audio_path, '-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental', output_video_path
    ], check=True)
    print(f"Final video with replaced audio saved at: {output_video_path}")

#### Change audio Speed

In [None]:
def change_audio_speed(audio_path, output_path, speed_factor):
    """
    Speed up or slow down audio by changing the playback speed.

    :param audio_path: Path to the input audio file.
    :param output_path: Path to save the output file.
    :param speed_factor: Factor by which to change the speed (e.g., 1.5 for 1.5x faster, 0.5 for 0.5x slower).
    """
    # Load the audio file
    audio = AudioSegment.from_file(audio_path)

    # Modify playback speed (this changes both pitch and speed)
    new_sample_rate = int(audio.frame_rate * speed_factor)

    # Create new AudioSegment with updated frame rate
    new_audio = audio._spawn(audio.raw_data, overrides={'frame_rate': new_sample_rate})

    # Set the new frame rate and export the modified audio
    new_audio = new_audio.set_frame_rate(audio.frame_rate)
    new_audio.export(output_path, format="wav")
    print(f"Audio saved at: {output_path}")

    return new_audio

#### Segregate speaker's voice

In [None]:
def segregate_speakers(audio_path, output_dir="output_segments"):
    """
    Segregates different speaker's voices and returns their timestamps and audio segments.

    :param audio_path: Path to the input audio file.
    :param output_dir: Directory where speaker audio segments will be saved.
    :return: List of dictionaries containing speaker label, timestamps, and segment path.
    """
    # Initialize pre-trained speaker diarization pipeline
    pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")

    # Perform diarization on the input audio file
    diarization = pipeline(audio_path)

    # Load the original audio using pydub
    audio = AudioSegment.from_wav(audio_path)

    # Create the output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Create a dictionary to store speaker segments
    speaker_data = {}

    # Iterate over diarization results and save each segment
    for i, (segment, speaker) in enumerate(diarization.itertracks(yield_label=True)):
        # Convert start and end times from seconds to milliseconds
        start_time = segment.start * 1000
        end_time = segment.end * 1000

        # Extract the audio segment for this speaker
        audio_segment = audio[start_time:end_time]

        # Save the audio segment to a file
        segment_filename = os.path.join(output_dir, f"speaker_{speaker}_segment_{i}.wav")
        audio_segment.export(segment_filename, format="wav")

        # Store each segment in the speaker's dictionary
        if speaker not in speaker_data:
            speaker_data[speaker] = []
        speaker_data[speaker].append({
            'start_time': segment.start,
            'end_time': segment.end,
            'audio_path': segment_filename
        })

    return speaker_data

# Dub (main function)

In [None]:
# Main function to run the process
def DubVideo(input_video_file, target_language, source_language="en-US"):

    # Step 1: Extract the original audio from the video
    original_audio_file = input_video_file.split(".mp4")[0] + ".wav"
    extract_audio_from_video(input_video_file, original_audio_file)

    # Step 2: Transcribe the original audio in the specified source language
    transcription = transcribe_audio(original_audio_file, source_language)
    if not transcription:
        print("Transcription failed.")
        return

    # Step 3: Translate the transcription into the target language
    translated_text = translate_text(transcription, target_language)

    # Step 4: Convert the translated text back to speech
    output_audio_file = original_audio_file.split(".wav")[0] + "_dubbed.wav"
    text_to_speech(translated_text, output_audio_file, original_audio_file, target_language)

    # Step 2: Reduce noise in the extracted audio
    reduce_noise(output_audio_file, output_audio_file)

    # Step 5: Replace the audio in the original video
    output_video_file = input_video_file.split(".mp4")[0] + "_dubbed.wav"
    replace_audio_in_video(input_video_file, output_audio_file, output_video_file)