## [실습 환경 Setting]
### 1. CodeFormer 설치
- Download CodeFormer pjt :
    ```
    git clone https://github.com/sczhou/CodeFormer.git
    <python_home>/Lib/site-packages 위치에 CodeFormer directory 통 copy
    cd <python_home>/Lib/site-packages/CodeFormer
    pip install -r requirements.txt
    python basicsr/setup.py develop
    ```
### 2. CodeFormer 용 AI Model 설치
- Download CodeFormer model(Face Restoring model):
    https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/codeformer.pth
    
    저장 위치 : python code 에 model 위치 명시
- Download Face relevant models :
    https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/detection_Resnet50_Final.pth
    https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/parsing_parsenet.pth
    
    저장 위치 : <python_home>/Lib/site-packages/CodeFormer/weights/facelib

### 3. insightface 설치
- pip install insightface
- pip install onnxruntime #for CPU-only
- pip install onnxruntime-gpu #For GPU
### 4. OpenCV 재설치
- insightface 설치 시 opencv-python(또는 opencv-python-headless)가 자동 설치됨. uninstall 후 opencv-contrib-python 설치
- pip uninstall opencv-python
- pip uninstall opencv-python-headless
- pip install opencv-contrib-python
### 5. insightface 용 AI Model 설치
#### 5-1. Facial Anaysis 모델 설치
- buffalo_l download from : 
    https://github.com/deepinsight/insightface/releases

- unzip buffalo_l.zip on `C:\Users\<user>\.insightface\models\buffalo_l`
- Facial Analysis 모델 : 
    - 얼굴 감지: 먼저 이미지에서 얼굴을 찾아내고, 얼굴의 위치 감지. 여러 얼굴이 있는 경우 각 얼굴의 위치를 정확하게 추출.
    - 얼굴 특징 추출: 감지된 얼굴에서 고유한 특징(임베딩)을 추출하여 이를 벡터 형식으로 표현. 이 특징은 각 얼굴을 고유하게 나타내며, 다른 얼굴과 비교할 때 사용될 수 있음.
- `buffalo_l` 모델이 insightface에서 사용할 수 있는 모델 중 정확도가 가장(그나마) 높음
```
Recognition Accuracy:

+-------+-------+--------+-------+--------+--------+------+----+------+-------+
| Name  | MR-ALL| African| Cauca | South  | East   | LFW  | CF | AgeD | IJB-C |
|       |       |        | sian  | Asian  | Asian  |      | P- | B-30 | (E4)  |
|       |       |        |       |        |        |      | FP |      |       |
+=======+=======+========+=======+========+========+======+====+======+=======+
| buffa | 91.25 | 90.29  | 94.70 | 93.16  | 74.96  | 99.83| 99 | 98.23| 97.25 |
| lo_l  |       |        |       |        |        |      | .33|      |       |
```
#### 5-2. Face Swap 모델 설치
- Download `inswapper_128.onnx`
- 저장 위치 : python code 에 model 위치 명시

## [실습]
### 1. Face Detection

In [None]:
# 얼굴 인식을 위해 InsightFace를 사용하는 샘플 코드

import cv2
from insightface.app import FaceAnalysis

# FaceAnalysis 객체 초기화 (사전 학습된 모델 사용)
app = FaceAnalysis(name='buffalo_l')  # 'buffalo_l'는 사전 학습된 모델 이름입니다.
app.prepare(ctx_id=-1)  # ctx_id=0은 GPU 사용, ctx_id=-1은 CPU 사용

# NMS 임계값 설정
# - 낮출수록 더 많은 얼굴이 검출될 수 있지만 오탐률이 증가할 수 있음
app.det_model.nms_thresh = 0.6

# 이미지 파일 읽기
img = cv2.imread("./faces/group_image.jpg")  # 처리할 이미지 파일의 경로로 변경하세요.
if img is None:
    raise FileNotFoundError(f"이미지를 불러올 수 없습니다. 경로를 확인하세요")

# 얼굴 검출 및 임베딩 추출
faces = app.get(img)

# 검출된 얼굴 처리
for idx, face in enumerate(faces):
    # 얼굴 영역 표시
    bbox = face.bbox.astype(int)
    cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
    # 얼굴 임베딩 출력
    #print(f"얼굴 {idx+1} 임베딩 벡터:\n{face.embedding}")

# 결과 이미지 표시
cv2.imshow('Detection Result', img)
cv2.waitKey(0)
cv2.destroyAllWindows()

### 2. Face Recognition

In [None]:
import cv2
import numpy as np
from insightface.app import FaceAnalysis
from sklearn.metrics.pairwise import cosine_similarity

# FaceAnalysis 객체 초기화 (사전 학습된 모델 사용)
app = FaceAnalysis(name='buffalo_l')  # 'buffalo_l'는 사전 학습된 모델 이름입니다.
app.prepare(ctx_id=-1)  # ctx_id=0은 GPU 사용, ctx_id=-1은 CPU 사용

# 비교할 기준 얼굴 이미지 로드 및 임베딩 추출
ref_img_path = "./faces/old_man.jpg"  # 기준 얼굴 이미지 경로로 변경하세요.
ref_img = cv2.imread(ref_img_path)
if ref_img is None:
    raise FileNotFoundError(f"기준 이미지를 불러올 수 없습니다. 경로를 확인하세요: {ref_img_path}")

ref_faces = app.get(ref_img)
if len(ref_faces) == 0:
    raise ValueError("기준 이미지에서 얼굴을 검출하지 못했습니다.")

# 기준 얼굴의 임베딩 추출 (첫 번째 얼굴 사용)
ref_embedding = ref_faces[0].embedding

# 비교할 대상 이미지 로드 및 얼굴 임베딩 추출
target_img_path = "./faces/group_image.jpg"  # 대상 이미지 경로로 변경하세요.
target_img = cv2.imread(target_img_path)
if target_img is None:
    raise FileNotFoundError(f"대상 이미지를 불러올 수 없습니다. 경로를 확인하세요: {target_img_path}")

target_faces = app.get(target_img)

# target_faces를 x축 기준으로 정렬 (좌에서 우로)
target_faces.sort(key=lambda face: face.bbox[0])

# 검출된 얼굴들에 대해 유사도 계산 및 표시
for idx, face in enumerate(target_faces):
    # 대상 얼굴의 임베딩 추출
    target_embedding = face.embedding

    # 코사인 유사도 계산
    similarity = cosine_similarity([ref_embedding], [target_embedding])[0][0]

    # 유사도 출력
    print(f"얼굴 {idx} 유사도: {similarity:.4f}")

    # 얼굴 영역 표시
    bbox = face.bbox.astype(int)
    cv2.rectangle(target_img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
    # 유사도 텍스트 표시
    cv2.putText(target_img, f"{idx} : {similarity:.2f}", (bbox[0], bbox[1]-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

    # 이미지 너무 커서 1/4로 줄임
    resized_targer_img = cv2.resize(target_img, (int(target_img.shape[1] / 2), int(target_img.shape[0] / 2)))

# 결과 이미지 표시
cv2.imshow('Similarity Result', resized_targer_img)
cv2.waitKey(0)
cv2.destroyAllWindows()

### 3. Face Swapping
#### 3-1. Face Swap 모델
- `inswapper_128.onnx`
- Face Swap 모델은 얼굴을 교체하는 데 중점
- 128x128 해상도의 얼굴 이미지를 사용, 원본 얼굴 이미지의 세부 사항이 손실
- Face Restoration 기술을 사용해 손실된 디테일을 복원하고, 저화질의 이미지를 향상 필요
#### 3-2. Class FaceSwapper
- `ndarray` Type image를 받아서 Face Detection 후 Face Swap 를 수행하는 Class

In [None]:
import cv2
import insightface
from insightface.app import FaceAnalysis
from insightface.data import get_image as ins_get_image
import numpy as np

assert insightface.__version__ >= '0.7'

#SWAPPER_MODLE = 'C:\\Users\\tanmi\\stable-diffusion-webui\\models\\insightface\\inswapper_128.onnx'
SWAPPER_MODLE = 'C:\\pypjt\\env\\inswapper_128.onnx'

class FaceSwapper:
    def __init__(self, model_name='buffalo_l', ctx_id=0, det_size=(640, 640)):
        # 얼굴 분석 모델 초기화
        self.app = FaceAnalysis(name=model_name)
        self.app.prepare(ctx_id=ctx_id, det_size=det_size)
        # 얼굴 교체 모델 로드
        self.swapper = insightface.model_zoo.get_model(
            SWAPPER_MODLE, download=True, download_zip=True
        )
        # 소스 얼굴 초기화
        self.source_face = None
        self.enhanced=False

    def set_source_face(self, img, face_index=0, enhanced=False):
        """
        이미지에서 소스 얼굴을 설정합니다.
        img: 이미지 파일 경로나 numpy.ndarray 이미지
        face_index: 선택할 얼굴의 인덱스 (기본값: 0)
        """
        # 이미지 로드 (파일 경로 또는 ndarray 처리)
        if isinstance(img, str):
            img = cv2.imread(img)
            if img is None:
                print(f"이미지를 로드할 수 없습니다: {img}")
                return False
        elif not isinstance(img, np.ndarray):
            print("유효한 이미지 또는 이미지 경로를 입력해 주세요.")
            return False

        # 얼굴 검출
        faces = self.app.get(img)
        if len(faces) == 0:
            print("소스 이미지에서 얼굴을 감지하지 못했습니다.")
            return False
        # 얼굴을 x 좌표 기준으로 정렬
        faces = sorted(faces, key=lambda x: x.bbox[0])
        if face_index >= len(faces):
            print(f"소스 얼굴 인덱스가 범위를 벗어났습니다. 총 감지된 얼굴 수: {len(faces)}")
            return False
        # 소스 얼굴 설정
        self.source_face = faces[face_index]
        print(f"소스 얼굴이 설정되었습니다. 인덱스: {face_index}")

        self.enhanced=enhanced
        
        return True

    def enhance_image(self, img):
        # 샤프닝
        kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
        sharpened = cv2.filter2D(img, -1, kernel)
        
        # 노이즈 제거
        denoised = cv2.fastNlMeansDenoisingColored(sharpened, None, 10, 10, 7, 21)
        
        # 대비 향상
        lab = cv2.cvtColor(denoised, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
        cl = clahe.apply(l)
        enhanced_lab = cv2.merge((cl,a,b))
        enhanced = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2BGR)
        
        return enhanced
    
    def swap_faces_in_image(self, img):
        """
        ndarray 이미지를 입력으로 받아 얼굴 교체를 수행하고, 결과 이미지를 반환합니다.
        """
        if self.source_face is None:
            print("소스 얼굴이 설정되지 않았습니다. 먼저 set_source_face 메서드를 호출하여 소스 얼굴을 설정하세요.")
            return None
        # 얼굴 검출
        faces = self.app.get(img)
        if len(faces) == 0:
            print("대상 이미지에서 얼굴이 감지되지 않았습니다.")
            return None
        # 얼굴을 x 좌표 기준으로 정렬
        faces = sorted(faces, key=lambda x: x.bbox[0])
        # 얼굴 교체 수행
        res = img.copy()
        for face in faces:
            res = self.swapper.get(res, face, self.source_face, paste_back=True)
        
        # 이미지 품질 향상
        if self.enhanced:
            res = self.enhance_image(res)

        return res

    def extract_and_swap_faces_in_image(self, img):
        """
        ndarray 이미지를 입력으로 받아 개별 얼굴을 교체한 이미지를 반환합니다.
        """
        if self.source_face is None:
            print("소스 얼굴이 설정되지 않았습니다. 먼저 set_source_face 메서드를 호출하여 소스 얼굴을 설정하세요.")
            return None
        # 얼굴 검출
        faces = self.app.get(img)
        if len(faces) == 0:
            print("대상 이미지에서 얼굴이 감지되지 않았습니다.")
            return None
        # 얼굴을 x 좌표 기준으로 정렬
        faces = sorted(faces, key=lambda x: x.bbox[0])
        # 개별 얼굴 교체 및 추출
        res = []
        for face in faces:
            _img, _ = self.swapper.get(img, face, self.source_face, paste_back=False)
            res.append(_img)
        if len(res) == 0:
            print("교체된 얼굴이 없습니다.")
            return None
        res = np.concatenate(res, axis=1)
        return res

#### 3-3. Face Swapping example
- 전체 얼굴 모두 Swap

In [None]:
face_swapper = FaceSwapper(det_size=(320, 320))

# 소스 얼굴 이미지 로드
source_img = cv2.imread("./faces/asian_girl.jpg")
    
# 소스 얼굴 설정 (face_index는 선택 사항)
success = face_swapper.set_source_face(source_img, face_index=0)
if not success:
    print("소스 얼굴 설정에 실패했습니다.")
    exit()
    
# 대상 이미지 로드
target_img = cv2.imread('./faces/group_image.jpg')
    
# 얼굴 교체 수행 (ndarray 이미지를 입력으로 받아 결과를 ndarray로 반환)
swapped_img = face_swapper.swap_faces_in_image(target_img)

if swapped_img is not None:
    # 결과 이미지 표시
    cv2.imshow('Result', swapped_img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()    
else:
    print("얼굴 교체에 실패했습니다.")

- 특정인 얼굴을 식별해서 Swap

In [None]:
import cv2
import numpy as np
from insightface.app import FaceAnalysis
from sklearn.metrics.pairwise import cosine_similarity

# FaceAnalysis 객체 초기화 (사전 학습된 모델 사용)
app = FaceAnalysis(name='buffalo_l')  # 'buffalo_l'는 사전 학습된 모델 이름입니다.
app.prepare(ctx_id=-1)  # ctx_id=0은 GPU 사용, ctx_id=-1은 CPU 사용

# 비교할 기준 얼굴 이미지 로드 및 임베딩 추출
ref_img = cv2.imread("./faces/old_man.jpg")
if ref_img is None:
    raise FileNotFoundError(f"기준 이미지를 불러올 수 없습니다. 경로를 확인하세요: {ref_img_path}")

ref_faces = app.get(ref_img)
if len(ref_faces) == 0:
    raise ValueError("기준 이미지에서 얼굴을 검출하지 못했습니다.")

# 기준 얼굴의 임베딩 추출 (첫 번째 얼굴 사용)
ref_embedding = ref_faces[0].embedding

# 소스 얼굴 이미지 로드
source_img = cv2.imread("./faces/asian_girl.jpg")
    
# 소스 얼굴 설정 (face_index는 선택 사항)
face_swapper = FaceSwapper(det_size=(320, 320))
success = face_swapper.set_source_face(source_img, face_index=0)
if not success:
    print("소스 얼굴 설정에 실패했습니다.")
    exit()
    
# 대상 이미지 로드
target_img = cv2.imread('./faces/group_image.jpg')

target_faces = app.get(target_img)

# 검출된 얼굴들에 대해 유사도 계산 및 표시
for face in target_faces:
    # 대상 얼굴의 임베딩 추출
    target_embedding = face.embedding

    # 코사인 유사도 계산
    similarity = cosine_similarity([ref_embedding], [target_embedding])[0][0]

    # 얼굴 영역 표시
    bbox = face.bbox.astype(int)

    if similarity > 0.3:
        # Extract face region
        face_region = target_img[bbox[1]:bbox[3], bbox[0]:bbox[2]]
        swapped_img = face_swapper.swap_faces_in_image(face_region)
        target_img[bbox[1]:bbox[3], bbox[0]:bbox[2]] = swapped_img

    cv2.rectangle(target_img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
    # 유사도 텍스트 표시
    cv2.putText(target_img, f"{similarity:.2f}", (bbox[0], bbox[1]-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

# 결과 이미지 표시
cv2.imshow('Swap Result', target_img)
cv2.waitKey(0)
cv2.destroyAllWindows()

### 4. Face Restoration
#### 4-1. Face Restoration 모델
- `codeformer.pth`
- Face Swap 후에 디테일을 복구하여 최종 이미지를 더 매끄럽고 고품질로 만드는 것이 목적
#### 4-2. Function `restore_face`
- image에서 얼굴을 찾아 복구(restoration) 함
- `ndarray` type image를 받아서 `ndarray` type image로 return

In [3]:
import cv2
import torch
from torchvision.transforms.functional import normalize
from basicsr.utils import img2tensor, tensor2img
from facelib.utils.face_restoration_helper import FaceRestoreHelper
from basicsr.utils.registry import ARCH_REGISTRY

# 모델 경로 설정
CODEFORMER_MODEL = "C:\\pypjt\\env\\codeformer.pth"

def restore_face(input_image, use_gpu=False):
    """
    얼굴 복원 함수
    
    :param input_image: ndarray 타입의 입력 이미지
    :param model_path: CodeFormer 모델 파일 경로
    :param use_gpu: GPU 사용 여부 (기본값: False)
    :return: ndarray 타입의 복원된 이미지
    """
    # 모델 로드
    device = torch.device('cuda' if use_gpu and torch.cuda.is_available() else 'cpu')
    model = ARCH_REGISTRY.get('CodeFormer')(dim_embd=512, codebook_size=1024, n_head=8, n_layers=9, connect_list=['32', '64', '128', '256']).to(device)
    
    checkpoint = torch.load(CODEFORMER_MODEL, weights_only=True, map_location=device)['params_ema']
    model.load_state_dict(checkpoint)
    model.eval()

    # 이미지 크기 저장
    h, w, _ = input_image.shape

    # 얼굴 검출 및 정렬
    face_helper = FaceRestoreHelper(
        upscale_factor=1,
        face_size=512,
        crop_ratio=(1, 1),
        det_model='retinaface_resnet50',
        save_ext='png',
        use_parse=True,
        device=device
    )
    face_helper.read_image(input_image)
    face_helper.get_face_landmarks_5(only_center_face=False, resize=640, eye_dist_threshold=5)
    face_helper.align_warp_face()

    # 얼굴 복원
    for idx, cropped_face in enumerate(face_helper.cropped_faces):
        cropped_face_t = img2tensor(cropped_face / 255., bgr2rgb=True, float32=True)
        normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True)
        cropped_face_t = cropped_face_t.unsqueeze(0).to(device)

        try:
            with torch.no_grad():
                output = model(cropped_face_t, w=0.5, adain=True)[0]
                restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1))
            del output
            if use_gpu:
                torch.cuda.empty_cache()
        except RuntimeError as error:
            print(f'Error: {error}')
            print('If you encounter CUDA out of memory, try to set --tile with a smaller number.')
        else:
            restored_face = restored_face.astype('uint8')
            face_helper.add_restored_face(restored_face)

    # 결과 생성
    face_helper.get_inverse_affine(None)
    restored_img = face_helper.paste_faces_to_input_image()
    
    # 최종 이미지 크기 조정 (원본 크기로)
    restored_img = cv2.resize(restored_img, (w, h))

    return restored_img

#### 4-3. Face Restoring Example

In [None]:
import cv2
import numpy as np

# 입력 이미지 로드
input_img = cv2.imread("./faces/asian_girl_low_quality.jpg")

# 얼굴 복원 함수 호출
restored_img = restore_face(input_img)

if restored_img is not None:
    # 텍스트를 추가할 이미지 복사
    input_with_text = input_img.copy()
    restored_with_text = restored_img.copy()

    # 폰트 설정
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1
    color = (255, 255, 255)  # 흰색
    thickness = 2

    # 텍스트 추가 (변경 전)
    cv2.putText(input_with_text, "Before", (10, 30), font, font_scale, color, thickness)

    # 텍스트 추가 (변경 후)
    cv2.putText(restored_with_text, "After", (10, 30), font, font_scale, color, thickness)

    # 입력 이미지와 복원된 이미지를 좌우로 붙이기
    combined_img = np.hstack((input_with_text, restored_with_text))

    # 결과 이미지 표시
    cv2.imshow('Result', combined_img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()    
else:
    print("얼굴 복구에 실패했습니다.")


#### 4-4. Face Swap & Restoring Example

In [None]:
import cv2
import numpy as np

# FaceSwapper 클래스 초기화
face_swapper = FaceSwapper(det_size=(320, 320))

# 소스 얼굴 이미지 로드
source_img = cv2.imread("./faces/old_man.jpg")

# 소스 얼굴 설정
success = face_swapper.set_source_face(source_img, face_index=0)
if not success:
    print("소스 얼굴 설정에 실패했습니다.")
    exit()

# 대상 이미지 로드
target_img = cv2.imread("./faces/asian_girl.jpg")

# 얼굴 교체 수행
swapped_img = face_swapper.swap_faces_in_image(target_img)
restored_swapped_img = restore_face(swapped_img)

if restored_swapped_img is not None:
    # 텍스트 추가를 위한 폰트 설정
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 2
    color = (221, 226, 255)  # 
    thickness = 4

    # 원본 이미지에 텍스트 추가 (Original Image)
    target_with_text = target_img.copy()
    cv2.putText(target_with_text, "Original Image", (50, 50), font, font_scale, color, thickness)

    # 교체된 이미지에 텍스트 추가 (Swapped Image)
    swapped_with_text = swapped_img.copy()
    cv2.putText(swapped_with_text, "Swapped Image", (50, 50), font, font_scale, color, thickness)

    # 복원된 이미지에 텍스트 추가 (Restored Image)
    restored_with_text = restored_swapped_img.copy()
    cv2.putText(restored_with_text, "Restored Image", (50, 50), font, font_scale, color, thickness)

    # 세 이미지를 가로로 결합
    combined_img = np.hstack((target_with_text, swapped_with_text, restored_with_text))

    # 결합된 이미지 너무 커서 1/4로 줄임
    resized_combined_img = cv2.resize(combined_img, (int(combined_img.shape[1] / 2), int(combined_img.shape[0] / 2)))
    # 결과 이미지 표시
    cv2.imshow('Result', resized_combined_img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
else:
    print("얼굴 교체에 실패했습니다.")

### 5. [응용] Video Face Swapping
### 5-1. Class `VideoFaceProcessor`
- Video상의 특정 인물의 Face에 대해 여러가지 조작을 수행하는 Class
### 5-2. Class `VideoFaceProcessor` 적용된 주요 기술
- OpenCV (cv2):
    - OpenCV는 컴퓨터 비전 라이브러리로, 동영상이나 이미지 처리에 자주 사용. 이 코드에서는 동영상을 불러오고 처리하며, 얼굴 인식 영역을 그리기 위해 사용.
- Face Detection & Recognition (InsightFace):
    - 얼굴 탐지와 얼굴 임베딩 추출을 위해 사용. 얼굴 임베딩은 각 얼굴을 고유한 숫자 벡터로 표현하는 방식으로, 얼굴의 유사성을 측정하기 위해 활용.
- Cosine Similarity (코사인 유사도):
    - 두 얼굴 임베딩 벡터 간의 유사도를 계산. 유사도가 높을수록 얼굴이 더 비슷하다는 의미. 이를 통해 대상 얼굴이 참조 이미지와 얼마나 일치하는지 판단 가능.
- Tracker (추적기):
    - 특정 인물의 얼굴을 비디오에서 지속적으로 추적하는 데 사용. 여기서는 KCF 추적기를 사용하여 얼굴의 위치를 추적하며, 추적 결과에 따라 얼굴 swap을 적용.
- video_swap method:
    - 동영상을 프레임 단위로 처리하는 데코레이터. 사용자는 이 데코레이터를 이용해 프레임별로 얼굴에 추가 작업(예: Face Swap)을 수행. func 인자에 전달된 함수가 프레임 내 얼굴 영역에 대해 호출되며, 결과를 처리할 수 있음.

In [None]:
import cv2
import os
from insightface.app import FaceAnalysis
from sklearn.metrics.pairwise import cosine_similarity

class VideoFaceProcessor:
    def __init__(self, 
            base_image, 
            target_video, 
            tolerance=0.35, 
            output_video=None, 
            display_video=True, 
            display_rectangle=True, 
            segments=None,
            ctx_id=-1,
            ):

        # Initialize FaceAnalysis object
        self.app = FaceAnalysis(name='buffalo_l')
        self.app.prepare(ctx_id=ctx_id)  # 0 Use GPU (set ctx_id=-1 to use CPU)

        # Load the reference face image and extract embedding
        ref_img = cv2.imread(base_image)
        if ref_img is None:
            raise FileNotFoundError(f"Unable to load the reference image: {base_image}")

        ref_faces = self.app.get(ref_img)
        if len(ref_faces) == 0:
            raise ValueError("No faces detected in the reference image.")

        # Extract embedding of the reference face (use the first face)
        self.known_face_embedding = ref_faces[0].embedding
        self.target_video = target_video
        self.output_video = output_video
        self.display_video = display_video
        self.display_rectangle = display_rectangle
        self.tolerance = tolerance
        self.specific_person_present = False  # Flag to indicate if Specific Person is present

        # Segments to process
        self.segments = self._prepare_segments(segments)
        
        # If output_video is None, do not use video saving feature
        self.fourcc = self._get_video_codec(output_video)

        self.trackers = []
        self.face_names = []
        self.face_similarities = []

    def _get_video_codec(self, output_video):
        if output_video is None:
            return None
        _, ext = os.path.splitext(output_video.lower())
        return cv2.VideoWriter_fourcc(*'VP90') if ext == '.webm' else cv2.VideoWriter_fourcc(*'mp4v')
        
    def _convert_to_frame_range(self, start_time_str, duration, fps):
        start_seconds = self._time_str_to_seconds(start_time_str)
        end_seconds = start_seconds + duration
        start_frame = int(start_seconds * fps)
        end_frame = int(end_seconds * fps)
        return start_frame, end_frame
        
    def _prepare_segments(self, segments):
        if segments is None:
            return None
        segment_frames = []
        fps = self.get_video_fps()
        for start_time_str, duration in segments:
            start_frame, end_frame = self._convert_to_frame_range(start_time_str, duration, fps)
            segment_frames.append((start_frame, end_frame))
        return segment_frames

    def _get_video_properties(self, video_capture):
        fps = video_capture.get(cv2.CAP_PROP_FPS)
        width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
        return fps, width, height, total_frames

    def _initialize_video_writer(self, fps, width, height):
        if self.output_video:
            return cv2.VideoWriter(self.output_video, self.fourcc, fps, (width, height))
        return None

    def _calculate_similarities(self, faces):
        return [cosine_similarity([self.known_face_embedding], [face.embedding])[0][0] for face in faces]

    def _create_tracker(self, frame, bbox):
        tracker = cv2.legacy.TrackerKCF_create()
        x1, y1, x2, y2 = bbox
        tracker_bbox = (x1, y1, x2 - x1, y2 - y1)
        tracker.init(frame, tracker_bbox)
        return tracker

    def video_swap(self, func):
        def wrapper(*args, **kwargs):
            # Video capture
            video_capture = cv2.VideoCapture(self.target_video)

            # Get video properties
            fps, width, height, total_frames = self._get_video_properties(video_capture)

            video_writer = self._initialize_video_writer(fps, width, height)

            # Initialize variables
            self.trackers = []
            self.face_names = []
            self.face_similarities = []
            frame_skip = 24
            frame_count = 0

            # Convert segments to list of (start_frame, end_frame)
            segment_frames = self.segments if self.segments else [(0, total_frames)]

            # Process each segment
            for start_frame, end_frame in segment_frames:

                if start_frame >= total_frames:
                    print(f"Start frame {start_frame} exceeds total frames {total_frames}. Skipping segment.")
                    break

                # Adjust end_frame if it exceeds total_frames
                if end_frame > total_frames:
                    end_frame = total_frames

                # Set video capture to the start frame
                video_capture.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
                frame_count = start_frame

                while frame_count < end_frame:
                    ret, frame = video_capture.read()
                    if not ret:
                        break

                    frame_count += 1

                    if self.specific_person_present and frame_count % frame_skip > 0:
                        # Update trackers
                        self._update_trackers(frame, func)
                    else:
                        # Create new tracers
                        if self._process_first_tracker(frame, func) < 0:
                            continue

                    # Display current frame number / total frames at the top-left corner
                    cv2.putText(frame, f"Frame: {frame_count}/{total_frames}", (10, 30),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

                    # Output or display video
                    if self.display_video:
                        cv2.imshow('Video', frame)
                        if cv2.waitKey(1) & 0xFF == ord('q'):
                            break

                    if video_writer:
                        video_writer.write(frame)

            # Cleanup
            video_capture.release()
            if video_writer:
                video_writer.release()
            if self.display_video:
                cv2.destroyAllWindows()

        return wrapper

    def get_video_fps(self):
        video_capture = cv2.VideoCapture(self.target_video)
        return video_capture.get(cv2.CAP_PROP_FPS)

    def _update_trackers(self, frame, func):

        new_trackers = []
        new_face_names = []
        new_face_similarities = []

        for tracker, name, similarity in zip(self.trackers, self.face_names, self.face_similarities):
            success, tracker_bbox = tracker.update(frame)
            if success:
                tracker_bbox = tuple(map(int, tracker_bbox))
                new_trackers.append(tracker)
                new_face_names.append(name)
                new_face_similarities.append(similarity)

                # Annotate frame and apply face swap if needed
                self._annotate_frame(tracker_bbox, frame, name, similarity, func)

            elif name == "Specific Person":
                self.specific_person_present = False

        # Update trackers and face info
        self.trackers = new_trackers
        self.face_names = new_face_names
        self.face_similarities = new_face_similarities

    def _process_first_tracker(self, frame, func)-> int:

        # Attempt to detect Specific Person in every frame
        self.trackers = []
        self.face_names = []
        self.face_similarities = []

        # Detect faces and extract embeddings
        faces = self.app.get(frame)                        
        if not faces:
            return -1  # No faces detected, skip to next frame

        similarities = self._calculate_similarities(faces)
        max_similarity = max(similarities, default=0)

        # Determine if the Specific Person is detected
        specific_person_index = similarities.index(max_similarity) if max_similarity > self.tolerance else None
        self.specific_person_present = specific_person_index is not None

        if self.specific_person_present:
            # Initialize trackers and annotate frame for Specific Person
            self._initialize_trackers(faces, frame, similarities, specific_person_index, func)
        else:
            
            for idx_face, face in enumerate(faces):
                bbox = face.bbox.astype(int)

                name = "Candidate" if similarities[idx_face] > self.tolerance else "Unknown"

                # Since we are not tracking, we do not initialize trackers
                # Annotate frame without applying face swap
                x1, y1, x2, y2 = bbox
                tracker_bbox = (x1, y1, x2 - x1, y2 - y1)
                                
                # Pass func=None to indicate no face swap should be applied
                self._annotate_frame(tracker_bbox, frame, name, similarities[idx_face], func=None)

        return 1

    def _initialize_trackers(self, faces, frame, similarities, specific_person_index, func):

        for idx, face in enumerate(faces):

            bbox = face.bbox.astype(int)

            if similarities[idx] > self.tolerance:                
                name = "Specific Person" if idx == specific_person_index else "Candidate"
            else:
                name = "Unknown"

            # Initialize tracker
            tracker = self._create_tracker(frame, bbox)
                                
            self.trackers.append(tracker)
            self.face_names.append(name)
            self.face_similarities.append(similarities[idx])

            # Annotate frame and apply face swap if needed
            x1, y1, x2, y2 = bbox
            tracker_bbox = (x1, y1, x2 - x1, y2 - y1)
            
            self._annotate_frame(tracker_bbox, frame, name, similarities[idx], func)

    def _annotate_frame(self, bbox, frame, name, similarity, func):

        left, top, width, height = map(int, bbox)
        expand_ratio = 0.3
        expand_width = int(width * expand_ratio)
        expand_height = int(height * expand_ratio)

        expanded_left = max(0, left - expand_width)
        expanded_top = max(0, top - expand_height)

        frame_height, frame_width, _ = frame.shape
        expanded_right = min(frame_width, left + width + expand_width)
        expanded_bottom = min(frame_height, top + height + expand_height)

        # Extract face region
        face_region = frame[expanded_top:expanded_bottom, expanded_left:expanded_right]

        if name == "Specific Person" and func:
            # Apply face swap
            swap_image = func(face_region)
            # Replace the face region with the swapped image
            if swap_image is not None:
                resized_swap_image = cv2.resize(swap_image, (expanded_right - expanded_left, expanded_bottom - expanded_top))
                frame[expanded_top:expanded_bottom, expanded_left:expanded_right] = resized_swap_image
            color = (0, 0, 255)  # Red
        elif name == "Candidate":
            color = (255, 0, 0)  # Blue
        else:
            color = (0, 255, 0)  # Green

        if self.display_rectangle:
            # Draw rectangle and annotations
            cv2.rectangle(frame, (expanded_left, expanded_top), (expanded_right, expanded_bottom), color, 2)
            cv2.putText(frame, name, (expanded_left, expanded_bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
            cv2.putText(frame, f"Similarity: {similarity:.2f}", (expanded_left, expanded_bottom + 45), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    def _time_str_to_seconds(self, time_str):
        # Convert "mm:ss" format to total seconds
        minutes, seconds = map(int, time_str.split(':'))
        total_seconds = minutes * 60 + seconds
        return total_seconds

### 5-3. Face Recognition Example

In [None]:
# Set the base image and target video
base_image = "test_hanni2.jpg" # 뉴진스 하니 얼굴 사진 확보 필요
target_video = "hanni.mp4" # 추출 : https://www.youtube.com/watch?v=ek05M8eCk7M 

# Define the segments to process (list of tuples with start time and duration in seconds)
segments = [("00:00", 10), ("01:00", 15)]  # Process from 0:00 for 10 seconds, and from 1:00 for 15 seconds

# Create an instance of VideoFaceSwapper with the segments
bypass = VideoFaceProcessor(
            base_image, 
            target_video,
            display_video=True, 
            display_rectangle=True, 
            segments=segments,
            )

@bypass.video_swap
def recognize_faces(face_region):
    return None

# Start the face swap process
recognize_faces()

### 5-4. Face Swap Example

In [None]:
# Set the base image and target video
base_image = "test_hanni2.jpg" # 뉴진스 하니 얼굴 사진 확보 필요
target_video = "hanni.mp4" # 추출 : https://www.youtube.com/watch?v=ek05M8eCk7M 

# Define the segments to process (list of tuples with start time and duration in seconds)
segments = [("00:00", 10), ("01:00", 15)]  # Process from 0:00 for 10 seconds, and from 1:00 for 15 seconds

# Create an instance of VideoFaceSwapper with the segments
# If output_video ends with '.webm', it will be saved in WebM format.
swapper = VideoFaceProcessor(
            base_image, 
            target_video, 
            output_video="swapped_output.mp4", 
            display_video=True, 
            display_rectangle=True, 
            segments=segments,
            )

# Create an instance of FaceSwapper
face_swapper = FaceSwapper(det_size=(320, 320))

# Set the source face
source_image = "./faces/old_man.jpg"
success = face_swapper.set_source_face(source_image)
if not success:
    print("Failed to set source face.")
    exit()

@swapper.video_swap
def swap(face_region):
    swap_image = face_swapper.swap_faces_in_image(face_region)
    return swap_image if swap_image is not None else face_region

# Start the face swap&restore process
swap()

### 5-5. Face Swap & Face Restoring Example

In [None]:
# Set the base image and target video
base_image = "test_hanni2.jpg" # 뉴진스 하니 얼굴 사진 확보 필요
target_video = "hanni.mp4" # 추출 : https://www.youtube.com/watch?v=ek05M8eCk7M 

# Define the segments to process (list of tuples with start time and duration in seconds)
segments = [("00:00", 10), ("01:00", 15)]  # Process from 0:00 for 10 seconds, and from 1:00 for 15 seconds

# Create an instance of VideoFaceSwapper with the segments
# If output_video ends with '.webm', it will be saved in WebM format.
swapper = VideoFaceProcessor(
            base_image, 
            target_video, 
            output_video="restored_output.mp4", 
            display_video=True, 
            display_rectangle=True, 
            segments=segments,
            )

# Create an instance of FaceSwapper
face_swapper = FaceSwapper(det_size=(320, 320))

# Set the source face
source_image = "./faces/old_man.jpg"
success = face_swapper.set_source_face(source_image)
if not success:
    print("Failed to set source face.")
    exit()

@swapper.video_swap
def swap_n_restore(face_region):
    swap_image = face_swapper.swap_faces_in_image(face_region)
    if swap_image is not None:
        new_face = restore_face(swap_image)
        if new_face is None:
            new_face = swap_image
    else:
        new_face = face_region
    return new_face

# Start the face swap&restore process
swap_n_restore()