In [None]:
!nvidia-smi

In [None]:
# Import the Python packages needed
import os
import zipfile
import shutil
import time
from subprocess import getoutput
from IPython.utils import capture
from google.colab import drive

In [None]:
# Set various paths needed for the project, such as base model storage path, LoRA model training output path, etc.
root_dir = os.path.abspath("/content")
deps_dir = os.path.join(root_dir, "deps")
repo_dir = os.path.join(root_dir, "kohya-trainer")
training_dir = os.path.join(root_dir, "LoRA")
pretrained_model = os.path.join(root_dir, "pretrained_model")
vae_dir = os.path.join(root_dir, "vae")
config_dir = os.path.join(training_dir, "config")

accelerate_config = os.path.join(repo_dir, "accelerate_config/config.yaml")
tools_dir = os.path.join(repo_dir, "tools")
finetune_dir = os.path.join(repo_dir, "finetune")

# TODO：In a new notebook, this code can be deleted
for store in [
    "root_dir",
    "deps_dir",
    "repo_dir",
    "training_dir",
    "pretrained_model",
    "vae_dir",
    "accelerate_config",
    "tools_dir",
    "finetune_dir",
    "config_dir",
]:
    with capture.capture_output() as cap:
        del cap

In [None]:
repo_url = "https://github.com/Linaqruf/kohya-trainer"
bitsandytes_main_py = "/usr/local/lib/python3.10/dist-packages/bitsandbytes/cuda_setup/main.py"
branch = ""
install_xformers = True
verbose = False

def read_file(filename):
    with open(filename, "r") as f:
        contents = f.read()
    return contents


def write_file(filename, contents):
    with open(filename, "w") as f:
        f.write(contents)


def clone_repo(url):
    if not os.path.exists(repo_dir):
        os.chdir(root_dir)
        !git clone {url} {repo_dir}
    else:
        os.chdir(repo_dir)
        !git pull origin {branch} if branch else !git pull


def install_dependencies():
    s = getoutput('nvidia-smi')

    !pip install {'-q' if not verbose else ''} --upgrade -r requirements.txt
    !pip install {'-q' if not verbose else ''} torch==2.0.0+cu118 torchvision==0.15.1+cu118 torchaudio==2.0.1+cu118 torchtext==0.15.1 torchdata==0.6.0 --extra-index-url https://download.pytorch.org/whl/cu118 -U

    if install_xformers:
        !pip install {'-q' if not verbose else ''} xformers==0.0.19 triton==2.0.0 -U

    from accelerate.utils import write_basic_config

    if not os.path.exists(accelerate_config):
        write_basic_config(save_location=accelerate_config)


def main():
    os.chdir(root_dir)

    for dir in [
        deps_dir,
        training_dir,
        config_dir,
        pretrained_model,
        vae_dir
    ]:
        os.makedirs(dir, exist_ok=True)

    clone_repo(repo_url)

    os.chdir(repo_dir)
    !apt --fix-broken install
    !apt install aria2 {'-qq' if not verbose else ''}

    install_dependencies()
    time.sleep(3)

    os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
    os.environ["BITSANDBYTES_NOWELCOME"] = "1"
    os.environ["SAFETENSORS_FAST_GPU"] = "1"

    cuda_path = "/usr/local/cuda-11.8/targets/x86_64-linux/lib/"
    ld_library_path = os.environ.get("LD_LIBRARY_PATH", "")
    os.environ["LD_LIBRARY_PATH"] = f"{ld_library_path}:{cuda_path}"

main()


In [None]:
pretrained_model_name_or_path = "/content/pretrained_model/moyou.safetensors"
!wget -c https://civitai.com/api/download/models/143001 -O $pretrained_model_name_or_path

## Prepare the data

In [None]:
train_data_dir = os.path.join(root_dir, "LoRA/train_data/hb_cartoon")
os.makedirs(train_data_dir, exist_ok=True)

print(f"Your train data directory : {train_data_dir}")

In [None]:
# Download Heben's images
# If you want to train LoRA with your own images, you can upload your images to /content/LoRA/train_data/hb_cartoon path, skip this code.
!wget https://github.com/yipingw/ai_painting/raw/main/chap23/data/herburn_images.tar

# Extract Heben images to target training path
!tar -xvf herburn_images.tar -C /content/LoRA/train_data/

train_data_dir = os.path.join(root_dir, "LoRA/train_data/herburn_images")

In [None]:
# Visualize images
from glob import glob
from PIL import Image

# Used to display images
def image_grid(imgs, rows, cols):
    assert len(imgs) == rows * cols

    w, h = imgs[0].size
    grid = Image.new("RGB", size=(cols * w, rows * h))
    grid_w, grid_h = grid.size

    for i, img in enumerate(imgs):
        grid.paste(img, box=(i % cols * w, i // cols * h))
    return grid

pths = glob(r"/content/LoRA/train_data/herburn_images/*")
imgs = []
for pth in pths:
  img = Image.open(pth)
  imgs.append(img.resize((512, 512)))

image_grid(imgs[:4], 1, 4)

## Operation instructions: You need to place the photos you prepared in the path above, such as the path here: /content/LoRA/train_data/hb_cartoon. Just drag the images to the folder! Make sure the file upload is complete, then go to next step. 10+ images, as many as you like.

In [None]:
# Use BLIP model to add prompts to your images, for training.
# BLIP is a multimodal generative algorithm, input image, get image prompt description information.

import os

os.chdir(finetune_dir)

batch_size = 8
max_data_loader_n_workers = 2
beam_search = True
min_length = 5
max_length = 75
recursive = False
verbose_logging = True

config = {
    "_train_data_dir" : train_data_dir,
    "batch_size" : batch_size,
    "beam_search" : beam_search,
    "min_length" : min_length,
    "max_length" : max_length,
    "debug" : verbose_logging,
    "caption_extension" : ".caption",
    "max_data_loader_n_workers" : max_data_loader_n_workers,
    "recursive" : recursive
}

args = ""
for k, v in config.items():
    if k.startswith("_"):
        args += f'"{v}" '
    elif isinstance(v, str):
        args += f'--{k}="{v}" '
    elif isinstance(v, bool) and v:
        args += f"--{k} "
    elif isinstance(v, float) and not isinstance(v, bool):
        args += f"--{k}={v} "
    elif isinstance(v, int) and not isinstance(v, bool):
        args += f"--{k}={v} "

final_args = f"python make_captions.py {args}"

os.chdir(finetune_dir)
!{final_args}

In [None]:
project_name = "hb_pro"
vae = "" # Use VAE in base model
output_dir = os.path.join(root_dir, "LoRA/output/hb_pro")

sample_dir = os.path.join(output_dir, "sample")
for dir in [output_dir, sample_dir]:
    os.makedirs(dir, exist_ok=True)

print("Project Name: ", project_name)
print(
    "Pretrained Model Path: ", pretrained_model_name_or_path
) if pretrained_model_name_or_path else print("No Pretrained Model path specified.")

print("VAE Path: ", vae) if vae else print("No VAE path specified.")
print("Output Path: ", output_dir)

In [None]:
# This code is used to preprocess data, process our training data, regularization data into dataloader that the training model can use
# Data augmentation is a very critical operation, such as image flipping, image color disturbance, image transition towards target style, etc. These can be tuned later.

import os
import toml
import glob

dataset_repeats = 20
activation_word = ""
caption_extension = ".caption"
resolution = 512
flip_aug = True
keep_tokens = 0

def parse_folder_name(folder_name, default_num_repeats, default_class_token):
    folder_name_parts = folder_name.split("_")

    if len(folder_name_parts) == 2:
        if folder_name_parts[0].isdigit():
            num_repeats = int(folder_name_parts[0])
            class_token = folder_name_parts[1].replace("_", " ")
        else:
            num_repeats = default_num_repeats
            class_token = default_class_token
    else:
        num_repeats = default_num_repeats
        class_token = default_class_token

    return num_repeats, class_token

def find_image_files(path):
    supported_extensions = (".png", ".jpg", ".jpeg", ".webp", ".bmp")
    return [file for file in glob.glob(path + '/**/*', recursive=True) if file.lower().endswith(supported_extensions)]

def process_data_dir(data_dir, default_num_repeats, default_class_token, is_reg=False):
    subsets = []

    images = find_image_files(data_dir)
    if images:
        subsets.append({
            "image_dir": data_dir,
            "class_tokens": default_class_token,
            "num_repeats": default_num_repeats,
            **({"is_reg": is_reg} if is_reg else {}),
        })

    for root, dirs, files in os.walk(data_dir):
        for folder in dirs:
            folder_path = os.path.join(root, folder)
            images = find_image_files(folder_path)

            if images:
                num_repeats, class_token = parse_folder_name(folder, default_num_repeats, default_class_token)

                subset = {
                    "image_dir": folder_path,
                    "class_tokens": class_token,
                    "num_repeats": num_repeats,
                }

                if is_reg:
                    subset["is_reg"] = True

                subsets.append(subset)

    return subsets


train_subsets = process_data_dir(train_data_dir, dataset_repeats, activation_word)
print(train_subsets)
# reg_subsets = process_data_dir(reg_data_dir, dataset_repeats, activation_word, is_reg=True)

# subsets = train_subsets + reg_subsets
subsets = train_subsets

config = {
    "general": {
        "enable_bucket": True,
        "caption_extension": caption_extension,
        "shuffle_caption": True,
        "keep_tokens": keep_tokens,
        "bucket_reso_steps": 64,
        "bucket_no_upscale": False,
    },
    "datasets": [
        {
            "resolution": resolution,
            "min_bucket_reso": 320 if resolution > 640 else 256,
            "max_bucket_reso": 1280 if resolution > 640 else 1024,
            "caption_dropout_rate": 0,
            "caption_tag_dropout_rate": 0,
            "caption_dropout_every_n_epochs": 0,
            "flip_aug": flip_aug,
            "color_aug": False,
            "face_crop_aug_range": None,
            "subsets": subsets,
        }
    ],
}

dataset_config = os.path.join(config_dir, "dataset_config.toml")

for key in config:
    if isinstance(config[key], dict):
        for sub_key in config[key]:
            if config[key][sub_key] == "":
                config[key][sub_key] = None
    elif config[key] == "":
        config[key] = None

config_str = toml.dumps(config)

with open(dataset_config, "w") as f:
    f.write(config_str)

print(config_str)

In [None]:
# Can provide pre-trained LoRA model
# Set learning rate for text_encoder and UNet respectively

network_category = "LoRA"

conv_dim = 32
conv_alpha = 16
network_dim = 32
network_alpha = 16
network_weight = ""
network_module = "networks.lora"
network_args = ""

min_snr_gamma = -1
optimizer_type = "AdamW8bit"
optimizer_args = ""
train_unet = True
unet_lr = 1e-4
train_text_encoder = True
text_encoder_lr = 5e-5
lr_scheduler = "constant"
lr_warmup_steps = 0
lr_scheduler_num_cycles = 0
lr_scheduler_power = 0

print("- LoRA Config:")
print(f"  - Min-SNR Weighting: {min_snr_gamma}") if not min_snr_gamma == -1 else ""
print(f"  - Loading network module: {network_module}")
print(f"  - {network_module} linear_dim set to: {network_dim}")
print(f"  - {network_module} linear_alpha set to: {network_alpha}")

if not network_weight:
    print("  - No LoRA weight loaded.")
else:
    if os.path.exists(network_weight):
        print(f"  - Loading LoRA weight: {network_weight}")
    else:
        print(f"  - {network_weight} does not exist.")
        network_weight = ""

print("- Optimizer Config:")
print(f"  - Additional network category: {network_category}")
print(f"  - Using {optimizer_type} as Optimizer")
if optimizer_args:
    print(f"  - Optimizer Args: {optimizer_args}")
if train_unet and train_text_encoder:
    print("  - Train UNet and Text Encoder")
    print(f"    - UNet learning rate: {unet_lr}")
    print(f"    - Text encoder learning rate: {text_encoder_lr}")
if train_unet and not train_text_encoder:
    print("  - Train UNet only")
    print(f"    - UNet learning rate: {unet_lr}")
if train_text_encoder and not train_unet:
    print("  - Train Text Encoder only")
    print(f"    - Text encoder learning rate: {text_encoder_lr}")
print(f"  - Learning rate warmup steps: {lr_warmup_steps}")
print(f"  - Learning rate Scheduler: {lr_scheduler}")
if lr_scheduler == "cosine_with_restarts":
    print(f"  - lr_scheduler_num_cycles: {lr_scheduler_num_cycles}")
elif lr_scheduler == "polynomial":
    print(f"  - lr_scheduler_power: {lr_scheduler_power}")

In [None]:
# Set storage format for models, we use safetensors format to adapt to WebUI
# Set test prompt
# Set training parameters


import toml
import os

lowram = True
enable_sample_prompt = True
sampler = "ddim"  #["ddim", "pndm", "lms", "euler", "euler_a", "heun", "dpm_2", "dpm_2_a", "dpmsolver","dpmsolver++", "dpmsingle", "k_lms", "k_euler", "k_euler_a", "k_dpm_2", "k_dpm_2_a"]
noise_offset = 0.0
num_epochs = 10
vae_batch_size = 4
train_batch_size = 6
mixed_precision = "fp16"  # ["no","fp16","bf16"]
save_precision = "fp16"  #  ["float", "fp16", "bf16"]
save_n_epochs_type = "save_every_n_epochs"
save_n_epochs_type_value = 1
save_model_as = "safetensors"  # ["ckpt", "pt", "safetensors"]
max_token_length = 225
clip_skip = 2
gradient_checkpointing = False
gradient_accumulation_steps = 1
seed = -1
logging_dir = os.path.join(root_dir, "LoRA/logs")
prior_loss_weight = 1.0
os.chdir(repo_dir)

# sample_str = f"""
#   masterpiece, best quality, a woman with a very short haircut and a pink shirt, looking at viewer, simple background \
#   --n lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality, low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry \
#   --w 512 \
#   --h 512 \
#   --l 7 \
#   --s 28
# """

sample_str = f"""
  masterpiece, best quality, 1girl,moyou，very short haircut and a pink shirt, looking at viewer, simple background \
  --n EasyNegativeV2,(badhandv4:1.2), \
  --w 512 \
  --h 512 \
  --l 7 \
  --s 28
"""

config = {
    "model_arguments": {
        "v2": False,
        "v_parameterization": False,
        "pretrained_model_name_or_path": pretrained_model_name_or_path,
        "vae": vae,
    },
    "additional_network_arguments": {
        "no_metadata": False,
        "unet_lr": float(unet_lr) if train_unet else None,
        "text_encoder_lr": float(text_encoder_lr) if train_text_encoder else None,
        "network_weights": network_weight,
        "network_module": network_module,
        "network_dim": network_dim,
        "network_alpha": network_alpha,
        "network_args": network_args,
        "network_train_unet_only": True if train_unet and not train_text_encoder else False,
        "network_train_text_encoder_only": True if train_text_encoder and not train_unet else False,
        "training_comment": None,
    },
    "optimizer_arguments": {
        "min_snr_gamma": min_snr_gamma if not min_snr_gamma == -1 else None,
        "optimizer_type": optimizer_type,
        "learning_rate": unet_lr,
        "max_grad_norm": 1.0,
        "optimizer_args": eval(optimizer_args) if optimizer_args else None,
        "lr_scheduler": lr_scheduler,
        "lr_warmup_steps": lr_warmup_steps,
        "lr_scheduler_num_cycles": lr_scheduler_num_cycles if lr_scheduler == "cosine_with_restarts" else None,
        "lr_scheduler_power": lr_scheduler_power if lr_scheduler == "polynomial" else None,
    },
    "dataset_arguments": {
        "cache_latents": True,
        "debug_dataset": False,
        "vae_batch_size": vae_batch_size,
    },
    "training_arguments": {
        "output_dir": output_dir,
        "output_name": project_name,
        "save_precision": save_precision,
        "save_every_n_epochs": save_n_epochs_type_value if save_n_epochs_type == "save_every_n_epochs" else None,
        "save_n_epoch_ratio": save_n_epochs_type_value if save_n_epochs_type == "save_n_epoch_ratio" else None,
        "save_last_n_epochs": None,
        "save_state": None,
        "save_last_n_epochs_state": None,
        "resume": None,
        "train_batch_size": train_batch_size,
        "max_token_length": 225,
        "mem_eff_attn": False,
        "xformers": True,
        "max_train_epochs": num_epochs,
        "max_data_loader_n_workers": 8,
        "persistent_data_loader_workers": True,
        "seed": seed if seed > 0 else None,
        "gradient_checkpointing": gradient_checkpointing,
        "gradient_accumulation_steps": gradient_accumulation_steps,
        "mixed_precision": mixed_precision,
        "clip_skip": clip_skip,
        "logging_dir": logging_dir,
        "log_prefix": project_name,
        "noise_offset": noise_offset if noise_offset > 0 else None,
        "lowram": lowram,
    },
    "sample_prompt_arguments": {
        "sample_every_n_steps": None,
        "sample_every_n_epochs": 1 if enable_sample_prompt else 999999,
        "sample_sampler": sampler,
    },
    "dreambooth_arguments": {
        "prior_loss_weight": 1.0,
    },
    "saving_arguments": {
        "save_model_as": save_model_as
    },
}

config_path = os.path.join(config_dir, "config_file.toml")
prompt_path = os.path.join(config_dir, "sample_prompt.txt")


for key in config:
    if isinstance(config[key], dict):
        for sub_key in config[key]:
            if config[key][sub_key] == "":
                config[key][sub_key] = None
    elif config[key] == "":
        config[key] = None

config_str = toml.dumps(config)

write_file(config_path, config_str)
write_file(prompt_path, sample_str)

print(config_str)

## During the process you can see generated images during training in LoRA/output/hb_pro/sample directory. For selecting your satisfactory LoRA model.

In [None]:
sample_prompt = os.path.join(config_dir, "sample_prompt.txt")
config_file = os.path.join(config_dir, "config_file.toml")
dataset_config = os.path.join(config_dir, "dataset_config.toml")
accelerate_config = os.path.join(repo_dir, "accelerate_config/config.yaml")

accelerate_conf = {
    "config_file" : accelerate_config,
    "num_cpu_threads_per_process" : 1,
}

train_conf = {
    "sample_prompts" : sample_prompt,
    "dataset_config" : dataset_config,
    "config_file" : config_file
}

def train(config):
    args = ""
    for k, v in config.items():
        if k.startswith("_"):
            args += f'"{v}" '
        elif isinstance(v, str):
            args += f'--{k}="{v}" '
        elif isinstance(v, bool) and v:
            args += f"--{k} "
        elif isinstance(v, float) and not isinstance(v, bool):
            args += f"--{k}={v} "
        elif isinstance(v, int) and not isinstance(v, bool):
            args += f"--{k}={v} "

    return args

accelerate_args = train(accelerate_conf)
train_args = train(train_conf)
final_args = f"accelerate launch {accelerate_args} train_network.py {train_args}"

os.chdir(repo_dir)
!{final_args}

In [None]:
# Check LoRA effects for different epochs
pths = sorted(glob.glob(r"/content/LoRA/output/hb_pro/sample/*"))
print(pths)

imgs = []
for pth in pths:
  img = Image.open(pth)
  imgs.append(img)

image_grid(imgs, 2, len(imgs)//2)

## The safetensor files in LoRA/output/hb_pro/ directory are the obtained LoRA models, download to WebUI can be used directly.

In [None]:
import os

network_weight = "/content/LoRA/output/hb_pro/hb_pro.safetensors"
network_mul = 1
network_module = "networks.lora"
network_args = ""

v2 = False
v_parameterization = False
prompt = "masterpiece, best quality, 1girl moyou ( ink sketch) fantasy, surreal muted color ( Russ Mills Anna Dittmann)"   # 你要测试的prompt
negative = "lowres, bad anatomy, bad hands, text, error, missing fingers, extra digit, fewer digits, cropped, worst quality, low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry"
model = pretrained_model_name_or_path
vae = ""
outdir = "/content/tmp"  # Image storage path
scale = 7
sampler = "euler_a"
steps = 20
precision = "fp16"
width = 512
height = 768
images_per_prompt = 4
batch_size = 4
clip_skip = 2
seed = 1024

final_prompt = f"{prompt} --n {negative}"

config = {
    "v2": v2,
    "v_parameterization": v_parameterization,
    "network_module": network_module,
    "network_weight": network_weight,
    "network_mul": float(network_mul),
    "network_args": eval(network_args) if network_args else None,
    "ckpt": model,
    "outdir": outdir,
    "xformers": True,
    "vae": vae if vae else None,
    "fp16": True,
    "W": width,
    "H": height,
    "seed": seed if seed > 0 else None,
    "scale": scale,
    "sampler": sampler,
    "steps": steps,
    "max_embeddings_multiples": 3,
    "batch_size": batch_size,
    "images_per_prompt": images_per_prompt,
    "clip_skip": clip_skip if not v2 else None,
    "prompt": final_prompt,
}

args = ""
for k, v in config.items():
    if k.startswith("_"):
        args += f'"{v}" '
    elif isinstance(v, str):
        args += f'--{k}="{v}" '
    elif isinstance(v, bool) and v:
        args += f"--{k} "
    elif isinstance(v, float) and not isinstance(v, bool):
        args += f"--{k}={v} "
    elif isinstance(v, int) and not isinstance(v, bool):
        args += f"--{k}={v} "

final_args = f"python gen_img_diffusers.py {args}"

os.chdir(repo_dir)
!{final_args}

In [None]:
# View test images
pths = sorted(glob.glob(r"/content/tmp/*"))
print(pths)

imgs = []
for pth in pths[-4:]:
  img = Image.open(pth)
  imgs.append(img)

image_grid(imgs, 1, len(imgs))