In [1]:
import argparse
import concurrent.futures as futures
import hashlib
import os
import re
import shutil
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Optional, Union

In [2]:
try:
    from PIL import Image  # type: ignore
    PIL_OK = True
except Exception:
    PIL_OK = False

try:
    import piexif  # type: ignore
    PIEXIF_OK = True
except Exception:
    PIEXIF_OK = False

# Patterns and defaults
SM_DIR_GLOB = re.compile(r"^SM_[A-Za-z0-9]+$", re.IGNORECASE)

# Locks
progress_lock = threading.Lock()
dest_lock = threading.Lock()

In [3]:
def ts() -> str:
    return f"[{datetime.now().strftime('%Y%m%d %H:%M:%S')}]"

def safe_makedirs(path: str) -> None:
    os.makedirs(path, exist_ok=True)

def is_wanted_ext(path: str, allowed_exts: set[str]) -> bool:
    return os.path.splitext(path)[1].lower() in allowed_exts

def parse_exif_to_mmddyyyy(raw: Union[str, bytes, None]) -> Optional[str]:
    """
    Convert EXIF-like date strings to 'MM/DD/YYYY'.
    Accepts 'YYYY:MM:DD ...' or 'YYYY-MM-DD ...' or 'YYYY/MM/DD ...' or date-only.
    """
    if not raw:
        return None
    if isinstance(raw, bytes):
        raw = raw.decode("utf-8", "ignore")
    raw = raw.strip()
    if not raw:
        return None
    date_part = raw.split()[0].replace(":", "-").replace("/", "-")
    try:
        dt = datetime.strptime(date_part, "%Y-%m-%d")
        return dt.strftime("%m/%d/%Y")
    except Exception:
        return None

In [4]:
def extract_capture_date_mmddyyyy(path: str) -> Tuple[Optional[str], str]:
    """
    Return (date_str 'MM/DD/YYYY', source_label) with priority:
      1) EXIF DateTimeOriginal (36867)
      2) EXIF Modify Date / DateTime (306)
      3) EXIF DateTimeDigitized (36868)
      4) FS mtime
    """
    # Pillow route
    if PIL_OK:
        try:
            with Image.open(path) as im:
                exif = im.getexif()
                if exif:
                    dto = exif.get(36867)  # DateTimeOriginal
                    d = parse_exif_to_mmddyyyy(dto)
                    if d:
                        return d, "EXIF:DateTimeOriginal"

                    dt = exif.get(306)  # Modify Date / DateTime
                    d = parse_exif_to_mmddyyyy(dt)
                    if d:
                        return d, "EXIF:ModifyDate"

                    dtd = exif.get(36868)  # DateTimeDigitized
                    d = parse_exif_to_mmddyyyy(dtd)
                    if d:
                        return d, "EXIF:DateTimeDigitized"
        except Exception:
            pass

    # piexif route
    if PIEXIF_OK:
        try:
            exif_dict = piexif.load(path)
            dto = exif_dict.get("Exif", {}).get(piexif.ExifIFD.DateTimeOriginal)
            d = parse_exif_to_mmddyyyy(dto)
            if d:
                return d, "piexif:DateTimeOriginal"

            dt = exif_dict.get("0th", {}).get(piexif.ImageIFD.DateTime)
            d = parse_exif_to_mmddyyyy(dt)
            if d:
                return d, "piexif:DateTime"

            dtd = exif_dict.get("Exif", {}).get(piexif.ExifIFD.DateTimeDigitized)
            d = parse_exif_to_mmddyyyy(dtd)
            if d:
                return d, "piexif:DateTimeDigitized"
        except Exception:
            pass

    # Filesystem mtime fallback
    try:
        mtime = os.path.getmtime(path)
        dt = datetime.fromtimestamp(mtime)
        return dt.strftime("%m/%d/%Y"), "FS:mtime"
    except Exception:
        return None, "unknown"

In [5]:
def safe_capture_time_str(path: str, date_str: str) -> str:
    """
    Build 'YYYYMMDD_HHMMSS' using EXIF time if possible, else FS mtime.
    date_str is 'MM/DD/YYYY'.
    """
    mm, dd, yyyy = date_str.split("/")
    yyyymmdd = f"{yyyy}{mm}{dd}"
    hhmmss = ""
    # Try EXIF time
    try:
        if PIL_OK:
            with Image.open(path) as im:
                exif = im.getexif()
                if exif:
                    for tag in (36867, 306, 36868):
                        raw = exif.get(tag)
                        if raw:
                            if isinstance(raw, bytes):
                                raw = raw.decode("utf-8", "ignore")
                            parts = raw.strip().split()
                            if len(parts) > 1:
                                t = parts[1].replace(":", "").replace("-", "").replace("/", "")
                                hhmmss = re.sub(r"[^0-9]", "", t)[:6]  # HHMMSS
                                break
    except Exception:
        pass
    # Fallback to FS mtime
    if not hhmmss:
        try:
            mtime = os.path.getmtime(path)
            hhmmss = datetime.fromtimestamp(mtime).strftime("%H%M%S")
        except Exception:
            hhmmss = "000000"
    return f"{yyyymmdd}_{hhmmss}"

In [6]:
def short_hash_from_meta(path: str, n: int = 6) -> str:
    """
    Lightweight short hash using size + mtime + path for disambiguation.
    (Faster than content hashing; still unique enough for collisions.)
    """
    try:
        size = os.path.getsize(path)
        mtime = os.path.getmtime(path)
    except Exception:
        size, mtime = 0, 0
    h = hashlib.sha1(f"{path}|{size}|{mtime}".encode("utf-8")).hexdigest()
    return h[:n]

def sha1_of_file(path: str, blocksize: int = 1 << 20) -> str:
    h = hashlib.sha1()
    with open(path, "rb") as f:
        while True:
            b = f.read(blocksize)
            if not b:
                break
            h.update(b)
    return h.hexdigest()

In [7]:
def build_dest_filename(src: str, date_str: str, mode: str = "rich") -> str:
    """
    Build destination filename.
    - 'rich' (default): <orig_stem>__<YYYYMMDD_HHMMSS>__<hash><ext>
    - 'simple': original filename
    """
    p = Path(src)
    if mode == "simple":
        return p.name
    t = safe_capture_time_str(src, date_str)
    h = short_hash_from_meta(src)
    return f"{p.stem}__{t}__{h}{p.suffix}"

In [8]:
def unique_dest_path(dest_path: str) -> str:
    """
    If dest_path exists, append _1, _2, ... to basename until free.
    """
    if not os.path.exists(dest_path):
        return dest_path
    root, ext = os.path.splitext(dest_path)
    i = 1
    while True:
        candidate = f"{root}_{i}{ext}"
        if not os.path.exists(candidate):
            return candidate
        i += 1

In [9]:
def discover_jobs(in_dir: str, allowed_exts: set[str], only_sm: bool, excludes: set[str]) -> List[Tuple[str, str]]:
    """
    Returns list of (abs_path, sm_bucket) for files with allowed_exts.
    - If only_sm=True, include files only if they are under an ancestor folder matching SM_*.
    - sm_bucket is the nearest ancestor dir matching SM_*; else 'SM_Unknown'.
    - Skips any path that includes a directory name in `excludes`.
    """
    jobs: List[Tuple[str, str]] = []
    in_dir_abs = os.path.abspath(in_dir)

    for root, dirs, files in os.walk(in_dir_abs):
        rel = os.path.relpath(root, in_dir_abs)
        parts = [] if rel == "." else rel.split(os.sep)

        # Exclusion filter (case-sensitive by name; adjust if you need case-insensitive)
        if any(part in excludes for part in parts):
            continue

        sm_top = None
        for p in parts:
            if SM_DIR_GLOB.match(p):
                sm_top = p
                break

        if only_sm and not sm_top:
            continue

        for fn in files:
            if is_wanted_ext(fn, allowed_exts):
                jobs.append((os.path.join(root, fn), sm_top or "SM_Unknown"))
    return jobs

In [10]:
def main():
    parser = argparse.ArgumentParser(
        description="Copy/move images into output/SM_BUCKET/MM-DD-YYYY/ using EXIF date (Original→Modify→Digitized→mtime)."
    )
    parser.add_argument("--inDir", required=True, help="Input root (e.g., '.' if you're already inside 'Camera Trap Photos')")
    parser.add_argument("--outDir", required=True, help="Output root")
    parser.add_argument("--workers", type=int, default=2, help="Max worker threads (1-2 recommended). Default: 2")
    parser.add_argument("--dry-run", action="store_true", help="Show actions without copying/moving")
    parser.add_argument("--move", action="store_true", help="Move files instead of copying")
    parser.add_argument("--only-sm", action="store_true", help="Restrict to files under folders named like SM_*")
    parser.add_argument("--exts", default="jpg,jpeg", help="Comma-separated list of extensions (default: jpg,jpeg)")
    parser.add_argument("--exclude", action="append", default=[], help="Directory name to exclude (can be repeated)")
    parser.add_argument("--list-sample", type=int, default=10, help="How many sample files to print before processing")
    parser.add_argument("--rename", choices=["rich", "simple"], default="rich",
                        help="Filename mode: 'rich' = add timestamp+hash (default), 'simple' = keep original")
    parser.add_argument("--dedupe", action="store_true",
                        help="Skip copying if an identical file already exists in the target date folder (content SHA-1; slower)")
    parser.add_argument("--quiet", action="store_true", help="Reduce per-file logging")
    args = parser.parse_args()

    workers = max(1, min(8, args.workers))  # allow a bit more if desired
    in_dir = args.inDir
    out_dir = args.outDir
    rename_mode = args.rename
    do_move = args.move
    do_dedupe = args.dedupe
    quiet = args.quiet

    # Normalize extensions
    exts = {("." + e.strip().lower().lstrip(".")) for e in args.exts.split(",") if e.strip()}
    excludes = set(args.exclude or [])

    start = time.perf_counter()

    jobs = discover_jobs(in_dir, exts, args.only_sm, excludes)
    total = len(jobs)

    print(f"{ts()} Scan summary:")
    print(f"{ts()}   inDir: {os.path.abspath(in_dir)}")
    print(f"{ts()}   outDir: {os.path.abspath(out_dir)}")
    print(f"{ts()}   only-sm: {args.only_sm}")
    print(f"{ts()}   allowed extensions: {sorted(exts)}")
    if excludes:
        print(f"{ts()}   excludes: {sorted(excludes)}")
    print(f"{ts()}   found files: {total}")

    for i, (p, sm) in enumerate(jobs[:max(0, args.list_sample)]):
        print(f"{ts()}   sample[{i+1}]: {p} (bucket: {sm})")

    if total == 0:
        print(f"{ts()} No files found matching extensions under the given rules.")
        return

    done = 0

    def process_one(job: Tuple[str, str]) -> None:
        nonlocal done
        src, sm_top = job

        if not quiet:
            print(f"{ts()} found: {src}")

        date_str, source = extract_capture_date_mmddyyyy(src)
        if not date_str:
            print(f"{ts()} WARNING: could not extract date, skipping: {src}")
            with progress_lock:
                done += 1
                pct = int(done * 100 / total)
                print(f"{ts()} progress: {done}/{total} ({pct}%)")
            return

        if not quiet:
            print(f"{ts()} metadata: {date_str} ({source})")

        # Build destination directory: outDir/SM_BUCKET/MM-DD-YYYY/
        mm, dd, yyyy = date_str.split("/")
        folder_name = f"{mm}-{dd}-{yyyy}"
        year_dir = os.path.join(out_dir, sm_top, folder_name)
        safe_makedirs(year_dir)

        # Build destination filename
        base_name = build_dest_filename(src, date_str, mode=rename_mode)
        dest = os.path.join(year_dir, base_name)

        # Optional content dedupe (skip if identical content already present in date folder)
        if do_dedupe and not args.dry_run:
            try:
                src_hash = sha1_of_file(src)
                # Quick scan of target folder (bounded to same date folder)
                try:
                    for cand in os.listdir(year_dir):
                        cand_path = os.path.join(year_dir, cand)
                        if os.path.isfile(cand_path):
                            try:
                                if sha1_of_file(cand_path) == src_hash:
                                    if not quiet:
                                        print(f"{ts()} duplicate detected (content match), skipping: {src}")
                                    with progress_lock:
                                        done += 1
                                        pct = int(done * 100 / total)
                                        print(f"{ts()} progress: {done}/{total} ({pct}%)")
                                    return
                            except Exception:
                                pass
                except FileNotFoundError:
                    pass
            except Exception:
                # If hashing fails for any reason, proceed without dedupe
                pass

        # Reserve a unique path and perform copy/move under a lock to avoid races
        with dest_lock:
            dest_final = unique_dest_path(dest)
            if not quiet:
                verb = "moving" if do_move else "copying"
                print(f"{ts()} {verb} to: {dest_final}")
            if not args.dry_run:
                try:
                    if do_move:
                        shutil.move(src, dest_final)
                    else:
                        shutil.copy2(src, dest_final)
                except Exception as e:
                    print(f"{ts()} ERROR {'moving' if do_move else 'copying'} '{src}' -> '{dest_final}': {e}")

        with progress_lock:
            done += 1
            pct = int(done * 100 / total)
            if not quiet or done % 100 == 0 or done == total:
                print(f"{ts()} progress: {done}/{total} ({pct}%)")

    print(f"{ts()} Starting with {workers} worker(s).")
    with futures.ThreadPoolExecutor(max_workers=workers) as ex:
        futs = [ex.submit(process_one, job) for job in jobs]
        for f in futures.as_completed(futs):
            exc = f.exception()
            if exc:
                print(f"{ts()} ERROR: {exc}", file=sys.stderr)

    elapsed = time.perf_counter() - start
    hrs = int(elapsed // 3600)
    mins = int((elapsed % 3600) // 60)
    secs = int(elapsed % 60)
    print(f"{ts()} DONE. Total time spent: {hrs:02d}:{mins:02d}:{secs:02d}")

In [11]:
# For move
import sys
sys.argv = [
    "Images_directories_correction.ipynb",
    "--inDir", ".",
    "--outDir", "output",
    "--workers", "2",
    "--move",
    "--rename", "rich",
    "--exts", "jpg,jpeg",
    "--quiet"
]
main()

[20251002 11:46:08] Scan summary:
[20251002 11:46:08]   inDir: /storage/ice1/8/0/wyiu31/stonemt_cameratrap/Camera Trap Photos
[20251002 11:46:08]   outDir: /storage/ice1/8/0/wyiu31/stonemt_cameratrap/Camera Trap Photos/output
[20251002 11:46:08]   only-sm: False
[20251002 11:46:08]   allowed extensions: ['.jpeg', '.jpg']
[20251002 11:46:08]   found files: 71430
[20251002 11:46:08]   sample[1]: /storage/ice1/8/0/wyiu31/stonemt_cameratrap/Camera Trap Photos/SM_2/Back-ups/IMG_4323.JPG (bucket: SM_2)
[20251002 11:46:08]   sample[2]: /storage/ice1/8/0/wyiu31/stonemt_cameratrap/Camera Trap Photos/SM_2/Back-ups/IMG_2249.JPG (bucket: SM_2)
[20251002 11:46:08]   sample[3]: /storage/ice1/8/0/wyiu31/stonemt_cameratrap/Camera Trap Photos/SM_2/Back-ups/IMG_8048.JPG (bucket: SM_2)
[20251002 11:46:08]   sample[4]: /storage/ice1/8/0/wyiu31/stonemt_cameratrap/Camera Trap Photos/SM_2/Back-ups/IMG_3529.JPG (bucket: SM_2)
[20251002 11:46:08]   sample[5]: /storage/ice1/8/0/wyiu31/stonemt_cameratrap/Camera 