In [None]:
from utils.common import iter_posts, r, download_multiple_images, load_data, save_data
from config import DATA_PATH, IMG_PATH, SAVE_EVERY
from itertools import chain
from utils.align import FaceAligner
from insightface.app import FaceAnalysis
from utils.regexes import extract_ranking
import cv2


In [None]:
subreddit = "lineups" # ranked_girls
app = FaceAnalysis(name="buffalo_m", allowed_modules=["detection"])
app.prepare(ctx_id=0, det_size=(640, 640))
aligner = FaceAligner()

data_path = DATA_PATH / f"{subreddit}.json"
img_path = IMG_PATH / subreddit
img_path.mkdir(exist_ok=True, parents=True)
data = load_data(data_path)


In [None]:
def extract_multiple_faces(imgs):
    aligned = []
    detections = []
    urls = []

    for img, url in imgs:
        faces = app.get(img)
        if len(faces) != 1:
            return [], [], []
        aligned.append(aligner.from_insight_face(img, faces)[0])
        detections.append(faces[0])
        urls.append(url)

    return aligned, detections, urls


In [None]:
posts = iter_posts(subreddit, 5_000)


In [None]:
for i, post in enumerate(posts):
    if (
        post.num_comments == 0
        or post.locked
        or post.removed_by_category
        or post.id in data
    ):
        continue

    ratings = []

    if author_rating := extract_ranking(post.title):
        ratings.append(
            dict(
                values=author_rating,
                author=getattr(post.author, "name", None),
                text=post.title,
            )
        )

    for comment in post.comments:
        ratings.append(
            dict(
                values=extract_ranking(comment.body),
                author=getattr(comment.author, "name", None),
                text=comment.body,
            )
        )

    num_faces = max(
        chain.from_iterable(filter(None, map(lambda x: x["values"], ratings))),
        default=0,
    )

    if not num_faces:
        continue
    
    try:
        imgs = download_multiple_images(post)
    except Exception:
        continue

    if not imgs:
        continue

    if len(imgs) == 1:
        img, url = imgs[0]
        detections = app.get(img)[:num_faces]
        detections.sort(key=lambda x: x["bbox"][0])
        aligned = aligner.from_insight_face(img, detections)
        urls = [url]
    else:
        if len(imgs) != num_faces:
            continue

        aligned, detections, urls = extract_multiple_faces(imgs)

        if not aligned:
            continue

    data[post.id] = dict(
        post_id=post.id,
        post_author=getattr(post.author, "name", None),
        ratings=ratings,
        urls=urls,
        detections=detections,
    )

    dir_path = img_path / post.id
    dir_path.mkdir(exist_ok=True)

    for j, img in enumerate(aligned):
        img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
        cv2.imwrite(str(dir_path / f"{j}.jpg"), img)

    if i % SAVE_EVERY:
        save_data(data_path, data)
