Resilient CMIP6 WorldClim 2.5m Downloader — Threaded, retry-enabled, resumable, log-enhanced, and checkpoint enabled.

'ACCESS-CM2', 'MPI-ESM1-2-HR', 'CMCC-ESM2', 'MIROC6', 'IPSL-CM6A-LR', 'BCC-CSM2-MR', 'EC-Earth3-Veg', 'GISS-E2-1-G', 'HadGEM3-GC31-LL'

In [None]:
# ============================================================
# 📦 DEPENDENCIES
# ============================================================

import os
import sys
import time
import signal
import json
import logging
import threading
import requests
from typing import List, Tuple
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed

# ============================================================
# ⚙️ CONFIGURATION OR USER SETTINGS (EDITABLE OPTIONS)
# ============================================================

@dataclass
class DownloadSettings:
    folder: str
    chunk_size: int
    max_workers: int
    models: List[str]
    ssps: List[str]
    year_ranges: List[str]
    variables: List[str]

SETTINGS = DownloadSettings(
    folder='E:/Elbe/Elbe_Input/Future_data/',
    chunk_size=32 * 1024 * 1024,
    max_workers=4,
    models=[
        'ACCESS-CM2', 'MPI-ESM1-2-HR', 'MIROC6'
    ],
    ssps=['ssp245', 'ssp585'],
    year_ranges=['2021-2040', '2041-2060', '2061-2080', '2081-2100'],
    variables=['tmin', 'tmax', 'prec']
)

STATUS_FILE = 'download_status.json'
status_lock = threading.Lock()

# ---------------- Logging Setup ----------------

logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s][%(threadName)s] %(message)s',
    datefmt='%H:%M:%S'
)
log = logging.info

# ---------------- Signal Handling ----------------

def handle_sigint(sig, frame):
    log("🔴 Interrupted by user. Exiting.")
    sys.exit(0)

signal.signal(signal.SIGINT, handle_sigint)

# ---------------- Status File Utilities ----------------

def load_download_status() -> dict:
    if os.path.exists(STATUS_FILE):
        try:
            with open(STATUS_FILE, 'r') as f:
                return json.load(f)
        except Exception:
            log("⚠ Failed to read status file, starting fresh.")
    return {}

def save_download_status(status: dict):
    try:
        with open(STATUS_FILE, 'w') as f:
            json.dump(status, f, indent=2)
    except Exception as e:
        log(f"⚠ Failed to save status: {e}")

# ---------------- Utilities ----------------

def get_skip_list(folder: str) -> set:
    return set(os.listdir(folder)) if os.path.exists(folder) else set()

def generate_file_url(model: str, ssp: str, year: str, variable: str) -> Tuple[str, str]:
    fname = f'wc2.1_2.5m_{variable}_{model}_{ssp}_{year}.tif'
    furl = f'https://geodata.ucdavis.edu/cmip6/2.5m/{model}/{ssp}/{fname}'
    return fname, furl

def is_download_complete(local_file: str, remote_url: str) -> bool:
    try:
        if not os.path.exists(local_file):
            return False
        local_size = os.path.getsize(local_file)
        if local_size == 0:
            return False
        response = requests.head(remote_url, timeout=10)
        if response.status_code == 200:
            remote_size = int(response.headers.get('Content-Length', -1))
            if remote_size == -1:
                return False
            return remote_size == local_size
        else:
            log(f"⚠ HEAD request failed for {remote_url}: status {response.status_code}")
            return False
    except Exception as e:
        log(f"⚠ Error checking file size for {local_file}: {e}")
        return False

# ---------------- Core Logic ----------------

def create_download_list(settings: DownloadSettings, skip_set: set, status: dict) -> List[Tuple[str, str]]:
    urls = []
    for model in settings.models:
        for ssp in settings.ssps:
            for year in settings.year_ranges:
                for var in settings.variables:
                    fname, furl = generate_file_url(model, ssp, year, var)
                    full_path = os.path.join(settings.folder, fname)

                    if status.get(fname) == 'done':
                        log(f"⏭️ Skipped (already marked done): {fname}")
                        continue
                    if os.path.exists(full_path) and is_download_complete(full_path, furl):
                        log(f"⏭️ Skipped (already complete): {fname}")
                        with status_lock:
                            status[fname] = 'done'
                            save_download_status(status)
                        continue

                    urls.append((fname, furl))
    return urls

def download_file(fname: str, url: str, settings: DownloadSettings,
                  max_retries: int = 3, status: dict = None) -> bool:
    local_file = os.path.join(settings.folder, fname)
    headers = {'User-Agent': 'Mozilla/5.0'}

    for attempt in range(1, max_retries + 1):
        try:
            response = requests.get(url, stream=True, timeout=30, headers=headers)
            response.raise_for_status()

            with open(local_file, 'wb') as f:
                for chunk in response.iter_content(settings.chunk_size):
                    if chunk:
                        f.write(chunk)

            if not is_download_complete(local_file, url):
                raise Exception("Incomplete file after download.")

            log(f"✔ Downloaded: {fname}")
            if status is not None:
                with status_lock:
                    status[fname] = 'done'
                    save_download_status(status)
            return True
        except Exception as e:
            log(f"⚠ Retry {attempt}/{max_retries} for {fname}: {e}")
            time.sleep(2)

    log(f"✖ Failed: {fname}")
    if status is not None:
        with status_lock:
            status[fname] = 'failed'
            save_download_status(status)
    return False

def parallel_download(download_items: List[Tuple[str, str]], settings: DownloadSettings, status: dict):
    success, fail = 0, 0
    failed_files = []

    with ThreadPoolExecutor(max_workers=settings.max_workers) as executor:
        futures = {
            executor.submit(download_file, fname, furl, settings, max_retries=3, status=status): fname
            for fname, furl in download_items
        }
        for future in as_completed(futures):
            fname = futures[future]
            result = future.result()
            if result:
                success += 1
            else:
                fail += 1
                failed_files.append(fname)

    log(f"✅ Success: {success}, ❌ Failed: {fail}")
    if failed_files:
        log(f"🚫 Failed files: {failed_files}")

# ---------------- Main ----------------

def main():
    os.makedirs(SETTINGS.folder, exist_ok=True)
    status = load_download_status()
    skip_set = get_skip_list(SETTINGS.folder)
    download_items = create_download_list(SETTINGS, skip_set, status)

    if not download_items:
        log("✅ All files already downloaded and verified.")
        return

    log(f"⏳ Starting {len(download_items)} downloads with {SETTINGS.max_workers} threads...")
    log(f"📦 First in queue: {download_items[0][0]}")

    parallel_download(download_items, SETTINGS, status)

    # Final check for any missing or incomplete
    missing = [fname for fname, _ in download_items if status.get(fname) != 'done']
    if missing:
        log(f"🚫 Still missing after run: {missing}")
    else:
        log("🎉 All requested files successfully downloaded.")

if __name__ == "__main__":
    main()
