In [None]:
!python run.py --source ted.jpeg --target van.jpeg --output roop.jpeg --execution-provider cuda

In [None]:
import roop.globals
import roop.ui as ui
import shutil
from roop.processors.frame.core import get_frame_processors_modules
from roop.utilities import normalize_output_path


In [None]:
import onnxruntime

def encode_execution_providers(execution_providers):
    return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]

def decode_execution_providers(execution_providers):
    return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
        if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]

def suggest_execution_providers():
    return encode_execution_providers(onnxruntime.get_available_providers())

def suggest_execution_threads() -> int:
    if 'CUDAExecutionProvider' in onnxruntime.get_available_providers():
        return 8
    return 1

def update_status(message: str, scope: str = 'ROOP.CORE') -> None:
    print(f'[{scope}] {message}')
    if not roop.globals.headless:
        ui.update_status(message)

In [None]:
from roop.utilities import normalize_output_path
# Default values taken from program.add_argument where provided
roop.globals.source_path = "ted.jpeg"  # default Python value for not specified
roop.globals.target_path = "van.jpeg"  # default Python value for not specified
roop.globals.output_path = None  # Assuming normalization handles None values
roop.globals.headless = None is not None and None is not None and None is not None  # Resulting in False
roop.globals.frame_processors = ['face_swapper']  # Default provided
roop.globals.keep_fps = False  # Default for store_true action is False
roop.globals.keep_frames = False  # Default for store_true action is False
roop.globals.skip_audio = False  # Default for store_true action is False
roop.globals.many_faces = False  # Default for store_true action is False
roop.globals.reference_face_position = 0  # Default provided
roop.globals.reference_frame_number = 0  # Default provided
roop.globals.similar_face_distance = 0.85  # Default provided
roop.globals.temp_frame_format = 'png'  # Default provided
roop.globals.temp_frame_quality = 0  # Default provided
roop.globals.output_video_encoder = 'libx264'  # Default provided
roop.globals.output_video_quality = 35  # Default provided
roop.globals.max_memory = None  # No default provided in add_argument
roop.globals.execution_providers = decode_execution_providers(['cpu'])  # Default provided
roop.globals.execution_threads = suggest_execution_threads()  # Default provided dynamically
roop.globals.headless = True  # Default for store_true action is False


In [None]:
from typing import Dict, List, Any, Tuple
import io
import base64
import sys
sys.path.append('CodeFormer/CodeFormer')
import os
import cv2
import torch
import torch.nn.functional as F
import gradio as gr
from PIL import Image
import numpy as np
from matplotlib import pyplot as plt

from torchvision.transforms.functional import normalize

from basicsr.utils import imwrite, img2tensor, tensor2img
from basicsr.utils.download_util import load_file_from_url
from facelib.utils.face_restoration_helper import FaceRestoreHelper
from facelib.utils.misc import is_gray
from basicsr.archs.rrdbnet_arch import RRDBNet
from basicsr.utils.realesrgan_utils import RealESRGANer

from basicsr.utils.registry import ARCH_REGISTRY

In [None]:
pretrain_model_url = {
    'codeformer': 'https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/codeformer.pth',
    'detection': 'https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/detection_Resnet50_Final.pth',
    'parsing': 'https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/parsing_parsenet.pth',
    'realesrgan': 'https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/RealESRGAN_x2plus.pth'
}
# download weights
if not os.path.exists('CodeFormer/CodeFormer/weights/CodeFormer/codeformer.pth'):
    load_file_from_url(url=pretrain_model_url['codeformer'], model_dir='CodeFormer/CodeFormer/weights/CodeFormer', progress=True, file_name=None)
if not os.path.exists('/CodeFormer/CodeFormer/weights/facelib/detection_Resnet50_Final.pth'):
    load_file_from_url(url=pretrain_model_url['detection'], model_dir='CodeFormer/CodeFormer/weights/facelib', progress=True, file_name=None)
if not os.path.exists('CodeFormer/CodeFormer/weights/facelib/parsing_parsenet.pth'):
    load_file_from_url(url=pretrain_model_url['parsing'], model_dir='CodeFormer/CodeFormer/weights/facelib', progress=True, file_name=None)
if not os.path.exists('CodeFormer/CodeFormer/weights/realesrgan/RealESRGAN_x2plus.pth'):
    load_file_from_url(url=pretrain_model_url['realesrgan'], model_dir='CodeFormer/CodeFormer/weights/realesrgan', progress=True, file_name=None)


def imread(img_path):
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

# set enhancer with RealESRGAN
def set_realesrgan():
    half = True if torch.cuda.is_available() else False
    model = RRDBNet(
        num_in_ch=3,
        num_out_ch=3,
        num_feat=64,
        num_block=23,
        num_grow_ch=32,
        scale=2,
    )
    upsampler = RealESRGANer(
        scale=2,
        model_path="CodeFormer/CodeFormer/weights/realesrgan/RealESRGAN_x2plus.pth",
        model=model,
        tile=400,
        tile_pad=40,
        pre_pad=0,
        half=half,
    )
    return upsampler

In [None]:
upsampler = set_realesrgan()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
codeformer_net = ARCH_REGISTRY.get("CodeFormer")(
    dim_embd=512,
    codebook_size=1024,
    n_head=8,
    n_layers=9,
    connect_list=["32", "64", "128", "256"],
).to(device)
ckpt_path = "CodeFormer/CodeFormer/weights/CodeFormer/codeformer.pth"
checkpoint = torch.load(ckpt_path)["params_ema"]
codeformer_net.load_state_dict(checkpoint)
codeformer_net.eval()

#os.makedirs('output', exist_ok=True)

In [None]:
def inference(args):
    """Run a single prediction on the model"""
    image, face_align, background_enhance, face_upsample, upscale, codeformer_fidelity = args
    try: # global try
        # take the default setting for the demo
        only_center_face = False
        draw_box = False
        detection_model = "retinaface_resnet50"

        #print('Inp:', image, background_enhance, face_upsample, upscale, codeformer_fidelity)
        face_align = face_align if face_align is not None else True
        background_enhance = background_enhance if background_enhance is not None else True
        face_upsample = face_upsample if face_upsample is not None else True
        upscale = upscale if (upscale is not None and upscale > 0) else 2

        has_aligned = not face_align
        upscale = 1 if has_aligned else upscale

        #img = cv2.imread(str(image), cv2.IMREAD_COLOR)
        img = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        #print('\timage size:', img.shape)

        upscale = int(upscale) # convert type to int
        if upscale > 4: # avoid memory exceeded due to too large upscale
            upscale = 4 
        if upscale > 2 and max(img.shape[:2])>1000: # avoid memory exceeded due to too large img resolution
            upscale = 2 
        if max(img.shape[:2]) > 1500: # avoid memory exceeded due to too large img resolution
            upscale = 1
            background_enhance = False
            face_upsample = False

        face_helper = FaceRestoreHelper(
            upscale,
            face_size=512,
            crop_ratio=(1, 1),
            det_model=detection_model,
            save_ext="png",
            use_parse=True,
            device=device,
        )
        bg_upsampler = upsampler if background_enhance else None
        face_upsampler = upsampler if face_upsample else None

        if has_aligned:
            # the input faces are already cropped and aligned
            img = cv2.resize(img, (512, 512), interpolation=cv2.INTER_LINEAR)
            face_helper.is_gray = is_gray(img, threshold=5)
            if face_helper.is_gray:
                #print('\tgrayscale input: True')
                xyz = 5
            face_helper.cropped_faces = [img]
        else:
            face_helper.read_image(img)
            # get face landmarks for each face
            num_det_faces = face_helper.get_face_landmarks_5(
            only_center_face=only_center_face, resize=640, eye_dist_threshold=5
            )
            #print(f'\tdetect {num_det_faces} faces')
            # align and warp each face
            face_helper.align_warp_face()

        # face restoration for each cropped face
        for idx, cropped_face in enumerate(face_helper.cropped_faces):
            # prepare data
            cropped_face_t = img2tensor(
                cropped_face / 255.0, bgr2rgb=True, float32=True
            )
            normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True)
            cropped_face_t = cropped_face_t.unsqueeze(0).to(device)

            try:
                with torch.no_grad():
                    output = codeformer_net(
                        cropped_face_t, w=codeformer_fidelity, adain=True
                    )[0]
                    restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1))
                del output
                torch.cuda.empty_cache()
            except RuntimeError as error:
                #print(f"Failed inference for CodeFormer: {error}")
                restored_face = tensor2img(
                    cropped_face_t, rgb2bgr=True, min_max=(-1, 1)
                )

            restored_face = restored_face.astype("uint8")
            face_helper.add_restored_face(restored_face)

        # paste_back
        if not has_aligned:
            # upsample the background
            if bg_upsampler is not None:
                # Now only support RealESRGAN for upsampling background
                bg_img = bg_upsampler.enhance(img, outscale=upscale)[0]
            else:
                bg_img = None
            face_helper.get_inverse_affine(None)
            # paste each restored face to the input image
            if face_upsample and face_upsampler is not None:
                restored_img = face_helper.paste_faces_to_input_image(
                    upsample_img=bg_img,
                    draw_box=draw_box,
                    face_upsampler=face_upsampler,
                )
            else:
                restored_img = face_helper.paste_faces_to_input_image(
                    upsample_img=bg_img, draw_box=draw_box
                )
        else:
            restored_img = restored_face

        # save restored img
        save_path = f'output/out.png'
        imwrite(restored_img, str(save_path))

        restored_img = cv2.cvtColor(restored_img, cv2.COLOR_BGR2RGB)
        return restored_img
    except Exception as error:
        print('Global exception', error)
        return None, None

In [None]:
def encode_image(image: Image, img_format: int = "JPEG") -> str:
    """
    Encodes image to base64 string.
    Args:
        image: PIL image
        img_format: image format
    Returns:
        base64 string
    """
    img_byte_arr = io.BytesIO()
    image.save(img_byte_arr, format=img_format)
    img_byte_arr = img_byte_arr.getvalue()
    img_byte_arr = base64.b64encode(img_byte_arr).decode("utf-8")
    return img_byte_arr


def decode_image(image: str) -> Image:
    """
    Decodes base64 string to PIL image.
    Args:
        image: base64 string
    Returns:
        PIL image
    """
    img_byte_arr = base64.b64decode(image)
    img_byte_arr = io.BytesIO(img_byte_arr)
    img_byte_arr = Image.open(img_byte_arr)
    return img_byte_arr

def image_grid(imgs: List[Any], rows: int = 2, cols: int = 2):
    """
    Creates a grid of images.
    Args:
        imgs: list of PIL images
        rows: number of rows
        cols: number of columns
    Returns:
        PIL image
    """
    w, h = imgs[0].size
    grid = Image.new("RGB", size=(cols * w, rows * h))

    for i, img in enumerate(imgs):
        grid.paste(img, box=(i % cols * w, i // cols * h))
    return grid

In [None]:
input_folder = "inputs"
bases_folder = "bases"
input_imgs = [f"{input_folder}/{img}" for img in os.listdir(input_folder)]
base_imgs = [f"{bases_folder}/{img}" for img in os.listdir(bases_folder)]
input_imgs.sort()
base_imgs.sort()

In [None]:
def roop_api(source_image, target_image):
    """
    Swaps faces in source image with faces in target image.
    Args:
        source_image: source image
        target_image: target image
    Returns:
        swapped image
    """
    address = "http://gamma.pons.ai:1861"
    source_base64 = encode_image(Image.open(source_image))
    target_base64 = encode_image(Image.open(target_image))
    payload = {
        "source_image": source_base64,
        "target_image": target_base64,
        "face_index": [0],
        "scale": 1,
        "upscale_visibility": 1,
        "face_restorer": "CodeFormer",
        "restorer_visibility": 1,
        "model": "inswapper_128.onnx",
    }
    out = requests.post(url=f"{address}/roop/image", json=payload)
    gen_image = decode_image(out.json()["image"])
    return gen_image

In [None]:
'''
from matplotlib import pyplot as plt
import requests
api_imgs = []
for i in range(4):
    in_img = input_imgs[i]
    base_img = base_imgs[i]
    gen_img = roop_api(in_img, base_img)
    api_imgs.append(gen_img)
grid = image_grid(api_imgs, cols=len(api_imgs)//2, rows=2)
plt.figure(figsize=(70, 10))
plt.imshow(grid)
plt.show()
#grid.save("api_grid.jpeg")
'''

In [None]:
import cv2
import concurrent.futures
imgs = []
args_roop = []
for i in range(4):
    in_img = Image.open(input_imgs[i])
    base_img = Image.open(base_imgs[i])
    arg_roop = (in_img, base_img)
    args_roop.append(arg_roop)

frame_processors = ["face_swapper"]
#shutil.copy2(base_img, roop.globals.output_path)
for frame_processor in get_frame_processors_modules(frame_processors):
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(frame_processor.process_image, args_roop))

imgs = [cv2.cvtColor(img, cv2.COLOR_BGR2RGB) for img in results]

args_codeformer = [(img, True, False, False, 0, 1.0) for img in imgs]
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(inference, args_codeformer))
imgs = [Image.fromarray(img) for img in results]
grid = image_grid(imgs, cols=len(imgs)//2, rows=2)
plt.figure(figsize=(70, 10))
plt.imshow(grid)
plt.show()