In [16]:
! pip install git+https://github.com/openai/whisper.git -q  

In [1]:
!pip install moviepy gtts googletrans 



In [1]:
import whisper
import os
from googletrans import Translator
from gtts import gTTS


model = whisper.load_model("base")

# Define a list of target languages
target_languages = ['te', 'fr', 'hi', 'de']  # Add more languages as needed

def translate_audio(audio_file,output_audio_file_location,langauge):
    # Load audio file
    with io.open(audio_file, "rb") as audio_file:
        content = audio_file.read()
        audio = speech.RecognitionAudio(content=content)
    audio = whisper.load_audio(audio)
    print("audio loaded")
    audio = whisper.pad_or_trim(audio)

    # Extract features
    mel = whisper.log_mel_spectrogram(audio).to(model.device)
    print("extracted features")
    # Detect language
    _, probs = model.detect_language(mel)
    detected_language = max(probs, key=probs.get)
    print(f"Detected language for {os.path.basename(audio_file)}: {detected_language}")

    # Decode and print result
    options = whisper.DecodingOptions(fp16=False)
    result = whisper.decode(model, mel, options)
    print(f"Result for {os.path.basename(audio_file)}: {result.text}")
    print("=" * 50)

    # Translate and save audio for each target language
    
    translator = Translator()
    translated_result = translator.translate(result.text, src='en', dest=langauge)
    translated_text = translated_result.text

    # Save translated text as an audio file
    tts = gTTS(translated_text, lang=langauge)
    tts.save(output_audio_file_location)


KeyboardInterrupt: 

In [6]:
!pip install librosa flask numpy tqdm torch moviepy



In [1]:
from os import listdir, path
import numpy as np
import scipy, cv2, os, sys, argparse, audio
import json, subprocess, random, string
from tqdm import tqdm
from glob import glob
import torch, face_detection
from models import Wav2Lip
import platform

import json

args = {
    'checkpoint_path': "D:/PAGO/Wav2Lip/checkpoints/wav2lip.pth",
    'face': "D:/PAGO/Wav2Lip/media/uploaded_video/uploaded.mp4",
    'audio': "D:/PAGO/Wav2Lip/media/uploaded_audio/uploaded.wav",
    'outfile': "D:/PAGO/Wav2Lip/results/video.mp4",
    'static': False,
    'fps': 25.,
    'pads': [0, 10, 0, 0],
    'face_det_batch_size': 16,
    'wav2lip_batch_size': 128,
    'resize_factor': 1,
    'crop': [0, -1, 0, -1],
    'box': [-1, -1, -1, -1],
    'rotate': False,
    'nosmooth': False,
    'img_size': 96
}

# Save the dictionary as a JSON file
with open('args.json', 'w') as json_file:
    json.dump(args, json_file)


if os.path.isfile(args['face']) and args['face'].split('.')[1] in ['jpg', 'png', 'jpeg']:
	args['static'] = True

def get_smoothened_boxes(boxes, T):
	for i in range(len(boxes)):
		if i + T > len(boxes):
			window = boxes[len(boxes) - T:]
		else:
			window = boxes[i : i + T]
		boxes[i] = np.mean(window, axis=0)
	return boxes

def face_detect(images):
	detector = face_detection.FaceAlignment(face_detection.LandmarksType._2D, 
											flip_input=False, device=device)

	batch_size = args['face_det_batch_size']
	
	while 1:
		predictions = []
		try:
			for i in tqdm(range(0, len(images), batch_size)):
				predictions.extend(detector.get_detections_for_batch(np.array(images[i:i + batch_size])))
		except RuntimeError:
			if batch_size == 1: 
				raise RuntimeError('Image too big to run face detection on GPU. Please use the --resize_factor argument')
			batch_size //= 2
			print('Recovering from OOM error; New batch size: {}'.format(batch_size))
			continue
		break

	results = []
	pady1, pady2, padx1, padx2 = args['pads']
	for rect, image in zip(predictions, images):
		if rect is None:
			cv2.imwrite('temp/faulty_frame.jpg', image) # check this frame where the face was not detected.
			raise ValueError('Face not detected! Ensure the video contains a face in all the frames.')

		y1 = max(0, rect[1] - pady1)
		y2 = min(image.shape[0], rect[3] + pady2)
		x1 = max(0, rect[0] - padx1)
		x2 = min(image.shape[1], rect[2] + padx2)
		
		results.append([x1, y1, x2, y2])

	boxes = np.array(results)
	if not args['nosmooth']: boxes = get_smoothened_boxes(boxes, T=5)
	results = [[image[y1: y2, x1:x2], (y1, y2, x1, x2)] for image, (x1, y1, x2, y2) in zip(images, boxes)]

	del detector
	return results 

def datagen(frames, mels):
	img_batch, mel_batch, frame_batch, coords_batch = [], [], [], []

	if args.box[0] == -1:
		if not args['static']:
			face_det_results = face_detect(frames) # BGR2RGB for CNN face detection
		else:
			face_det_results = face_detect([frames[0]])
	else:
		print('Using the specified bounding box instead of face detection...')
		y1, y2, x1, x2 = args['box']
		face_det_results = [[f[y1: y2, x1:x2], (y1, y2, x1, x2)] for f in frames]

	for i, m in enumerate(mels):
		idx = 0 if args['static'] else i%len(frames)
		frame_to_save = frames[idx].copy()
		face, coords = face_det_results[idx].copy()

		face = cv2.resize(face, (args['img_size'], args['img_size']))
			
		img_batch.append(face)
		mel_batch.append(m)
		frame_batch.append(frame_to_save)
		coords_batch.append(coords)

		if len(img_batch) >= args['wav2lip_batch_size']:
			img_batch, mel_batch = np.asarray(img_batch), np.asarray(mel_batch)

			img_masked = img_batch.copy()
			img_masked[:, args['img_size']//2:] = 0

			img_batch = np.concatenate((img_masked, img_batch), axis=3) / 255.
			mel_batch = np.reshape(mel_batch, [len(mel_batch), mel_batch.shape[1], mel_batch.shape[2], 1])

			yield img_batch, mel_batch, frame_batch, coords_batch
			img_batch, mel_batch, frame_batch, coords_batch = [], [], [], []

	if len(img_batch) > 0:
		img_batch, mel_batch = np.asarray(img_batch), np.asarray(mel_batch)

		img_masked = img_batch.copy()
		img_masked[:, args['img_size']//2:] = 0

		img_batch = np.concatenate((img_masked, img_batch), axis=3) / 255.
		mel_batch = np.reshape(mel_batch, [len(mel_batch), mel_batch.shape[1], mel_batch.shape[2], 1])

		yield img_batch, mel_batch, frame_batch, coords_batch

mel_step_size = 16
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} for inference.'.format(device))

def _load(checkpoint_path):
	if device == 'cuda':
		checkpoint = torch.load(checkpoint_path)
	else:
		checkpoint = torch.load(checkpoint_path,
								map_location=lambda storage, loc: storage)
	return checkpoint

def load_model(path):
	model = Wav2Lip()
	print("Load checkpoint from: {}".format(path))
	checkpoint = _load(path)
	s = checkpoint["state_dict"]
	new_s = {}
	for k, v in s.items():
		new_s[k.replace('module.', '')] = v
	model.load_state_dict(new_s)

	model = model.to(device)
	return model.eval()

def convert(audio_file, video_file, checkpoint_path, final_output_directory):
    args['face'] = video_file
    args['audio'] = audio_file
    args['checkpoint_path'] = checkpoint_path
    print("generating video")
    if not os.path.isfile(args['face']):
        raise ValueError('--face argument must be a valid path to video/image file')

    elif args['face'].split('.')[1] in ['jpg', 'png', 'jpeg']:
        full_frames = [cv2.imread(args['face'])]
        fps = args['fps']

    else:
        video_stream = cv2.VideoCapture(args['face'])
        fps = video_stream.get(cv2.CAP_PROP_FPS)

        print('Reading video frames...')

        full_frames = []
        while 1:
            still_reading, frame = video_stream.read()
            if not still_reading:
                video_stream.release()
                break
            if args['resize_factor'] > 1:
                frame = cv2.resize(frame, (frame.shape[1] // args['resize_factor'], frame.shape[0] // args['resize_factor']))

            if args['rotate']:
                frame = cv2.rotate(frame, cv2.cv2.ROTATE_90_CLOCKWISE)

            y1, y2, x1, x2 = args['crop']
            if x2 == -1: x2 = frame.shape[1]
            if y2 == -1: y2 = frame.shape[0]

            frame = frame[y1:y2, x1:x2]

            full_frames.append(frame)

    print("Number of frames available for inference: " + str(len(full_frames)))

    if not args['audio'].endswith('.wav'):
        print('Extracting raw audio...')
        command = 'ffmpeg -y -i {} -strict -2 {}'.format(args['audio'], 'temp/temp.wav')

        subprocess.call(command, shell=True)
        args['audio'] = 'temp/temp.wav'
    print("original audio")
    wav = audio.load_wav(args['audio'], 16000)
    mel = audio.melspectrogram(wav)
    print(mel.shape)

    if np.isnan(mel.reshape(-1)).sum() > 0:
        raise ValueError('Mel contains nan! Using a TTS voice? Add a small epsilon noise to the wav file and try again')

    mel_chunks = []
    mel_idx_multiplier = 80. / fps
    i = 0
    while 1:
        start_idx = int(i * mel_idx_multiplier)
        if start_idx + mel_step_size > len(mel[0]):
            mel_chunks.append(mel[:, len(mel[0]) - mel_step_size:])
            break
        mel_chunks.append(mel[:, start_idx: start_idx + mel_step_size])
        i += 1

    print("Length of mel chunks: {}".format(len(mel_chunks)))

    full_frames = full_frames[:len(mel_chunks)]

    batch_size = args['wav2lip_batch_size']
    gen = datagen(full_frames.copy(), mel_chunks)

    try:
        print("video generating")  
        for i, (img_batch, mel_batch, frames, coords) in enumerate(tqdm(gen,
                                                                        total=int(
                                                                            np.ceil(float(len(mel_chunks)) / batch_size)))):
            print("xyz")
            if i == 0:
                model = load_model(args['checkpoint_path'])
                print("Model loaded")

                frame_h, frame_w = full_frames[0].shape[:-1]
                out = cv2.VideoWriter('temp/result.avi',
                        cv2.VideoWriter_fourcc(*'DIVX'), fps, (frame_w, frame_h))
                img_batch = torch.FloatTensor(np.transpose(img_batch, (0, 3, 1, 2))).to(device)
                mel_batch = torch.FloatTensor(np.transpose(mel_batch, (0, 3, 1, 2))).to(device)

                with torch.no_grad():
                    pred = model(mel_batch, img_batch)

                pred = pred.cpu().numpy().transpose(0, 2, 3, 1) * 255.

                for p, f, c in zip(pred, frames, coords):
                    y1, y2, x1, x2 = c
                    p = cv2.resize(p.astype(np.uint8), (x2 - x1, y2 - y1))

                    f[y1:y2, x1:x2] = p
                    out.write(f)  # frames for entire video # 4 -> 4 video frames * 8
        out.release()
        # command = 'ffmpeg -y -i {} -i {} -strict -2 -q:v 1 {}'.format(args['audio'], 'temp/result.avi', args['outfile'])
        # subprocess.call(command, shell=platform.system() != 'Windows')
    except:
        return 'error'
	
    command = 'ffmpeg -y -i {} -i {} -strict -2 -q:v 1 {}'.format(args['audio'], 'temp/result.avi', final_output_directory)
    subprocess.call(command, shell=platform.system() != 'Windows')


def get_conversion(audio_file, video_file, file_name, checkpoint_path, final_output_directory):
    convert(audio_file, video_file, checkpoint_path, final_output_directory)


# if __name__ == '__main__':
# 	main()

Using cpu for inference.


In [None]:
video_file=r"D:\PAGO\sample_data\uploaded.mp4"
audio_file=r"D:\PAGO\translated_audio\translated.wav"
final_output_directory=r"D:\PAGO\converted_videos"
checkpoint_path=r"D:\PAGO\Wav2Lip\checkpoints\wav2lip_gan.pth"
convert(audio_file, video_file, checkpoint_path, final_output_directory)

generating video
Reading video frames...
Number of frames available for inference: 36
original audio


In [None]:
import os
from flask import Flask, request, session, send_file, render_template
from werkzeug.utils import secure_filename
import audio_video_handler
import pathlib
from pathlib import Path

app = Flask(__name__, static_folder='static')
app.config['SECRET_KEY']="abcdefg"

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/language', methods=['POST'])
def translate_text():
    try:
        selected_language = request.form.get('language')
        if selected_language in ['en', 'hi']:  # Add more languages as needed
            session['selected_language'] = selected_language
            return "Language selected successfully"
        else:
            return "Invalid language selected", 400
    except KeyError:
        return "No language selected", 400

@app.route('/upload_video', methods=['POST'])
def upload_video():
    try:
        media_folder = "D:/PAGO/sample_data"
        if not os.path.exists(media_folder):
            os.makedirs(media_folder)
        file = request.files['videoFile']
        filename = secure_filename(file.filename)
        extension = pathlib.Path(filename).suffix
        filename = 'uploaded' + extension
        destination = "/".join([media_folder, filename])
        file.save(destination)
        return 'Video uploaded successfully'
    except Exception as e:
        return f'Error occurred: {e}', 500


@app.route('/generate', methods=['POST'])
def generate_result():
    try:
        print("covnerting video .. ")
        selected_language = session.get('selected_language')  # Get selected language from session
        if not selected_language:
            return "Language not selected. Please select a language first.", 400
        else:
            print( "Language selected")
        # Assuming convert_video accepts the language as an argument
        audio_video_handler.convert_video(language=selected_language)
        print("conversion .. completed .. ")
        return render_template('index.html', show_download=True)
    except Exception as e:
        return f'Error occurred: {e}', 500

@app.route('/download_generated_video', methods=['POST'])
def get_generated_video():
    output_video_path ="D:/PAGO/converted_videos/output.mp4" # Replace this with the actual path of the generated video
    try:
        return send_file(output_video_path, as_attachment=True)
    except:
        return "Generated video file not found!", 404

if __name__ == '__main__':
    app.run(debug=True, port=500)

Using cpu for inference.


In [None]:
from moviepy.editor import *

# This file is dedicated for extracting audio and saving it to a particular file

def extract_audio(default_video_file, output_audio_file_location):
    clip =  VideoFileClip(default_video_file)
    clip.audio.write_audiofile(output_audio_file_location)

In [10]:
import io
language = 'hi'
def convert_video(language,video_file=DEFAULT_VIDEO_FILE, output_audio_file_location=OUTPUT_AUDIO_FILE_LOCATION, output_translated_audio_location=OUTPUT_TRANSLATED_AUDIO_LOCATION, video_file_name=DEFAULT_VIDEO_FILE_NAME , final_output_directory=FINAL_OUTPUT_DIRECTOR):

    print("started converting")
    # language_code, voice_name = find_language_code(language)
    video_file_name = video_file_name + "_" +language
    print(video_file_name)
    audio_file_id = "/" + video_file_name +".wav"
    output_audio_file_location = output_audio_file_location + audio_file_id
    print(output_audio_file_location)
    if not os.path.exists(output_audio_file_location):
        raise FileNotFoundError(f"Audio file not found: {output_audio_file_location}")

#     extract_audio_from_video.extract_audio(video_file, output_audio_file_location)
#     print("audio extracted")
    transation_completed = translate_audio(output_audio_file_location,
                                                                          output_translated_audio_location,
                                                                          language
                                                                          )
    return "success"
convert_video(language,video_file=DEFAULT_VIDEO_FILE, output_audio_file_location=OUTPUT_AUDIO_FILE_LOCATION, output_translated_audio_location=OUTPUT_TRANSLATED_AUDIO_LOCATION, video_file_name=DEFAULT_VIDEO_FILE_NAME , final_output_directory=FINAL_OUTPUT_DIRECTOR)


started converting
talk_voice_English_hi
D:/PAGO/extracted_audio/talk_voice_English_hi.wav


NameError: name 'speech' is not defined

In [11]:
audio_file="D:/PAGO/extracted_audio/talk_voice_English_hi.wav"
result = model.transcribe(audio_file, fp16=False) 
print("result")

FileNotFoundError: [WinError 2] The system cannot find the file specified

In [9]:
destination

'D:/PAGO/Wav2Lip/media/.wav'