In [None]:
!mkdir -p ~/.insightface/models/
!wget -c https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx -O ~/.insightface/models/inswapper_128.onnx

In [None]:
import datetime
import numpy as np
import sys
import os
import os.path as osp
import glob
import cv2
import numpy as np
from PIL import Image
from tqdm import tqdm

from IPython.display import Video

import insightface
from insightface.app import FaceAnalysis


sys.path.insert(0,'../src')
from display_utils import display_html, html_table, html_text, html_image

In [None]:
class Engine:
    def __init__(self):
        self.app = FaceAnalysis(name='buffalo_l')
        self.app.prepare(ctx_id=0, det_size=(640, 640))
        self.swapper = insightface.model_zoo.get_model('inswapper_128.onnx', download=True, download_zip=True)


def read_video(video, skip=1):
    video_stream = cv2.VideoCapture(video)
    while 1:
        for _ in range(skip):
            still_reading, frame = video_stream.read()
        if not still_reading:
            video_stream.release()
            break
        yield frame


def bbox_iou(boxA, boxB):
    # determine the (x, y)-coordinates of the intersection rectangle
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    # compute the area of intersection rectangle
    interArea = abs(max((xB - xA, 0)) * max((yB - yA), 0))
    if interArea == 0:
        return 0
    # compute the area of both the prediction and ground-truth
    # rectangles
    boxAArea = abs((boxA[2] - boxA[0]) * (boxA[3] - boxA[1]))
    boxBArea = abs((boxB[2] - boxB[0]) * (boxB[3] - boxB[1]))

    # compute the intersection over union by taking the intersection
    # area and dividing it by the sum of prediction + ground-truth
    # areas - the interesection area
    iou = interArea / float(boxAArea + boxBArea - interArea)

    # return the intersection over union value
    return iou


class FaceLib:
    def __init__(self):
        self.faces = {}
        self.nums = 0

    def simi(self, a, b):
        return np.dot(a, np.array(b).T).max()

    def norm(self, x):
        return x / np.linalg.norm(x)
    
    def put(self, faces, img):
        for face in faces:
            for f in self.faces:
                # print(self.simi(face["embedding"], self.faces[f]["embedding"]))
                sim = self.simi(face.normed_embedding, self.faces[f]["embedding"])
                iou = bbox_iou(face.bbox, self.faces[f]["prev_bbox"])
                if sim > 0.9:
                    self.faces[f]["prev_bbox"] = face.bbox
                    break
                elif sim > 0.8:
                    self.faces[f]["prev_bbox"] = face.bbox
                    self.faces[f]["embedding"].append(face.normed_embedding)
                    break
                elif sim > 0.7 and iou > 0.5:
                    self.faces[f]["prev_bbox"] = face.bbox
                    self.faces[f]["embedding"].append(face.normed_embedding)
                    break
                elif sim > 0.6 and iou > 0.6:
                    self.faces[f]["prev_bbox"] = face.bbox
                    self.faces[f]["embedding"].append(face.normed_embedding)
                    break
                elif sim > 0.5 and iou > 0.7:
                    self.faces[f]["prev_bbox"] = face.bbox
                    self.faces[f]["embedding"].append(face.normed_embedding)
                    break
            else:
                x0, y0, x1, y1 = face["bbox"].round().astype(np.int32)
                img_h, img_w = img.shape[:2]
                h = (y1 - y0) // 2
                w = (x1 - x0) // 2
                x0, y0, x1, y1 = max(x0 - w, 0), max(y0 - h, 0), min(x1 + w, img_w), min(y1 + h, img_h)
                self.faces[self.nums] = {
                    "img": Image.fromarray(cv2.cvtColor(img[y0:y1,x0:x1], cv2.COLOR_BGR2RGB)),
                    "embedding": [face.normed_embedding],
                    "prev_bbox": face.bbox,
                }
                self.nums += 1
        
    def most_simi(self, face):
        sim_index = -1
        sim_value = -1
        for f in self.faces:
            sim = self.simi(face.normed_embedding, self.faces[f]["embedding"])
            if sim > sim_value:
                sim_value = sim
                sim_index = f
        return sim_index, sim_value

In [None]:
engine = Engine()
facelib0 = FaceLib()

display(Video("test_video.mp4"))

for idx, img in enumerate(tqdm(read_video("test_video.mp4"))):
    faces = engine.app.get(img)
    facelib0.put(faces, img)
print("total faces", len(facelib0.faces))
display_html(
    html_table([
        list(facelib0.faces.keys()),
        [html_image(v["img"]) for v in facelib0.faces.values()],
    ])
)

In [None]:
swap_map = {
    0: "faces/xinzhilei.webp",
    4: "faces/xinzhilei.webp",
    11: "faces/xinzhilei.webp",
    
    1: "faces/tangyan.webp",
    10: "faces/tangyan.webp",
    
    3: "faces/huge.webp",
    
    7: "faces/musk.jpeg",

    8: "faces/ronaldo.jpg",
    
}

for k in swap_map:
    img = cv2.imread(swap_map[k])
    face = engine.app.get(img)[0]
    x0, y0, x1, y1 = face["bbox"].round().astype(np.int32)
    img_h, img_w = img.shape[:2]
    h = (y1 - y0) // 2
    w = (x1 - x0) // 2
    x0, y0, x1, y1 = max(x0 - w, 0), max(y0 - h, 0), min(x1 + w, img_w), min(y1 + h, img_h)
    swap_map[k] = {
        "img": Image.fromarray(cv2.cvtColor(img[y0:y1,x0:x1], cv2.COLOR_BGR2RGB)),
        "face": face,
    }



display_html(
    html_table([
        [""] + list(swap_map.keys()),
        ["source"] + [html_image(facelib0.faces[k]["img"]) for k in swap_map],
        ["target"] +[html_image(v["img"]) for v in swap_map.values()],
    ])
)

In [None]:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
!rm temp.mp4

video_writer = None

for idx, img in enumerate(tqdm(read_video("test_video.mp4"))):
    faces = engine.app.get(img)
    if video_writer is None:
        video_writer = cv2.VideoWriter("temp.mp4", fourcc, 23.98, img.shape[1::-1])
    for face in faces:
        sim_index, sim_value = facelib0.most_simi(face)
        if sim_value > 0.6 and sim_index in swap_map:
            img = engine.swapper.get(img, face, swap_map[sim_index]["face"], paste_back=True)
    video_writer.write(img)

video_writer.release()

!ffmpeg -i temp.mp4 -i test_video.mp4 -vcodec h264 -y out.mp4
display(Video("out.mp4"))