<a href="https://colab.research.google.com/github/yf591/sd-model-merge-tool/blob/main/04_Merge_Model_Maker_Ver2_1_0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 3モデル以上の階層マージ（Huggin Face, Civitai, MyDriveからのロードに対応）

## 事前準備

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
from google.colab import output

# Hugging Face Hub, PyTorch, その他必要なライブラリをインストール
!pip install --upgrade pip
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 # PyTorchを使用して深層学習モデルを操作します。CUDAバージョン（例: `cu118`）を指定
!pip install diffusers transformers accelerate # Stable Diffusionを扱うための主要ライブラリです。モデルのロードや画像生成の操作を簡素化
!pip install safetensors # 安全かつ軽量なモデル保存形式（`.safetensors`）をサポート
!pip install huggingface-hub # Hugging Face Hubからモデルをダウンロード・管理
!pip install opencv-python # 生成した画像の前処理や後処理に使用
!pip install numpy # 数値計算ライブラリで、モデルや画像の操作に使う
!pip install matplotlib # 生成された画像の可視化に使う
!pip install tqdm # プログレスバーの表示
!pip install optuna # ハイパーパラメータ最適化

output.clear()

In [None]:
import os
import torch
from diffusers import DiffusionPipeline, StableDiffusionPipeline, UNet2DConditionModel
from safetensors.torch import load_file, save_file
from transformers import AutoConfig, AutoModel
from huggingface_hub import hf_hub_download
import requests
from tqdm import tqdm
import ipywidgets as widgets
from IPython.display import display
from typing import List, Dict
from urllib.parse import urlparse
from pathlib import Path
import hashlib
import time

In [None]:
#@title ### APIキー設定（Hugging Face, Civitai）

from getpass import getpass
from google.colab import userdata

# Hugging Faceで取得したTokenをこちらに貼る(トークンを非表示で入力)
HF_TOKEN = getpass("Hugging FaceのRead権限のあるHF Tokenを入力してください: ")

# CIVITAI_TOKEN が存在する場合、取得
api_key = userdata.get('CIVITAI_TOKEN')
if api_key is None:
    print("Error: CIVITAI_API_KEY secret is not set.")

## 各種設定（関数の定義、モデル数の設定）

In [5]:
#@title ### 関数の定義

# ヘルパー関数
def download_model(repo_id, filename, token):
    """Hugging Face Hubからモデルをダウンロード"""
    return hf_hub_download(repo_id=repo_id, filename=filename, token=token)


def get_model_filename_from_url(url):
    """URLからモデル名を抽出する"""
    parsed_url = urlparse(url)
    # パスの最後の部分をファイル名として使用
    filename = os.path.basename(parsed_url.path)
    # ファイル名が無い場合はデフォルト名を使用
    if not filename or not filename.endswith('.safetensors'):
        filename = f"model_{hash(url)}.safetensors"
    return filename

def download_civitai_model(url, output_dir="/content/downloaded_models", api_key=None):
    """Civitaiからモデルをダウンロード（一意のファイル名を使用）"""
    try:
        # 出力ディレクトリが存在しない場合は作成
        os.makedirs(output_dir, exist_ok=True)

        # URLからファイル名を取得
        filename = get_model_filename_from_url(url)
        output_path = os.path.join(output_dir, filename)

        # 既にダウンロード済みの場合はそのパスを返す
        if os.path.exists(output_path):
            print(f"モデルは既にダウンロード済みです: {output_path}")
            return output_path

        print(f"モデルをダウンロードしています: {filename}")
        headers = {"Authorization": f"Bearer {api_key}"} if api_key else None
        response = requests.get(url, stream=True, headers=headers)
        response.raise_for_status()

        total_size = int(response.headers.get('content-length', 0))
        with open(output_path, 'wb') as file, tqdm(
            desc=filename,
            total=total_size,
            unit='iB',
            unit_scale=True,
            unit_divisor=1024,
        ) as bar:
            for data in response.iter_content(chunk_size=1024):
                size = file.write(data)
                bar.update(size)

        print(f"ダウンロード完了: {output_path}")
        return output_path

    except Exception as e:
        print(f"Civitaiからのダウンロード中にエラーが発生しました: {e}")
        return None


def load_model(path, device):
    """ファイルパスからモデルをロードする（修正版）"""
    try:
        if path.startswith("http"):
            if "civitai.com" in path:
                output_path = download_civitai_model(path, api_key=api_key)
                if not output_path:
                    return None
                print(f"Civitaiからモデルをロード: {output_path}")
                try:
                    return load_file(output_path, device=device)
                except Exception as e:
                  print(f"Error loading downloaded Civitai model: {e}. Attempting to redownload...")
                  os.remove(output_path)
                  output_path = download_civitai_model(path, api_key=api_key)
                  if not output_path:
                     return None
                  return load_file(output_path, device=device)
            elif "huggingface.co" in path:
                print(f"HuggingFaceからモデルをロード: {path}")
                repo_id_and_file = path.split("huggingface.co/")[1]
                repo_id = repo_id_and_file.split("/resolve/")[0]
                filename = repo_id_and_file.split("/")[-1]
                path = download_model(repo_id, filename, HF_TOKEN)
                return load_file(path, device=device)
            else:
                print("Error: HTTP URLが認識できません。HuggingFaceまたはCivitaiのモデルを使用してください。")
                return None

        if path.startswith("/content/drive"):
            print(f"Google Driveからモデルをロード: {path}")
            return load_file(path, device=device)
        else:
            print("Error: モデルパスが正しくありません。")
            return None

    except Exception as e:
        print(f"モデルのロード中にエラーが発生しました: {e}")
        return None

def merge_multiple_models(models: List[Dict], alpha, layer_sliders, use_alpha_checkboxes):
    """複数のモデルを階層マージする（修正版）"""
    merged_weights = {}

    # 最初のモデルのキー構造を取得 # U-Netモデルの各レイヤー名と構造を把握するため
    base_model_keys = set(models[0]['weights'].keys())
    skipped_layers = []

    for key in base_model_keys: # U-Netの全ての重みに対して処理を実行
        weights_to_merge = []
        valid_alphas = []

        # キーから層の名前を抽出 # 例: model.diffusion_model.input_blocks.4.1.proj_in.weight
        layer_type = None # レイヤーの種類(IN, M, OUT)
        if 'input_blocks' in key: # U-Netのエンコーダー部分
            layer_type = 'IN'
        elif 'middle_block' in key: # U-Netの中間層
            layer_type = 'M'
        elif 'output_blocks' in key: # U-Netのデコーダー部分
            layer_type = 'OUT'

        # 層番号の抽出（存在する場合）
        layer_num = None # レイヤーの番号 (例: IN04)
        if layer_type:
            try:
                if layer_type == 'M': # middle_blockは00固定
                    layer_num = '00'
                else:
                    # キーから数字を抽出
                    parts = key.split('.')
                    for part in parts:
                        if part.isdigit():
                            layer_num = str(part).zfill(2)
                            break
            except:
                pass

        # 完全な層名を構築（例: "IN04"）
        layer_name = f"{layer_type}{layer_num}" if layer_type and layer_num else None # 例: input_blocksの4番目のレイヤーなら IN04

        try:
            for idx, model in enumerate(models): # 各モデルの重みを収集
                weights = model['weights']
                if key in weights and weights[key].size() == models[0]['weights'][key].size(): # 重みのサイズが一致する場合
                    weights_to_merge.append(weights[key])
                    valid_alphas.append(model['alpha'])
                else:
                    print(f"警告: レイヤー {key} のサイズが一致しないか存在しません。スキップします。")
                    skipped_layers.append(key)
                    break

            if weights_to_merge: # マージする重みが存在する場合
                if layer_name and layer_name in use_alpha_checkboxes: # レイヤー名が存在し、かつチェックボックスがUIにある場合
                    if use_alpha_checkboxes[layer_name].value: # チェックボックスがTrueの場合、グローバルalphaを使用
                        # グローバルアルファ値を使用
                        merged_weights[key] = sum(
                            alpha * weight for alpha, weight in zip(valid_alphas, weights_to_merge)
                        )
                    else: # チェックボックスがFalseの場合、レイヤー固有のスライダーの値を使用
                        # レイヤー固有のスライダー値を使用
                        layer_alphas = []
                        sliders = layer_sliders.get(layer_name, []) # レイヤーに対応するスライダーを取得

                        # スライダー値の取得
                        for i in range(len(weights_to_merge) - 1):
                            if i < len(sliders):
                                layer_alphas.append(sliders[i].value) # スライダーの値を収集
                            else:
                                layer_alphas.append(1.0 / len(weights_to_merge)) # スライダーが無い場合はデフォルト値

                        # 最後のアルファ値を計算
                        last_alpha = 1.0 - sum(layer_alphas)
                        layer_alphas.append(last_alpha)

                        # 重みの合計を計算
                        merged_weights[key] = sum(
                            alpha * weight for alpha, weight in zip(layer_alphas, weights_to_merge)
                        )
                else: # レイヤー名がUIに存在しない場合はグローバルalphaを使用
                    # レイヤー名が見つからない場合はグローバルアルファ値を使用
                    merged_weights[key] = sum(
                        alpha * weight for alpha, weight in zip(valid_alphas, weights_to_merge)
                    )
            else:
                print(f"情報: レイヤー {key} に対応する重みが見つかりません。最初のモデルの重みを使用します。")
                merged_weights[key] = models[0]['weights'][key] # 重みが見つからない場合は、最初のモデルの重みを使用

        except Exception as e:
            print(f"Skipped key: {key}, Error: {e}")
            skipped_layers.append(key)
            merged_weights[key] = models[0]['weights'][key]

    if skipped_layers:
        print("\nスキップされたレイヤーの数:", len(skipped_layers))
        print("スキップされたレイヤーの例:", skipped_layers[:5])

    return merged_weights


def save_merged_model(merged_weights, output_path):
    """マージ済みモデルを保存"""
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    save_file(merged_weights, output_path)

In [6]:
#@title MergeするModel数の定義

# UI設定
num_models = 5 #@param {type:"integer"}
paths = [widgets.Text(value="", description=f"Path{i+1}", layout=widgets.Layout(width='80%')) for i in range(num_models)]
sliders = [widgets.FloatSlider(value=1/num_models, min=0, max=1, step=0.01, description=f"Alpha{i+1}") for i in range(num_models-1)]
alpha_n = widgets.FloatText(value=1-sum([slider.value for slider in sliders]), description=f"Alpha{num_models}", disabled = True)

def enforce_alpha_constraints(*args):
    total_alpha = sum(slider.value for slider in sliders)
    alpha_n.value = 1 - total_alpha

for slider in sliders:
    slider.observe(enforce_alpha_constraints, 'value')

output_file_widget = widgets.Text(value="/content/drive/MyDrive/sd-webui-google-colab-setup/stable-diffusion-webui/models/checkpoints/merged_model_kaisou_sample1.safetensors", description="Output", layout=widgets.Layout(width='80%'))


# UI設定 (レイヤーごとのスライダーを追加)
layer_names = [
    "IN00", "IN01", "IN02", "IN03", "IN04", "IN05", "IN06", "IN07", "IN08", "IN09", "IN10", "IN11",
    "M00",
    "OUT00", "OUT01", "OUT02", "OUT03", "OUT04", "OUT05", "OUT06", "OUT07", "OUT08", "OUT09", "OUT10", "OUT11"
]
layer_sliders = {}
use_alpha_checkboxes = {}
for layer in layer_names:
    layer_sliders[layer] = [widgets.FloatSlider(value=1/num_models, min=0, max=1, step=0.01, description=f"{layer}_{i+1}") for i in range(num_models-1)]
    # 自動計算されるFloatTextの初期値を設定
    layer_sliders[layer].append(widgets.FloatText(value=1-sum(1/num_models for i in range(num_models-1)), description = f"{layer}_{num_models}", disabled = True))
    use_alpha_checkboxes[layer] = widgets.Checkbox(value=False, description=f"Alphaを優先({layer})")
    def enforce_layer_slider_constraints(change, layer_name = layer):
         total_value = sum([slider.value for slider in layer_sliders[layer_name][:-1]])
         layer_sliders[layer_name][-1].value = 1 - total_value
    for slider in layer_sliders[layer][:-1]:
        slider.observe(enforce_layer_slider_constraints, 'value')

## UIの設定と実行

In [7]:
#@title ### マージ実行関数

def execute_merge(b):  # ボタンのクリックイベントではbutton引数が必要
    output_file = output_file_widget.value
    try:
        print("モデルのロードを開始します...")
        models = []
        # 最後のモデルのパスとアルファ値も含める
        all_paths = paths
        all_alphas = sliders + [alpha_n]

        for path, alpha in zip(all_paths, all_alphas):
            if path.value and alpha.value > 0:
                model = load_model(path.value, device="cuda" if torch.cuda.is_available() else "cpu")
                if model is not None:
                    models.append({
                        "weights": model,
                        "alpha": alpha.value  # .valueを追加
                    })
                else:
                    print(f"Error: model loading failed. Skip this model.")

        if len(models) == 0:
            print("Error: At least one model is required for merging.")
        else:
            for idx, model in enumerate(models):
                print(f"model{idx+1} keys: {list(model['weights'].keys())[:5]} ...")

            print("モデルをレイヤーごとにマージ中...")
            merged_weights = merge_multiple_models(
                models=models,
                alpha=None,  # alpha引数は不要なので削除するか、Noneを渡す
                layer_sliders=layer_sliders,
                use_alpha_checkboxes=use_alpha_checkboxes
            )

            print(f"マージされたモデルを保存します: {output_file}")
            save_merged_model(merged_weights, output_file)
            print("マージ完了！")

    except Exception as e:
        print(f"エラーが発生しました: {e}")
        raise e  # エラーの詳細を表示

Memo
```Python
# UI設定
path1 = "https://huggingface.co/casque/majicmixRealistic_v6/resolve/main/majicmixRealistic_v6.safetensors" #@param {type:"string"}
path2 = "/content/drive/MyDrive/sd-webui-google-colab-setup/stable-diffusion-webui/models/checkpoints/merged_model_chillre_majic.safetensors"  #@param {type:"string"}
path3 = "https://civitai.com/api/download/models/279964?type=Model&format=SafeTensor&size=full&fp=fp16" #@param {type:"string"}
path4 = "https://civitai.com/api/download/models/90505?type=Model&format=SafeTensor&size=full&fp=fp32"  #@param {type:"string"}
path5 = "" #@param {type:"string"}
path6 = ""  #@param {type:"string"}
```

In [8]:
#@title ### UIの表示と実行ボタン

#@markdown ### チェックボックスに **✓ を入れる (オン, `True`) と、そのレイヤーは `alpha` 値でマージ** されます。
#@markdown ### チェックボックスに **✓ を入れない (オフ, `False`) と、そのレイヤーは個別に設定された比率でマージ** されます。

merge_button = widgets.Button(description="マージ実行")
merge_button.on_click(execute_merge)  # on_clickに直接関数を渡す

ui = widgets.VBox([
    item for sublist in [
        [*paths, output_file_widget],
        sliders + [alpha_n],
        *[[use_alpha_checkboxes[layer]] + layer_sliders[layer] for layer in layer_names],
        [merge_button]  # 修正したボタンを追加
    ] for item in sublist
])
display(ui)

VBox(children=(Text(value='', description='Path1', layout=Layout(width='80%')), Text(value='', description='Pa…

モデルのロードを開始します...
HuggingFaceからモデルをロード: https://huggingface.co/casque/majicmixRealistic_v6/resolve/main/majicmixRealistic_v6.safetensors


majicmixRealistic_v6.safetensors:   0%|          | 0.00/2.40G [00:00<?, ?B/s]

モデルをダウンロードしています: model_4653899186037957543.safetensors


model_4653899186037957543.safetensors: 100%|██████████| 4.46G/4.46G [01:43<00:00, 46.2MiB/s]


ダウンロード完了: /content/downloaded_models/model_4653899186037957543.safetensors
Civitaiからモデルをロード: /content/downloaded_models/model_4653899186037957543.safetensors
モデルをダウンロードしています: model_-598469663754219417.safetensors


model_-598469663754219417.safetensors: 100%|██████████| 2.24G/2.24G [00:27<00:00, 88.2MiB/s]


ダウンロード完了: /content/downloaded_models/model_-598469663754219417.safetensors
Civitaiからモデルをロード: /content/downloaded_models/model_-598469663754219417.safetensors
モデルをダウンロードしています: model_-1429292943124362659.safetensors


model_-1429292943124362659.safetensors:  51%|█████     | 2.56G/5.05G [01:02<01:00, 43.8MiB/s]


Civitaiからのダウンロード中にエラーが発生しました: ('Connection broken: IncompleteRead(2747596800 bytes read, 2671157088 more expected)', IncompleteRead(2747596800 bytes read, 2671157088 more expected))
Error: model loading failed. Skip this model.
Google Driveからモデルをロード: /content/drive/MyDrive/sd-webui-google-colab-setup/stable-diffusion-webui/models/checkpoints/anyloraCheckpoint_bakedvaeBlessedFp16.safetensors
model1 keys: ['cond_stage_model.transformer.text_model.embeddings.position_embedding.weight', 'cond_stage_model.transformer.text_model.embeddings.position_ids', 'cond_stage_model.transformer.text_model.embeddings.token_embedding.weight', 'cond_stage_model.transformer.text_model.encoder.layers.0.layer_norm1.bias', 'cond_stage_model.transformer.text_model.encoder.layers.0.layer_norm1.weight'] ...
model2 keys: ['cond_stage_model.transformer.text_model.embeddings.position_embedding.weight', 'cond_stage_model.transformer.text_model.embeddings.position_ids', 'cond_stage_model.transformer.text_model.embedd