In [2]:
import os
import shutil
import zipfile
import tarfile
from pathlib import Path
from tqdm.auto import tqdm

d = Path(r"F:/EPIC-KITCHEN-100")
f = Path(r"F:/2g1n6qdydwa9u22shpxqzp0t8m")

print("SRC:", f)
print("DST:", d)

SRC: F:\2g1n6qdydwa9u22shpxqzp0t8m
DST: F:\EPIC-KITCHEN-100


In [4]:
def is_archive(p: Path) -> bool:
    name = p.name.lower()
    return (
        name.endswith(".zip")
        or name.endswith(".tar")
        or name.endswith(".tar.gz")
        or name.endswith(".tgz")
    )

def archive_type(p: Path) -> str:
    name = p.name.lower()
    if name.endswith(".zip"):
        return "zip"
    if name.endswith(".tar"):
        return "tar"
    if name.endswith(".tar.gz") or name.endswith(".tgz"):
        return "targz"
    return "unknown"

def top_level_has_single_folder(paths):
    """
    paths: list of member paths (posix style)
    returns (has_single_folder, folder_name)
    """
    # remove empty / directory markers
    clean = [pp.strip("/") for pp in paths if pp and pp.strip("/")]
    if not clean:
        return False, None
    tops = {pp.split("/", 1)[0] for pp in clean}
    if len(tops) == 1:
        # still might be files directly at root (no slash). But tops will be that filename.
        # Check whether at least one member actually has a slash under that top.
        only = next(iter(tops))
        has_sub = any(pp.startswith(only + "/") for pp in clean)
        # if no member contains '/', then root contains files only -> not a folder container
        if has_sub:
            return True, only
    return False, None

def safe_mkdir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def extract_zip(zip_path: Path, dst_dir: Path):
    with zipfile.ZipFile(zip_path, "r") as zf:
        names = zf.namelist()
        has_folder, _ = top_level_has_single_folder(names)

        # 没有顶层文件夹 => 用压缩包名建索引目录
        final_dst = dst_dir
        if not has_folder:
            final_dst = dst_dir / zip_path.stem
            safe_mkdir(final_dst)

        zf.extractall(final_dst)

def extract_tar(tar_path: Path, dst_dir: Path):
    mode = "r:gz" if tar_path.name.lower().endswith((".tar.gz", ".tgz")) else "r:"
    with tarfile.open(tar_path, mode) as tf:
        members = tf.getmembers()
        names = [m.name for m in members]
        has_folder, _ = top_level_has_single_folder(names)

        final_dst = dst_dir
        if not has_folder:
            # tar.gz / tgz 的 stem 可能是 xxx.tar -> 再去一次后缀
            stem = tar_path.name
            for suf in [".tar.gz", ".tgz", ".tar"]:
                if stem.lower().endswith(suf):
                    stem = stem[: -len(suf)]
                    break
            final_dst = dst_dir / stem
            safe_mkdir(final_dst)

        tf.extractall(final_dst)

def copy_any(src: Path, dst: Path):
    """
    src can be file/dir.
    if dir -> copytree (merge)
    if file -> copy2
    """
    if src.is_dir():
        # merge-copy
        safe_mkdir(dst)
        # copy contents into dst/src.name to preserve folder name
        target = dst / src.name
        if target.exists():
            # merge
            for item in src.iterdir():
                copy_any(item, target)
        else:
            shutil.copytree(src, target)
    else:
        safe_mkdir(dst)
        shutil.copy2(src, dst / src.name)

In [None]:
# 找所有 Pxx
participants = sorted([p for p in f.iterdir() if p.is_dir() and p.name.startswith("P")])

# 跳过 P01
participants = [p for p in participants if p.name != "P01"]

print("Will process:", [p.name for p in participants])

for pdir in tqdm(participants, desc="Participants"):
    src_rgb = pdir / "rgb_frames"
    if not src_rgb.exists():
        # 有些 P 可能没 rgb_frames
        continue

    dst_rgb = d / pdir.name / "rgb_frames"
    safe_mkdir(dst_rgb)

    items = sorted(list(src_rgb.iterdir()))
    for item in tqdm(items, desc=f"{pdir.name}/rgb_frames", leave=False):
        try:
            if item.is_file() and is_archive(item):
                t = archive_type(item)
                if t == "zip":
                    extract_zip(item, dst_rgb)
                elif t in ("tar", "targz"):
                    extract_tar(item, dst_rgb)
                else:
                    # 不认识的压缩格式就直接复制
                    shutil.copy2(item, dst_rgb / item.name)
            elif item.is_dir():
                # 直接复制文件夹（保持原名）
                # 会复制到 dst_rgb/<foldername>
                copy_any(item, dst_rgb)
            else:
                # 普通文件复制过去
                shutil.copy2(item, dst_rgb / item.name)

        except Exception as e:
            print(f"[ERROR] {item} -> {e}")

print("Done.")

In [None]:
# 随便检查一个 P
check_p = "P02"
print("DST exists:", (d/check_p/"rgb_frames").exists())
# 列几个文件/文件夹看看
print(list((d/check_p/"rgb_frames").iterdir())[:10])