<a href="https://colab.research.google.com/github/suzukimain/diffusers_in_Colab/blob/main/diffusers_in_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# README


<details>

<summary>Detail</summary>

>Disclaimer:

* This project was created as a hobby and is not intended for commercial use.

* I am not responsible for any problems that may arise from the use of this project.

* Please check the [License](https://github.com/suzukimain/diffusers_in_Colab/blob/main/LICENSE) for more information.

\

---

>Project Objectives

1. Personal learning of programming.

2. An image generation notebook based on diffusers.

\

---

>Acknowledgment

I used open source resources and free tools in the creation of this project. These resources and tools made this possible.\
I would like to take this opportunity to thank the open source community and those who provided free tools.

</details>


In [None]:
#@title   (option) Mount GoogleDrive { run: "auto", display-mode: "form"}
Conect_GoogleDrive = True  # @param {type:"boolean"}

conect_drive=False

from google.colab import drive
if Conect_GoogleDrive:
    if not drive._os.path.ismount('/content/drive'):
        try:
            drive.mount('/content/drive')
            conect_drive=True
        except:
            print("GoogleDrive is not mounted")
            conect_drive=False
else:
    if drive._os.path.ismount('/content/drive'):
        drive.flush_and_unmount()
        conect_drive=False
        print("GoogleDrive unmounted")




In [None]:
#@title  #Step.1 Runtime Setup {display-mode: "form"}

DEBUG = False


import os
import queue
import subprocess
import sys
import threading
import shutil
import time
import warnings
import codecs
import datetime
import difflib
import gc
import glob
import imageio
import importlib
import inspect
import jax
import json
import logging
import numpy as np
import pickle
import random
import re
import requests
import urllib.request
from base64 import b64encode
from pathlib import Path
from urllib.parse import urlparse
from requests import HTTPError
from torch import Generator
from typing import Any, Dict, List, Literal, Optional, Union
from IPython.display import display, Markdown, HTML

import transformers
from transformers import (
    pipeline,
    AutoModelForCausalLM,
    CLIPTokenizer,
    AutoTokenizer,
    FlaxAutoModelForCausalLM
)
from transformers import logging as tf_logging

import ipywidgets as widgets
from google.colab import output
from tqdm.auto import tqdm
import torch
from PIL import Image, PngImagePlugin
from google.colab import drive
from huggingface_hub import hf_hub_download
import yaml



class runtime_func:
    def __init__(self):
        """
        NOTE:
        Functions that should be included in the `basic_config` class are needed in the `install_packages` class, so these functions are separated into the `runtime_func` class.
        This is because the `install_packages` class is executed before the `basic_config` class is executed, so these functions are separated and inherited later.
        """
        self.device_type = self.device_type_check()


    @staticmethod
    def module_version(module_name):
        try:
            version = importlib.metadata.version(module_name)
            return re.match(r"^\d+\.\d+\.\d+", version).group(0)
        except importlib.metadata.PackageNotFoundError:
            return None


    @staticmethod
    def device_type_check():
        _device_type = jax.devices()[0].device_kind
        if "TPU" in _device_type:
            return "TPU"
        elif "cpu" in _device_type:
            return "cpu"
        else:
            return "cuda"


    @staticmethod
    def is_url_valid(url) -> bool:
        response = requests.head(url)
        try:
            response.raise_for_status()
        except requests.RequestException:
            return False
        else:
            return True
        finally:
            logger.debug(f"response.status_code: {response.status_code}")


    @staticmethod
    def custom_logger(DEBUG=False):
        format = '%(levelname)s:<cell line: %(lineno)d> <funcName: %(funcName)s>: %(message)s'
        logger = logging.getLogger(__name__)
        logger.propagate = False
        if not logger.handlers:
            formatter = logging.Formatter(format)
            handler = logging.StreamHandler()
            handler.setFormatter(formatter)
            logger.addHandler(handler)
        if DEBUG:
            logger.setLevel(logging.DEBUG)
        else:
            logger.setLevel(logging.WARNING)
        return logger


run_func = runtime_func()
device_type = run_func.device_type
logger = run_func.custom_logger(DEBUG)



class ProcessBarRun:
    """
    Example:
    if __name__ == "__main__":
    bar = ProcessBarRun(total=11)

    for s in range(4):
        bar.bar_update(3)
        time.sleep(1)

    bar.bar_update(exit=True)

    Args:
    desc:str = "Running",
    default_desc:str = "Running",
    fin_desc = "",
    pofix:str="",
    default_pofix:str = "",
    fin_pofix= "Finish!",
    desc_dot:bool = False,
    pofix_dot:bool = False

    If the download_bar is true, the URL and save_path are required
    """

    def __init__(self,
                 total: int = 0,
                 desc: str = "",
                 default_desc: str = "",
                 fin_desc="",
                 pofix: str = "",
                 default_pofix: str = "",
                 fin_pofix="Finish!",
                 desc_dot: bool = False,
                 pofix_dot: bool = False,
                 download_bar:bool = False,
                 url:str = "",
                 save_path:str = "",
                 **ex_word):

        self.total = total
        self.default_desc = default_desc
        self.default_pofix = default_pofix
        self.fin_desc = fin_desc
        self.fin_pofix = fin_pofix
        self.desc_dot = desc_dot
        self.pofix_dot = pofix_dot
        self.ex_word = ex_word
        self.queue_obj = queue.Queue()
        self._count = 0
        self._run_count = 0
        self.dot_count = 0
        self.max_dots = 5
        self.desc = desc
        self.pofix = pofix
        self.base_desc_txt = ""
        self.base_pofix_txt = ""
        self.stop_event = threading.Event()
        self.stop_dot = threading.Event()
        self.tqdm_lock = threading.Lock()
        self.tqdm_obj = tqdm(total=total, desc=desc, postfix=pofix)
        self.arg_update(desc=desc, postfix=pofix, **ex_word)
        self.dot_thread = threading.Thread(target=self.prosess_dot)
        self.dot_thread.start()


    def __del__(self):
        self.stop()


    def __enter__(self):
        return self


    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()


    def stop(self):
        self.stop_event.set()
        self.stop_dot.set()
        self.tqdm_obj.n = self.total
        if self.fin_desc:
            setattr(self.tqdm_obj,"desc",self.fin_desc)
        if self.fin_pofix:
            setattr(self.tqdm_obj,"postfix",self.fin_pofix)
        self.tqdm_obj.refresh()
        self.tqdm_obj.close()


    def arg_update(self, *ex_word, **input_dict):
        for extra_word in ex_word:
            if isinstance(extra_word, dict):
                for _key, _value in extra_word.items():
                    setattr(self, _key, _value)
        for key, value in input_dict.items():
            if hasattr(self, key):
                setattr(self, key, value)


    def run_downlaod_with_bar(self):
        with self.tqdm_lock:
            for chunk in response.iter_content(chunk_size=4096):
                self.tqdm_obj.write(chunk)


    def prosess_dot(self,):
        """
        NOTE:
        set_description_str and set_description_str are not used,
        because they cannot be used in the case of the download_with_bar function.
        """
        self.dot_count = 0
        chenge_check = False
        while not self.stop_dot.is_set():
            for num in range(self.max_dots):
                with self.tqdm_lock:
                    dot_txt = "." * num
                    if self.desc and self.desc_dot:
                        desc_dot_txt = dot_txt
                    else:
                        desc_dot_txt = ""

                    if self.pofix and self.pofix_dot:
                        pofix_dot_txt = dot_txt
                    else:
                        pofix_dot_txt = ""

                    setattr(self.tqdm_obj, "desc", self.desc + desc_dot_txt)
                    setattr(self.tqdm_obj, "postfix", self.pofix + pofix_dot_txt)
                    self.tqdm_obj.refresh()

                    if self.stop_dot.is_set():
                        break
                    time.sleep(0.5)


    def bar_update(
            self,
            update_rate: int = 1,
            exit: bool = False,
            desc = None,
            pofix = None,
            desc_dot = None,
            pofix_dot= None):
        """
        args:
        desc : str
        pofix : str
        desc_dot : bool
        pofix_dot : bool
        """
        self.desc = desc or self.desc or self.default_desc
        self.pofix = pofix or self.pofix or self.default_pofix
        self.desc_dot = desc_dot or self.desc_dot
        self.pofix_dot = pofix_dot or self.pofix_dot
        self.tqdm_obj.update(int(update_rate))
        self.tqdm_obj.set_description_str(self.desc)
        self.tqdm_obj.set_postfix_str(self.pofix)
        self.tqdm_obj.refresh()
        if exit:
            self.stop()



class config_check:
    base_config_json = "/tmp/diffusers_in_colab_config.json"

    def __init__(self):
        pass


    def get_json_dict(self):
        """Retrieve the JSON dictionary from the config file."""
        config_dict = {}
        if os.path.isfile(self.base_config_json):
            try:
                with open(self.base_config_json, "r") as basic_json:
                    config_dict = json.load(basic_json)
            except json.JSONDecodeError:
                pass
        return config_dict


    def update_json_dict(self, key, value):
        """Update the JSON dictionary with a new key-value pair."""
        basic_json_dict = self.get_json_dict()
        basic_json_dict[key] = value
        with open(self.base_config_json, "w") as json_file:
            json.dump(basic_json_dict, json_file, indent=4)


    def check_func_hist(self,*return_key, **kwargs):
        """
        Check and optionally update the history of a given element.

        Args:
            *return_key (str): Variable for which to get the history.
            **kwargs: Keyword arguments for additional options.
                - update (bool): Whether to update the dictionary. Default is True.
                - return_value (bool): Whether to return the element value. Default is False.
                - key (str): Specific key to look up in the dictionary.
                - value (Any): Value to be matched or updated in the dictionary.

        Returns:
            Any: The historical value if `return_value` is True, or a boolean indicating
                 if the value matches the historical value.
        """
        update = kwargs.pop("update", True)
        return_value = kwargs.pop("return_value", False)
        if kwargs:
            if "key" in kwargs:
                key = kwargs["key"]
                if "value" in kwargs:
                    value = kwargs["value"]
                else:
                    value = None
                    update = False
                    return_value = True
            else:
                key, value = next(iter(kwargs.items()))
        elif return_key:
            key, value = return_key[0], None
            update = False
            return_value = True
        else:
            raise TypeError("Missing 'key' argument.")

        basic_json_dict = self.get_json_dict()
        hist_value = basic_json_dict.get(key)
        if hist_value == value:
            value_match = True
        else:
            value_match = False

        if update:
            self.update_json_dict(key, value)

        if return_value:
            return hist_value
        else:
            return value_match



class install_packages(runtime_func):
    repo_url = "https://github.com/huggingface/diffusers.git"
    repo_dir = "diffusers"

    def __init__(self):
        super().__init__()


    def torch_install_packages(self):
        __sub_process_list =[
            (f'if [ ! -d {self.repo_dir} ]; then git clone {self.repo_url} {self.repo_dir}; fi', 'Cloning diffusers'),
            (f'cd {self.repo_dir} && pip install .[torch]', 'Installing diffusers'),
        ]
        if self.device_type == "cuda":
            cuda_ex_prosess = (f'pip install accelerate', 'Installing accelerate')
            __sub_process_list.append(cuda_ex_prosess)

        torch_install = ProcessBarRun(
            total=len(__sub_process_list),
            desc = "Installing packages",
            pofix = "Install process Start",
            pofix_dot = True,
            )

        for torch_sub_process,update_pofix in __sub_process_list:
            torch_install.arg_update(pofix=update_pofix)
            subprocess.run(torch_sub_process, shell=True, check=True)
            torch_install.bar_update()

        torch_install.bar_update(exit=True)


    def flax_install_packages(self):
        """
        'natsort' is not installed by default in the TPUv2 runtime
        """
        __sub_process_list = [
            (f'if [ ! -d {self.repo_dir} ]; then git clone {self.repo_url} {self.repo_dir}; fi', 'Cloning diffusers'),
            (f'cd {self.repo_dir} && pip install .[flax]', 'Installing diffusers'),
            ('pip uninstall -y tensorflow', 'Uninstalling tensorflow'),
            ('pip install tensorflow-cpu', 'Installing tensorflow-cpu'),
            ('pip install natsort', 'Installing other librarys')
        ]

        flax_install = ProcessBarRun(
            total=len(__sub_process_list),
            desc = "Installing packages",
            pofix = "Install process Start",
            pofix_dot = True,
            )

        for flax_sub_process,up_pofix in __sub_process_list:
            flax_install.arg_update(pofix=up_pofix)
            subprocess.run(flax_sub_process, shell=True, check=True)
            flax_install.bar_update()

        flax_install.bar_update(exit=True)


    def package_install(self):

        if self.device_type == "TPU":
            self.flax_install_packages()
        else:
            self.torch_install_packages()


Step1 = ProcessBarRun(
    total=4,
    desc = f"{device_type} runtime setting",
    pofix_dot = True)

Step1.bar_update(pofix="Installing libraries")

install_packages().package_install()

Step1.bar_update(pofix="Importing libraries")



from natsort import natsorted

import diffusers
from diffusers import (
    DiffusionPipeline,
    FlaxAutoencoderKL,
    FlaxDiffusionPipeline,
    FlaxStableDiffusionPipeline,
    StableDiffusionPipeline,
    AutoencoderKL,
    schedulers
)
from diffusers import logging as df_logging


# Device-specific imports
if device_type == "TPU":
    from flax.jax_utils import replicate
    from flax.training.common_utils import shard
else:
    from diffusers import AutoPipelineForText2Image


# Configure warnings
if DEBUG:
    df_logging.set_verbosity_warning()
    tf_logging.set_verbosity_warning()
    warnings.filterwarnings("always", category=DeprecationWarning)
else:
    df_logging.set_verbosity_error()
    tf_logging.set_verbosity_error()
    warnings.filterwarnings("ignore")



class data_config:
    Config_file="model_index.json"

    VALID_URL_PREFIXES = ["https://huggingface.co/", "huggingface.co/", "hf.co/", "https://hf.co/"]
    exts =  [".safetensors", ".ckpt",".bin"]

    model_dict = {
            "stable diffusion-v2.1" : "stabilityai/stable-diffusion-2-1",
            "waifu diffusion-v1.4": "hakurei/waifu-diffusion",
            "Anything-v3.0": "Linaqruf/anything-v3.0",
            "anything-midjourney-v-4-1": "Joeythemonster/anything-midjourney-v-4-1",
            "Anything-v4.5": "shibal1/anything-v4.5-clone",
            "AB4.5_AC0.2": "aioe/AB4.5_AC0.2",
            "basil_mix": "nuigurumi/basil_mix",
            "Waifu-Diffusers": "Nilaier/Waifu-Diffusers",
            "Double-Exposure-Diffusion": "joachimsallstrom/Double-Exposure-Diffusion",
            "openjourney-v4": "prompthero/openjourney-v4",
            "ACertainThing": "JosephusCheung/ACertainThing",
            "Counterfeit-V2.0": "gsdf/Counterfeit-V2.0",
            "Counterfeit-V2.5": "gsdf/Counterfeit-V2.5",
            "chilled_remix":"chilled_remix",
            "chilled_reversemix":"chilled_reversemix",
            "7th_Layer": "syaimu/7th_test",
            "EimisAnimeDiffusion_1.0v": "eimiss/EimisAnimeDiffusion_1.0v",
            "JWST-Deep-Space-diffusion" : "dallinmackay/JWST-Deep-Space-diffusion",
            "Riga_Collection": "natsusakiyomi/Riga_Collection",
            "sd-db-epic-space-machine" : "rabidgremlin/sd-db-epic-space-machine",
            "spacemidj" : "Falah/spacemidj",
            "anime-kawai-diffusion": "Ojimi/anime-kawai-diffusion",
            "Realistic_Vision_V2.0": "SG161222/Realistic_Vision_V2.0",
            "nasa-space-v2" : "sd-dreambooth-library/nasa-space-v2-768",
            "meinamix_meinaV10": "namvuong96/civit_meinamix_meinaV10",
            "loliDiffusion": "JosefJilek/loliDiffusion",
            }
    exclude =  ["safety_checker/model.safetensors",
                "unet/diffusion_pytorch_model.safetensors",
                "vae/diffusion_pytorch_model.safetensors",
                "text_encoder/model.safetensors",
                "unet/diffusion_pytorch_model.fp16.safetensors",
                "text_encoder/model.fp16.safetensors",
                "vae/diffusion_pytorch_model.fp16.safetensors",
                "safety_checker/model.fp16.safetensors",

                "safety_checker/model.ckpt",
                "unet/diffusion_pytorch_model.ckpt",
                "vae/diffusion_pytorch_model.ckpt",
                "text_encoder/model.ckpt",
                "text_encoder/model.fp16.ckpt",
                "safety_checker/model.fp16.ckpt",
                "unet/diffusion_pytorch_model.fp16.ckpt",
                "vae/diffusion_pytorch_model.fp16.ckpt"]

    Auto_pipe_class=[
            "AutoPipelineForText2Image",
            "AutoPipelineForImage2Image",
            "AutoPipelineForInpainting",
     ]

    Error_M1 = (
        '''
        Could not load URL.
        Format:"https://huggingface.co/<repo_name>/<model_name>/blob/main/<path_to_file>"
        EX1: "https://huggingface.co/gsdf/Counterfeit-V3.0/blob/main/Counterfeit-V3.0.safetensors"
        EX2: "https://huggingface.co/runwayml/stable-diffusion-v1-5/blob/main/v1-5-pruned.ckpt"
        '''
        )

    Error_M2= (
        '''
        Could not load hugface_path.
        Format: <repo_name>/<model_name>"
        EX1: "Linaqruf/anything-v3.0"
        EX2: "stabilityai/stable-diffusion-2-1"

        Suport_model:

                "stable diffusion-v2.1"
                "waifu diffusion-v1.4"
                "Anything-v3.0"
                "anything-midjourney-v-4-1"
                "Anything-v4.5"
                "AB4.5_AC0.2"
                "basil_mix"
                "Waifu-Diffusers"
                "Double-Exposure-Diffusion"
                "openjourney-v4"
                "ACertainThing"
                "Counterfeit-V2.0"
                "Counterfeit-V2.5"
                "7th_Layer"
                "EimisAnimeDiffusion_1.0v"
                "Riga_Collection"
                "anime-kawai-diffusion"
                "Realistic_Vision_V2.0"
                "meinamix_meinaV10"
                "loliDiffusion"
                ''')

    Error_M3 = ('''
                The specified path could not be recognized. Please try the following
                ・Check that the path to the file exists.
                ・Check that there is no whitespace in the path.
                ・Check if there are any special symbols such as "\" or "." and other special symbols (may not be recognized).
                ''')



class basic_config(data_config,config_check,runtime_func):
    def __init__(self):
        self.device_count = self.count_device()
        self.device_type = self.device_type_check()
        self.device = self.device_set()
        if self.device_type == "TPU":
            self.use_TPU = True
        else:
            self.use_TPU = False

        if drive._os.path.ismount("/content/drive"):
            self.conect_gdrive = True
        else:
            self.conect_gdrive = False


    @classmethod
    def get_inherited_class(cls,class_name) -> list:
        inherited_class = inspect.getmro(class_name)
        return [cls_method.__name__ for cls_method in inherited_class]


    @staticmethod
    def device_set():
        logger.debug(f"device_type: {device_type}")
        if device_type == "TPU":
            #import torch_xla.core.xla_model as xm
            #device = xm.xla_device()
            device = device_type
        else:
            device = device_type
        return device


    def is_TPU(self):
        if self.device_type_check() == "TPU":
            return True
        else:
            return False


    def is_safetensors(self,path):
        if ".safetensors" == os.path.splitext(path)[1]:
            return True
        else:
            return False


    def count_device(self):
        return jax.device_count()


    def get_item(self,dict_obj):
        """
        Returns the first element of the dictionary
        """
        return next(iter(dict_obj.items()))[1]


    def pipeline_metod_type(self,Target_class) -> str:
        """
        Args:
        Target_class : class

        Returns:
        Literal['torch','flax','onnx']
        """
        torch_list=["DiffusionPipeline",
                    "AutoPipelineForText2Image",
                    "AutoPipelineForImage2Image",
                    "AutoPipelineForInpainting",]

        flax_list = ["FlaxDiffusionPipeline",]

        if isinstance(Target_class,str):
            Target_class = getattr(diffusers, Target_class)

        cls_method= self.get_inherited_class(Target_class)

        if any(method in torch_list for method in cls_method):
            class_type= "torch"
        elif any(method in flax_list for method in cls_method):
            class_type= "flax"
        else:
            class_type= "onnx"
        return class_type


    def sort_by_version(self,sorted_list) -> list:
        """
        Returns:
        Sorted by version in order of newest to oldest
        """
        return natsorted(sorted_list,reverse = True)


    def key_check(self,keyword) -> bool:
        global key_dict
        if "key_dict" not in globals():
            key_dict = {}
        key = str(keyword)
        key_in = False
        if key in key_dict:
            if keyword == key_dict[key]:
                key_in = True
        key_dict[key] = keyword
        return key_in


    def get_call_method(
            self,
            class_name,
            method_name : str = '__call__'
            ) ->list:
        """
        Acquire the arguments of the function specified by 'method_name'
        for the class specified by 'class_name'
        """
        if isinstance(class_name,str):
            class_name = getattr(getattr(diffusers, class_name),method_name)
        parameters = inspect.signature(class_name).parameters
        arg_names = []
        for param in parameters.values():
            arg_names.append(param.name)
        return arg_names


    def get_class_elements(
            self,
            search
            ):
        return list(search.__class__.__annotations__.keys())


    def check_for_safetensors(
            self,
            path
            ):
        _ext = os.path.basename(path).split(".")[-1]
        if _ext == "safetensors":
            return True
        else:
            return False


    def pipe_class_type(
            self,
            class_name
            ):
        """
        Args:
        class_name : class

        Returns:
        Literal['txt2img','img2img','txt2video']
        """
        _txt2img_method_list = [] #else
        _img2img_method_list = ["image"]
        _img2video_method_list = ["video_length","fps"]

        call_method = self.get_call_method(class_name,method_name = '__call__')

        if any(method in call_method for method in _img2video_method_list):
            pipeline_type = "txt2video"
        elif any(method in call_method for method in _img2img_method_list):
            pipeline_type = "img2img"
        else:
            pipeline_type = "txt2img"
        return pipeline_type


    def import_on_str(
            self,
            desired_function_or_class,
            module_name = ""
            ):
        if not module_name:
            import_object = __import__(desired_function_or_class)
        else:
            import_object = getattr(__import__(module_name), desired_function_or_class)
        return import_object


    def max_temper(
            self,
            search_word,
            search_list
            ):
        return difflib.get_close_matches(search_word, search_list,cutoff=0, n=1)


    def sort_list_obj(
            self,
            list_obj,
            need_txt
            ):
        sorted_list=[]
        for module_obj in list_obj:
            if need_txt.lower() in module_obj.lower():
                sorted_list.append(module_obj)
        return sorted_list


    def checkpoint_type_get(
            self,
            checkpoint_path_or_dict ,
            config_files= None,
            original_config_file= None,
            model_type = None,
            is_upscale = False
            ):
        """
        NOTE:
        Return SD if you do not know, because an error may occur if None is used.

        About model_type:
        The model_type itself will be left for possible use at some point.
        """

        from_safetensors = False
        checkpoint = None
        if isinstance(checkpoint_path_or_dict, str):
            if os.path.isfile(checkpoint_path_or_dict):
                from_safetensors: bool = self.check_for_safetensors(checkpoint_path_or_dict)
                if from_safetensors:
                    from safetensors.torch import load_file as safe_load
                    checkpoint = safe_load(checkpoint_path_or_dict, device=self.device)
                else:
                    checkpoint = torch.load(checkpoint_path_or_dict, map_location=self.device)
            elif os.path.isdir(checkpoint_path_or_dict):
                model_index_path = os.path.join(checkpoint_path_or_dict,"model_index.json")
                if os.path.isfile(model_index_path):
                    with open(model_index_path,"r") as loaded_model_index:
                        cls_name = loaded_model_index["_class_name"]
                        #Fixed in due course.
                        if "XL" in cls_name:
                            return "SDXL"
                        else:
                            return "SD"

        elif isinstance(checkpoint_path_or_dict, dict):
            checkpoint = checkpoint_path_or_dict
        else:
            raise TypeError(f"checkpoint_path_or_dict: {checkpoint_path_or_dict}")

        if "global_step" in checkpoint:
            global_step = checkpoint["global_step"]
        else:
            global_step = None

        while "state_dict" in checkpoint:
            checkpoint = checkpoint["state_dict"]
        if original_config_file is None:
            key_name_v2_1 = "model.diffusion_model.input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight"
            key_name_sd_xl_base = "conditioner.embedders.1.model.transformer.resblocks.9.mlp.c_proj.bias"
            key_name_sd_xl_refiner = "conditioner.embedders.0.model.transformer.resblocks.9.mlp.c_proj.bias"
            #is_upscale = pipeline_class == StableDiffusionUpscalePipeline
            config_url = None
            # model_type = "v1"
            if config_files is not None and "v1" in config_files:
                original_config_file = config_files["v1"]
            else:
                config_url = "https://raw.githubusercontent.com/CompVis/stable-diffusion/main/configs/stable-diffusion/v1-inference.yaml"

            if key_name_v2_1 in checkpoint and checkpoint[key_name_v2_1].shape[-1] == 1024:
                # model_type = "v2"
                if config_files is not None and "v2" in config_files:
                    original_config_file = config_files["v2"]
                else:
                    config_url = "https://raw.githubusercontent.com/Stability-AI/stablediffusion/main/configs/stable-diffusion/v2-inference-v.yaml"
                if global_step == 110000:
                    # v2.1 needs to upcast attention
                    upcast_attention = True

            elif key_name_sd_xl_base in checkpoint:
                # only base xl has two text embedders
                if config_files is not None and "xl" in config_files:
                    original_config_file = config_files["xl"]
                else:
                    config_url = "https://raw.githubusercontent.com/Stability-AI/generative-models/main/configs/inference/sd_xl_base.yaml"

            elif key_name_sd_xl_refiner in checkpoint:
                # only refiner xl has embedder and one text embedders
                if config_files is not None and "xl_refiner" in config_files:
                    original_config_file = config_files["xl_refiner"]
                else:
                    config_url = "https://raw.githubusercontent.com/Stability-AI/generative-models/main/configs/inference/sd_xl_refiner.yaml"

            if is_upscale:
                config_url = "https://raw.githubusercontent.com/Stability-AI/stablediffusion/main/configs/stable-diffusion/x4-upscaling.yaml"

            if config_url is not None:
                try:
                    original_config_file = BytesIO(requests.get(config_url).content)
                except:
                    logger.error(f"Could not download the Config_file to find out the model type from the following URL: {config_url}")
                    if model_type is None:
                        logger.warning("model_type is set to None")
                    return model_type
            else:
                with open(original_config_file, "r") as f:
                    original_config_file = f.read()
        else:
            with open(original_config_file, "r") as f:
                original_config_file = f.read()

        original_config = yaml.safe_load(original_config_file)

        if (
            model_type is None
            and "cond_stage_config" in original_config["model"]["params"]
            and original_config["model"]["params"]["cond_stage_config"] is not None
        ):

            model_type = original_config["model"]["params"]["cond_stage_config"]["target"].split(".")[-1]
            return "SD"

        elif model_type is None and original_config["model"]["params"]["network_config"] is not None:
            if original_config["model"]["params"]["network_config"]["params"]["context_dim"] == 2048:
                model_type = "SDXL"
            else:
                model_type = "SDXL-Refiner"
            return "SDXL"

        else:
            if model_type is None:
                logger.warning("model_type is set to None")
            return model_type


'''
about: unexplained crash

If you get an unexplained crash in transfomers or diffusers while using TPUv2, try the following

----do----
pip uninstall tensorflow
pip install tensorflow-cpu
----------
'''


if not drive._os.path.ismount('/content/drive'):
    conect_drive = False
    Connect_Gdrive="\033[33mNo connection\033[0m"
else:
    conect_drive = True
    Connect_Gdrive="\033[34mConnection Successful\033[0m"

#base dir
os.makedirs("/content/script",exist_ok=True)
sys.path.append("/content/script")


Step1.bar_update(exit=True, pofix="Finish!")

print("\n\033[34m___________________________________________\n")
print(f"Devie: {runtime_func().device_type_check()}\n")
print(f"Googledrive: {Connect_Gdrive}")
print("\n\033[32mSetup completed successfully\033[0m")

step1_finish =True

In [None]:
#@title (option) hf_login
#@markdown If you use a secret variable, store the token under the name **hf_token**
from google.colab import userdata
from huggingface_hub import login, logout

Logout = False # @param {type:"boolean"}

if not Logout:
    try:
        hf_login_token = userdata.get("hf_token")
    except userdata.SecretNotFoundError:
        hf_login_token = None
    finally:
        login(hf_login_token)
else:
    logout()

In [None]:
#@title #Step.2 Model Selection {display-mode: "form"}

#@markdown >Selecting the model to use [#info](#model_select_help)


model_select = "stable diffusion-v2.1(basic)" # @param ["stable diffusion-v2.1(basic)", "Counterfeit-V2.5(Anime)(better)", "loliDiffusion(Anime)", "waifu diffusion-v1.4(Anime)", "Anything-v3.0(Anime)", "Anything-v4.5(Anime)", "anything-midjourney-v-4-1(Anime)", "ACertainThing(Anime)", "anime-kawai-diffusion(Anime)", "AB4.5_AC0.2(Anime)", "basil_mix(Anime)", "Counterfeit(Anime)", "Counterfeit-V2.0(Anime)", "chilled_remix(Anime)", "Double-Exposure-Diffusion(Anime)", "EimisAnimeDiffusion_1.0v(Anime)", "7th_Layer(Anime)", "Riga_Collection(Anime)", "Waifu-Diffusers(Anime)", "JWST-Deep-Space-diffusion(space)", "sd-db-epic-space-machine(space_ship)", "spacemidj(space)", "nasa-space-v2(space)", "openjourney-v4(Reality)", "Realistic_Vision_V2.0(Reality)", "meinamix_meinaV10(Reality)", "search"] {allow-input: true}
del_word_list=["(basic)","(Anime)","(Reality)","(space_ship)","(space)","(better)"]
if model_select in ["stable diffusion-v2.1(basic)", "Counterfeit-V2.5(Anime)(better)", "loliDiffusion(Anime)", "waifu diffusion-v1.4(Anime)", "Anything-v3.0(Anime)", "Anything-v4.5(Anime)", "anything-midjourney-v-4-1(Anime)", "ACertainThing(Anime)", "anime-kawai-diffusion(Anime)", "AB4.5_AC0.2(Anime)", "basil_mix(Anime)", "Counterfeit(Anime)", "Counterfeit-V2.0(Anime)", "chilled_remix(Anime)", "Double-Exposure-Diffusion(Anime)", "EimisAnimeDiffusion_1.0v(Anime)", "7th_Layer(Anime)", "Riga_Collection(Anime)", "Waifu-Diffusers(Anime)", "JWST-Deep-Space-diffusion(space)", "sd-db-epic-space-machine(space_ship)", "spacemidj(space)", "nasa-space-v2(space)", "openjourney-v4(Reality)", "Realistic_Vision_V2.0(Reality)", "meinamix_meinaV10(Reality)"]:
    for del_word in del_word_list:
        model_select=model_select.replace(del_word, "" )

#@markdown * Words to search models

#@markdown * Select from pull-down menu

#@markdown * Enter the name of the model and the path or URL where the model is stored

#@markdown *  **"search"** to search all directories for candidate files or folders.


#@markdown >Config [#info](#auto_help)

auto = True  # @param {type:"boolean"}
#@markdown Automatically select repository & model files (recommended: ON)



if "step1_finish"not in globals():
    raise NameError("\033[33mPlease execute Step.1 first\033[0m")



class Huggingface(basic_config):
    def __init__(self):
        super().__init__()
        self.num_prints=20
        self.model_id=""
        self.model_name=""
        self.vae_name=""
        self.model_file=""
        self.input_url=False
        self.diffuser_model=False
        self.check_choice_key = ""
        self.choice_number = -1
        self.file_path_dict={}
        self.special_file=""
        self.hf_repo_id = ""
        #self.model_select = ""


    def repo_name_or_path(self,model_name_or_path):
        pattern = r"([^/]+)/([^/]+)/(?:blob/main/)?(.+)"
        weights_name = None
        repo_id = None
        for prefix in self.VALID_URL_PREFIXES:
            model_name_or_path = model_name_or_path.replace(prefix, "")
        match = re.match(pattern, model_name_or_path)
        if not match:
            return repo_id, weights_name
        repo_id = f"{match.group(1)}/{match.group(2)}"
        weights_name = match.group(3)
        return repo_id, weights_name


    def run_hf_download(self,url_or_path):
        """
        retrun:
        os.path
        """
        def _hf_repo_download(path):
            model_path = DiffusionPipeline.download(path)
            return model_path

        if any(url_or_path.startswith(checked) for checked in self.VALID_URL_PREFIXES):
            if not self.self.is_url_valid(url_or_path):
                raise HTTPError("Invalid URL")
            hf_path, file_name =self.repo_name_or_path(url_or_path)
            logger.debug(f"url_or_path:{url_or_path}")
            logger.debug(f"hf_path: {hf_path} \nfile_name: {file_name}")
            if hf_path and file_name:
                model_file_path = hf_hub_download(hf_path, file_name)
            elif hf_path and (not file_name):
                if self.diffusers_model_check(hf_path):
                    model_file_path = _hf_repo_download(url_or_path)
                else:
                    raise HTTPError("Invalid hf_path")
            else:
                raise TypeError("Invalid path_or_url")

        #from hf_repo
        elif self.diffusers_model_check(url_or_path):
            logger.debug(f"url_or_path: {url_or_path}")
            model_file_path = _hf_repo_download(url_or_path)
        else:
            logger.debug(f"url_or_path:{url_or_path}")
            raise TypeError("Invalid path_or_url")
        return model_file_path


    def model_safe_check(self,model_list) ->str:
        if len(model_list)>1:
           for check_model in model_list:
                match = bool(re.search(r"(?i)[-＿]sfw", check_model))
                if match:
                    return check_model
        return model_list[0]


    def list_safe_check(self,model_list) -> list:
        for check_model in model_list:
            if bool(re.search(r"(?i)[-ー_＿]sfw", check_model)):
                model_list.remove(check_model)
                model_list.insert(0, check_model)
                break
        return model_list


    def diffusers_model_check(self,checked_model: str) -> bool:
        index_url=f"https://huggingface.co/{checked_model}/blob/main/model_index.json"
        return self.is_url_valid(index_url)


    def hf_model_check(self,path) -> bool:
        return self.is_url_valid(f"https://huggingface.co/{path}")


    def data_get(self,path) -> list:
        url = f"https://huggingface.co/api/models/{path}"
        data = requests.get(url).json()
        file_value_list = []
        df_model_bool=False
        #fix error': 'Repo model <repo_id>/<model> is gated. You must be authenticated to access it.
        try:
            siblings=data["siblings"]
        except KeyError:
            return []

        for item in siblings:
            data["siblings"]
            file_path=item["rfilename"]
            #model_index.json outside the root directory is not recognized
            if file_path=="model_index.json":
                df_model_bool=True
            elif (any(file_path.endswith(ext) for ext in self.exts) and
                not any(file_path.endswith(ex) for ex in self.exclude)):
                file_value_list.append(file_path)
        #↓{df_model,file_value_list}
        self.file_path_dict.update({path:(df_model_bool,file_value_list)})
        return file_value_list

    def hf_model_search(self,
                        model_path,
                        limit_num):
        url = f"https://huggingface.co/api/models"#?search={model_name}"
        params={"search":model_path,"sort":"likes","direction":-1,"limit":limit_num}#"downloads",}
        return requests.get(url,params=params).json()

    def hf_models(self,
                  model_name,
                  limit):
        """
        return:
        repo_model_list,with_like : list
        """
        #logger.debug(f"model_name: {model_name}")
        data=self.hf_model_search(model_name,limit)
        final_list = []
        if data:
            for item in data:
                model_id,like,private_value,tag_value = item["modelId"],item["likes"],item["private"],item["tags"]
                if  ("audio-to-audio" not in tag_value and
                    (not private_value)):
                    if self.data_get(model_id):
                        model_dict = {"model_id":model_id,
                                      "like":like,}
                        final_list.append(model_dict)
        else:
            print("No models matching your criteria were found on huggingface.")
            return []
        return final_list



    def model_name_search(self,
                          model_name: str,
                          auto_set: bool,
                          Recursive_execution:bool = False):

        def find_max_like(model_dict_list:list):
            """
            Finds the dictionary with the highest "like" value in a list of dictionaries.

            Args:
                model_dict_list: A list of dictionaries.

            Returns:
                The dictionary with the highest "like" value, or the first dictionary if none have "like".
            """
            max_like = 0
            max_like_dict = None
            for model_dict in model_dict_list:
                if model_dict["like"] > max_like:
                    max_like = model_dict["like"]
                    max_like_dict = model_dict
            return max_like_dict["model_id"] or model_dict_list[0]["model_id"]


        """
        auto_set: bool
        loads the model with the most likes in hugface
        """
        if Recursive_execution:
            limit = 1000
        else:
            limit = 15



        repo_model_list = self.hf_models(model_name,limit)
        model_history = self.check_func_hist(key="hf_model_name",
                                             return_value=True)
        if not auto_set:
            print("\033[34mThe following model paths were found")
            if model_history is not None:
                print(f"Previous Choice: {model_history}")
            print("0.Search civitai")
            for (i,(model_dict)) in enumerate(repo_model_list,1):
                model_name = model_dict["model_id"]
                like = model_dict["like"]
                print(f"{i}.model path: {model_name}, evaluation: {like}")

            if Recursive_execution:
                print("16.Other than above")

            while True:
                try:
                    choice = int(input("Select the model path to use: "))
                except ValueError:
                    print("\033[33mOnly natural numbers are valid.\033[34m")
                    continue
                if choice == 0:
                    return "_hf_no_model"
                elif (not Recursive_execution) and choice>=16 and choice == len(repo_model_list)+1:
                    return self.model_name_search(model_name = model_name,
                                                  auto_set = auto_set,
                                                  Recursive_execution = True)
                elif 1 <= choice <= len(repo_model_list):
                    choice_path_dict = repo_model_list[choice-1]
                    choice_path = choice_path_dict["model_id"]
                    break
                else:
                    print(f"Please enter the numbers 1~{len(repo_model_list)}")

        else:
            if repo_model_list:
                choice_path = find_max_like(repo_model_list)
            else:
                choice_path = "_hf_no_model"


        return choice_path



    def file_name_set_sub(self,model_select,file_value,model_type):
        check_key = f"{model_select}_select"
        if not file_value and (not self.diffuser_model):
            print("\033[31mNo candidates found at huggingface\033[0m")
            res = input("Searching for civitai?: ")
            if res.lower() in ["y","yes"]:
                return "_hf_no_model"
            else:
                raise ValueError("No available files were found in the specified repository")
        elif not file_value:
            print("\033[34mOnly models in Diffusers format found")
            while True:
                result=input("Do you want to use it?[y/n]: ")
                if result.lower() in ["y","yes"]:
                    return "_DFmodel"
                elif result.lower() in ["n","no"]:
                    sec_result=input("Searching for civitai?[y/n]: ")
                    if sec_result.lower() in ["y","yes"]:
                        return "_hf_no_model"
                    elif sec_result.lower() in ["n","no"]:
                        raise ValueError("Processing was stopped because no corresponding model was found.")
                else:
                    print("Please enter only [y,n]")
        file_value=self.list_safe_check(file_value)
        if len(file_value)>=self.num_prints: #15
            start_number="1"
            #previous_select = self.check_func_hist(key=check_key)
            #if previous_select:
            choice_history = self.check_func_hist(key = check_key,return_value=True)
            if choice_history:
                if choice_history>self.num_prints+1:
                    choice_history = self.num_prints+1
                print(f"\033[33m＊Previous number: {choice_history}\033[0m")

            if self.diffuser_model:
                start_number="0"
                print("\033[34m0.Use Diffusers format model")
            for i in range(self.num_prints):
                print(f"\033[34m{i+1}.File name: {file_value[i]}\033[0m")
            print(f"\033[34m{self.num_prints+1}.Other than the files listed above (all candidates will be displayed)\n")
            while True:
                choice = input(f"select the file you want to use({start_number}~21): ")
                try:
                    choice=int(choice)
                except ValueError:
                    print("\033[33mOnly natural numbers are valid\033[34m")
                    continue
                if self.diffuser_model and choice==0:
                    old_num=None
                    self.input_url=False
                    self.choice_number = -1
                    print("\033[0m",end="")
                    choice_history_update = self.check_func_hist(key=check_key,value=choice,update=True)
                    return "_DFmodel"

                elif choice==(self.num_prints+1): #other_file
                    break
                elif 1<=choice<=self.num_prints:
                    self.input_url=True
                    old_num=choice
                    choice_path=file_value[choice-1]
                    self.choice_number = choice
                    print("\033[0m",end="")
                    choice_history_update = self.check_func_hist(key=check_key,value=choice,update=True)
                    return choice_path
                else:
                    print(f"\033[33mPlease enter numbers from 1~{self.num_prints}\033[34m")
            print("\033[0m",end="")
            print("\n\n")

        choice_history = self.check_func_hist(key = check_key,return_value=True)
        if choice_history:
            print(f"\033[33m＊Previous number: {choice_history}\033[0m")

        start_number="1"
        if self.diffuser_model:
            start_number="0"
            print("\033[34m0.Use Diffusers format model\033[0m")
        for i, file_name in enumerate(file_value, 1):
            print(f"\033[34m{i}.File name: {file_name}")
        while True:
            choice = input(f"Select the file you want to use({start_number}~{len(file_value)}): ")
            try:
                choice=int(choice)
            except ValueError:
                print("\033[33mOnly natural numbers are valid\033[34m")
            else:
                if self.diffuser_model and choice==0:
                    self.input_url=False
                    print("\033[0m",end="")
                    self.choice_number = -1
                    choice_history_update = self.check_func_hist(key=check_key,value=choice,update=True)
                    return "_DFmodel"
                if 1<=choice<=len(file_value):
                    self.input_url=True
                    old_num=choice
                    choice_path=file_value[choice-1]
                    self.choice_number = choice
                    print("\033[0m",end="")
                    choice_history_update = self.check_func_hist(key=check_key,value=choice,update=True)
                    return choice_path
                else:
                    print(f"\033[33mPlease enter numbers from 1~{len(file_value)}\033[34m")
        #print("\033[0m",end="")


    def file_name_set(self,model_select,auto,model_type="Checkpoint",download=False):
        logger.debug(f"model_select: {model_select}")
        del_dir_name = ["VAEs"]
        if self.diffusers_model_check(model_select) and model_type=="Checkpoint":
            self.diffuser_model=True
        #check_choice_key = f"model_select_{model_type}"
        url = f"https://huggingface.co/api/models/{model_select}"
        try:
            response = requests.get(url)
            response.raise_for_status()
        except requests.exceptions.HTTPError:
            raise HTTPError("A hugface login or token is required")
        data = response.json()
        choice_path=""
        file_value = []
        logger.debug(data)
        logger.debug(element for element in data)
        siblings = data["siblings"]
        if data:
            for item in siblings:
                fi_path=item["rfilename"]
                if (any(fi_path.endswith(ext) for ext in self.exts) and
                    (not any(fi_path.endswith(ex) for ex in self.exclude)) and
                    (not any(fi_path.startswith(st) for st in del_dir_name))):
                    file_value.append(fi_path)
        else:
            raise ValueError("No available file was found.\nPlease check the name.")
        if file_value:
            file_value=self.sort_by_version(file_value)
            if not auto:
                print("\033[34mThe following model files were found\033[0m")
                choice_path=self.file_name_set_sub(model_select,file_value,model_type)
                #if not self.choice_number == -1:
                #    choice_key_update = self.check_func_hist(key=check_key,value=self.choice_number)
            else:
                if self.diffuser_model:
                    self.input_url=False
                else:
                    self.input_url=True
                    choice_path=self.model_safe_check(file_value)


        elif self.diffuser_model:
            print("\033[32mOnly models in Diffusers format found")
            choice_path = "_DFmodel"
        else:
            raise FileNotFoundError("No available files found in the specified repository")
        #if model_type!="Checkpoint" and model_type!="_DFmodel":
            #self.input_url=False
        if download and not choice_path=="_DFmodel":
            choice_path=hf_hub_download(repo_id=model_select, filename=choice_path)
        #if not self.choice_number== -1:
        #    choice_key_update = self.check_func_hist(key=check_choice_key,value=self.choice_number)
        return choice_path



class Civitai(basic_config):
    '''
    Example:
    item = requests.get("http://civitai.example").json
    state_list = [{
        "repo_name": item["name"],
        "repo_id": item["id"],
        "favoriteCount": item["stats"]["favoriteCount"],
        "downloadCount": item["stats"]["downloadCount"],
        "CreatorName": item["creator"]["username"],
        "version_list": [{
            "id": item["modelVersions"]["id"],
            "name": item["modelVersions"]["name"],
            "downloadCount": item["modelVersions"]["downloadCount"],
            "files": [{
                "filename": item["modelVersions"]["files"]["name"],
                "file_id": item["modelVersions"]["files"]["id"],
                "download_url": item["modelVersions"]["files"]["downloadUrl"],
            }]
        }]
    }]
    return:
        state_list = {
            "repo_name": item["name"],
            "repo_id": item["id"],
            "favoriteCount": item["stats"]["favoriteCount"],
            "downloadCount": item["stats"]["downloadCount"],
            "CreatorName": item["creator"]["username"],
            "version_list": <file_list>
        }
    '''

    base_civitai_dir = "/root/.cache/Civitai"
    max_number_of_choices:int = 15
    chunk_size:int = 1024

    def __init__(self):
        super().__init__()
        self.path_dict = {}
        self.save_file_name = ""


    def civitai_download(
            self,
            seach_word,
            auto,
            model_type,
            download=True):
        """
        Function to download models from civitai.

        Parameters:
        - seach_word(str): Search query string.
        - auto(bool): Flag for automatic selection.
        - model_type(str): Type of model to search for.
            arg:[Checkpoint,
                 TextualInversion,
                 Hypernetwork,
                 AestheticGradient,
                 LORA,
                 Controlnet,
                 Poses
                ]
        - download(bool): Whether to download the model

        Returns:

        Local storage path if download is true,
        model download URL if false
        ---
        (model_url:str, save_path:str)
        ---
        """

        model_url, save_path = self.requests_civitai(
            query=seach_word,
            auto=auto,
            model_type=model_type)
        if download:
            self.download_model(
                url=model_url,
                save_path=save_path)
            return (model_url, self.civitai_save_path())
        else:
            return (model_url, None)


    def download_model(self, url, save_path):
        if not self.is_url_valid(url):
            raise requests.HTTPError("URL is invalid.")

        response = requests.get(url, stream=True)

        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise requests.HTTPError(f"Invalid URL: {response.status_code}")

        os.makedirs(os.path.dirname(save_path),exist_ok=True)

        with tqdm.wrapattr(open(save_path, "wb"), "write",
            miniters=1, desc="Downloading model",
            total=int(response.headers.get('content-length', 0))) as fout:
            for chunk in response.iter_content(chunk_size=4096):
                fout.write(chunk)
        print(f"Downloaded file saved to {save_path}")


    def repo_select_civitai(self, state: list, auto: bool, recursive: bool = True):
        """
        Set repository requests for Civitai.

        Parameters:
        - state (list): List of repository information.
        - auto (bool): Flag for automatic selection.
        - recursive (bool): Flag for recursion.

        Returns:
        - dict: Selected repository information.
        """
        if not state:
            raise ValueError("state is empty")

        if auto:
            return max(state, key=lambda x: x['downloadCount'])
        else:
            sorted_list = sorted(state, key=lambda x: x['downloadCount'], reverse=True)
            if recursive and self.max_number_of_choices < len(sorted_list):
                Limit_choice = True
            else:
                Limit_choice = False

            if recursive:
                print("\n\n\033[34mThe following repo paths were found\033[0m")
            else:
                print("\n\n\n")

            max_number = min(self.max_number_of_choices, len(sorted_list)) if recursive else len(sorted_list)
            for number, states_dict in enumerate(sorted_list[:max_number]):
                print(f"\033[34m{number + 1}. Repo_id: {states_dict['CreatorName']} / {states_dict['repo_name']}, download: {states_dict['downloadCount']}")

            if Limit_choice:
                max_number += 1
                print(f"\033[34m{max_number}. Other than above")

            while True:
                try:
                    choice = int(input(f"choice repo [1~{max_number}]: "))
                except ValueError:
                    print("\033[33mOnly natural numbers are valid.\033[34m")
                    continue

                if Limit_choice and choice == max_number:
                    return self.repo_select_civitai(state=state, auto=auto, recursive=False)
                elif 1 <= choice <= max_number:
                    self.path_dict["repo_id"] = sorted_list[choice - 1]["repo_id"]
                    return sorted_list[choice - 1]
                else:
                    print(f"\033[33mPlease enter the numbers 1~{max_number}\033[34m")


    def version_select_civitai(self, state, auto, recursive: bool = True):
        """
        Set model requests for Civitai.

        Parameters:
        - state: Model information state.
        - auto: Flag for automatic selection.
        - recursive (bool): Flag for recursion.

        Returns:
        - dict: Selected model information.
        """
        if not state:
            raise ValueError("state is empty")

        ver_list = sorted(state["version_list"], key=lambda x: x['downloadCount'], reverse=True)

        if recursive and self.max_number_of_choices < len(ver_list):
            Limit_choice = True
        else:
            Limit_choice = False

        if auto:
            result_dict = max(ver_list, key=lambda x: x['downloadCount'])
            ver_files_list = self.sort_by_version(result_dict["files"])
            return_dict = ver_files_list[0]
            self.path_dict["version_id"] = return_dict["id"]
            return return_dict
        else:
            if recursive:
                print("\n\n\033[34mThe following model paths were found\033[0m")
            else:
                print("\n\n\n")

            if len(ver_list) == 1:
                return ver_list

            max_number = min(self.max_number_of_choices, len(ver_list)) if recursive else len(ver_list)

            for number_, state_dict_ in enumerate(ver_list[:max_number]):
                print(f"\033[34m{number_ + 1}. model_version: {state_dict_['name']}, download: {state_dict_['downloadCount']}")

            if Limit_choice:
                max_number += 1
                print(f"{max_number}. Other than above")


            while True:
                try:
                    choice = int(input("Select the model path to use: "))
                except ValueError:
                    print("\033[33mOnly natural numbers are valid.\033[34m")
                    continue
                if Limit_choice and choice == max_number:
                    return self.version_select_civitai(state=state, auto=auto, recursive=False)
                elif 1 <= choice <= max_number:
                    return_dict = ver_list[choice - 1]
                    self.path_dict["version_id"] = return_dict["id"]
                    return return_dict["files"]
                else:
                    print(f"\033[33mPlease enter the numbers 1~{max_number}\033[34m")


    def file_select_civitai(self, state_list, auto,recursive:bool=True):
        """
        Return the download URL for the selected file.

        Parameters:
        - state_list: List of file information.
        - auto: Flag for automatic selection.

        Returns:
        - str: Download URL of the selected file.
        """
        if recursive and self.max_number_of_choices < len(state_list):
            Limit_choice = True
        else:
            Limit_choice = False

        if len(state_list) > 1 and (not auto):
            max_number = min(self.max_number_of_choices, len(state_list)) if recursive else len(state_list)
            for number, states_dict in enumerate(state_list[:max_number]):
                print(f"\033[34m{number + 1}. File_name: {states_dict['filename']}")

            if Limit_choice:
                max_number += 1
                print(f"{max_number}. Other than above")

            while True:
                try:
                    choice = int(input(f"Select the file to download[1~{max_number}]: "))
                except ValueError:
                    print("\033[33mOnly natural numbers are valid.\033[34m")
                    continue
                if Limit_choice and choice == max_number:
                    return self.file_select_civitai(state_list=state_list, auto=auto, recursive=False)
                elif 1 <= choice <= len(state_list):
                    self.path_dict["filename"] = state_list[choice - 1]["filename"]
                    return state_list[choice - 1]
                else:
                    print(f"\033[33mPlease enter the numbers 1~{len(state_list)}\033[34m")
        else:
            self.path_dict["filename"] = state_list[0]["filename"]
            return state_list[0]

    def civitai_save_path(self):
        """
        Set the save path using the information in path_dict.

        Returns:
        - str: Save path.
        """
        repo_level_dir = str(self.path_dict['repo_id'])
        file_version_dir = str(self.path_dict['version_id'])
        save_file_name = str(self.path_dict['filename'])
        save_path = os.path.join(self.base_civitai_dir, repo_level_dir, file_version_dir, save_file_name)
        logger.debug(f"save: {save_path}")
        return save_path


    def requests_civitai(self, query, auto, model_type):
        """
        Fetch models from Civitai based on a query and model type.

        Parameters:
        - query: Search query string.
        - auto: Flag for automatic selection.
        - model_type: Type of model to search for.
            arg:[Checkpoint,
                 TextualInversion,
                 Hypernetwork,
                 AestheticGradient,
                 LORA,
                 Controlnet,
                 Poses
                ]

        Returns:
        - str: Download URL of the selected file.
        (url, save_path)
        """
        state = []
        repo_list = []
        model_ver_list = []
        version_dict = {}

        params = {"query": query, "types": model_type, "sort": "Most Downloaded"}

        try:
            response = requests.get("https://civitai.com/api/v1/models", params=params)
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise HTTPError(f"Could not get elements from the URL. {err}")
        else:
            try:
                data = response.json()
            except AttributeError:
                raise ValueError("Invalid JSON response")

        items = data["items"]

        for item in items:
            for model_ver in item["modelVersions"]:
                files_list = []

                for model_value in model_ver["files"]:
                    if any(check_word in model_value for check_word in ["downloadUrl", "name"]):
                        file_status = {
                            "filename": model_value["name"],
                            "file_id": model_value["id"],
                            "download_url": model_value["downloadUrl"],
                        }
                        files_list.append(file_status)

                version_dict = {
                    "id": model_ver["id"],
                    "name": model_ver["name"],
                    "downloadCount": model_ver["stats"]["downloadCount"],
                    "files": files_list,
                }

                if files_list:
                    model_ver_list.append(version_dict)

            if all(check_txt in item.keys() for check_txt in ["name", "stats", "creator"]):
                state_dict = {
                    "repo_name": item["name"],
                    "repo_id": item["id"],
                    "favoriteCount": item["stats"]["favoriteCount"],
                    "downloadCount": item["stats"]["downloadCount"],
                    "CreatorName": item["creator"]["username"],
                    "version_list": model_ver_list,
                }

                if model_ver_list:
                    state.append(state_dict)

        if not state:
            raise ValueError("No matches found for your criteria")

        model_dict = self.repo_select_civitai(
            state = state,
            auto = auto
            )
        files_list = self.version_select_civitai(
            state = model_dict,
            auto = auto
            )

        file_status_dict = self.file_select_civitai(
            state_list = files_list,
            auto = auto)

        save_path = self.civitai_save_path()

        return (file_status_dict["download_url"], save_path)



class with_Flax(basic_config):
    def __init__(self):
        super().__init__()

    def sd_to_flax(self,url_or_path):
        hf_path,file_name,model_file_path="","",""
        #from_config or from_single_file
        if os.path.isfile(url_or_path):
            model_file_path=url_or_path

        #from_pretrain
        elif os.path.isdir(url_or_path):
            if os.path.exists(os.path.join(url_or_path, self.Config_file)):
                return url_or_path

            else:
                raise FileNotFoundError(f"model_index.json not found in '{url_or_path}'")
        else:
            raise FileNotFoundError("Invalid dir_path.")


        model_saved_dir = os.path.join(os.path.dirname(model_file_path),"converted")
        os.makedirs(model_saved_dir,exist_ok=True)
        if not os.path.isfile(os.path.join(model_saved_dir,"model_index.json")):
            print("Converting the model...")
            #from diffusers.pipelines.stable_diffusion.convert_from_ckpt import download_from_original_stable_diffusion_ckpt
            is_from_safetensors = self.check_for_safetensors(url_or_path)
            from diffusers.pipelines.stable_diffusion.convert_from_ckpt import download_from_original_stable_diffusion_ckpt
            save_pipeline = download_from_original_stable_diffusion_ckpt(url_or_path, from_safetensors = is_from_safetensors)
            #can not use output format :safetensors
            save_pipeline.save_pretrained(model_saved_dir, safe_serialization = False)
            print("End of model conversion")
            del save_pipeline
        return model_saved_dir

    def Flax_pipe_create(self,url_or_path):
        model_dir_path=self.sd_to_flax(url_or_path)
        model_index_path=os.path.join(model_dir_path,self.Config_file)
        with open(model_index_path, "r") as f:
            pipeline_class_name = json.load(f)["_class_name"]
        pipeline_class = getattr(diffusers, pipeline_class_name)
        logger.info(f"Pipeline class imported: {pipeline_class_name}.")
        try:
            base_pipe,base_params = FlaxDiffusionPipeline.from_pretrained(model_dir_path,
                                                                           dtype=jax.numpy.bfloat16,
                                                                           use_safetensors=True)

        except ValueError:
            raise ValueError("Insufficient memory.")
        params = replicate(base_params)
        return base_pipe,params


class Config_Mix(Huggingface,
                 Civitai,
                 with_Flax,
                 basic_config,
                 data_config,
                 config_check):
    def __init__(self):
        super().__init__()

class pipeline_setup(Config_Mix):

    def __init__(self):

        self.use_TPU = self.is_TPU()
        super().__init__()
        self.model_path = ""


    def File_search(self):
        """
        only single file
        """

        search_path=""
        paths = []
        for root, dirs, files in os.walk("/"):
            for file in files:
                if any(file.endswith(ext) for ext in self.exts):
                    path = os.path.join(root, file)
                    if path not in self.exclude:
                        if not path.startswith("/root/.cache"):
                            paths.append(path)
        num_path=len(paths)
        if not num_path:
            raise FileNotFoundError("\033[33mModel File not found\033[0m")
        else:
            print(f"{num_path} candidate model files found.")
        for s, path in enumerate(paths, 1):
            print(f"{s}: {path}")
        num = int(input(f"Please enter a number(1〜{num_path}): "))
        if 1 <= num <= len(paths):
            search_path=(paths[num-1])
            print(f"Selected model file: {search_path}\n")
        else:
            raise TypeError(f"\033[33mOnly natural numbers in the following range are valid : (1〜{len(paths)})\033[0m")
        return search_path


    def model_set(self,
                  model_select,
                  auto = False,
                  model_type = "Checkpoint",
                  branch = "main",
                  download: bool = False) -> list:
        """
        return:
        [model_path:str, {base_model_path: str,from_single_file: bool}]
        """

        if not model_type  in ["Checkpoint", "TextualInversion", "LORA", "Hypernetwork", "AestheticGradient", "Controlnet", "Poses"]:
            raise TypeError(f'Wrong argument. Valid values are "Checkpoint", "TextualInversion", "LORA", "Hypernetwork", "AestheticGradient", "Controlnet", "Poses". What was passed on {model_type}')
        local = True if download else False
        return_dict = {"base_model_path":model_select,
                       "from_dingle_file":False,
                       "local":local,
                       "model_url":"",
                       }
        show_url_or_path = ""
        from_single_file = False
        model_path = ""
        file_path = ""
        if model_select in self.model_dict:
            model_path_to_check = self.model_dict[model_select]
            if self.is_url_valid(f"https://huggingface.co/{model_path_to_check}"):
                model_select = model_path_to_check

        if model_select == "search":
            #only file
            model_path = self.File_search()
            return_dict["from_single_file"] = False
            return_dict["url_or_path"] = model_path

        elif model_select.startswith("https://huggingface.co/"):
            if not self.is_url_valid(model_select):
                raise ValueError(self.Error_M1)
            else:
                if download:
                    model_path = self.run_hf_download(model_select)
                    return_dict["from_single_file"] = False
                    return_dict["url_or_path"] = model_path
                else:
                    model_path = model_select
                    return_dict["from_single_file"] = True
                    return_dict["url_or_path"] = model_path

        elif model_select.startswith("https://civitai.com/"):
            #local file
            model_path = self.public_civiai(model_select,
                                            auto,
                                            model_type)
            return_dict["from_single_file"] = True

        elif os.path.isfile(model_select):
            model_path = model_select
            return_dict["from_single_file"] = True
            return_dict["local"] = True

        elif os.path.isdir(model_select):
            if os.path.exists(os.path.join(model_select,self.Config_file)):
                return_dict["model_path"] = model_select
                return_dict["from_single_file"] = False
                return_dict["local"] = True
            else:
                raise FileNotFoundError(f"model_index.json not found in {model_select}")

        elif model_select.count("/") == 1:
            if auto and self.diffusers_model_check(model_select):
                if download:
                    model_path = self.run_hf_download(model_select)
                    return_dict["from_single_file"] = False
                else:
                    model_path = model_select
                    return_dict["from_single_file"] = False
            elif auto and (not self.hf_model_check(model_select)):
                raise ValueError(f'The specified repository could not be found, please try turning off "auto" (model_select:{model_select})')
            else:
                file_path=self.file_name_set(model_select,auto,model_type)
                if file_path == "_hf_no_model":
                    raise ValueError("Model not found")
                elif file_path == "_DFmodel":
                    if download:
                        model_path = self.run_hf_download(model_select)
                        return_dict["from_single_file"] = False
                    else:
                        model_path = model_select
                        return_dict["from_single_file"] = False
                else:
                    hf_model_path=f"https://huggingface.co/{model_select}/blob/{branch}/{file_path}"
                    if download:
                        model_path = self.run_hf_download(hf_model_path)
                        return_dict["from_single_file"] = True

                    else:
                        model_path = hf_model_path
                        return_dict["from_single_file"] = True

        else:
            model_name = self.model_name_search(model_select,auto)
            #self.hf_repo_id = model_name
            #hf->civit
            if not model_name == "_hf_no_model":
                file_path = self.file_name_set(model_name,auto,model_type)
                if model_path == "_DFmodel":
                    if download:
                        model_path = self.run_hf_download(file_path)
                        return_dict["from_single_file"] = False
                    else:
                        model_path = model_name #f"https://huggingface.co/{model_name}"
                        return_dict["from_single_file"] = False

                else:
                    hf_model_path = f"https://huggingface.co/{model_name}/blob/{branch}/{file_path}"
                    if download:
                        model_path = self.run_hf_download(hf_model_path)
                        return_dict["from_single_file"] = True
                    else:
                        model_path = hf_model_path
                        return_dict["from_single_file"] = True


            else:
                model_url, model_path = self.civitai_download(
                    model_select,
                    auto,
                    model_type)

                return_dict["from_single_file"] = True
                return_dict["model_url"] = model_url

        if not return_dict["model_url"]:
            return_dict["model_url"] = model_path

        return [model_path,return_dict]



    def pipe_status_check(self,pipeline):
        from diffusers.pipelines.stable_diffusion import (StableDiffusionSafetyChecker,FlaxStableDiffusionSafetyChecker)
        from transformers import CLIPImageProcessor
        pipe_class_name_ = pipeline.__class__.__name__
        pipe_type = self.pipeline_metod_type(self.import_on_str(pipe_class_name_,"diffusers"))
        if hasattr(pipeline,"safety_checker"):
            if getattr(pipeline,"safety_checker") is None:
                if pipe_type == "flax":
                    pipeline.safety_checker = FlaxStableDiffusionSafetyChecker.from_pretrained(
                        "CompVis/stable-diffusion-safety-checker", from_pt=True
                    )
                elif pipe_type in ["torch", "onnx"]:
                    pipeline.safety_checker = StableDiffusionSafetyChecker.from_pretrained(
                        "CompVis/stable-diffusion-safety-checker"
                    )
        if hasattr(pipeline,"feature_extractor"):
            if getattr(pipeline,"feature_extractor") is None:
                pipeline.feature_extractor = CLIPImageProcessor.from_pretrained("openai/clip-vit-base-patch32")
        return pipeline


    def pipe_create(self,
                    model_path,
                    from_single_file):
        logger.debug(f"input_url; {self.input_url}")
        logger.debug(f"model_path; {self.model_path}")

        if from_single_file:
            #not from_confg
            base_pipe = StableDiffusionPipeline.from_single_file(
                model_path
                ).to(self.device)
        else:
            base_pipe = StableDiffusionPipeline.from_pretrained(
                model_path
                ).to(self.device)

        if self.device == "cuda":
            base_pipe.to(torch_dtype = torch.float16)

        return base_pipe

    def pipeline_task(self,model_select,auto):
        logger.debug(f"auto: {auto}")
        logger.debug(f"model_select: {model_select}")
        params = None
        model_path,model_dict = self.model_set(model_select,
                                               auto = auto,
                                               download = False)
        #model_path = model_dict["base_model_path"]
        from_single_file = model_dict["from_single_file"]

        update_model_path = self.check_func_hist(key="model_path",value=model_path)

        if self.use_TPU:
            try:
                base_pipe,params = self.Flax_pipe_create(model_path)
            except OSError as a:
                logger.debug(a)
                raise OSError("Check your internet connection")

        else:
            try:
                base_pipe = self.pipe_create(model_path, from_single_file)
            except OSError as a:
                logger.debug(a)
                raise OSError("Check your internet connection")
        base_pipe = self.pipe_status_check(base_pipe)
        return base_pipe, params, model_dict["model_url"]


pipe_set = pipeline_setup()
base_pipe, parmer, model_path = pipe_set.pipeline_task(
    model_select = model_select,
    auto = auto)

if device_type == "cpu" and base_pipe.dtype == torch.float16:
    if DEBUG:
        logger.warning("base_pipe.dtype is torch.float16 with cpu")
    else:
        raise RuntimeError("CPU cannot use base_pipe with half precision (torch.float16)")

gc.collect()

print("\033[32m\n------------------\n")
print(f"\033[32mmodel_path: {model_path}\033[0m\n")
print("\033[32mModel set-up has been completed.\033[0m")

Step2_finish = True

In [None]:
#@title  #Step.3 Pipeline Setup{display-mode: "form"}

# @markdown >Pipeline class set [#info](#Pipeline_class_set_help)

Pipeline_type = "txt2img" # @param ["txt2img", "img2img", "Inpaint", "txt2video"] {allow-input: true}

#@markdown * Select from pull-down menu

#@markdown * Enter pipeline class name

#@markdown >Scheduler set [#info](#Scheduler_select)

Scheduler_select = "DDIM" # @param ["DPM", "DDPM", "DDIM", "DEISM", "DPM_S", "EulerA", "Euler", "HeunD", "K_DPM2D", "K_DPM2AD", "LMSD", "PNDM", "UniPCM", "EulerA_with_sonar", "Euler_with_sonar"] {allow-input: true}

#@markdown >Vae set (Enter only if you want to exchange vae) [#info](#Vae_set)

vae_select = "" # @param ["waifu-diffusion", "Counterfeit-V2.5", "anything-v3.0"] {allow-input: true}

auto = True # @param {type:"boolean"}

#@markdown >Extra parameter

set_extra_parameter = False # @param {type:"boolean"}

#@markdown * If you change them, you can update the parameters by pressing Update Values.

#@markdown >Filter switching

#@markdown **Caution : Be careful when changing it**

Filter_off = False  # @param {type:"boolean"}



if "Step2_finish"not in globals():
    raise NameError("\033[33mPlease execute Step.2 first\033[0m")



class vae_set(Config_Mix):
    def __init__(self):
        self.use_input_url=False
        self.vae_path=""
        super().__init__()


    def vae_load(self,vae_model_name="",auto=False):
        if vae_model_name:
            model_names = self.model_name_search(vae_model_name,auto)
            vae_path = self.file_name_set(model_names,auto,download=True)
            if vae_path == "_DFmodel":
                vae_path = model_names
                single_file = False

            elif os.path.isfile(vae_path):
                single_file = True
            elif os.path.isdir(vae_path):
                single_file = False
            else:
                single_file = True

            if self.use_TPU:
                if single_file:
                    vae = FlaxAutoencoderKL.from_single_file(vae_path)
                else:
                    try:
                        vae = FlaxAutoencoderKL.from_pretrained(vae_path)
                    except Exception:
                        vae = FlaxAutoencoderKL.from_pretrained(vae_path , subfolder="vae")
            else:
                if single_file:
                    vae = AutoencoderKL.from_single_file(vae_path)
                else:
                    try:
                        vae = AutoencoderKL.from_pretrained(vae_path)
                    except Exception:
                        vae = AutoencoderKL.from_pretrained(vae_path , subfolder="vae")

        else:
            vae = self.base_pipe.vae
            vae_path = ""
        return vae, vae_path



class Scheduler_set(Config_Mix):
    Scheduler_dict={
            "DDPM": "DDPMScheduler",
            "DDIM": "DDIMScheduler",
            "PNDM": "PNDMScheduler",
            "LMSD" : "LMSDiscreteScheduler",
            "DPM":  "DPMSolverMultistepScheduler",
            "EulerA": "EulerAncestralDiscreteScheduler",
            "Euler": "EulerDiscreteScheduler",
            "DEISM":"DEISMultistepScheduler",
            "UniPCM":"UniPCMultistepScheduler",
            "K_DPM2D":"KDPM2DiscreteScheduler",
            "DPM_S":"DPMSolverSinglestepScheduler",
            "K_DPM2AD":"KDPM2AncestralDiscreteScheduler",
            "HeunD":"HeunDiscreteScheduler",
            }
    Special_Scheduler_dict={
            "Euler_with_sonar":"Euler_Scheduler_with_sonar",
            "EulerA_with_sonar":"EulerA_Scheduler_with_sonar",
            }
    def __init__(self):
        super().__init__()
        self.scheduler_name = ""
    """
    def Euler_sh_set(self,Scheduler_name):
        '''
        Args:
            Scheduler_name: Either "CustomEuler" or "CustomEulerA".
        '''
        assert Scheduler_name in ["CustomEuler", "CustomEulerA"]
        if not os.path.exists("./script/Euler_mod"):
            os.makedirs("./script/Euler_mod",exist_ok=True)
            !git clone https://github.com/alexblattner/modified-euler-samplers-for-sonar-diffusers.git ./script/Euler_mod
        path_1 = "./script/Euler_mod/EulerANew.py"
        path_2 = "./script/Euler_mod/EulerNew.py"
        if path_1 not in sys.path:
            sys.path.append(path_1)
            sys.path.append(path_2)
        os.chdir("./script/Euler_mod")
        if Scheduler_name=="CustomEulerA":
            !python EulerANew.py
            Scheduler_object=self.import_on_str("EulerA",module_name="EulerANew")
        else:
            !python EulerNew.py
            Scheduler_object=self.import_on_str("Euler",module_name="EulerNew")
        os.chdir("../..")
        return Scheduler_object
    """


    def scheduler_setup(self,Scheduler_select):
        if Scheduler_select in self.Special_Scheduler_dict:
            if not os.path.isdir("/content/script/Euler_sonar"):
                !git clone -q  https://github.com/alexblattner/modified-euler-samplers-for-sonar-diffusers.git /content/script/Euler_sonar
            from Euler_sonar.EulerNew import Euler as Euler_Scheduler_with_sonar
            from Euler_sonar.EulerANew import EulerA as EulerA_Scheduler_with_sonar
            if Scheduler_select == "Euler_with_sonar":
                Scheduler_class = Euler_Scheduler_with_sonar
            else:
                Scheduler_class = EulerA_Scheduler_with_sonar
        else:
            logger.debug(f"scheduler: {Scheduler_select}")
            if Scheduler_select in self.Scheduler_dict:
                Scheduler_select = self.Scheduler_dict[Scheduler_select]
            try:
                Scheduler_class = getattr(diffusers, Scheduler_select)
            except AttributeError:
                _error = self.make_Error_message(Scheduler_select,dir(diffusers),need_txt="Scheduler")
                raise AttributeError(f'"{Scheduler_select}" not found. Maybe "{_error}" ?')
        return Scheduler_class



class make_main_pipe(Scheduler_set,
                     vae_set,
                     Config_Mix
                     ):
    def __init__(self,
                 pipe_name,
                 Filter_off,
                 Scheduler_select,
                 vae_select,
                 auto,
                 base_pipe,
                 parmer):
        super().__init__()
        self.pipeline_name = self.pipeline_name_convert(pipe_name)
        self.pipeline_class = self.pipeline_class_set(self.pipeline_name)
        self.pipeline_type = self.pipeline_metod_type(self.pipeline_class)
        self.Scheduler_class = super().scheduler_setup(Scheduler_select)
        self.Filter_off = Filter_off
        self.base_pipe = base_pipe
        self.parmer = parmer
        self.vae,self.vae_path = self.vae_load(vae_select, auto)
        self.auto = auto
        self.pipe_args_setup = {}
        self.stetas = {}
        self.from_pipe_args = {}
        self.main_name = ""
        self.pipe_module = None



    def make_Error_message(self,
                           base_txt,
                           list_obj,
                           need_txt=""):
            """
            Args:
            base_txt : base_txt: Returns the string most similar to the one specified here. Note that if "need_txt" is specified, it must be included.
            list_obj : Search list for candidate strings
            need_txt : Strings that must be included in the candidate list
            """
            if not list_obj:
                list_obj = dir(diffusers)

            if need_txt:
                _diffusers_module = self.sort_list_obj(list_obj,need_txt)
            else:
                _diffusers_module = (dir(diffusers))
            logger.debug(type(_diffusers_module))
            pretxt = self.max_temper(base_txt,_diffusers_module)
            if pretxt:
                return pretxt[0]
            else:
                raise AttributeError("Please try another pipeline")

    def pipeline_name_convert(self,pipeline_name: str):
        #"StableDiffusionPipeline"
        pipe_class_dict={
            "txt2img":"AutoPipelineForText2Image",
            "img2img":"AutoPipelineForImage2Image",
            "Inpaint":"AutoPipelineForInpainting",
            "txt2video":"TextToVideoZeroPipeline",
            }
        #FlaxStableDiffusionPipeline,FlaxStableDiffusionImg2ImgPipeline
        Flax_pipe_class_dict={
        "txt2img":"FlaxStableDiffusionPipeline",
        "img2img":"FlaxStableDiffusionImg2ImgPipeline",
        "Inpaint":"FlaxStableDiffusionInpaintPipeline",
        "txt2video": "",
        }
        params = None
        if self.use_TPU:
            if pipeline_name == "txt2video":
                raise ValueError("Video generation in TPU requires direct specification of the class. (At the time of development, there was no class for video generation in TPU)")
            elif pipeline_name in Flax_pipe_class_dict:
                pipeline_name = Flax_pipe_class_dict[pipeline_name]
        else:
            if pipeline_name in pipe_class_dict:
                pipeline_name = pipe_class_dict[pipeline_name]

        return pipeline_name


    def pipeline_class_set(self,
                           pipeline_name : str):
        try:
            pipe_class = getattr(diffusers, pipeline_name)
        except AttributeError:
            pretxt = self.make_Error_message(pipeline_name , dir(diffusers),need_txt="Pipeline")
            raise AttributeError(f"'{pipeline_name}' not found. Maybe '{pretxt}' ?")
        return pipe_class


    def main_pipe_set(self):
        txt2img_pipe,img2img_pipe,txt2video_pipe,Inpaint_pipe,safe_pipe=None,None,None,None,None
        textual_inversion_dict ={
            "EasyNegativeV2.safetensors": "EasyNegative",
            "bad-hands-5.pt": "bad-hands",
            }
        init_method_list = [
            "text_encoder",
            "tokenizer",
            "unet",
            "image_encoder",
            ]

        logger.debug(self.Scheduler_class)
        self.stetas["vae"] = self.vae
        self.stetas["scheduler"] = self.Scheduler_class.from_config(self.base_pipe.scheduler.config)
        logger.debug(self.stetas["scheduler"])
        stetas_check = self.get_call_method(class_name=self.pipeline_name,method_name='__init__')
        for init_method in init_method_list:
            if ((init_method in stetas_check) and
                (hasattr(self.base_pipe, init_method))):
                self.stetas[init_method] = getattr(self.base_pipe, init_method)
        if self.Filter_off:
            if "feature_extractor" in stetas_check:
                self.stetas["feature_extractor"] = None
                self.from_pipe_args["feature_extractor"] = None

            if "safety_checker" in stetas_check or (self.pipeline_name in self.Auto_pipe_class):
                self.stetas["safety_checker"] = None

            elif "load_safety_checker" in stetas_check:
                self.stetas["load_safety_checker"] = False

            elif "requires_safety_checker" in stetas_check:
                 self.stetas["requires_safety_checker"] = False

            else:
                print(f'"{self.pipeline_name}"ではsatety_checkerの指定が使用できません')
        else:
            if "feature_extractor" in stetas_check:
                self.stetas["feature_extractor"] = self.base_pipe.feature_extractor

            if "safety_checker" in stetas_check or (self.pipeline_name in self.Auto_pipe_class):
                self.stetas["safety_checker"] = self.base_pipe.safety_checker

            if "load_safety_checker" in stetas_check:
                self.stetas["load_safety_checker"] = True



        logger.debug(f"steate: {self.stetas.keys()}")
        if "from_pipe" in dir(self.pipeline_class):
            use_from_pipe = True
        else:
            use_from_pipe = False
        if self.use_TPU:
            if use_from_pipe:
                if self.Filter_off:
                    self.base_pipe.safety_checker = None
                self.base_pipe.sheduler = self.stetas["scheduler"]
                self.base_pipe.vae = self.vae.to(torch.float16)
                main_pipe = self.pipeline_class.from_pipe(self.base_pipe).to(dtype=jax.numpy.bfloat16)
                for key,value in self.stetas.items():
                    setattr(main_pipe,key,value)

            else:
                main_pipe = self.pipeline_class(**self.stetas,dtype=jax.numpy.bfloat16)

        else:
            if use_from_pipe:

                if self.Filter_off:
                    self.base_pipe.safety_checker = None
                main_pipe = self.pipeline_class.from_pipe(self.base_pipe).to(self.device, torch.float16)
                for key,value in self.stetas.items():
                    setattr(main_pipe,key,value)
            else:
                main_pipe = self.pipeline_class(**self.stetas).to(self.device, torch.float16)



        if hasattr(main_pipe.__class__,"load_textual_inversion"):
            if "EasyNegative" not in main_pipe.tokenizer.get_vocab():
                try:
                    main_pipe.load_textual_inversion("embed/negative", weight_name="EasyNegativeV2.safetensors", token="EasyNegative")
                except Exception:
                    logger.info("EasyNegativeの適用に失敗しました")
            if "bad-hands" not in main_pipe.tokenizer.get_vocab():
                try:
                    main_pipe.load_textual_inversion("embed/negative", weight_name="bad-hands-5.pt", token="bad-hands")
                except Exception:
                    logger.info("bad-handsの適用に失敗しました")


        if self.device == "cpu" and hasattr(self.pipeline_class,"enable_model_cpu_offload"):

            main_pipe.enable_model_cpu_offload()

        if hasattr(self.pipeline_class,"fuse_qkv_projections"):
            #torch.compile -> bug
            main_pipe.fuse_qkv_projections()

        if self.device_type == "TPU":
            main_pipe.to(dtype=jax.numpy.bfloat16)
        else:
            main_pipe.to(device=self.device_type,dtype=torch.float16)

        return main_pipe

MMP=make_main_pipe(pipe_name = Pipeline_type,
                   Filter_off = Filter_off,
                   Scheduler_select = Scheduler_select,
                   vae_select = vae_select,
                   auto = auto,
                   base_pipe = base_pipe,
                   parmer = parmer)
main_pipe = MMP.main_pipe_set()



class PipeSetUI:
    def __init__(self):
        self.del_key_list = [
            "self", "prompt", "prompt_ids", "negative_prompt", "neg_prompt", "device",
            "guidance_scale", "num_inference_steps", "generator", "height", "width", "image",
            "strength", "t0", "t1", "video_length", "num_frames", "prng_seed", "params",
            "return_dict", "callback_on_step_end_tensor_inputs","timesteps", "sigmas"
        ]
        self.widgets_dict = {}


    def display(self, pipeline, input_dict):
        args_info = self.get_call_args_info(pipeline)
        self.display_args(args_info, input_dict)


    def get_call_args_info(self, pipeline):
        if not hasattr(pipeline.__class__, '__call__'):
            raise ValueError("pipeline does not have a __call__ method")
        else:
            sig = inspect.signature(pipeline.__call__)

        args_info = {}
        for name, param in sig.parameters.items():
            if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
                continue

            arg_info = {
                'type': param.annotation if param.annotation != inspect.Parameter.empty else None,
                'default': param.default if param.default != inspect.Parameter.empty else None
            }
            args_info[name] = arg_info
        return args_info


    def on_button_click(self, _):
        for arg, widget in self.widgets_dict.items():
            parameter_dict[arg] = widget.value if widget.value != 'None' else None
        output.clear(output_tags="button_click_txt")
        with output.use_tags("button_click_txt"):
            sys.stdout.write("\n\033[34m----------------------\n")
            sys.stdout.write("Updated parameter \n\n")
            for key, value in parameter_dict.items():
                sys.stdout.write(f"{key}: {value}\n\n")
                sys.stdout.flush();
            sys.stdout.write("\033[0m")


    def extract_union_options(self, union_type):
        options = set()
        for ut in union_type.__args__:
            if hasattr(ut, '__origin__') and ut.__origin__ == Union:
                options.update(self.extract_union_options(ut))
            elif ut is type(None):
                options.add('None')
            elif isinstance(ut, type):
                options.add(ut.__name__)
            else:
                options.add(str(ut))
        return options


    def display_args(self, args_info, input_dict):
        widget_list=[]
        for checked_key in list(args_info.keys()):
            if checked_key in self.del_key_list:
                del args_info[checked_key]

        for arg, info in args_info.items():
            if not isinstance(info["type"], type):
                arg_type = tuple(t for t in get_args(info["type"]) if t is not type(None))
                if len(arg_type) == 1:
                    info["type"] = arg_type[0]
                else:
                    info["type"] = arg_type

            if info['type'] == int:
                widget = widgets.IntText(
                    value=info['default'],
                    description=arg,
                    disabled=False,
                )
            elif info['type'] == float:
                widget = widgets.FloatText(
                    value=info['default'],
                    description=arg,
                    disabled=False,
                )
            elif info['type'] == str:
                widget = widgets.Text(
                    value=str(info['default']),
                    description=arg,
                    disabled=False,
                )
            elif info['type'] == bool:
                widget = widgets.Checkbox(
                    value=info['default'] if isinstance(info['default'], bool) else True,
                    description=arg,
                    disabled=False,
                )
            else:
                continue

            self.widgets_dict[arg] = widget
            widget_list.append(widget)
            self.widgets_dict[arg] = widget

        for widget in widget_list:
            if isinstance(widget, (widgets.Text, widgets.IntText, widgets.FloatText)):
                display(widget)

        for widget in widget_list:
            if isinstance(widget, widgets.Checkbox):
                display(widget)

        button = widgets.Button(description="Update Values",button_style="info")
        button.on_click(self.on_button_click)
        display(button)

"""
ANSI Escape Code List

Red: \033[31m
Yellow: \033[33m
Blue: \033[34m
Green: \033[32m
white(default): \033[0m
Light-blue: \033[38;2;0;255;255m
Yellowish-green: \033[38;2;74;229;110m
"""


if main_pipe.dtype == torch.float16 and device_type == "cpu":
    print("\033[31mWarning: float16 pipeline is not available on CPU\033[0m")

if Scheduler_select not in Scheduler_set().Special_Scheduler_dict:
    warning_txt = " or ".join(Scheduler_set().Special_Scheduler_dict.keys())
    print(f'\n\nMoment is only valid if Scheduler_select is either {warning_txt}.\n')

if Filter_off == False:
    filter_level = "Filter: Enabled"
else:
    filter_level = "\033[33mFilter: Disabled\033[0m"

print("\033[34m____________________________________________________________________________\n\n")

print(f"model_path: {model_path}"+"\n\n")

print(f"scheduler: {main_pipe.scheduler.__class__.__name__}\n\n")

if vae_select:
    print(f"vae_path: {MMP.vae_path}\n\n")

print(f"pipeline_class: {main_pipe.__class__.__name__}\n\n")

print(filter_level+"\n\n")

print("\033[32mReady for generation\033[0m")


parameter_dict = {}
if set_extra_parameter:
    print("\n\033[34m____________________________________________________________________________\n")
    print("Extra parameter\033[0m")
    PipeSetUI().display(main_pipe.__class__, parameter_dict)


Step3_finish = True

#Option

<details>
<summary>Note</summary>

* Available only if **Step.3** has already been performed.

* If Step.3 is performed again, the runtime will be initialized, so the option must be applied again.

</details>


In [None]:
#@title  (option) Load Lora{display-mode: "form"}

model_name = "" # @param {type:"string"}

auto = False # @param {type:"boolean"}

#@markdown ---

unload_lora = False # @param {type:"boolean"}


if not unload_lora:
    LORA_dict = pipe_set.model_set(
        model_select = model_name,
        auto=auto,
        model_type="LORA",
        download=True)
    LORA_PATH = LORA_dict["model_path"]
    try:
        main_pipe.load_lora_weights(LORA_PATH)
    except Exception:
        raise ValueError("Failed to load LORA")
else:
    try:
        main_pipe.unload_lora_weights()
    except Exception:
        raise ValueError("Lora is not installed")

print("\033[32mLora successfully loaded.\033[0m")



In [None]:
#@title   (option) load textual inversion {display-mode: "form"}

model_path = "" # @param {type:"string"}

#weight_name= "" # @param {type:"string"}

token = "" # @param {type:"string"}

auto = False # @param {type:"boolean"}

#@markdown ---

unload_textual_inversion = False # @param {type:"boolean"}

class load_textual_inversion_cls(pipeline_setup):
    def __init__(self):
        super().__init__()
        self.tokenizer_names = ["tokenizer",
                               "tokenizer1",
                               "tokenizer2",]
    def load_textual(
            self,
            pretrained_path,
            token,
            auto,
            main_pipe):
        state={}

        tokenizer_list = []


        for tokenizer_name in self.tokenizer_names:
            base_tokenizer = getattr(main_pipe, tokenizer_name, None)
            if base_tokenizer is not None:
                tokenizer_list.append(base_tokenizer)

        assert tokenizer_list, "Tokenizer is not Found"

        check_tokenizer = tokenizer_list[0]

        if token in check_tokenizer.get_vocab():
            raise ValueError("Token has been used, please select another token.")
        elif os.path.isfile(pretrained_path):
            state["pretrained_model_name_or_path"] = pretrained_path
            state["token"] = token
            #state["weight_name"]=weight_name
        else:
            embed_path = self.model_set(model_select=pretrained_path,
                                        model_type="TextualInversion",
                                        auto=auto,
                                        download=True)
            state["pretrained_model_name_or_path"] = embed_path
            state["token"] = token
            #state["weight_name"] = self.weight_name
        print(state)

        main_pipe.load_textual_inversion(**state)
        print(f"CustomToken: {','.join(token_list)}\n")
        return main_pipe


    def unload_textual(
            self,
            token,
            main_pipe):
        if token:
            print(f"Only the following tokens will be unloaded: {token}")
            print("If you want to unload all tokens together, empty the tokens")
        try:
            main_pipe.unload_textual_inversion(token)
        except ValueError:
            raise ValueError("Unloadable Lora does not exist")
        return main_pipe




#ct=text_cadd_textual(Repo_id_or_path,weight_name,token,main_pipe)
textual_cls = load_textual_inversion_cls()
if not unload_textual_inversion:
    main_pipe = textual_cls.load_textual(
        pretrained_path=model_path,
        token=token,
        auto=auto,
        main_pipe=main_pipe)
else:
    main_pipe = textual_cls.unload_textual(
        token=token,
        main_pipe=main_pipe)





#Info
If you have any questions, please read the description in the sub-function at the bottom.

In [None]:
#@title  #Step.4 Generation_Step{display-mode: "form"}

##@markdown ># Basic Config

# @markdown >Basic Status [#info](#Prompt_special_tokens_help)


prompt = "Please draw a beautiful Mount Fuji with the sun rising from the summit" # @param ["smail,girl, white hair, medium hair, cat ears, looking at viewer, :3, cute,white_dress", "smail,1girl, {white,blue,red,purple} hair, medium hair, cat ears, looking at viewer, :3, cute,white_dress,loli", "Please draw a beautiful Mount Fuji with the sun rising from the summit", "Earth, space, high resolution"] {allow-input: true}

negative_prompt = ""  # @param {type:"string"}

seed = -1 # @param {type:"number"}

num_imgs = 4 # @param {type:"integer"}

input_image_path_or_dir = "" # @param {type:"string"}

guidance_scale = 7 # @param {type:"slider", min:5, max:15, step:0.5}


# @markdown >Special Status [#info](#Prompt_assistant_help)


num_inference_steps = 30  # @param {type:"integer"}
height = "512" #@param ["480","512","600", "768","800", "1080","1152", "1440","1920", "3840","4000","7680"] {allow-input: true}
width = "512" #@param ["480","512","600", "768","800", "1080","1152", "1440", "1920", "3840","4000","7680"] {allow-input: true}

text_generate_model = "gpt2-prompt-generator" #@param ["None","MagicPrompt-Stable-Diffusion","anime-anything-promptgen-v2","gpt2-prompt-generator"]

#@markdown

grit_image_width = 2 # @param {type:"slider", min:0, max:10, step:1}
#@markdown * If 0, it will not be created.

# @markdown >output config [#info](#File_name_Special_tokens_help)

save_dir = "" # @param ["/content/drive/MyDrive/"] {allow-input: true}

file_name = "" #@param {type:"string"}

#@markdown > video config

video_length = 10 # @param {type:"integer"}

video_fps = 10 # @param {type:"integer"}

num_frames = 9 # @param {type:"integer"}

#@markdown >Sonar config <a name = "Sonar_config"></a>
momentum = 0.95 # @param {type:"slider", min:0.8, max:1, step:0.05}

momentum_hist = -0.1 # @param {type:"slider", min:-1, max:1, step:0.1}

history_d = "rand_init" # @param ["rand_new", "rand_init"]

# @markdown >Option
Recommended_setting = True #@param {type:"boolean"}

show_result = True  # @param {type:"boolean"}


Hide_warnings = True #@param {type:"boolean"}



if (Recommended_setting and
 (momentum-momentum_hist)<=0.3 and
 (Scheduler_select in Scheduler_set().Special_Scheduler_dict.keys())):
        momentum_hist=momentum-0.3
        print(f"Momentum_hist is set to '{momentum_hist}' because the difference between momentum_hist and momentum is less than 3.\n This process is a feature that comes with the 'recommended settings'")


if (not Recommended_setting) and (not Hide_warnings):
    print('\033[33mRecommended_setting is off')



def run_html_js(path,moji):
    import datetime
    num=len(path)
    now_a = datetime.datetime.now()
    datetimes = now_a.strftime("%Y%m%d%H%M%S")
    html_dis = f'''
    <style>
      #clipborad-text-{datetimes} {{
        border: none;
        color: #0ff;
        font-size: 15px;
      }}
    </style>
    <span style="color: #4ae56e" font-size:16px>{moji}</span> <!-- <p>タグを<span>タグに変更 -->
    <input type="text" value="{path}" id="clipborad-text-{datetimes}" size="{num}" readonly> <!-- size属性とid属性を追加 -->
    <button id="copy-button-{datetimes}" onclick="copyToClipboard('{datetimes}')">Copy</button> <!-- id属性とonclick属性を追加 -->
    '''
    js_code = '''
    function copyToClipboard(datetimes) {
      var copyText = document.getElementById("clipborad-text-" + datetimes); // datetimeを結合
      var copyButton = document.getElementById("copy-button-" + datetimes); // datetimeを結合
      copyText.select();
      navigator.clipboard.writeText(copyText.value);
      document.execCommand("copy");
      copyButton.textContent = "Copied!"; // ボタンの文字を変更
      setTimeout(function() {
        copyButton.textContent = "Copy"; // 1秒後に元に戻す
        }, 1000);
    }
    '''
    display(HTML(html_dis))
    display(HTML('<script>{}</script>'.format(js_code)))





class Scheduler_setup(make_main_pipe):
    def __init__(self,main_pipe):
        self.main_pipe=main_pipe


    def moment_set(self,Scheduler_select):
        if Scheduler_select in self.Special_Scheduler_dict:
            self.main_pipe.scheduler.history_d =history_d
            self.main_pipe.scheduler.momentum = momentum
            self.main_pipe.scheduler.momentum_hist = momentum_hist


sc_set=Scheduler_setup(main_pipe)
sc_set.moment_set(Scheduler_select)



class txt_model_setup(basic_config):
    def __init__(self,
                 base_prompt,
                 n_prompt,
                 num_imgs,
                 text_generate_model,
                 main_pipe,
                 Recommended_setting):
        #global Prompt_list , make_images_list
        #self.use_TPU=use_TPU
        super().__init__()
        self.prompt_list=[]
        self.num_imgs = num_imgs
        self.base_prompt = base_prompt
        self.n_prompt = n_prompt
        self.text_generate_model = text_generate_model
        self.main_pipe = main_pipe
        self.Recommended_setting = Recommended_setting

        self.good_word = "masterpiece:2.0,best quality,high quality,"

        self.SP_word_dict = {"<color>":["Red","Blue","green","yellow","orange","purple","pink","brown","gray","black","white",]
                             }


    def txt_pipe(self,text_generate_model):
        global gl_txt_pipe

        txt_pipe_dict={"MagicPrompt-Stable-Diffusion":"Gustavosta/MagicPrompt-Stable-Diffusion",
                       "anime-anything-promptgen-v2":"FredZhang7/anime-anything-promptgen-v2",
                       "gpt2-prompt-generator":"Ar4ikov/gpt2-medium-650k-stable-diffusion-prompt-generator",
                       "None":"None"}


        assert text_generate_model in txt_pipe_dict , f"text_generate_model: {text_generate_model}"

        txt_model_name = txt_pipe_dict[text_generate_model]

        try:
            if txt_model_name != "None" and (not globals().get('gl_txt_pipe') or not self.key_check(txt_model_name)):
                if self.use_TPU:
                    txt_model = FlaxAutoModelForCausalLM.from_pretrained(txt_model_name)
                else:
                    txt_model = AutoModelForCausalLM.from_pretrained(txt_model_name).to(self.device)
                txt_tokenizer = AutoTokenizer.from_pretrained(txt_model_name)
                gl_txt_pipe = pipeline('text-generation', model=txt_model, tokenizer=txt_tokenizer, device=self.device, pad_token_id=50256)
            elif "gl_txt_pipe" in globals():
                gl_txt_pipe = globals()["gl_txt_pipe"]
            else:
                gl_txt_pipe = None
        except Exception as e:
            logger.error(f"Error in setting up txt_pipe: {e}")
            raise e

        return gl_txt_pipe


    def sp_word_replace(self,base_prompt):
        for sp_key, sp_value in self.SP_word_dict.items():
            for co in base_prompt:
                if isinstance(sp_value, list):
                    sp_value = random.choice(sp_value)
                base_prompt = re.sub(sp_key, sp_value, base_prompt, count=1)

        return base_prompt


    def convert_special_word(self,base_prompt):
        base_prompt = self.sp_word_replace(base_prompt)
        contents = re.findall("\\{(.*?)\\}", base_prompt)
        for count, content in enumerate(contents):
            if not content:
                continue
            lst = content.split(",")
            choice = random.choice(lst)
            base_prompt = re.sub("\\{.*?\\}", choice, base_prompt, count=1)

        return base_prompt


    def prompt_pipe_convert(self,prompt):
        n_word=["EasyNegative","bat_hands","white background"," simple background","logo","text","Loss of eye highlights","freckles","simple background","Fingers fused together","Writing Sweet Fingers","bad hands","bad legs","worst quality","low quality","Not five fingers","blurred","Missing finger","Simple background","Cat with deformed face","medium quality","purple hair","Loss of eye highlights","Fingers fused together","Writing Sweet Fingers","deleted","lowres","Low quality animals","deformed animals","hands emerging from impossible places","bad anatomy","more than three limbs hands/legs","low resolution","blurry","absurdres","pixelated","sketchy","nonsensical anatomy","unrealistic pose","mosaic","unclear details","distorted colors","unrealistic proportions","poor quality","fuzzy","missing head:1.6","out of focus","hazy","grainy","text","error","missing fingers:0.9","extra digit","fewer digits","cropped","jpeg artifacts","signature","watermark","username","standard quality","bad feet_hand_finger_leg_eye","bad","text font ui","bad shadow","poorly drawn","black-white","ugly","duplicate","mutation","mutilated","malformed mutated:1.1","malformed:1.1","The background is incoherent","simple background","low-quality background","low background","bad body","long body","broken limb","anatomical nonsense","extra limbs","missing limb","incorrect limb","multiple heads","twisted head","poorly drawn face","1 unit with multiple heads:1.3","heads together:1.0","abnormal eye:1.2 proportion","cropped:1.0","bad eyes","fused eyes","poorly drawn eyes","bad mouth","poorly drawn mouth","bad tongue","too long tongue","bad ears","poorly drawn ears","extra ears","heavy ears","long neck","too thick neck","bad neck","bad breasts","missing arms","disappearing arms","extra arms","three arms:2.0","mutated hands and fingers","fused hand","missing fingers","extra digits","huge thighs","disappearing thigh","missing thighs","extra thighs","bad feet","huge calf","disappearing legs","bad gloves","fused gloves","beard","artist name","text watermark","unnatural","obviously wrong","distorted face","floating hair","floating body parts","severed body parts","incorrect leg position","deformed","fused body and hands","disregard of physics","distorted shape","doll-like object not present in the image","body fusion","abnormal fingers","fingers resembling fish fins","dot eyes","unclear background","mosaic","body bending","incorrect leg-to-torso ratio","excessively large breasts","unsettling appearance","eyes filled with solid color","lack of lower body","splitting","creepy doll-like appearance","distorted eyes","lines on the skin","legs bending in unnatural directions","abnormal finger count","missing arms","floating hands","lack of nose or mouth","incorrect body part ratios","bad","longbody","lowres","bad anatomy","bad hands","missing fingers","Distorted eye contour","Missing part from the ankles onward","extra digit","fewer digits","split wings","Vampire wings floating in the air","bad wing","wonder egg priority","egg priority","demon"]
        if self.n_prompt:
            words =self.n_prompt.split(",")
            n_word.extend(words)
        if self.Recommended_setting:
            max_length=64
        else:
            max_length=74

        gl_txt_pipeline = self.txt_pipe(self.text_generate_model)

        converted_prompt = gl_txt_pipeline(prompt,
                                           max_length=max_length,
                                           truncation=True,
                                           num_return_sequences=10,
                                           repetition_penalty=1.2,
                                           early_stopping=False,
                                           do_sample=True,
                                           temperature=0.4,
                                           top_k=10)

        converted_prompt = converted_prompt[0]['generated_text']
        return converted_prompt


    def prompt_converting(self,base_prompt):
        sp_word_converted = self.convert_special_word(base_prompt)

        if not self.text_generate_model == "None":
            sp_word_converted = self.prompt_pipe_convert(sp_word_converted)

        return_prompt = re.sub(r",+", ",", sp_word_converted)
        return return_prompt


    def Flax_prompt(self,prompt):
        #print(prompt)
        num_samples = jax.device_count()
        prompt = num_samples * [prompt]
        prompt_ids = self.main_pipe.prepare_inputs(prompt)
        prompt_ids = shard(prompt_ids)
        return prompt_ids


    def prompt_processing(self,base_prompt,output_list):
        if self.pipeline_type == "txt2video":
            logger.debug("set num_imgs = 1")
            self.num_imgs = 1

        for number in range(self.num_imgs):
            converted_prompt = self.prompt_converting(base_prompt)


            if self.use_TPU:
                converted_prompt = self.Flax_prompt(converted_prompt)
            #elif self.Recommended_setting:
            #    converted_prompt = self.good_word + converted_prompt
            output_list.put(converted_prompt)


    def use_over_token(self,PROMPT,pipe):
        max_length = pipe.tokenizer.model_max_length
        input_ids = pipe.tokenizer(PROMPT, return_tensors="pt").input_ids
        input_ids = input_ids.to("cuda")

        negative_ids = pipe.tokenizer("", truncation=False, padding="max_length", max_length=input_ids.shape[-1], return_tensors="pt").input_ids
        negative_ids = negative_ids.to("cuda")

        concat_embeds = []
        neg_embeds = []
        for i in range(0, input_ids.shape[-1], max_length):
            concat_embeds.append(pipe.text_encoder(input_ids[:, i: i + max_length])[0])
            neg_embeds.append(pipe.text_encoder(negative_ids[:, i: i + max_length])[0])

        #prompt_embeds = torch.cat(concat_embeds, dim=1)
        negative_prompt_embeds = torch.cat(neg_embeds, dim=1)
        return negative_prompt_embeds



class generate_config(basic_config):
    def __init__(self):
        super().__init__()
        self.png_num = 0
        self.mp_num = 0


    def find_max_num(self,
                     base_file_name,
                     directory,
                     ext):
        max_num = 0
        pattern = re.compile(f"{base_file_name}-(\\d+)\\.{ext}")
        for filename in os.listdir(directory):
            filepath = os.path.join(directory, filename)
            if os.path.isfile(filepath):
                match = pattern.match(filename)
                if match:
                    z = int(match.group(1))
                    max_num = max(max_num, z)
        return max_num


    def image_grid(self,
                   imgs :list,
                   cols : int):
        """
        cols: width
        """
        all_num=len(imgs)
        if all_num<cols:
            cols=all_num
        if all_num>1:
            am=0
            w, h = imgs[0].size
            rows,b= divmod(all_num,cols)
            if b!=0:
                rows+=1
                am=cols-b
                white_img = Image.new("RGB", (w,h), (255, 255, 255))
                for x in range(am):
                    imgs.append(white_img)
            grid = Image.new('RGB', size=(cols*w, rows*h))
            grid_w, grid_h = grid.size
            for i, img in enumerate(imgs):
                grid.paste(img, box=(i%cols*w, i//cols*h))
        elif all_num==1:
            grid=None
            if self.make_grid:
                print("Unable to create a grid image due to a single image")
        else:
            grid=None
            if self.make_grid:
                print("There are no images available for the grid image")
        return grid


    def get_save_path(self,
                      saved_path,
                      result_type="image"):
        """
        args:
        result_type = [image,video,music]
        """
        dir_type = {"image":"Images",
                    "video":"Video",
                    "music":"Music",}
        make_dir_name = dir_type[result_type]

        if not saved_path:
            saved_path = "/content/Generated"
            if not self.Hide_warnings:
                print(f"The directory to save to has not been entered yet, so save to the following path: {saved_path}")
        elif not self.conect_gdrive:
            if "/content/drive/MyDrive" in saved_path:
                saved_path = "/content/Generated"
                if not self.Hide_warnings:
                    print(f"\033[31mSince Google Drive is not mounted, save to the following path: {saved_path}\033[0m")

        image_save_dir = os.path.join(saved_path,make_dir_name)
        os.makedirs(image_save_dir, exist_ok=True)
        if result_type == "image":
            Grid_save_dir = os.path.join(saved_path,"Grid")
            os.makedirs(Grid_save_dir, exist_ok=True)
            base_file_name ="genrated_grid"
            grid_num = self.find_max_num(
                              base_file_name = base_file_name,
                              directory = Grid_save_dir,
                              ext="png"
                              )
            Grid_save_path = os.path.join(Grid_save_dir,f"{base_file_name}-{grid_num+1}.png")
        else:
            Grid_save_path= ""


        return image_save_dir , Grid_save_path


    def seed_set(self,seed_number):
        Flax_seed=None
        if seed_number==-1 or seed_number is None:
            seed = random.randint(1,10**10)
        else:
            seed=seed_number
        if self.use_TPU:
            Flax_seed=jax.random.split(jax.random.PRNGKey(seed), jax.device_count())
        else:
            Flax_seed=None
        return seed,Flax_seed


    def img_set(self,path,height,width)->list:
        #↓counter-measure: AttributeError: 'str' object has no attribute 'seek'
        init_image_list=[]
        def img_open(path,height,width):
            _init_image = Image.open(path)
            _init_image = _init_image.resize((height, width))
            return _init_image

        if os.path.isfile(path):
            init_image = img_open(path,height,width)
            init_image_list.append(init_image)
        else:
            _ext = [".jpg",".png"]
            #files = glob.glob(f"{path}/*")
            files = Path(path)
            file_paths = [str(file_path) for file_path in files.iterdir() if file_path.is_file()]
            for file_path in file_paths:
                if file_path.endswith in _ext:
                    init_image = img_open(path,height,width)
                    init_image_list.append(init_image)
        logger.debug(f"init_image_list: {init_image_list}")
        if not init_image_list:
            raise FileNotFoundError("No image")
        return init_image_list


    def n_prompt_set(self,base_negative_prompt=""):
        add_n_prompt = "Loss of eye :1.5,Fingers fused together:1.3,Writing Sweet Fingers:2.0,bad hands:2.0,bad legs:2.0,EasyNegative,bat_hands:1.3,worst quality:2.0, low quality:2.0,Fused fingers,7 fingers, 6 fingers, 4 fingers, 3 fingers,Not five fingers:2.0,blurred,Simple_background:2.0,Missing finger:1.7,Cat with deformed face:1.3 ,medium quality, purple hair,Loss of eye highlights:1.5,Fingers fused together:1.3,Writing Sweet Fingers:2.0 ,deleted:0.5, lowres,Low quality animals, deformed animals ,hands emerging from impossible places:1.7, bad anatomy, more than three limbs hands/legs:1.5, low resolution, blurry, absurdres,pixelated, sketchy, nonsensical anatomy, unrealistic pose, mosaic, unclear details, distorted colors, unrealistic proportions, poor quality, fuzzy, missing head:1.6, out of focus, hazy, grainy, text, error, missing fingers:0.9, extra digit, fewer digits, cropped, jpeg artifacts, signature, watermark, username, standard quality, bad feet_hand_finger_leg_eye, bad, text font ui, bad shadow, poorly drawn, black-white, ugly, duplicate, mutation, mutilated, malformed mutated:1.1, malformed:1.1, The background is incoherent, simple background, low-quality background, low background, bad body, long body, broken limb, anatomical nonsense, extra limbs, missing limb, incorrect limb, multiple heads, twisted head, poorly drawn face, 1 unit with multiple heads:1.3, heads together:1.0, abnormal eye:1.2 proportion, cropped:1.0, bad eyes, fused eyes, poorly drawn eyes, bad mouth, poorly drawn mouth, bad tongue, too long tongue, bad ears, poorly drawn ears, extra ears, heavy ears, long neck, too thick neck, bad neck,  bad breasts, missing arms, disappearing arms, extra arms, three arms:2.0, mutated hands and fingers, fused hand, missing fingers, extra digits, huge thighs, disappearing thigh, missing thighs, extra thighs, bad feet, huge calf, disappearing legs, bad gloves, fused gloves, beard, artist name, text watermark, unnatural, obviously wrong, distorted face, floating hair, floating body parts, severed body parts, incorrect leg position, deformed, fused body and hands, disregard of physics, distorted shape, doll-like object not present in the image, body fusion, abnormal fingers, fingers resembling fish fins, dot eyes, unclear background, mosaic, body bending, incorrect leg-to-torso ratio, excessively large breasts, unsettling appearance, eyes filled with solid color, lack of lower body, splitting, creepy doll-like appearance, distorted eyes, lines on the skin, legs bending in unnatural directions, abnormal finger count, missing arms, floating hands, lack of nose or mouth,, incorrect body part ratios, bad, longbody, lowres, bad anatomy, bad hands, missing fingers,  Distorted eye contour, Missing part from the ankles onward, extra digit, fewer digits, split wings, Vampire wings floating in the air, bad wing, comic,chainsaw man,demon, simple background"
        if self.Recommended_setting:
            #if base_negativev_prompt.endswith(","):
            base_negative_prompt = base_negative_prompt + add_n_prompt
        elif not self.Hide_warnings:
            print('\033[33mself.Recommended_setting is off\033[0m')

        return re.sub(r",+", ",", base_negative_prompt)



    def play_mp4(self,path_new):
        mp4 = open(path_new, 'rb').read()
        data_url = 'data:video/mp4;base64,' + b64encode(mp4).decode()
        HTML_code=(f"""
                   <video width="70%" height="70%" controls>
                         <source src="{data_url}" type="video/mp4">
                   </video>""")
        try:
            display(HTML(HTML_code))
        except:
            display(HTML('<script>{}</script>'.format(HTML_code)))



class generate_class(make_main_pipe,
                     txt_model_setup,
                     generate_config):
    def __init__(self,
                 base_prompt,
                 base_negative_prompt,
                 base_seed,
                 num_imgs,
                 height,
                 width,
                 num_inference_steps,
                 guidance_scale,
                 grit_image_width,
                 Recommended_setting,
                 image_save_dir,
                 save_file_name,
                 input_img_dir_or_path,
                 model_name,
                 main_pipe,
                 parmer,
                 video_length,
                 num_frames,
                 video_fps ,
                 show_result,
                 Hide_warnings,
                 extra_parameter_dict):

        txt_model_setup.__init__(self,
                                 base_prompt = base_prompt,
                                 n_prompt = base_negative_prompt,
                                 num_imgs = num_imgs,
                                 text_generate_model = text_generate_model,
                                 main_pipe = main_pipe,
                                 Recommended_setting = Recommended_setting)

        self.base_prompt = base_prompt
        self.base_negative_prompt = base_negative_prompt
        self.base_seed = base_seed
        self.num_imgs = num_imgs
        self.height = height
        self.width = width
        self.guidance_scale = guidance_scale
        self.num_inference_steps = num_inference_steps
        self.grit_image_width = grit_image_width
        self.Recommended_setting = Recommended_setting
        self.image_save_dir = image_save_dir
        self.save_file_name = save_file_name
        self.input_img_dir_or_path = input_img_dir_or_path
        self.model_name = model_name
        self.main_pipe = main_pipe
        self.parmer = parmer
        self.video_length = video_length
        self.num_frames = num_frames
        self.video_fps = video_fps
        self.extra_parameter_dict = extra_parameter_dict

        self.negative_prompt = self.n_prompt_set(base_negative_prompt)
        self.pipeline_name = main_pipe.__class__.__name__
        self.pipeline_class = self.pipeline_class_set(self.pipeline_name)
        self.pipeline_type = self.pipe_class_type(self.pipeline_name)
        self.pipeline_call_method = self.get_call_method(self.pipeline_name,"__call__")
        if not self.use_TPU:
            self.generator = torch.Generator(self.device)
        else:
            self.generator = None
        self.stop_flag = threading.Event()
        if self.grit_image_width == 0:
            self.make_grid = False
        else:
            self.make_grid = True

        self.pipeline_type = self.pipe_class_type(self.pipeline_name)
        if self.pipeline_type == "txt2video":
            self.make_video = True
            self.num_imgs = 1
            self.result_type = "video"
        else:
            self.make_video = False
            self.result_type = "image"

        self.prompt_list = queue.Queue()
        self.make_images_list = queue.Queue()
        self.total_generate_time = 0
        self.base_generate_number = 1
        self.generation_time_average = 0
        self.pipeline_name = main_pipe.__class__.__name__
        self.image_save_base_dir = ""
        self.gird_save_base_dir = ""
        self.gird_save_path = ""
        self.prompt_process = None
        self.generate_process = None
        self.save_and_show_process = None
        self.nsfw_detected = None
        self.sp_output_module = None
        self.grid_imgs=[]
        self.call_dict={}

        "---module---"
        self.show_result = show_result
        self.Hide_warnings = Hide_warnings

    """
    def __del__(self):
        try:
            self.prompt_process.join(timeout=0)
        except (RuntimeError,AttributeError):
            pass
        try:
            self.generate_process.join(timeout=0)
        except (RuntimeError,AttributeError):
            pass
        try:
            self.save_and_show_process.join(timeout=0)
        except (RuntimeError,AttributeError):
            pass
    """


    def save_path_set(self,
                      base_file_name,
                      image_save_dir,
                      is_images : bool,
                      Prompt = "",
                      seed = 0
                      ):

        """
        Args:
        is_images:When true, returns the path to image generation
                  when false, returns the path to video generation
        Format:
        <base_file_name>-<num>.png
        """
        if base_file_name=="":
            if is_images:
                base_file_name = "GIMG"
            else:
                base_file_name = "Gvideo"

        if is_images:
            extension = "png"
        else:
            extension = "mp4"

        now = datetime.datetime.now()
        path_d = {"{prompt}": Prompt,
                  "{seed}": seed,
                  "{model_name}": self.model_name,
                  "{g_scale}": self.guidance_scale,
                  "{time}":now}
        for key, value in path_d.items():
            base_file_name = base_file_name.replace(key, str(value))
        #NOTE: This function returns the maximum number, so the file will be overwritten if 1 is not added.
        number = self.find_max_num(base_file_name,image_save_dir,extension)
        return os.path.join(image_save_dir,f"{base_file_name}-{number+1}.{extension}")


    def generate_images_or_video(self,
                                 init_image = None):
        seed, Flax_seed = self.seed_set(self.base_seed)
        self.image_save_base_dir, self.gird_save_path= self.get_save_path(self.image_save_dir,self.result_type)
        for g_number in range(1,self.num_imgs+1):

                target_prompt = self.prompt_list.get()

                if self.Recommended_setting:
                    input_prompt = self.good_word + target_prompt
                else:
                    input_prompt = target_prompt

                if self.base_seed == -1:
                    seed,Flax_seed = self.seed_set(self.base_seed)

                image_save_path = self.save_path_set(base_file_name =self.save_file_name,
                                                     image_save_dir = self.image_save_base_dir,
                                                     is_images = True if not self.make_video else False,
                                                     Prompt = target_prompt,
                                                     seed = seed)


                save_file_name = os.path.basename(image_save_path)
                now = datetime.datetime.now()
                date_str = now.strftime("%Y-%m-%d_UTC-%H:%M:%S")
                generate_start_time = time.time()
                if not self.use_TPU:
                    self.generator.manual_seed(seed)
                self.input_method_dict = {
                    "prompt" : input_prompt,
                    "negative_prompt" : self.negative_prompt,
                    "neg_prompt" : self.negative_prompt,
                    "guidance_scale" : self.guidance_scale,
                    "num_inference_steps" : self.num_inference_steps,
                    "generator" : self.generator,
                    "height" : int(self.height),
                    "width" : int(self.width),
                    "image" : init_image,
                    "strength" : 1,
                    "t0" : self.num_inference_steps - 5,
                    "t1" : self.num_inference_steps - 3,
                    "video_length" : self.video_length,
                    "num_frames":self.num_frames,
                    "prng_seed" : Flax_seed,
                    "params" : self.parmer,
                    }
                for method_name, input_method in self.input_method_dict.items():
                    if method_name in self.pipeline_call_method:
                        self.call_dict[method_name] = input_method

                self.call_dict = self.extra_parameter_dict | self.call_dict

                output = self.main_pipe(**self.call_dict)
                output_element_names = self.get_class_elements(output)

                if "nsfw_content_detected" in output_element_names:
                    self.nsfw_detected = output.nsfw_content_detected
                else:
                    self.nsfw_detected = None

                if "images" in output_element_names:
                    if self.make_video:
                        maked_result = output.images
                    elif not self.use_TPU:
                        maked_result = output.images[0]
                    else:
                        maked_result = self.main_pipe.numpy_to_pil(np.asarray(output.images.reshape((self.device_count,) + output.images.shape[-3:])))
                elif "frames" in output_element_names:
                    if self.make_video:
                        maked_result = output.frames
                    else:
                        maked_result = output.frames[0]
                else:
                    maked_result = getattr(output, output_element_names[0])

                if self.nsfw_detected is None:
                    bloke=False
                else:
                    if False in self.nsfw_detected:
                        bloke = False
                    else:
                        bloke = True

                generate_end_time = time.time()
                base_generate_time = generate_end_time - generate_start_time
                generate_time = round(base_generate_time,2)
                self.total_generate_time += generate_time
                metadata = {
                    "seed": seed,
                    "prompt": input_prompt,
                    "G_scale":guidance_scale,
                    "D_step":self.num_inference_steps,
                    "model_path":self.model_name,
                    "n_prompt":self.negative_prompt,
                    "pipeline_name":self.pipeline_name
                    }
                save_data = {"base_prompt": target_prompt,
                             "save_path": image_save_path,
                             "file_name": save_file_name,
                             "generate_time" : generate_time,
                             "generate_number" : self.base_generate_number,}

                status_dict = {"bloke" : bloke,
                               "metadata" : metadata,
                               "save_data" : save_data}
                self.make_images_list.put([maked_result,status_dict])
                if not bloke and self.make_grid:
                    self.grid_imgs.append(maked_result)

                self.base_generate_number += 1


    def save_and_show(self):
        for T in range(self.num_imgs):
            make_image, status_dict = self.make_images_list.get()
            bloke = status_dict["bloke"]
            metadata = status_dict["metadata"]
            save_data = status_dict["save_data"]
            save_path = status_dict["save_data"]["save_path"]
            file_name = status_dict["save_data"]["file_name"]

            if not self.make_video:
                if not bloke:
                    pnginfo = PngImagePlugin.PngInfo()
                    info = make_image.info
                    for key, value in metadata.items():
                        pnginfo.add_text(key, str(value))
                    make_image.save(save_path,pnginfo = pnginfo,quality=95)

                    print(f'\033[34mImage generation is complete ({save_data["generate_number"]}/{self.num_imgs})  {save_data["generate_time"]}s')
                    print(f'seed:\033[38;2;0;255;255m {metadata["seed"]}\033[34m')
                    print(f'File_name: \033[32m{file_name}\033[0m')
                    run_html_js(save_path,"Save_path: ")
                    if self.show_result:
                        print()
                        #To display including metadata
                        with Image.open(save_path) as show_img:
                            display(show_img)
                    print(f'\033[92mbase_prompt: {save_data["base_prompt"]}\033[0m')
                else:
                    print("Blocked\n")

            else:
                logger.debug(f"num make_image: {len(make_image)} 1の場合、output.images[0]になっていないか確認。output.imagesが正解")
                from diffusers.utils import export_to_video
                export_to_video(make_image, save_path, fps = self.video_fps)
                print(f'\033[34mVideo_generation_is_complete ({save_data["generate_number"]}/{self.num_imgs})  {save_data["generate_time"]}s')
                print(f'seed:\033[38;2;0;255;255m {metadata["seed"]}\033[34m')
                print(f'File name: \033[32m{file_name}\033[0m')
                run_html_js(save_path,"Save_path: ")
                if self.show_result:
                    self.play_mp4(save_path)
                    print(f'\033[92mbase_prompt: {save_data["base_prompt"]}\033[0m')


    def grid_task(self):
        if num_imgs>1 and self.gird_save_path:
            grid = self.image_grid(imgs = self.grid_imgs,
                                   cols = self.grit_image_width)
            if grid is not None:
                grid.save(self.gird_save_path)
                if self.show_result:
                    with Image.open(self.gird_save_path) as grid_img:
                        run_html_js(self.gird_save_path,"Grid_Image_Path: ")
                        display(grid_img)


    def generate(self):
        if (not self.make_video) and self.pipeline_type == "img2img":
            #img2img
            self.init_images_list = self.img_set(
                self.input_img_dir_or_path,
                self.height,
                self.width)

            if len(self.init_images_list)>1:
                total_num = self.num_imgs * len(self.init_images_list)
                print(f"Since multiple images were passed, generate {self.num_imgs} images per image, for a total of {total_num} images")
            for init_image in self.init_images_list:
                self.generate_images_or_video(init_image = init_image)
        else:
            self.generate_images_or_video(init_image = None)


    def run(self):
        self.prompt_process = threading.Thread(target=self.prompt_processing, args=(self.base_prompt,self.prompt_list),daemon=True)
        self.generate_process = threading.Thread(target=self.generate)
        self.save_and_show_process = threading.Thread(target=self.save_and_show)
        self.prompt_process.start()
        self.generate_process.start()
        self.save_and_show_process.start()
        self.save_and_show_process.join()
        self.generation_time_average = "{:.2f}".format(self.total_generate_time/self.base_generate_number)
        if not self.make_video:
            self.grid_task()
        print(f"\033[38;2;135;206;235mGeneration time average: {self.generation_time_average}s\033[0m")


    def debugs(self):
        self.prompt_processing(self.base_prompt,self.prompt_list)
        self.generate()
        self.save_and_show()


generate_cls = generate_class(base_prompt = prompt,
                              base_negative_prompt = negative_prompt,
                              base_seed = seed,
                              num_imgs = num_imgs,
                              height = height,
                              width = width,
                              num_inference_steps = num_inference_steps,
                              guidance_scale = guidance_scale,
                              grit_image_width = grit_image_width,
                              Recommended_setting = Recommended_setting,
                              image_save_dir = save_dir,
                              save_file_name = file_name,
                              input_img_dir_or_path = input_image_path_or_dir,
                              model_name = model_path,
                              main_pipe = main_pipe,
                              parmer = parmer,
                              video_length = video_length,
                              num_frames = num_frames,
                              video_fps = video_fps,
                              show_result = show_result,
                              Hide_warnings = Hide_warnings,
                              extra_parameter_dict = parameter_dict)

if __name__ == '__main__':
    generate_cls.run()
    #generate_cls.debugs()

    if not device_type == "TPU":
        gc.collect()
    if device_type == "cuda":
        torch.cuda.empty_cache()

#Sub-function

##Tool

In [None]:
#@title Memory initialization {display-mode: "form"}

import gc
import torch

variable_names = ["base_pipe","txt2img_pipe","img2img_pipe","txt2video_pipe","Inpaint_pipe","safe_pipe", "tokenizer"]

for name in variable_names:
    if name in globals():
        del globals()[name]
    if name in locals():
        del locals()[name]

torch.cuda.empty_cache()
gc.collect()

In [None]:
#@title View Image Metadata  {display-mode: "form"}
import os
from PIL import Image, PngImagePlugin

image_path = "" #@param {type:"string"}

def show_png_info(image_path):
    try:
        output_info = Image.open(image_path).info
    except Exception as err:
        if not image_path:
            print("\33[31mPlease enter the image path\33[0m")
            return
        elif isinstance(err, FileNotFoundError):
            print("\33[31mThe specified image file cannot be found\33[0m")
            return
        else:
            raise ("An error occurred while opening the image file.") from err

    if not output_info:
        print("\33[31mMetadata not found\33[0m")
    for key,info in output_info.items():
        print(f"\33[32m{key} : {info}\33[0m\n")

show_png_info(image_path)

In [None]:
#@title Unzip 7zip{display-mode: "form"}

from google.colab import files
import os
import subprocess


try:
    subprocess.run(['7z'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
    !apt-get install -y p7zip-full


path = ""  # @param {type:"string"}
if not path:
    path = "/content/Generated-No.1.7z"

if os.path.isfile(path) and path.endswith('.7z'):
    try:
        extract_path = path.replace('.7z', '')
        os.makedirs(extract_path, exist_ok=True)
        subprocess.run(['7z', 'x', path, f'-o{extract_path}'], check=True)
        print(f'\033[34mSuccessfully extracted to {extract_path} \033[0m')
    except Exception as e:
        print(f"\033[31mExtraction failed. {e}\033[0m")
else:
    print("\033[31mThe specified file is not a valid .7z file or it does not exist.\033[0m")

In [None]:
#@title  download{display-mode: "form"}
#@markdown >Specify file or directory to download.

from google.colab import files
import shutil
import os

path = "/content/Generated-No.3"  # @param {type:"string"}
if not path:
    path = "/content/Generated"

if os.path.isfile(path):
    try:
        files.download(path)
    except:
        print("\033[31mThe specified file could not be found.\033[0m")
    else:
        print("\33[32mSuccessfully downloaded\33[0m")
elif os.path.isdir(path):
    for zip_number in range(1, 100000):
        zip_name = f"Generated-No.{zip_number}.zip"
        zip_path = os.path.join("/content", zip_name)
        if not os.path.isfile(zip_path):
            break
    try:
        shutil.make_archive(zip_path.replace('.zip', ''), 'zip', path)
        files.download(zip_path)

        print(f'\033[34mZip file name is {os.path.basename(zip_path)} \033[0m')
    except Exception as e:
        print(f"\033[31mDownload failed.{e}\033[0m")




In [None]:
#@title txt2audio
prompt = "Techno-style  music" #@param {type:"string"}
negative_prompt = "Low quality." #@param {type:"string"}
audio_length_in_s = 30 #@param {type:"number"}
num_inference_steps = 500 #@param {type:"number"}
seed = -1 #@param {type:"number"}

prompt+=",masterpiece"
import os
import scipy
import torch
from diffusers import AudioLDM2Pipeline
from IPython.display import Audio
import random



repo_id = "cvssp/audioldm2"
if "pipe" not in locals() and "pipe" not in globals():
    pipe = AudioLDM2Pipeline.from_pretrained(repo_id, torch_dtype=torch.float16).to("cuda")
if "numa" in globals():
    numa+=1
else:
    numa=1
if seed is None or seed==-1:
    seed = random.randint(1,1000000)
generator = torch.Generator("cuda").manual_seed(seed)
audio = pipe(prompt,
             negative_prompt=negative_prompt,
             num_inference_steps=num_inference_steps,
             audio_length_in_s=audio_length_in_s,
             num_waveforms_per_prompt=1,
             generator=generator,
             ).audios


os.makedirs("/content/audio",exist_ok=True)
save_path=f"/content/audio/audio_{numa}_{seed}.wav"


scipy.io.wavfile.write(save_path, rate=16000, data=audio[0])
print(f"\033[38;2;0;255;255mseed: {seed}")
print(f"save_path: {save_path}\n\033[0m")
Audio(save_path)


In [None]:
#@title Youtube_download {display-mode: "form"}
import os
from google.colab import files
import shutil

try:
    import pytube
except ModuleNotFoundError:
    !pip install -q pytube

from pytube import (Playlist, YouTube)
from requests import HTTPError
Youtube_path = "" # @param {type:"string"}

Download_to_local = False # @param {type:"boolean"}

def Youtube_download(Youtube_path,local_download):
    Not_download_list=[]
    mkdir_list=["/content/Youtube/File","/content/Youtube/List"]
    for mk_path in mkdir_list:
        os.makedirs(mk_path, exist_ok=True)
    os.chdir("/content/Youtube")
    if Youtube_path:
        if Youtube_path.startswith("https://www.youtube.com/playlist?list="):
            try:
                nu+=1
            except:
                nu=1
            list_path=f"/content/Youtube/Yotube_list_{nu}"
            os.makedirs(list_path,exist_ok=True)
            os.chdir(list_path)
            Y_list = Playlist(Youtube_path)
            for video, path in zip(Y_list.videos, Y_list.video_urls[:3]):
                try:
                    video.streams.first().download()
                except:
                    Not_download_list.append(path)
            if Not_download_list:
                print("Failed download path")
                for ndl in Not_download_list:
                    print(ndl)
            if local_download:
                zip_path=f"/content/Youtube/zip-No_{zip_number}.zip"
                shutil.make_archive(zip_path, "zip", list_path)
                files.download(zip_path)

        elif Youtube_path.startswith("https://www.youtube.com/watch?v="):
            os.chdir("/content/Youtube/File")
            try:
                yt=YouTube(Youtube_path)
                yt.streams.first().download()
                yt_save_path=os.path.join("/content/Youtube/File" , (yt.title+".mp4"))
            except:
                raise HTTPError("Invalid URL")
            if local_download:
                try:
                    files.download(yt_save_path)
                except FileNotFoundError:
                    print(f"File download failed")

        else:
            raise HTTPError("Invalid URL")
    else:
        raise HTTPError("Please enter the URL")

    os.chdir("/content")
    print("The process has been completed.")
    print(f"save_path: {os.path.basename(yt_save_path)}")

Youtube_download(Youtube_path,Download_to_local)





In [None]:
#@title  Split Grid Image {display-mode: "form"}
from PIL import Image
import os
import sys
import math

input_img_path = "" # @param {type:"string"}
output_dir = "" # @param {type:"string"}
Vertical_Image_Count = 2 # @param {type:"number"}
Horizontal_Image_Count = 2 # @param {type:"number"}

if (not input_img_path) or (not output_dir):
    raise ValueError("Required path is missing")

if Vertical_Image_Count < 1 or Horizontal_Image_Count < 1:
    raise ValueError("The number of installments must be greater than or equal to 1")

if os.path.isfile(output_dir):
   output_dir = os.path.dirname(output_dir)


def ImgSplit(
        im,
        width,
        height,
        Vertical_Image_Count,
        Horizontal_Image_Count
        ):
    HEIGHT = height / Vertical_Image_Count
    WIDTH = width / Horizontal_Image_Count
    for h1 in range(Vertical_Image_Count):
        for w1 in range(Horizontal_Image_Count):
            w2 = w1 * WIDTH
            h2 = h1 * HEIGHT
            yield im.crop((w2, h2, WIDTH + w2, HEIGHT + h2))



im = Image.open(input_img_path)
w = im.size[0]
h = im.size[1]
length = math.log10(Vertical_Image_Count * Horizontal_Image_Count) + 1

os.makedirs(output_dir, exist_ok=True)


for number, ig in enumerate(ImgSplit(im, w, h, Vertical_Image_Count, Horizontal_Image_Count), 1):
    ig.save(output_dir + "/" + str(number).zfill(int(length)) + ".PNG", "PNG")

print(f"\033[34mSplit is complete: {os.path.abspath(output_dir)}\033[0m")

In [None]:
#@title  Move files containing the specified words {display-mode: "form"}
import os
import shutil

def move_files_with_keyword(source_dir, destination_dir, keyword):
    if not os.path.exists(destination_dir):
        os.makedirs(destination_dir)

    for root, dirs, files in os.walk(source_dir):
        for _file in files:
            if keyword in _file:
                source_file = os.path.join(root, _file)
                destination_file = os.path.join(destination_dir, _file)
                shutil.move(source_file, destination_file)
                print(f"Moved: {source_file} -> {destination_file}")


search_directory = ""  # @param {type:"string"}
destination_directory = ""  # @param {type:"string"}
keyword_to_search = ""  # @param {type:"string"}


if not search_directory:
    search_directory = "/"


if not os.path.exists(destination_directory):
    raise ValueError(f"Destination directory does not exist: {destination_directory}")


move_files_with_keyword(search_directory, destination_directory, keyword_to_search)

In [None]:
#@title Directory or File size Counter {display-mode: "form"}

import os

def get_folder_size(folder_path):
    total_size = 0
    if os.path.isfile(folder_path):
        total_size = os.path.getsize(folder_path)
    else:
        for path, dirs, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(path, file)
                total_size += os.path.getsize(file_path)

    return total_size

path = "" # @param {type:"string"}

if not os.path.exists(path):
    raise FileNotFoundError("Invalid path")

size_in_bytes = get_folder_size(path)

size_in_kb = size_in_bytes / 1024
size_in_mb = size_in_kb / 1024
size_in_gb = size_in_mb / 1024

print(f"Folder Size: {size_in_bytes:.2f} B")
print(f"Folder Size: {size_in_kb:.2f} KB")
print(f"Folder Size: {size_in_mb:.2f} MB")
print(f"Folder Size: {size_in_gb:.2f} GB")

In [None]:
#@title Change image format {display-mode: "form"}

"""
後で修正必要。
83行ら辺を修正
"""


input_path = "." #@param {type:"string"}
output_dir="" #@param {type:"string"}
output_format = "jpg" # @param ["png", "jpg"]

import os
from PIL import Image
import sys

class Image_Format_Converter:
    def __init__(self):
        """
        NOTE:
        When there are a very large number of files,
        the maximum value at that time is stored to lighten the processing as much as possible.

        About exc_dict:
            key : value = Target extension to format : (output file exc , Save format)
        """

        self.max_exc_number = 0
        self.exc_dict = {".png":(".jpg","JPEG"),
                         ".jpg":(".png","PNG")}


    def file_name_set(self,path):
        if os.path.isfile(path):
            base_file_path, exc = os.path.splitext(path)
            dir_path = os.path.dirname(base_file_path)
            os.makedirs(dir_path,exist_ok=True)
            sp_file_number = 1
            while True:
                _try_file_path = f"{base_file_path}({sp_file_number}){exc}"
                if not os.path.isfile(_try_file_path):
                    return _try_file_path
                else:
                    sp_file_number += 1
        else:
            return path


    def dir_name_set(self,path):
        if os.path.isdir(path):
            dir_sp_number = 1
            while True:
                _try_dir_path = f"{path}({dir_sp_number})"
                if not os.path.isdir(_try_dir_path):
                    return _try_dir_path
                else:
                    dir_sp_number += 1
        else:
            return path


    def image_format_converter(
            self,
            input_path,
            output_path,
            return_format
            ):
        """
        NOTE:
        If output_path is not entered,
        create a folder to store the images to be input in the directory where they are located.
        """
        if not os.path.exists(input_path):
            print('\033[31mPlease enter the directory or file path\033[0m')
            return

        elif os.path.isdir(input_path):
            if not output_path:
                base_dir_name = os.path.dirname(input_path)
                output_path = self.dir_name_set(base_dir_name)

            for File_path in os.listdir(input_path):
                if os.path.splis
                #後でやる


    def img_conver_func(
            self,
            input_path,
            output_path,
            return_format
            ):
        base_save_path, input_file_exc = os.path.splitext(File_path)
        if input_file_exc in self.exc_dict:
            target_save_exc, save_format = self.exc_dict[input_file_exc]
            try:
                target_image = Image.open(File_path)
            except Exception as err:
                if not isinstance(err, KeyboardInterrupt):
                    print(f"\033[31mFailed to load image: {os.path.abspath(File_path)}\033[0m")
                else:
                    raise err
            else:
                save_file_path = self.file_name_set(f"{base_save_path}{target_save_exc}")
                target_image.save(save_file_path, format=save_format)
                print(f"\033[34mFormatted image path: {save_file_path}\033[0m")


if __name__ == "__main__":
    Img_Format_cls = Image_Format_Converter()
    Img_Format_cls.image_format_converter(
        input_path=input_path,
        output_path=output_dir,
        return_format=output_format)


In [None]:
#@title Play mp4  {display-mode: "form"}

Path_of_mp4_file = "" #@param {type:"string"}

import os
from IPython.display import HTML
from base64 import b64encode

class Show_video:
    def play_mp4(self,path):
        if os.path.isfile(path):
            self.show_func(path)
        elif os.path.isdir(path):
            mp4_list = []
            for mp4_path in os.listdir(path):
                if os.path.splitext(mp4_path)[1] == ".mp4":
                    mp4_list.append(os.path.abspath(mp4_path))

            if not mo4_list:
                raise FileNotFoundError("No mp4 file exists in the specified directory")
            else:
                for mp4_file_path in mp4_list:
                    self.show_func(mp4_file_path)
        else:
            if not path:
                raise ValueError("Please enter the path to the file or directory")
            else:
                raise FileNotFoundError("Invalid path")


    def show_func(self,mp4_file):
        mp4 = open(mp4_file, 'rb').read()
        data_url = 'data:video/mp4;base64,' + b64encode(mp4).decode()
        HTML(f"""
        <video width="100%" height="100%" controls>
              <source src="{data_url}" type="video/mp4">
        </video>
        """)

Show_video().play_mp4(Path_of_mp4_file)

In [None]:
#@title  String token counter{display-mode: "form"}
import pprint
from transformers import AutoTokenizer

prompt = ""  #@param {type:"string"}
try:
  tokens = tokenizer.tokenize(prompt)
except:
    text_model_id = "laion/CLIP-ViT-H-14-laion2B-s32B-b79K"
    tokenizer = AutoTokenizer.from_pretrained(text_model_id)
    tokens = tokenizer.tokenize(prompt)
print(len(tokens))
pprint.pprint(tokens)

In [None]:
#@title Higher image quality{display-mode: "form"}
from PIL import Image
from io import BytesIO
from diffusers import StableDiffusionUpscalePipeline
import torch

# load model and scheduler
Prompt = "" #@param {type:"string"}
prompt="masterpiece:2.0,best quality,high quality,"+Prompt
low_res_img_path = "" #@param {type:"string"}
encoded_text = codecs.encode(low_res_img_path, 'utf-8')
low_res_img_path = codecs.decode(encoded_text, 'utf-8')

if "pipeline2" not in locals()  and "pipeline2" not in globals():
  model_id = "stabilityai/stable-diffusion-x4-upscaler"
  pipeline2 = StableDiffusionUpscalePipeline.from_pretrained(
    model_id, revision="fp16", torch_dtype=torch.float16
  )
  pipeline2 = pipeline2.to("cuda")
if not os.path.exists(low_res_img_path):
  raise FileNotFoundError("File not found")
try:
  low_res_img = Image.open(low_res_img_path)
except:
  raise FileNotFoundError("Image could not be loaded.")
low_res_img = low_res_img.resize((128, 128))

negative_prompt="low quality:2.0"

upscaled_image = pipeline2(prompt=prompt, image=low_res_img,negative_prompt=negative_prompt).images[0]

try:
  L+=1
except:
  L=1
os.makedirs("/content/low_res_imgs",exist_ok=True)
path1=(f"/content/low_res_imgs/No{L}.png")
upscaled_image.save(path1)
print(f"img save path: ({path1})")

In [None]:
#@title Delete folder{display-mode: "form"}

del_path = "" #@param {type:"string"}
if del_path =="":
  del_path="/content/Generated"
import shutil
if not os.path.isdir(del_path):
  raise TypeError("Only directories can be deleted")


YorS=input("Are you sure you want to delete it? [yes/no]: ")
if YorS.lower() in ("yes", "y"):
    try:
        shutil.rmtree(del_path)
        print("Deleted.")
    except FileNotFoundError:
        print(f"\033[31mNot Found dir: {del_path}\033[0m")
elif YorS.lower() in ("no", "n"):
    print("Deletion of the folder has been aborted.")
else:
    print("Please enter only yes/no.")

In [None]:
#@title url_Download
import urllib.request , os
import datetime
def download_file(url, save_path):
  other_url,split=os.path.splitext(url)
  if save_path=="":
    save_path="/content/download"
    choice_name=input("ファイル名(拡張子無し): ")
    choice_name+=split
    test_path=os.path.join(save_path,choice_name)
    if os.path.exists(test_path):
      now=datetime.now()
      save_path=test_path+now
  save_dir=os.path.dirname(save_path)
  os.makedirs(save_dir,exist_ok=True)
  if not url.endswith(split):
    save_path+=split
  try:
    urllib.request.urlretrieve(url, save_path)
  except HTTPError:
    raise SystemError("URLからファイルを習得できませんでした")
  print(f"保存先のパス: {save_path}")

url = "" #@param {type:"string"}
save_path = "" #@param {type:"string"}

download_file(url, save_path)

##Training

In [None]:
#@title Training Lora {display-mode: "form"}

instance_prompt = "Cat ears girl" # @param {type:"string"}

max_train_steps = 20   # @param {type:"number"}

input_img_dir = "" # @param {type:"string"}
output_dir = "/content/drive/MyDrive" # @param {type:"string"}

model_name_or_path = "runwayml/stable-diffusion-v1-5" # @param ["runwayml/stable-diffusion-v1-5", "stabilityai/stable-diffusion-2-1"] {allow-input: true}

output_filename = "" # @param {type:"string"}




Disconnect_runtime_when_finished = False # @param {type:"boolean"}


#driveに接続 = True # @param {type:"boolean"}

import os


os.makedirs(output_dir,exist_ok=True)

try:
    import ftfy
except:
    !pip install ftfy -q
    import ftfy
    #!pip install -q accelerate>=0.16.0 transformers>=4.25.1 ftfy Jinja2 bitsandbytes
  #import bitsandbytes,accelerate,transformers,ftfy
#import accelerate,torchvision,transformers,ftfy ,diffusers,bitsandbytes

size=512
#if model_name_or_path=="runwayml/stable-diffusion-v1-5":
#  size=512

#Can not use export
%env pretrained_model_name_or_path=$model_name_or_path
%env train_data_dir=$input_img_dir

%env resolution=$size
%env output_dir=$output_dir
%env max_train_steps = $max_train_steps


%cd /content/script/diffusers/examples/textual_inversion

!accelerate launch train_dreambooth.py \
  --pretrained_model_name_or_path=$pretrained_model_name_or_path  \
  --instance_data_dir=$train_data_dir \
  --output_dir=$output_dir \
  --instance_prompt=$placeholder_token \
  --resolution=$resolution \
  --train_batch_size=1 \
  --learning_rate=1 \
  --lr_scheduler="constant" \
  --lr_warmup_steps=0 \
  --max_train_steps=$max_train_steps \

base=os.path.join(output_dir,"learned_embeds.safetensors")
if output_filename:
    after_name = output_filename + ".safetensors"
    after = os.path.join(output_dir,after_name)

else:
  after=base

print(f"\033[34mpath: {output_dir}\033[0m")

if Disconnect_runtime_when_finished:
  from google.colab import runtime
  print("Disconnect runtime...")
  runtime.unassign()

In [None]:
#@title Training textual inversion {display-mode: "form"}

model_name_or_path = "runwayml/stable-diffusion-v1-5" # @param ["runwayml/stable-diffusion-v1-5", "stabilityai/stable-diffusion-2-1"] {allow-input: true}
placeholder_token = "" # @param {type:"string"}
initializer_token = "" # @param {type:"string"}
max_train_steps = 50   # @param {type:"number"}

input_imgs_dir = "" # @param {type:"string"}
output_dir = "/content/drive/MyDrive/textual" # @param {type:"string"}
input_imgs_dir = "style" # @param ["style","object"]

output_file_name = "model" # @param {type:"string"}


#@markdown ---

Disconnect_runtime_when_finished = False # @param {type:"boolean"}

conect_Gdrive = True # @param {type:"boolean"}



if conect_Gdrive:
  if not drive._os.path.ismount('/content/drive'):
    try:
      drive.mount('/content/drive')
    except:
      drive_cn=False



os.makedirs(output_dir,exist_ok=True)


if not os.path.exists("/content/script/diffusers"):
  %cd /content/script
  !git clone https://github.com/huggingface/diffusers


if "setup_txt" not in locals():
  %cd /content/script/diffusers/examples/textual_inversion
  !pip install -r ./requirements.txt -q
  setup_txt=True

import accelerate,torchvision,transformers,ftfy ,diffusers


%cd /content/diffusers/examples/textual_inversion

size=512
if model_name_or_path=="runwayml/stable-diffusion-v1-5":
  size=512
else:
  size=768

#Can not use export
%env pretrained_model_name_or_path=$model_name_or_path
%env train_data_dir=$input_imgs_dir
%env learnable_property=$input_imgs_dir
%env placeholder_token=$placeholder_tokenトークン
%env initializer_token=$initializer_token
%env resolution=$size
%env max_train_steps=$max_train_steps
%env output_dir=$output_dir

#--learning_rate=5  \
#--mixed_precision="fp16" \
#learning_rate is int type.
!accelerate launch textual_inversion.py \
  --pretrained_model_name_or_path=$pretrained_model_name_or_path \
  --train_data_dir=$train_data_dir \
  --learnable_property=$learnable_property \
  --placeholder_token=$placeholder_token \
  --initializer_token=$initializer_token \
  --resolution=$resolution \
  --train_batch_size=4 \
  --gradient_accumulation_steps=4 \
  --max_train_steps=$max_train_steps \
  --lr_scheduler="constant" \
  --learning_rate=5  \
  --lr_warmup_steps=0 \
  --output_dir=$output_dir

base=os.path.join(output_dir,"learned_embeds.safetensors")
if output_file_name:
    after_name=output_file_name+".safetensors"
    after=os.path.join(output_dir,after_name)
    if os.path.exists(base):
        os.rename(base,after)
    else:
        print("Not Found model File")
else:
    after=base

print(f"\033[34mpath: {after}\033[0m")

if Disconnect_runtime_when_finished:
    from google.colab import runtime
    print("Disconnect runtime...")
    runtime.unassign()

##Description<a name = "Description"></a>

**Function Description**
>**Important Functions**


* auto<a name = "auto_help"></a>
    * desc
        * Minimize user input by automatically selecting the most highly rated models when searching for models.

  * type
      * bool

\

---

* model_select<a name = "model_select_help"></a>
    * arg
        1. url
        2. hugface repository
        3. Path where the model is
        4. Keywords to search

    * desc
        1. url
            * Download the model from the specified URL
            * Given the URL of a hugface repository, it will search that repository.
              * The same process as in 2. hugface repository is performed.

        2. hugface repository
            * Use hugface api to receive and select a model file from the repository.
            * If there is a model (in the format of diffusers) that can be loaded "from_pretrained", add it to the candidates
                * If auto is on, this is the preferred choice
        
        3. Path where the model is stored
            * Processes to find files with the extensions safetensors, ckpt, and bin

        4. Keywords to search
            * After using the hugface api and selecting a repository, do the same process as in 2.

            * If the user chooses to search outside of hugface, or if the file is not found in the repository, pass the keyword to the civitai api and process to search the model again.
                * The following is how civitai selects models.
                  1. Select Repository
                  2. Select model version
                  3. If more than one file exists in a given version, it processes the selection. If not, skip.
                * In all processes, the candidates are sorted in order of popularity.
                * In the case of auto, the indentation is 0, i.e., the one judged most popular is selected in all processes.

    * type
        * string

\

* Pipeline class set<a name = "Pipeline_class_set_help"></a>
    * arg
        1. Select from drop-down menus
        2. Specify the class of the pipeline for diffusers
            * Example
                1. StableDiffusionPipeline
                2. DiffusionPipeline
    
    * desc
        * The class that corresponds to the pull-down selection\
        ※ subject to change in due course
        
        1. torch(cpu or GPU)
            * txt2img : AutoPipelineForText2Image
            * img2img : AutoPipeleForImage2Image
            * Inpaint : AutoPipelineForInpainting
            * txt2video : TextToVideoZeroPipeline
        
        2. flax (TPU)
            * txt2img : FlaxStableDiffusionPipeline
            * img2img : FlaxStableDiffusionImg2ImgPipeline
            * Inpaint : FlaxStableDiffusionInpaintPipeline
            * txt2video : **None**
              - The txt2video is **None** because it does not exist at the time of development.



---

> (option)Load lora<a name = "Load_lora_help"></a>

NOTE : Available after completion of step.3

* model_name
    * See model_select in Important Functions
            
    
* auto
    * See auto in Important Functions

\

---

> (option) load textual inversion<a name = "load_textual inversion_help"></a>

NOTE : Available after completion of step.3

* model_path
    * See model_select in Important Functions

    (However, models in diffusers format are ignored (because they cannot be loaded).)

* token
    * desc
        * Specifies a token to invoke textual inversion.
        * Duplicate token strings are not allowed. If you want to use it, please unload it.
    * type
        * string



---
**Scheduler_select**<a name = "Scheduler_select"></a>
* arg
    * Specify the class of Scheduler
    * Select from pull-down menus

* desc
    * [Additional settings](#Sonar_config) are available only for EulerA_with_sonar and Euler_with_sonar

* The corresponding table is as follows.
    * Scheduler
        * DDPM : DDPMScheduler
        * DDIM : DDIMScheduler
        * PNDM : PNDMScheduler
        * LMSD : LMSDiscreteScheduler
        * DPM : DPMSolverMultistepScheduler
        * EulerA : EulerAncestralDiscreteScheduler
        * Euler : EulerDiscreteScheduler
        * DEISM : DEISMultistepScheduler
        * UniPCM : UniPCMultistepScheduler
        * K_DPM2D : KDPM2DiscreteScheduler
        * DPM_S : DPMSolverSinglestepScheduler
        * K_DPM2AD : KDPM2AncestralDiscreteScheduler
        * HeunD : HeunDiscreteScheduler
            
    * other_Scheduler
        * Original Repository   
            * [modified-euler-samplers-for-sonar-diffusers](https://github.com/alexblattner/modified-euler-samplers-for-sonar-diffusers.git)
    
        * type
            * Euler_with_sonar : EulerNew.py -> Euler
            * EulerA_with_sonar : EulerANew.py -> EulerA

---
**Vae set**<a name = "Vae_set"></a>

* Basically, it is the same as [model Select](#model_select_help), but when searching for Civitai, change the type to VAE only in the condition.

---
**Parameters**

* seed
    * If seed is -1, put a random number of 1~10^10

---
**Prompt special tokens**<a name = "Prompt_special_tokens_help"></a>


>Random Words

* Description
 * Randomly select letters separated by { }
 * if you include spaces, you can choose to ignore words.

* Example
 * { cat, dog }, cute
     1. cat, cute
     2. dog, cute

 * color, { blue, red, green }
     1. color, blue
     2. color, red
     3. color, green

 * man, { boy, girl, "  " }
     1. **man**  
     2. man, boy
     3. man, girl

---
**File name Special tokens**<a name = "File_name_Special_tokens_help"></a>
* Description
  * This function allows you to include parameters and other elements in the name of the file.

* Arguments
  * {prompt}
    * Prompt

  * {seed}
    * Seed value

  * {model_name}
    * Path or URL or name of the model

  * {g_scale}
    * Guidance scale

  * {time}
    * Current time

  * {number}
    * Number of times generation has been performed at this runtime

* Example
  * Generate_img_{seed}_{time}.png

---
**Prompt assistant**<a name = "Prompt_assistant_help"></a>
* **None**
  * Select this if not used.

* gpt2-prompt-generator
  * Recommended model
  * This model uses gpt-2.

* anime-anything-promptgen-v2
  * Suitable for images such as anime and 2D

* MagicPrompt-Stable-Diffusion
  * It is a versatile model







**default**

>Save path

* /content/Generated

>File_name

* GIMG-{number}

\

---
**EX**
* When saving to Google Drive
    * /content/drive/MyDrive