<a href="https://colab.research.google.com/github/Linaqruf/sd-notebook-collection/blob/main/booru-i2i-scraper.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


![](https://visitor-badge.glitch.me/badge?page_id=linaqruf.i2i-scraper) [![GitHub](https://badgen.net/badge/icon/github?icon=github&label)](https://github.com/Linaqruf/sd-notebook-collection/blob/main/booru-i2i-scraper.ipynb) 

# **Booru I2I Scraper**
Not perfect but usable<br>

<details>
  <summary><big>Support Us!</big></summary>
    <ul>
      <li>
        <a href="https://ko-fi.com/linaqruf">
          <img src="https://img.shields.io/badge/Support%20me%20on%20Ko--fi-F16061?logo=ko-fi&logoColor=white&style=flat" alt="Ko-fi badge">
        </a>
      </li>
      <li>
        <a href="https://saweria.co/linaqruf">
          <img src="https://img.shields.io/badge/Saweria-7B3F00?style=flat&logo=ko-fi&logoColor=white" alt="Saweria badge">
        </a>
      </li>
    </ul>
</details>

In [None]:
#@title Install Dependencies
import os
from IPython.utils import capture

print("[1;32mInstalling...")
print("[1;32mPlease wait...\n")
with capture.capture_output() as cap:
  !pip -q install opencv-python tensorflow faiss-gpu huggingface_hub "gradio==3.16.2" gallery-dl
  !pip -q uninstall -y Pillow
  !pip -q install "pillow>=9.1.0"
  del cap
print("[1;32mDone! Restarting...")

os.kill(os.getpid(), 9)

In [None]:
#@title Launch
import argparse
import functools
import json
import os 
import zipfile
import shutil
from IPython.utils import capture
from tqdm import tqdm
from pathlib import Path
from tensorflow.keras.models import load_model
import faiss
import PIL.Image
import gradio as gr
import numpy as np

import requests
import tensorflow as tf
from huggingface_hub import hf_hub_download

root_dir = "/content"
dir = os.path.join(root_dir, "danbooru2022_image_similarity")
repo_url = "https://huggingface.co/spaces/SmilingWolf/danbooru2022_image_similarity"
utils_dir = os.path.join(dir, "Utils")
index_dir = os.path.join(dir, "index")
deps_dir = os.path.join(root_dir, "deps")
image_dir = os.path.join(root_dir, "images")
app_py = os.path.join(repo_url, "resolve/main/app.py")
index_dir_files = [os.path.join(repo_url, "resolve/main/index/cosine_ids.npy"),
                   os.path.join(repo_url, "resolve/main/index/cosine_infos.json"),
                   os.path.join(repo_url, "resolve/main/index/cosine_knn.index")]
utils = os.path.join(repo_url, "resolve/main/Utils/dbimutils.py")

TITLE = "## Danbooru Explorer"
DESCRIPTION = """
Image similarity-based retrieval tool using:
- [SmilingWolf/wd-v1-4-convnext-tagger-v2](https://huggingface.co/SmilingWolf/wd-v1-4-convnext-tagger-v2) as feature extractor
- [Faiss](https://github.com/facebookresearch/faiss) and [autofaiss](https://github.com/criteo/autofaiss) for indexing
"""
CONV_FEXT_LAYER = "predictions_norm"

files = ["keras_metadata.pb", "saved_model.pb", "selected_tags.csv"]
sub_dir = "variables"
sub_dir_files = ["variables.data-00000-of-00001", "variables.index"]
csv_file = files[-1]
model_repo = "SmilingWolf/wd-v1-4-convnext-tagger-v2"
model_dir = "/content/wd14_tagger"

for directory in [dir, utils_dir, index_dir, deps_dir, image_dir]:
  os.makedirs(directory, exist_ok=True)

def ubuntu_deps(url, name, dst):
  os.makedirs(dst, exist_ok=True)
  !wget -q --show-progress {url}
  with zipfile.ZipFile(name, 'r') as deps:
    deps.extractall(dst)
  !dpkg -i {dst}/*
  os.remove(name)
  shutil.rmtree(dst)

print("[1;32mInstalling...")
with capture.capture_output() as cap:
  !apt -y update -qq
  ubuntu_deps("https://huggingface.co/Linaqruf/fast-repo/resolve/main/ram_patch.zip", "ram_patch.zip", deps_dir)
  %env LD_PRELOAD=libtcmalloc.so
  ubuntu_deps("https://huggingface.co/Linaqruf/fast-repo/resolve/main/deb-libs.zip", "deb-libs.zip", deps_dir)
  del cap

print("[1;32mDownloading Danbooru Explorer...")
with capture.capture_output() as cap:
  for file in index_dir_files:
    index_basename = os.path.basename(file)
    !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d {index_dir} -o {index_basename} {file}
  !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d {dir} -o "app.py" {app_py}
  !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d {utils_dir} -o "dbimutils.py" {utils}
  del cap

init = """"""
with open(os.path.join(utils_dir, "__init__.py"), 'w') as f:
    f.write(init)

setup = """from setuptools import setup, find_packages
setup(name = "Utils", packages = find_packages())"""
with open(os.path.join(dir, "setup.py"), 'w') as f:
    f.write(setup)

os.chdir(dir)
print("[1;32mInstall setup.py...")
with capture.capture_output() as cap:
  !pip -q install .
  del cap

import Utils.dbimutils as dbimutils

if not os.path.exists(model_dir):
  print(f"[1;32mDownloading WD 14 Tagger model from {model_repo}...")
  with capture.capture_output() as cap:
    for file in files:
      hf_hub_download(model_repo, file, cache_dir=model_dir, force_download=True, force_filename=file)
    for file in sub_dir_files:
      hf_hub_download(model_repo, file, subfolder=sub_dir, cache_dir=os.path.join(
          model_dir, sub_dir), force_download=True, force_filename=file)
    del cap
    
def load_model(model_path, feature_extraction_layer):
    full_model = tf.keras.models.load_model(model_path)
    model = tf.keras.models.Model(
        full_model.inputs, full_model.get_layer(feature_extraction_layer).output
    )
    return model

def danbooru_id_to_url(image_id, selected_ratings, api_username="", api_key=""):
    headers = {"User-Agent": "image_similarity_tool"}
    ratings_to_letters = {
        "General": "g",
        "Sensitive": "s",
        "Questionable": "q",
        "Explicit": "e",
    }

    acceptable_ratings = [ratings_to_letters[x] for x in selected_ratings]

    image_url = f"https://danbooru.donmai.us/posts/{image_id}.json"
    if api_username != "" and api_key != "":
        image_url = f"{image_url}?api_key={api_key}&login={api_username}"

    r = requests.get(image_url, headers=headers)
    if r.status_code != 200:
        return None

    content = json.loads(r.text)
    image_url = content["large_file_url"] if "large_file_url" in content else None
    image_url = image_url if content["rating"] in acceptable_ratings else None
    return image_url

class SimilaritySearcher:
    def __init__(self, model, images_ids):
        self.knn_index = None
        self.knn_metric = None

        self.model = model
        self.images_ids = images_ids

    def change_index(self, knn_metric):
        if knn_metric == self.knn_metric:
            return

        if knn_metric == "ip":
            self.knn_index = faiss.read_index("index/ip_knn.index")
            config = json.loads(open("index/ip_infos.json").read())["index_param"]
        elif knn_metric == "cosine":
            self.knn_index = faiss.read_index("index/cosine_knn.index")
            config = json.loads(open("index/cosine_infos.json").read())["index_param"]

        faiss.ParameterSpace().set_index_parameters(self.knn_index, config)
        self.knn_metric = knn_metric


    def predict(
        self, image, selected_ratings, knn_metric, api_username, api_key, n_neighbours
    ):
        _, height, width, _ = self.model.inputs[0].shape

        self.change_index(knn_metric)

        # Alpha to white
        image = image.convert("RGBA")
        new_image = PIL.Image.new("RGBA", image.size, "WHITE")
        new_image.paste(image, mask=image)
        image = new_image.convert("RGB")
        image = np.asarray(image)

        # PIL RGB to OpenCV BGR
        image = image[:, :, ::-1]

        image = dbimutils.make_square(image, height)
        image = dbimutils.smart_resize(image, height)
        image = image.astype(np.float32)
        image = np.expand_dims(image, 0)
        target = self.model(image).numpy()

        if self.knn_metric == "cosine":
            faiss.normalize_L2(target)

        dists, indexes = self.knn_index.search(target, k=n_neighbours)
        neighbours_ids = self.images_ids[indexes][0]
        neighbours_ids = [int(x) for x in neighbours_ids]

        captions = []
        for image_id, dist in zip(neighbours_ids, dists[0]):
            captions.append(f"{image_id}/{dist:.2f}")

      
        image_urls = []
        scraper_text = os.path.join(root_dir, "scrape_this.txt")
        
        for image_id in neighbours_ids:
          current_url = danbooru_id_to_url(
            image_id, selected_ratings, api_username, api_key
          )
          if current_url is not None:              
            image_urls.append(current_url)

        with open(scraper_text, 'w') as f:
          f.write('\n'.join(image_urls))

        !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d {image_dir} -i {scraper_text}

        return list(zip(image_urls, captions))


def main():
    gpus = tf.config.list_physical_devices('GPU')
    if len(gpus) > 0:
        tf.config.set_visible_devices(gpus[0], 'GPU')
        tf.config.experimental.set_memory_growth(gpus[0], True)
    else:
        print("No GPU found, using CPU instead.")
    model = load_model(model_dir, CONV_FEXT_LAYER)
    images_ids = np.load("index/cosine_ids.npy")

    searcher = SimilaritySearcher(model=model, images_ids=images_ids)

    with gr.Blocks() as demo:
        gr.Markdown(TITLE)
        gr.Markdown(DESCRIPTION)

        with gr.Row():
            input = gr.Image(type="pil", label="Input")
            with gr.Column():
                with gr.Row():
                    api_username = gr.Textbox(label="Danbooru API Username")
                    api_key = gr.Textbox(label="Danbooru API Key")
                selected_ratings = gr.CheckboxGroup(
                    choices=["General", "Sensitive", "Questionable", "Explicit"],
                    value=["General", "Sensitive"],
                    label="Ratings",
                )
                with gr.Row():
                    selected_metric = gr.Radio(
                        choices=["cosine"],
                        value="cosine",
                        label="Metric selection",
                        visible=False,
                    )
                    n_neighbours = gr.Slider(
                        minimum=1, maximum=500, value=5, step=1, label="# of images"
                    )
                find_btn = gr.Button("Find similar images")
        similar_images = gr.Gallery(label="Similar images")

        similar_images.style(grid=5)
        find_btn.click(
            fn=searcher.predict,
            inputs=[
                input,
                selected_ratings,
                selected_metric,
                api_username,
                api_key,
                n_neighbours,
            ],
            outputs=[similar_images],
        )

    demo.queue()
    demo.launch(share=True, debug=True)

if __name__ == "__main__":
    main()

In [None]:
#@title  ## 📝 Download Images
#@markdown Download file manually from files tab or save to Google Drive
%cd /content/

!zip -r /content/images.zip images

from pydrive.auth import GoogleAuth
from google.colab import drive
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
def create_folder(folder_name):
    # Check if folder exists
    file_list = drive.ListFile({'q': "title='{}' and mimeType='application/vnd.google-apps.folder' and trashed=false".format(folder_name)}).GetList()
    if len(file_list) > 0:
        # Folder exists
        print('Debug: Folder exists')
        folder_id = file_list[0]['id']
    else:
        print('Debug: Creating folder')
        file = drive.CreateFile({'title': folder_name, 'mimeType': 'application/vnd.google-apps.folder'})
        file.Upload()
        folder_id = file.attr['metadata']['id']
    # return folder id
    return folder_id
# Upload file to Google Drive
def upload_file(file_name, folder_id, save_as):
    # Check if file exists
    file_list = drive.ListFile({'q': "title='{}' and trashed=false".format(save_as)}).GetList()
    if len(file_list) > 0:
        print('Debug: File already exists')
        # Change file name to avoid overwriting
        save_as = save_as + ' (1)'
    file = drive.CreateFile({'title': save_as, 'parents': [{'id': folder_id}]})
    file.SetContentFile(file_name)
    # Upload and set permission to public
    file.Upload()
    file.InsertPermission({'type': 'anyone', 'value': 'anyone', 'role': 'reader'})
    # return file id
    return file.attr['metadata']['id']

use_drive = True #@param {type:"boolean"}
folder_name = "scraper" #@param {type: "string"}
save_as = "nakiriayame.zip" #@param {type: "string"}

if use_drive:
  auth.authenticate_user()
  gauth = GoogleAuth()
  gauth.credentials = GoogleCredentials.get_application_default()
  drive = GoogleDrive(gauth)
  file_id = upload_file('/content/images.zip', create_folder(folder_name), save_as)
  print("Your sharing link: https://drive.google.com/file/d/" + file_id + "/view?usp=sharing")  