<a href="https://colab.research.google.com/github/pia-fml1/DeepFake-Voice-Cloning/blob/main/Wav2lip/Wav2Lip.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import sys

if os.path.exists('installed.txt'):
  sys.exit('Step 1 has already been run on this instance!')

print('checking for GPU')
#check GPU is enabled
import torch
if not torch.cuda.is_available():
  sys.exit('No GPU in runtime. Please go to the "Runtime" menu, "Change runtime type" and select "GPU".')

print('requesting Google Drive access')
#prompt to mount google drive
try:
  from google.colab import drive
  drive.mount('/content/drive')
except:
  print("google drive not linked")

#start timer
import time
start_time = time.time()

#clone git
giturl = 'https://github.com/pia-fml1/DeepFake-Voice-Cloning.git'
gitbranch = 'main'
!git clone -b {gitbranch} {giturl}
import re
regex = r'([^\/]+)(?=\.git)'
match = re.search(regex, giturl)
project_dir = match.group(1)+'/Wav2lip'
%cd '{project_dir}'
!mkdir 'face_alignment' 'temp'

#get face_alignment folder
!git clone https://github.com/1adrianb/face-alignment.git
!mv face-alignment/face_alignment/* face_alignment/
!rm -rf face-alignment

#install prerequisites
print('installing batch_face')
!pip install torch==2.1.0+cu121 torchvision==0.16.0+cu121 -f https://download.pytorch.org/whl/torch_stable.html
!pip install batch_face --quiet
print('installing gfpgan')
!pip install gfpgan --quiet

#import functions
from easy_functions import (format_time,
                            get_input_length,
                            get_video_details,
                            load_file_from_url,
                            load_model,
                            load_predictor,
                            show_video)
import contextlib
import face_alignment
import shutil
import subprocess
import warnings
from enhance import load_sr
from IPython.display import Audio, Image, clear_output, display
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip

#download and initialize both wav2lip models
print('downloading wav2lip essentials')
load_file_from_url(
  url='https://github.com/anothermartz/Easy-Wav2Lip/releases/download/Prerequesits/Wav2Lip_GAN.pth',
  model_dir='checkpoints', progress=True, file_name='Wav2Lip_GAN.pth')
model = load_model("/content/"+project_dir+"/checkpoints/Wav2Lip_GAN.pth")
print('Wav2Lip_GAN loaded')
load_file_from_url(
  url='https://github.com/anothermartz/Easy-Wav2Lip/releases/download/Prerequesits/Wav2Lip.pth',
  model_dir='checkpoints', progress=True, file_name='Wav2Lip.pth')
model = load_model("/content/"+project_dir+"/checkpoints/Wav2Lip.pth")
print('wav2lip loaded')

#download gfpgan files
print("downloading gfpgan essentials")
load_file_from_url(
  url='https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth',
  model_dir='checkpoints', progress=True, file_name='GFPGANv1.4.pth')
load_sr()

print('initializing face detectors')
#load face detectors
load_predictor()

#write a file to signify setup is done
with open('installed.txt', 'w') as f:
    f.write('Easy-Wav2Lip v7 has been installed.')
clear_output()
print("Installation complete, move to Step 2!")
#end timer
elapsed_time = time.time() - start_time
print(f"Execution time: {format_time(elapsed_time)}")

checking for GPU
requesting Google Drive access
Mounted at /content/drive
Cloning into 'DeepFake-Voice-Cloning'...
remote: Enumerating objects: 105, done.[K
remote: Counting objects: 100% (105/105), done.[K
remote: Compressing objects: 100% (87/87), done.[K
remote: Total 105 (delta 39), reused 57 (delta 14), pack-reused 0 (from 0)[K
Receiving objects: 100% (105/105), 8.07 MiB | 12.83 MiB/s, done.
Resolving deltas: 100% (39/39), done.
/content/DeepFake-Voice-Cloning
Cloning into 'face-alignment'...
remote: Enumerating objects: 1122, done.[K
remote: Counting objects: 100% (153/153), done.[K
remote: Compressing objects: 100% (90/90), done.[K
remote: Total 1122 (delta 79), reused 108 (delta 56), pack-reused 969 (from 1)[K
Receiving objects: 100% (1122/1122), 6.28 MiB | 11.24 MiB/s, done.
Resolving deltas: 100% (680/680), done.
installing batch_face
Looking in links: https://download.pytorch.org/whl/torch_stable.html
Collecting torch==2.1.0+cu121
  Downloading https://download.pytor

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.6/30.6 MB[0m [31m19.4 MB/s[0m eta [36m0:00:00[0m
[?25hinstalling gfpgan
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.5/172.5 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.8/46.8 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.2/52.2 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.6/59.6 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m297.8/297.8 kB[0m [31m27.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━

ModuleNotFoundError: No module named 'easy_functions'

In [None]:
import os
import sys
if not os.path.exists('installed.txt'):
  sys.exit('Step 1 has not been run in this instance! Please run step 1 each time you disconnect from a runtime.')
start_time = time.time()
############################## user inputs #####################################
video_file = ""
vocal_file = ""

if not os.path.exists(video_file):
  sys.exit('Video file not found!')

quality = "Enhanced" #["Fast", "Improved", "Enhanced", "Experimental"]
#preview_quality = False #@param {type:"boolean"} - coming soon!
output_height = "full resolution"
if quality == "Fast":
  no_sr=True
  better_mask=False
if quality == "Improved":
  no_sr=True
  better_mask=True
if quality == "Enhanced":
  no_sr=False
  better_mask=True

delete_previous_track = False
wav2lip_version = "Wav2Lip"
if wav2lip_version=="Wav2Lip_GAN":
  checkpoint_path = '/content/'+project_dir+'/checkpoints/Wav2Lip_GAN.pth'
else:
  checkpoint_path = '/content/'+project_dir+'/checkpoints/Wav2Lip.pth'

nosmooth = True
U = 0
D = 0
L = 0
R = 0

size = 2.5
feathering = 2
mouth_tracking = False
debug_mask = False

if feathering == 3:
  feathering = 5
if feathering == 2:
  feathering = 3

resolution_scale = 1
res_custom = False
if output_height == 'half resolution':
  resolution_scale = 2
elif output_height == 'full resolution':
  resolution_scale = 1
else:
  res_custom = True
  resolution_scale = 3

in_width, in_height, in_fps, in_length = get_video_details(video_file)
out_height = round(in_height / resolution_scale)

if res_custom:
  out_height = int(output_height)
fps_for_static_image = 30
batch_process = False
output_suffix = "_EZWav2Lip"
include_settings_in_suffix = False

if output_suffix == '' and not include_settings_in_suffix:
  sys.exit('Current suffix settings will overwrite your input video! Please add a suffix or tick include_settings_in_suffix')

preview_input = False
preview_settings = False
frame_to_preview = 100
frame_to_preview = max(frame_to_preview -1,0)

if include_settings_in_suffix:
  if wav2lip_version=="Wav2Lip_GAN":
    output_suffix = f'{output_suffix}_GAN'
  output_suffix = f'{output_suffix}_{quality}'
  if output_height != 'full resolution':
    output_suffix = f'{output_suffix}_{out_height}'
  if nosmooth:
    output_suffix = f'{output_suffix}_nosmooth1'
  else:
    output_suffix = f'{output_suffix}_nosmooth0'
  if U!=0 or D!=0 or L!=0 or R!=0:
    output_suffix = f'{output_suffix}_pads-'
    if U!=0:
      output_suffix = f'{output_suffix}U{U}'
    if D!=0:
      output_suffix = f'{output_suffix}D{D}'
    if L!=0:
      output_suffix = f'{output_suffix}L{L}'
    if R!=0:
      output_suffix = f'{output_suffix}R{R}'
  if quality != 'fast':
    output_suffix = f'{output_suffix}_mask-S{size}F{feathering}'
    if mouth_tracking:
      output_suffix = f'{output_suffix}_mt'
    if debug_mask:
      output_suffix = f'{output_suffix}_debug'
if preview_settings:
  output_suffix = f'{output_suffix}_preview'


rescaleFactor = str(round(1 // resolution_scale))
pad_up = str(round(U * resolution_scale))
pad_down = str(round(D * resolution_scale))
pad_left = str(round(L * resolution_scale))
pad_right = str(round(R * resolution_scale))
################################################################################


######################### reconstruct input paths ##############################
# check video_file exists
if not os.path.exists(video_file):
  sys.exit(f'Could not find file: {video_file}')
# extract each part of the path
filename = re.search(r"[^\/]+(?=\.\w+$)", video_file).group()
file_type = os.path.splitext(video_file)[1]
folder = re.search(r"^(.*\/)[^\/]+$", video_file).group(1)
filenumber_match = re.search(r"\d+$", filename)
if filenumber_match: # if there is a filenumber - extract it
  filenumber = str(filenumber_match.group())
  filenamenonumber = re.sub(r"\d+$", "", filename)
else: # if there is no filenumber - make it blank
  filenumber = ""
  filenamenonumber = filename

# if vocal_file is blank - use the video as audio
if vocal_file == "":
  vocal_file = video_file
# if not, check that the vocal_file file exists
else:
  if not os.path.exists(vocal_file):
    sys.exit(f'Could not find file: {vocal_file}')
# extract each part of the path:
audio_filename = re.search(r"[^\/]+(?=\.\w+$)", vocal_file).group()
audio_file_type = os.path.splitext(vocal_file)[1]
audio_folder = re.search(r"^(.*\/)[^\/]+$", vocal_file).group(1)
audio_filenumber_match = re.search(r"\d+$", audio_filename)
if audio_filenumber_match: #if there is a filenumber - extract it
  audio_filenumber = str(audio_filenumber_match.group())
  audio_filenamenonumber = re.sub(r"\d+$", "", audio_filename)
else: # if there is no filenumber - make it blank
  audio_filenumber = ""
  audio_filenamenonumber = audio_filename
################################################################################

# set process_failed to False so that it may be set to True if one or more processings fail
process_failed = False
temp_output = '/content/'+project_dir+'/temp/output.mp4'
temp_folder = '/content/'+project_dir+'/temp/'
last_input_video = None
last_input_audio = None

#--------------------------Batch processing loop-------------------------------!
while True:

  # construct input_video

  input_video = folder + filenamenonumber + str(filenumber) + file_type
  input_videofile = re.search(r"[^\/]+$", input_video).group()
  # construct input_audio
  input_audio = audio_folder + audio_filenamenonumber + str(audio_filenumber) + audio_file_type
  input_audiofile = re.search(r"[^\/]+$", input_audio).group()
  # see if filenames are different:
  if filenamenonumber + str(filenumber) != audio_filenamenonumber + str(audio_filenumber):
    output_filename = filenamenonumber + str(filenumber) + "_" + audio_filenamenonumber + str(audio_filenumber)
  else:
    output_filename = filenamenonumber + str(filenumber)
  # construct output_video
  output_video = folder + output_filename + output_suffix + '.mp4'
  output_videofile = re.search(r"[^\/]+$", output_video).group()

  # remove last outputs
  !rm -rf temp
  !mkdir 'temp'

  # preview inputs (if enabled)
  if preview_input:
    print("input video:")
    show_video(input_video)
    if vocal_file != "":
      print("input audio:")
      display(Audio(input_audio))
    else:
      print("using", input_videofile, "for audio")
    print("You may want to check now that they're the correct files!")

  last_input_video = input_video
  last_input_audio = input_audio
  shutil.copy(input_video, temp_folder)
  shutil.copy(input_audio, temp_folder)

  #rename temp file to include padding or else changing padding does nothing
  temp_input_video = temp_folder + input_videofile
  renamed_temp_input_video = temp_folder + str(U)+str(D)+str(L)+str(R) + input_videofile
  shutil.copy(temp_input_video, renamed_temp_input_video)
  temp_input_video = renamed_temp_input_video
  temp_input_videofile = re.search(r"[^\/]+$", temp_input_video).group()
  temp_input_audio = temp_folder + input_audiofile

    #trim video if it's longer than the audio
  video_length = get_input_length(temp_input_video)
  audio_length = get_input_length(temp_input_audio)

  if preview_settings:
    batch_process = False

    preview_length_seconds = 1
    converted_preview_frame = frame_to_preview/in_fps
    preview_start_time = min(converted_preview_frame, video_length-preview_length_seconds)

    preview_video_path = "temp/preview_" +str(preview_start_time)+'_' + str(U)+str(D)+str(L)+str(R) + input_videofile
    preview_audio_path = "temp/preview_" + input_audiofile

    if os.path.isfile(preview_video_path):
      os.remove(preview_video_path)

    subprocess.call(['ffmpeg', '-i', temp_input_video, '-ss', str(preview_start_time), '-to', str(preview_start_time+preview_length_seconds), '-c', 'copy', preview_video_path])
    subprocess.call(['ffmpeg', '-i', temp_input_audio, '-ss', str(preview_start_time), '-to', str(preview_start_time+1), '-c', 'copy', preview_audio_path])
    temp_input_video = preview_video_path
    temp_input_audio = preview_audio_path

  if video_length > audio_length:

    trimmed_video_path = "temp/trimmed_" + temp_input_videofile
    if os.path.isfile(trimmed_video_path):
      os.remove(trimmed_video_path)
    with open(os.devnull, 'w') as devnull:
      with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull):
        ffmpeg_extract_subclip(temp_input_video, 0, audio_length, targetname=trimmed_video_path)
    temp_input_video = trimmed_video_path

  #check if face detection has already happened on this clip
  last_detected_face = '/content/'+project_dir+'/face_alignment/last_detected_face.pkl'
  if os.path.isfile('last_file.txt'):
    with open('last_file.txt', 'r') as file:
      last_file = file.readline()
    if last_file != temp_input_video or delete_previous_track:
        if os.path.isfile(last_detected_face):
          os.remove(last_detected_face)

  if os.path.isfile(temp_output):
    os.remove(temp_output)

  #----------------------------Process the inputs!-----------------------------!
  print(f"Processing{' preview:' if preview_settings else ''} {input_videofile} using {input_audiofile} for audio")
  #start processing timer
  #start_time = time.time()


  #execute Wav2Lip & upscaler
  !python 'inference.py' \
  --face "{temp_input_video}" \
  --audio "{temp_input_audio}" \
  --outfile "{temp_output}" \
  --pads {pad_up} {pad_down} {pad_left} {pad_right} \
  --checkpoint_path {checkpoint_path} \
  --out_height {out_height} \
  --fullres {resolution_scale} \
  --quality '{quality}' \
  --mask_dilation '{size}' \
  --mask_feathering '{feathering}' \
  --nosmooth '{nosmooth}' \
  --debug_mask '{debug_mask}' \
  --preview_settings '{preview_settings}' \
  --mouth_tracking '{mouth_tracking}'

  #end processing timer and format the time it took
  end_time = time.time()
  elapsed_time = end_time - start_time
  process_time = int(elapsed_time)
  formatted_process_time = format_time(elapsed_time)

  if preview_settings:
    if os.path.isfile('temp/preview.jpg'):
      clear_output()
      display(Image('temp/preview.jpg'))
      with open('last_file.txt', 'w') as f:
       f.write(temp_input_video)
      break
    else:
      print(f"Processing failed! :( see line above 👆")
      sys.exit("Processing failed")


  #rename temp file and move to correct directory
  if os.path.isfile(temp_output):
    if os.path.isfile(output_video):
      os.remove(output_video)
    !cp "{temp_output}" "{output_video}"
    #show output video
    with open('last_file.txt', 'w') as f:
      f.write(temp_input_video)
    clear_output()
    print(f"{output_filename} successfully lip synced! Find it in the same folder as your input file(s).")
    end_time = time.time()
    elapsed_time = end_time - start_time
    formatted_setup_time = format_time(elapsed_time)
    print(f"Execution time: {formatted_setup_time}")
    print(f"Loading video preview for {output_videofile}...")
    show_video(temp_output)
    #display(Image(filename='results/p.jpg'))
  else:
      print(f"Processing failed! :( see line above 👆")
      process_failed = True

  if batch_process == False:
    #print("Batch Processing disabled")
    if process_failed:
        sys.exit("Processing failed")
    else:
      break

  elif filenumber == "" and audio_filenumber == "":
    print('Files not set for batch processing')
    break

  #-----------------------------Batch Processing!------------------------------!
  if filenumber != "": # if video has a filenumber
    match = re.search(r'\d+', filenumber)
    # add 1 to video filenumber
    filenumber = f"{filenumber[:match.start()]}{int(match.group())+1:0{len(match.group())}d}"

  if audio_filenumber != "": # if audio has a filenumber
    match = re.search(r'\d+', audio_filenumber)
    # add 1 to audio filenumber
    audio_filenumber = f"{audio_filenumber[:match.start()]}{int(match.group())+1:0{len(match.group())}d}"

  # construct input_video
  input_video = folder + filenamenonumber + str(filenumber) + file_type
  input_videofile = re.search(r"[^\/]+$", input_video).group()
  # construct input_audio
  input_audio = audio_folder + audio_filenamenonumber + str(audio_filenumber) + audio_file_type
  input_audiofile = re.search(r"[^\/]+$", input_audio).group()

  # now check which input files exist and what to do for each scenario

  # both +1 files exist - continue processing
  if os.path.exists(input_video) and os.path.exists(input_audio):
    continue

  # video +1 only - continue with last audio file
  if os.path.exists(input_video) and input_video != last_input_video:
    if audio_filenumber != "": # if audio has a filenumber
        match = re.search(r'\d+', audio_filenumber)
        # take 1 from audio filenumber
        audio_filenumber = f"{audio_filenumber[:match.start()]}{int(match.group())-1:0{len(match.group())}d}"
    continue

  # audio +1 only - continue with last video file
  if os.path.exists(input_audio) and input_audio != last_input_audio:
    if filenumber != "": # if video has a filenumber
      match = re.search(r'\d+', filenumber)
      # take 1 from video filenumber
      filenumber = f"{filenumber[:match.start()]}{int(match.group())-1:0{len(match.group())}d}"
    continue

  # neither +1 files exist or current files already processed - finish processing
  print("Finished all sequentially numbered files")
  if process_failed:
     sys.exit("Processing failed on at least one video")
  else:
    break