## 1. Downloading Videos from YouTube

Here, we download videos from YouTube. In the `group_urls_folder`, create a `.txt` file with the URLs of YouTube videos you want downloaded. Each URL should be on a new line. The saved YouTube videos in WEBM format are saved into the `download_folder`.

In [1]:
import os
import re
import shutil
import yt_dlp

def download_video(url, output_path):
    ydl_opts = {
        'format': 'bestvideo[ext=webm]+bestaudio[ext=webm]/best',
        'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'),
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])

def sanitize_file_name(name):
    """Remove all characters except a-z, A-Z, and periods from a file name"""
    return re.sub(r'[^a-zA-Z\.]', '', name)

group_urls_folder = "group_urls"
download_folder = "downloaded_videos"

# Iterate through text files in the group_urls folder
for text_file in os.listdir(group_urls_folder):
    if text_file.endswith(".txt"):
        group_name = os.path.splitext(text_file)[0]
        group_download_folder = os.path.join(download_folder, group_name)

        # Create the group download folder if it doesn't exist
        if not os.path.exists(group_download_folder):
            os.makedirs(group_download_folder)

        with open(os.path.join(group_urls_folder, text_file), "r") as f:
            video_urls = f.readlines()

        # Download the videos
        for i, video_url in enumerate(video_urls):
            video_url = video_url.strip()

            # Download the video
            download_video(video_url, group_download_folder)

        # Sanitize the file names
        for file_name in os.listdir(group_download_folder):
            if file_name.endswith(".webm"):
                sanitized_file_name = sanitize_file_name(file_name)
                if sanitized_file_name != file_name:
                    os.rename(os.path.join(group_download_folder, file_name), os.path.join(group_download_folder, sanitized_file_name))


[youtube] Extracting URL: https://www.youtube.com/watch?v=cU0JrSAyy7o
[youtube] cU0JrSAyy7o: Downloading webpage
[youtube] cU0JrSAyy7o: Downloading android player API JSON
[info] cU0JrSAyy7o: Downloading 1 format(s): 315+251
[dashsegments] Total fragments: 62
[download] Destination: downloaded_videos\choom\[BE ORIGINAL] IVE(아이브) 'I AM' (4K).f315.webm
[download] 100% of  618.43MiB in 00:00:41 at 14.99MiB/s                  
[dashsegments] Total fragments: 1
[download] Destination: downloaded_videos\choom\[BE ORIGINAL] IVE(아이브) 'I AM' (4K).f251.webm
[download] 100% of    3.33MiB in 00:00:00 at 13.85MiB/s              
[Merger] Merging formats into "downloaded_videos\choom\[BE ORIGINAL] IVE(아이브) 'I AM' (4K).webm"
Deleting original file downloaded_videos\choom\[BE ORIGINAL] IVE(아이브) 'I AM' (4K).f251.webm (pass -k to keep)
Deleting original file downloaded_videos\choom\[BE ORIGINAL] IVE(아이브) 'I AM' (4K).f315.webm (pass -k to keep)
[youtube] Extracting URL: https://www.youtube.com/watch?v=ql

## 2. File Sanitization

To ensure smooth processing and to avoid any interference from weird symbols, we sanitize the file names in this section.

In [2]:
import os
import re

def is_valid_char(c):
    return c.isalnum() and c.isascii() or c == '.'

def sanitize_filename(filename):
    return ''.join([c if is_valid_char(c) else '' for c in filename])

input_dir = "downloaded_videos"

for filename in os.listdir(input_dir):
    if filename.endswith(".webm"):
        old_path = os.path.join(input_dir, filename)
        new_filename = sanitize_filename(filename)
        new_path = os.path.join(input_dir, new_filename)
        os.rename(old_path, new_path)
        print(f"Renamed {old_path} to {new_path}")


Renamed downloaded_videos/choom\BEORIGINALaespaGirlsK.webm to downloaded_videos/choom\BEORIGINALaespaGirlsK.webm
Renamed downloaded_videos/choom\BEORIGINALaespaNextLevelK.webm to downloaded_videos/choom\BEORIGINALaespaNextLevelK.webm
Renamed downloaded_videos/choom\BEORIGINALBilllieRINGmaBellK.webm to downloaded_videos/choom\BEORIGINALBilllieRINGmaBellK.webm
Renamed downloaded_videos/choom\BEORIGINALEVERGLOWDUNDUNK.webm to downloaded_videos/choom\BEORIGINALEVERGLOWDUNDUNK.webm
Renamed downloaded_videos/choom\BEORIGINALEVERGLOWLADIDAK.webm to downloaded_videos/choom\BEORIGINALEVERGLOWLADIDAK.webm
Renamed downloaded_videos/choom\BEORIGINALGFRIENDMAGOK.webm to downloaded_videos/choom\BEORIGINALGFRIENDMAGOK.webm
Renamed downloaded_videos/choom\BEORIGINALGIDLEOhmygodK.webm to downloaded_videos/choom\BEORIGINALGIDLEOhmygodK.webm
Renamed downloaded_videos/choom\BEORIGINALGIDLETOMBOYK.webm to downloaded_videos/choom\BEORIGINALGIDLETOMBOYK.webm
Renamed downloaded_videos/choom\BEORIGINALITZY...I

## 3. Face Detection and Upscaling

This section of the code navigates through each directory in the `downloaded-videos` folder. For each `.WEBM` file, it creates a new directory storing the detected faces in the video.

### Requirements:

To run this code, you need two models in your working directory:
- `shape_predictor_5_face_landmarks.dat`: This model enables the face detection capability.
- `FSRCNN_x2.pb`: This model enables the 2x upscaling of detected faces, allowing for faster processing and good quality images.

### Variables for Modification:

Here are a few variables that you may want to modify:

- `every_n_frames`: Originally set to `1`, this means that face detection will be run on every frame of the video.
- `max_workers`: Originally set to `4` as my CPU has 4 cores. This code can run in parallel, so increase or decrease this variable according to your system's capabilities.
- `padding_factor`: Originally set to `1.3` which is a reasonable value, adjust this to modify the size of the captured image. Decrease to zoom into the face more, increase to capture a larger facial image outside of the face.

In [3]:
import cv2
import dlib
import math
import numpy as np
from pathlib import Path
import random
import string
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
from tqdm import tqdm


def filter_images(image):
    detector = dlib.get_frontal_face_detector()
    if image is not None and image.shape[0] > 511 and image.shape[1] > 511:
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = detector(gray_image, upsample_num_times=0)
        return len(faces) > 0, gray_image
    return False, None


def face_score(gray_image):
    detector = dlib.get_frontal_face_detector()
    faces, scores, _ = detector.run(gray_image, upsample_num_times=0)
    return faces, scores


def find_eyes(image, face):
    predictor = dlib.shape_predictor('shape_predictor_5_face_landmarks.dat')

    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    shape = predictor(gray, face)
    left_eye = ((shape.part(3).x + shape.part(2).x) // 2, (shape.part(3).y + shape.part(2).y) // 2)
    right_eye = ((shape.part(1).x + shape.part(0).x) // 2, (shape.part(1).y + shape.part(0).y) // 2)
    return left_eye, right_eye


def upscale_image_fsrcnn(image, model_path):
    sr = cv2.dnn_superres.DnnSuperResImpl_create()
    sr.readModel(model_path)
    sr.setModel("fsrcnn", 2)
    upscaled_image = sr.upsample(image)
    return upscaled_image


def center_eyes(img, model_path="FSRCNN_x2.pb"):
    detector = dlib.get_frontal_face_detector()
    
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    faces = detector(gray, upsample_num_times=0)
    if len(faces) == 0:
        return None
    face = faces[0]
    left_eye, right_eye = find_eyes(img, face)
    # rest of the function remains unchanged

    center = ((left_eye[0] + right_eye[0]) // 2, (left_eye[1] + right_eye[1]) // 2)
    dy = right_eye[1] - left_eye[1]
    dx = right_eye[0] - left_eye[0]
    angle = math.degrees(math.atan2(dy, dx))
    height, width, _ = img.shape
    eye_distance = int(math.sqrt((right_eye[0] - left_eye[0]) ** 2 + (right_eye[1] - left_eye[1]) ** 2))


    M_rotate = cv2.getRotationMatrix2D(center, angle, 1)
    rotated_img = cv2.warpAffine(img, M_rotate, (width, height))

    rotated_center = tuple(map(int, M_rotate.dot(np.array([center[0], center[1], 1]))))

    dx_translation = width // 2 - rotated_center[0]
    dy_translation = height // 2 - rotated_center[1]
    M_translate = np.float32([[1, 0, dx_translation], [0, 1, dy_translation]])
    translated_img = cv2.warpAffine(rotated_img, M_translate, (width, height), flags=cv2.INTER_LANCZOS4)
    reference_eye_distance = 260
    scaling_factor = reference_eye_distance / eye_distance

    scaled_height = int(height * scaling_factor)
    scaled_width = int(width * scaling_factor)
    
    if scaling_factor > 1:
        translated_img = upscale_image_fsrcnn(translated_img, model_path)

    scaled_img = cv2.resize(translated_img, (scaled_width, scaled_height), interpolation=cv2.INTER_LANCZOS4)  # Use cv2.INTER_LANCZOS4 interpolation


    center_y = scaled_height // 2
    center_x = scaled_width // 2
    cropped_img = scaled_img[center_y - 512:center_y + 512, center_x - 512:center_x + 512]
    return cropped_img



def crop_face(image, face):
    left_eye, right_eye = find_eyes(image, face)
    return center_eyes(image, left_eye, right_eye)


def convert_to_png(image, faces, output_path):
    for face in faces:
        cropped_face = crop_face(image, face)
        if cropped_face.shape[0] < 512 or cropped_face.shape[1] < 512:
            continue
        face_area = (face.bottom() - face.top()) * (face.right() - face.left())
        random_suffix = ''.join(random.choices(string.ascii_letters, k=4))
        file_name = f'{face_area}_{random_suffix}.png'
        output_image_path = output_path / file_name
        cv2.imwrite(str(output_image_path), cropped_face)


def process_single_image(input_image_path, output_dir):
    detector = dlib.get_frontal_face_detector()

    image = cv2.imread(str(input_image_path))
    filtered, gray_image = filter_images(image)
    if filtered:
        relative_path = input_image_path.parent.relative_to(input_dir)
        output_subdir = output_dir / relative_path
        output_subdir.mkdir(parents=True, exist_ok=True)
        faces = detector(gray_image, upsample_num_times=0)
        convert_to_png(image, faces, output_subdir)

def process_video(video_path, output_path, every_n_frames=1,
                  score_threshold=-2, model_path="FSRCNN_x2.pb", padding_factor=1.3):
    detector = dlib.get_frontal_face_detector()
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        print("Error opening the video file.")
        return
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count % every_n_frames == 0:
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            faces, scores, _ = detector.run(gray_frame, upsample_num_times=0)
            if len(faces) > 0:
                for face in faces:
                    x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom()
                    width, height = x2 - x1, y2 - y1

                    if width > 512 and height > 512:  # Updated condition
                        pad_x = int(width * padding_factor)
                        pad_y = int(height * padding_factor)
                        x1 = max(0, x1 - pad_x)
                        y1 = max(0, y1 - pad_y)
                        x2 = min(frame.shape[1], x2 + pad_x)
                        y2 = min(frame.shape[0], y2 + pad_y)
                        face_img = frame[y1:y2, x1:x2]

                        centered_face_img = center_eyes(face_img, model_path)
                        random_suffix = ''.join(random.choices(string.ascii_letters, k=4))
                        face_area = (y2 - y1) * (x2 - x1)
                        face_output_path = output_path / f"{face_area}_frame{frame_count}_{random_suffix}.png"
                        if centered_face_img is not None:
                            cv2.imwrite(str(face_output_path), centered_face_img)


        frame_count += 1
    cap.release()



def process_videos_in_parallel(input_dir, output_dir, max_workers=2):
    input_dir = Path(input_dir)
    output_dir = Path(output_dir)

    video_paths = list(input_dir.glob("**/*.webm"))
    video_paths.extend(input_dir.glob("**/*.mp4"))
    video_paths.extend(input_dir.glob("**/*.avi"))
    video_paths.extend(input_dir.glob("**/*.mkv"))

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {}
        for video_path in video_paths:
            relative_path = video_path.parent.relative_to(input_dir)
            output_subdir = output_dir / relative_path
            video_name = video_path.stem
            output_path = output_subdir / video_name

            # Create output directories if they don't exist
            output_path.mkdir(parents=True, exist_ok=True)

            future = executor.submit(process_video, video_path, output_path)
            futures[future] = video_path

        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing videos", unit="video"):
            video_path = futures[future]
            try:
                future.result()
            except Exception as e:
                print(f"Error processing video {video_path}: {e}")


input_dir = "downloaded_videos"
output_dir = "kpop-faces"
process_videos_in_parallel(input_dir, output_dir, max_workers=2)

Processing videos:  20%|██        | 6/30 [20:40:59<49:31:11, 7427.99s/video]  