# Repo Mirror Tool
项目地址：[licyk/repo_mirror_tools](https://github.com/licyk/repo_mirror_tools)

制作 [HuggingFace](https://huggingface.co) / [ModelScope](https://modelscope.cn) 镜像仓库的工具，基于 [sd-webui-all-in-one/sd_scripts_ipynb_core](https://github.com/licyk/sd-webui-all-in-one/blob/main/sd_scripts_ipynb_core.py) 内核制作。

Colab 链接：<a href="https://colab.research.google.com/github/licyk/repo_mirror_tools/blob/main/repo_mirror_tools.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


In [None]:
# @title 👇 环境配置
from pathlib import Path
import urllib.request
import sys
import os
import traceback
import uuid
from typing import Literal
# SD Scripts Manager 核心下载地址
SD_SCRIPTS_IPYNB_CORE_URL = "https://github.com/licyk/sd-webui-all-in-one/raw/main/sd_scripts_ipynb_core.py"
FORCE_DOWNLOAD_CORE = False  # 设置为 True 时, 即使 SD Scripts Manager 已存在也会重新下载
try:
    print(f"[Repo Mirror Tools] Jupyter Notebook 根目录: {JUPYTER_ROOT_PATH}") # type: ignore
except Exception as _:
    JUPYTER_ROOT_PATH = Path(os.getcwd())
    sys.path.append(str(JUPYTER_ROOT_PATH.resolve()))
    print(f"[Repo Mirror Tools] Jupyter Notebook 根目录: {JUPYTER_ROOT_PATH}")
    SD_SCRIPTS_IPYNB_CORE_PATH = JUPYTER_ROOT_PATH / \
        "sd_scripts_ipynb_core.py"  # SD Scripts Manager 核心保存路径
try:
    if SD_SCRIPTS_IPYNB_CORE_PATH.exists() and not FORCE_DOWNLOAD_CORE:
        print("[Repo Mirror Tools] SD Scripts Manager 核心模块已存在")
    else:
        urllib.request.urlretrieve(
            SD_SCRIPTS_IPYNB_CORE_URL, SD_SCRIPTS_IPYNB_CORE_PATH)
        print("[Repo Mirror Tools] SD Scripts Manager 核心模块下载成功")
except Exception as e:
    raise Exception(f"SD Scripts Manager 核心模块下载错误: {e}")
from sd_scripts_ipynb_core import logger, VERSION, BaseManager
logger.info("SD Scripts Manager 版本: %s", VERSION)
logger.info("核心模块初始化完成")
#################################################################################################################
class RepoMirrorTools(BaseManager):
    """镜像制作工具"""

    def clean_package_cache(self) -> bool:
        """清理 APT, Pip, uv 的缓存

        :return `bool`: 清理成功时返回`True`
        """
        logger.info("清理缓存中")
        try:
            self.run_cmd([sys.executable, "-m", "pip", "cache", "purge"])
            self.run_cmd(["uv", "cache", "clean"])
            self.run_cmd(["apt", "clean"])
            logger.info("缓存清理完成")
            return True
        except Exception as e:
            logger.error("清理缓存时出现错误: %s", e)
            return False

    def verify_huggingface_token(self, hf_token: str) -> bool:
        """HuggingFace Token 验证

        :param hf_token `(str)`: HuggingFace 账号 Token
        :return `bool`: 验证成功时返回`True`, 否则返回`False`
        """
        from huggingface_hub import HfApi
        logger.info("验证 HuggingFace Token 中")
        api = HfApi()
        try:
            api.whoami(hf_token)
            logger.info("HuggingFace Token 验证成功")
            return True
        except Exception as e:
            logger.error("HuggingFace Token 验证失败", e)
            return False

    def verify_modelscope_token(self, ms_token: str) -> bool:
        """验证 ModelScope Token

        :param ms_token `(str)`: ModelScope 账号 Token
        :return `bool`: 验证成功时返回`True`, 否则返回`False`
        """
        from modelscope import HubApi
        api = HubApi()
        logger.info("验证 ModelScope Token 中")
        try:
            api.login(ms_token)
            logger.info("ModelScope Token 验证成功")
            return True
        except Exception as e:
            logger.error("ModelScope Token 验证失败", e)
            return False

    def generate_repo_url(
        self,
        api_type: Literal["huggingface", "modelscope"],
        repo_id: str,
        repo_type: Literal["model", "dataset", "space"],
    ) -> str | None:
        """生成仓库访问地址

        :param api_type`(Literal["huggingface","modelscope"])`: Api 类型
        :param repo_id`(str)`: 仓库 ID
        :param repo_type`(Literal["model","dataset","space"])`: 仓库类型
        :return `str | None`: 仓库访问地址
        """

        if api_type == "huggingface":
            if repo_type == "model":
                return f"https://huggingface.co/{repo_id}"
            if repo_type == "dataset":
                return f"https://huggingface.co/datasets/{repo_id}"
            if repo_type == "space":
                return f"https://huggingface.co/spaces/{repo_id}"
        elif api_type == "modelscope":
            if repo_type == "model":
                return f"https://modelscope.cn/models/{repo_id}"
            if repo_type == "dataset":
                return f"https://modelscope.cn/datasets/{repo_id}"
            if repo_type == "space":
                return f"https://modelscope.cn/studios/{repo_id}"
        else:
            logger.error("未知的 Api 类型: %s", api_type)
            return None

    def sync_repo(
        self,
        src_repo: Literal["huggingface", "modelscope"],
        dst_repo: Literal["huggingface", "modelscope"],
        src_repo_id: str,
        dst_repo_id: str,
        src_repo_type: Literal["model", "dataset", "space"] = "model",
        dst_repo_type: Literal["model", "dataset", "space"] = "model",
        visibility: bool | None = False,
        retry: int | None = 3,
    ) -> None:
        """镜像 HuggingFace / ModelScope 仓库

        :param src_repo`(Literal["huggingface","modelscope"])`: 源仓库类型
        :param dst_repo`(Literal["huggingface","modelscope"])`: 镜像仓库类型
        :param src_repo_id`(str)`: 原仓库 ID
        :param dst_repo_id`(str)`: 镜像仓库 ID
        :param src_repo_type`(str)`: 原仓库类型
        :param dst_repo_type`(str)`: 镜像仓库类型
        :param visibility`(bool|None)`: 当镜像仓库不存在时自动创建的仓库的可见性
        :param retry`(int|None)`: 上传重试次数
        """
        if src_repo not in ["huggingface", "modelscope"]:
            logger.error("未知的镜像仓库类型: %s", src_repo)
            return
        if dst_repo not in ["huggingface", "modelscope"]:
            logger.error("未知的镜像仓库类型: %s", dst_repo)
            return

        logger.info("镜像仓库: %s/%s -> %s/%s", src_repo, src_repo_id, dst_repo, dst_repo_id)

        if not self.repo.check_repo(
            api_type=dst_repo,
            repo_id=dst_repo_id,
            repo_type=dst_repo_type,
            visibility=visibility,
        ):
            logger.error("检查 %s/%s (类型: %s) 仓库失败, 无法镜像仓库", dst_repo, dst_repo_id, dst_repo_type)
            return

        self.make_hf_or_ms_repo_mirror(
            src_repo=src_repo,
            dst_repo=dst_repo,
            src_repo_id=src_repo_id,
            dst_repo_id=dst_repo_id,
            src_repo_type=src_repo_type,
            dst_repo_type=dst_repo_type,
            retry=retry,
        )
        src_repo_url = self.generate_repo_url(
            api_type=src_repo,
            repo_id=src_repo_id,
            repo_type=src_repo_type,
        )
        dst_repo_url = self.generate_repo_url(
            api_type=dst_repo,
            repo_id=dst_repo_id,
            repo_type=dst_repo_type,
        )
        logger.info("%s -> %s", src_repo_url, dst_repo_url)

    def make_hf_or_ms_repo_mirror(
        self,
        src_repo: Literal["huggingface", "modelscope"],
        dst_repo: Literal["huggingface", "modelscope"],
        src_repo_id: str,
        dst_repo_id: str,
        src_repo_type: Literal["model", "dataset", "space"] = "model",
        dst_repo_type: Literal["model", "dataset", "space"] = "model",
        retry: int | None = 3
    ) -> None:
        """镜像 HuggingFace / ModelScope 仓库文件

        :param src_repo`(Literal["huggingface","modelscope"])`: 源仓库类型
        :param dst_repo`(Literal["huggingface","modelscope"])`: 镜像仓库类型
        :param src_repo_id`(str)`: 原仓库 ID
        :param dst_repo_id`(str)`: 镜像仓库 ID
        :param src_repo_type`(str)`: 原仓库类型
        :param dst_repo_type`(str)`: 镜像仓库类型
        :param retry`(int|None)`: 上传重试次数
        """
        from tqdm import tqdm
        from modelscope import snapshot_download
        src_repo_files = set(
            self.repo.get_repo_file(
                api_type=src_repo,
                repo_id=src_repo_id,
                repo_type=src_repo_type,
                retry=retry,
            )
        )
        dst_repo_files = set(
            self.repo.get_repo_file(
                api_type=dst_repo,
                repo_id=dst_repo_id,
                repo_type=dst_repo_type,
                retry=retry,
            )
        )
        need_sync_files = [
            x
            for x in tqdm(src_repo_files, desc=f"统计需要镜像到 {dst_repo} 的文件")
            if x not in dst_repo_files
        ]
        files_count = len(need_sync_files)
        logger.info("需要镜像的文件数量: %s", files_count)
        count = 0
        retry_sum = 0
        tmp_dir = self.workspace / f"{uuid.uuid4()}"
        for file in need_sync_files:
            count += 1
            logger.info("[%s/%s] 镜像 %s 到 %s (类型: %s) 中", count, files_count, file, dst_repo_id, dst_repo_type)
            while retry_sum < retry:
                try:
                    if src_repo == "huggingface":
                        self.repo.hf_api.hf_hub_download(
                            repo_id=src_repo_id,
                            repo_type=src_repo_type,
                            filename=file,
                            local_dir=tmp_dir,
                        )
                    elif src_repo == "modelscope":
                        snapshot_download(
                            repo_id=src_repo_id,
                            repo_type=src_repo_type,
                            allow_patterns=file,
                            local_dir=tmp_dir,
                        )
                    file_path = tmp_dir / file
                    if dst_repo == "huggingface":
                        self.repo.hf_api.upload_file(
                            path_or_fileobj=file_path,
                            path_in_repo=file,
                            repo_id=dst_repo_id,
                            repo_type=dst_repo_type,
                            commit_message=f"Upload {file}",
                        )
                    elif dst_repo == "modelscope":
                        self.repo.ms_api.upload_file(
                            path_or_fileobj=file_path,
                            path_in_repo=file,
                            repo_id=dst_repo_id,
                            repo_type=dst_repo_type,
                            commit_message=f"Upload {file}",
                            token=self.repo.ms_token,
                        )
                    self.remove_files(file_path)
                    break
                except Exception as e:
                    traceback.print_exc()
                    logger.error("[%s/%s] 镜像 %s 时发生了错误: %s", count, files_count, file, e)
                    if retry_sum < retry:
                        logger.warning("重新镜像 %s 中", file)
        logger.info("镜像仓库完成")
        if tmp_dir.exists():
            self.remove_files(tmp_dir)

    def install(
        self,
        use_uv: bool | None = True,
        huggingface_token: str | None = None,
        modelscope_token: str | None = None,
        clean_install_log: bool | None = False,
    ) -> None:
        logger.info("配置镜像制作工具环境中")
        self.mirror.configure_pip()
        self.env.install_manager_depend(use_uv)
        self.clean_package_cache()
        if clean_install_log:
            self.utils.clear_up()
        if huggingface_token is not None and not self.verify_huggingface_token(huggingface_token):
            logger.warning("请检查 HuggingFace Token 是否可用")
        if modelscope_token is not None and not self.verify_modelscope_token(modelscope_token):
            logger.warning("请检查 ModelScope Token 是否可用")
        self.restart_repo_manager(
            hf_token=huggingface_token,
            ms_token=modelscope_token,
        )
        logger.info("配置镜像制作工具环境完成")


###############################################

# @markdown #### HuggingFace Token, 可在 Account -> Settings -> Access Tokens 中获取
HF_TOKEN = ""  # @param {type:"string"}
# @markdown #### ModelScope Token, 可在 首页 -> 访问令牌 -> SDK 令牌 中获取
MS_TOKEN = ""  # @param {type:"string"}
# @markdown #### 工作区路径
WORKSPACE = "/content"  # @param {type:"string"}
# @markdown #### 清理安装环境日志
CLEAR_LOG = True #@param {type:"boolean"}

###############################################
os.chdir(WORKSPACE)
mirror_maker = RepoMirrorTools(WORKSPACE, "__MIRROR_MAKER__")
mirror_maker.install(
    use_uv=True,
    huggingface_token=HF_TOKEN or None,
    modelscope_token=MS_TOKEN or None,
    clean_install_log=CLEAR_LOG,
)
INSTALL_DONE = True

In [None]:
#@title 👇 上传文件夹到 ModelScope
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")

#@markdown #### ModelScope 仓库 ID
repo_id = "" #@param {type:"string"}
#@markdown #### ModelScope 仓库类型
repo_type = "model" #@param ["model", "dataset", "space"]
#@markdown #### 当仓库不存在时自动创建的仓库的可见性
visibility = True #@param {type:"boolean"}
#@markdown #### 上传的文件夹
upload_path = "/content/model" #@param {type:"string"}

mirror_maker.repo.upload_files_to_repo(
    api_type="modelscope", # Api 类型
    repo_id=repo_id, # Modelscope 的仓库地址
    repo_type=repo_type, # ModelScope 仓库类型
    visibility=visibility, # 新建的仓库可见性
    upload_path=upload_path # 要上传的文件夹
)

In [None]:
#@title 👇 上传文件夹到 HuggingFace
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")

#@markdown #### HuggingFace 仓库 ID
repo_id = "" #@param {type:"string"}
#@markdown #### HuggingFace 仓库类型
repo_type = "model" #@param ["model", "dataset", "space"]
#@markdown #### 当仓库不存在时自动创建的仓库的可见性
visibility = True #@param {type:"boolean"}
#@markdown #### 上传的文件夹
upload_path = "/content/model" #@param {type:"string"}

mirror_maker.repo.upload_files_to_repo(
    api_type="huggingface", # Api 类型
    repo_id=repo_id, # HuggingFace 仓库地址
    repo_type=repo_type, # HuggingFace 仓库种类
    visibility=visibility, # 新建的仓库可见性
    upload_path=upload_path # 要上传文件的目录
)

In [None]:
#@title 👇 镜像 HuggingFace / ModelScope 仓库
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")

#@markdown #### 源仓库路径
src_repo = "huggingface" #@param ["huggingface", "modelscope"]
#@markdown #### 镜像仓库路径
dst_repo = "modelscope" #@param ["huggingface", "modelscope"]
#@markdown #### 源仓库 ID
src_repo_id = "" #@param {type:"string"}
#@markdown #### 源仓库类型
src_repo_type = "model" #@param ["model", "dataset", "space"]
#@markdown #### 镜像仓库 ID
dst_repo_id = "" #@param {type:"string"}
#@markdown #### 镜像仓库类型
dst_repo_type = "model" #@param ["model", "dataset", "space"]
#@markdown #### 当仓库不存在时自动创建的仓库的可见性
visibility = True #@param {type:"boolean"}

mirror_maker.sync_repo(
    src_repo=src_repo,
    dst_repo=dst_repo,
    src_repo_id=src_repo_id or None,
    dst_repo_id=dst_repo_id or None,
    src_repo_type=src_repo_type,
    dst_repo_type=dst_repo_type,
    visibility=visibility,
)

In [None]:
#@title 👇 从 HuggingFace 下载仓库中的单个文件
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")

#@markdown #### HuggingFace 仓库 ID
repo_id = "" #@param {type:"string"}
#@markdown #### HuggingFace 仓库类型
repo_type = "model" #@param ["model", "dataset", "space"]
#@markdown #### 文件在仓库中的路径
filename = "" #@param {type:"string"}
#@markdown #### 下载到本地的路径
local_dir = "/content/model" #@param {type:"string"}
cache_dir_1 = os.path.join(local_dir, ".cache")
cache_dir_2 = os.path.join(local_dir, ".huggingface")

mirror_maker.repo.hf_api.hf_hub_download(
    repo_id=repo_id,
    filename=filename,
    repo_type=repo_type,
    local_dir=local_dir,
)
mirror_maker.remove_files(cache_dir_1)
mirror_maker.remove_files(cache_dir_2)

In [None]:
#@title 👇 从 HuggingFace 下载整个仓库
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")

#@markdown #### HuggingFace 仓库 ID
repo_id = "" #@param {type:"string"}
#@markdown #### HuggingFace 仓库类型
repo_type = "model" #@param ["model", "dataset", "space"]
#@markdown #### 下载到本地的路径
local_dir = "/content" #@param {type:"string"}
cache_dir_1 = os.path.join(local_dir, ".cache")
cache_dir_2 = os.path.join(local_dir, ".huggingface")

mirror_maker.repo.hf_api.snapshot_download(
    repo_id=repo_id,
    repo_type=repo_type,
    local_dir=local_dir,
)
mirror_maker.remove_files(cache_dir_1)
mirror_maker.remove_files(cache_dir_2)

In [None]:
#@title 👇 从 ModelScope 下载仓库中的单个文件
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")

#@markdown #### ModelScope 仓库 ID
repo_id = "" #@param {type:"string"}
#@markdown #### ModelScope 仓库类型
repo_type = "model" #@param ["model", "dataset", "space"]
#@markdown #### 文件在仓库中的路径
filename = "" #@param {type:"string"}
#@markdown #### 下载到本地的路径
local_dir = "/content/model" #@param {type:"string"}
cache_dir_1 = os.path.join(local_dir, "._____temp")
cache_dir_2 = os.path.join(local_dir, ".msc")
cache_dir_3 = os.path.join(local_dir, ".mv")

from modelscope import snapshot_download
snapshot_download(
    repo_id=repo_id,
    allow_patterns=filename,
    repo_type=repo_type,
    local_dir=local_dir,
)
mirror_maker.remove_files(cache_dir_1)
mirror_maker.remove_files(cache_dir_2)
mirror_maker.remove_files(cache_dir_3)

In [None]:
#@title 👇 从 ModelScope 下载整个仓库
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")

#@markdown #### ModelScope 仓库 ID
repo_id = "" #@param {type:"string"}
#@markdown #### ModelScope 仓库类型
repo_type = "model" #@param ["model", "dataset", "space"]
#@markdown #### 下载到本地的路径
local_dir = "/content/model" #@param {type:"string"}
cache_dir_1 = os.path.join(local_dir, "._____temp")
cache_dir_2 = os.path.join(local_dir, ".msc")
cache_dir_3 = os.path.join(local_dir, ".mv")

from modelscope import snapshot_download
snapshot_download(
    repo_id=repo_id,
    repo_type=repo_type,
    local_dir=local_dir,
)
mirror_maker.remove_files(cache_dir_1)
mirror_maker.remove_files(cache_dir_2)
mirror_maker.remove_files(cache_dir_3)

In [None]:
#@title 👇 下载文件
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")

#@markdown #### 下载链接
url = "" #@param {type:"string"}
#@markdown #### 下载到本地的路径
path = "/content" #@param {type:"string"}
#@markdown #### 保存的文件名
filename = "" #@param {type:"string"}
#@markdown #### 下载工具
download_tool = "aria2" #@param ["aria2", "request"]

mirror_maker.downloader.download_file(
    url=url,
    path=path,
    save_name=filename or None,
    tool=download_tool,
)

In [None]:
#@title 👇 清理 Python 软件包
try:
    INSTALL_DONE
except Exception as _:
    raise Exception("未进行环境配置单元进行环境初始化, 请运行后再试")
logger.info("清理 Python 软件包中")
!uv pip uninstall \
    tensorboard tensorboard-data-server tensorflow tensorflow-datasets tensorflow-estimator tensorflow-gcs-config tensorflow-hub tensorflow-io-gcs-filesystem tensorflow-metadata tensorflow-probability tensorstore \
    torch torchaudio torchsummary torchtext torchvision triton \
    opencv-python-headless opencv-python opencv-contrib-python \
    Sphinx sphinxcontrib-applehelp sphinxcontrib-devhelp sphinxcontrib-htmlhelp sphinxcontrib-jsmath sphinxcontrib-qthelp sphinxcontrib-serializinghtml
logger.info("Python 软件包清理完成")