In [1]:
"""
Planet API Satellite Image Downloader - Threaded Version

Enhanced with safe threading for faster processing:
- Batch activation of all assets
- Activation checking queue with 5-minute intervals
- Download queue for activated assets
- Thread-safe CSV updates
"""

import os
import csv
import time
import yaml
import requests
import zipfile
import shutil
from datetime import datetime, timedelta
import pandas as pd
import glob
import tempfile
from typing import Dict, List, Tuple, Optional, Set
import json
import threading
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from dataclasses import dataclass
from filelock import FileLock  # pip install filelock

# =============================================================================
# CONFIGURATION
# =============================================================================

# API configuration file
PLANET_YAML_PATH = r"D:\planetscope_lake_ice\planet.yaml"

# Load API key
with open(PLANET_YAML_PATH, 'r') as f:
    PLANET_API_KEY = yaml.safe_load(f)['api_key']

BASE_URL = "https://api.planet.com/data/v1"

# Asset configuration with fallback mapping
ASSET_CONFIG = {
    'ortho_analytic_4b_sr': {
        'fallback': 'ortho_analytic_3b',
        'extension': '.tif'
    },
    'ortho_analytic_4b_xml': {
        'fallback': 'ortho_analytic_3b_xml', 
        'extension': '.xml'
    },
    'ortho_udm2': {
        'fallback': 'ortho_udm2',
        'extension': '.tif'
    }
}

ASSET_KEYS = list(ASSET_CONFIG.keys())

# Threading configuration
MAX_REQUESTS_PER_SECOND = 4
REQUEST_INTERVAL = 1.0 / MAX_REQUESTS_PER_SECOND
MAX_CONCURRENT_DOWNLOADS = 4  # Number of concurrent download threads
MAX_CONCURRENT_ACTIVATIONS = 8  # Number of concurrent activation threads

# Timing configuration
ACTIVATION_CHECK_INTERVAL = 300  # 5 minutes in seconds
MAX_ACTIVATION_WAIT = 18000  # 5 hours in seconds
CLOUD_THRESHOLD = 0.0

# Base data folder
BASE_DATA_FOLDER = r"E:\planetscope_lake_ice\Data\Input"

# Configuration variables (SET THESE IN JUPYTER CELLS)
CURRENT_STUDY_AREA = None
CURRENT_AOI = None
CURRENT_SEASONS = None
CURRENT_YEARS = None
CURRENT_FOLDER = None

# =============================================================================
# DATA STRUCTURES
# =============================================================================

@dataclass
class AssetInfo:
    """Information about an asset to be activated/downloaded"""
    item_id: str
    item_type: str
    asset_key: str
    actual_asset_key: str
    df_index: int
    row: pd.Series
    activation_url: Optional[str] = None
    download_url: Optional[str] = None
    activated_at: Optional[datetime] = None
    activation_requested_at: Optional[datetime] = None

@dataclass
class ItemDownloadGroup:
    """Group of assets for a single item that are ready to download"""
    item_id: str
    assets: List[AssetInfo]
    df_index: int
    row: pd.Series

# =============================================================================
# THREAD-SAFE CSV OPERATIONS
# =============================================================================

class ThreadSafeCSVUpdater:
    """Thread-safe CSV file updater with file locking"""
    
    def __init__(self, csv_path: str):
        self.csv_path = csv_path
        self.lock_path = csv_path + '.lock'
        self._local_lock = threading.Lock()
    
    def update_status(self, df_index: int, asset_key: str, new_status: str):
        """Thread-safe update of asset status in CSV"""
        with self._local_lock:
            with FileLock(self.lock_path, timeout=30):
                try:
                    # Read current CSV
                    df = pd.read_csv(self.csv_path)
                    
                    # Update status
                    status_col = f'{asset_key}_status'
                    if df_index < len(df) and status_col in df.columns:
                        df.loc[df_index, status_col] = new_status
                        
                        # Write back to CSV
                        df.to_csv(self.csv_path, index=False, quoting=csv.QUOTE_ALL)
                        logging.info(f"Updated {asset_key} status to {new_status} for index {df_index}")
                    
                except Exception as e:
                    logging.error(f"Error updating CSV: {e}")

# =============================================================================
# ENHANCED RATE LIMITED SESSION
# =============================================================================

class ThreadSafeRateLimitedSession:
    """Thread-safe rate-limited requests session"""
    
    def __init__(self, requests_per_second: float = 4):
        self.min_interval = 1.0 / requests_per_second
        self.last_request_time = 0
        self.request_lock = threading.Lock()
        self.session = requests.Session()
        self.session.auth = (PLANET_API_KEY, '')
        
    def request(self, method: str, url: str, **kwargs) -> requests.Response:
        """Make thread-safe rate-limited request"""
        with self.request_lock:
            # Wait if needed
            now = time.time()
            elapsed = now - self.last_request_time
            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)
            
            # Handle timeout parameter - use provided timeout or default
            timeout = kwargs.pop('timeout', 60)
            
            # Make request with retries
            for attempt in range(3):
                try:
                    response = self.session.request(method, url, timeout=timeout, **kwargs)
                    self.last_request_time = time.time()
                    return response
                except requests.exceptions.RequestException as e:
                    if attempt == 2:  # Last attempt
                        raise e
                    logging.warning(f"Request failed (attempt {attempt + 1}/3): {e}")
                    time.sleep(2 ** attempt)  # Exponential backoff
            
            raise requests.exceptions.RequestException("All retry attempts failed")

# Global thread-safe session
session = ThreadSafeRateLimitedSession(MAX_REQUESTS_PER_SECOND)

# =============================================================================
# ACTIVATION MANAGER
# =============================================================================
class ActivationManager:
    def __init__(self, csv_updater: ThreadSafeCSVUpdater):
        self.csv_updater = csv_updater
        self.download_queue = Queue()
        self.active_assets: Dict[str, AssetInfo] = {}
        self.ready_assets: Dict[str, AssetInfo] = {}
        self.failed_assets: Set[str] = set()
        self.completed_items: Set[str] = set()
        self.lock = threading.Lock()
        
    def request_activation(self, asset_info: AssetInfo):
        try:
            assets_url = f"{BASE_URL}/item-types/{asset_info.item_type}/items/{asset_info.item_id}/assets/"
            response = session.request('GET', assets_url)
            response.raise_for_status()
            assets_data = response.json()
            if asset_info.actual_asset_key not in assets_data:
                self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Does Not Exist")
                return False
            asset_data = assets_data[asset_info.actual_asset_key]
            status = asset_data.get('status', 'inactive')
            # Already active
            if status == 'active' and 'location' in asset_data:
                asset_info.download_url = asset_data['location']
                key = f"{asset_info.item_id}_{asset_info.asset_key}"
                with self.lock:
                    self.ready_assets[key] = asset_info
                self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Ready for Download")
                self._check_item_ready_for_download(asset_info.item_id)
                return True
            # Request activation
            if 'activate' in asset_data.get('_links', {}):
                activate_url = asset_data['_links']['activate']
                session.request('POST', activate_url).raise_for_status()
                asset_info.activation_requested_at = datetime.now()
                key = f"{asset_info.item_id}_{asset_info.asset_key}"
                with self.lock:
                    self.active_assets[key] = asset_info
                self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Activation Requested")
                return True
            else:
                self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Cannot Activate")
                return False
        except Exception as e:
            print(f"[Activation Error] {asset_info.item_id} {asset_info.asset_key}: {e}")
            self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Activation Failed")
            return False
    
    def check_activations(self):
        with self.lock:
            assets_to_check = list(self.active_assets.values())
        print(f"[Activation Check] Checking {len(assets_to_check)} pending assets...")
        for asset_info in assets_to_check:
            try:
                if asset_info.activation_requested_at:
                    elapsed = (datetime.now() - asset_info.activation_requested_at).total_seconds()
                    if elapsed > MAX_ACTIVATION_WAIT:
                        self._handle_activation_timeout(asset_info)
                        continue
                assets_url = f"{BASE_URL}/item-types/{asset_info.item_type}/items/{asset_info.item_id}/assets/"
                response = session.request('GET', assets_url)
                response.raise_for_status()
                assets_data = response.json()
                if asset_info.actual_asset_key not in assets_data:
                    self._handle_activation_failure(asset_info, "Asset disappeared")
                    continue
                asset_data = assets_data[asset_info.actual_asset_key]
                status = asset_data.get('status', 'inactive')
                if status == 'active' and 'location' in asset_data:
                    asset_info.download_url = asset_data['location']
                    key = f"{asset_info.item_id}_{asset_info.asset_key}"
                    with self.lock:
                        del self.active_assets[key]
                        self.ready_assets[key] = asset_info
                    self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Ready for Download")
                    self._check_item_ready_for_download(asset_info.item_id)
            except Exception as e:
                print(f"[Activation Check Error] {asset_info.item_id} {asset_info.asset_key}: {e}")
    
    def _handle_activation_timeout(self, asset_info: AssetInfo):
        key = f"{asset_info.item_id}_{asset_info.asset_key}"
        with self.lock:
            del self.active_assets[key]
            self.failed_assets.add(key)
        self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Activation Timeout")
        print(f"[Activation Timeout] {asset_info.item_id} {asset_info.asset_key}")
    
    def _handle_activation_failure(self, asset_info: AssetInfo, reason: str):
        key = f"{asset_info.item_id}_{asset_info.asset_key}"
        with self.lock:
            del self.active_assets[key]
            self.failed_assets.add(key)
        self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, f"Activation Failed: {reason}")
        print(f"[Activation Failed] {asset_info.item_id} {asset_info.asset_key}: {reason}")
    
    def _check_item_ready_for_download(self, item_id: str):
        with self.lock:
            item_assets = [a for a in self.ready_assets.values() if a.item_id == item_id]
            if len(item_assets) == len(ASSET_KEYS):
                self.download_queue.put(ItemDownloadGroup(item_id, item_assets, item_assets[0].df_index, item_assets[0].row))
                for asset in item_assets:
                    del self.ready_assets[f"{asset.item_id}_{asset.asset_key}"]
                print(f"[Download Queue] Item {item_id} all assets ready")
    
    def get_activation_status(self) -> dict:
        with self.lock:
            return {
                'active_count': len(self.active_assets),
                'ready_count': len(self.ready_assets),
                'failed_count': len(self.failed_assets),
                'completed_count': len(self.completed_items),
                'download_queue_size': self.download_queue.qsize()
            }

# =============================================================================
# DOWNLOAD MANAGER
# =============================================================================
class DownloadManager:
    def __init__(self, csv_updater: ThreadSafeCSVUpdater, activation_manager):
        self.csv_updater = csv_updater
        self.activation_manager = activation_manager
        self.completed_downloads = set()
        
    def get_next_download_group(self) -> Optional[ItemDownloadGroup]:
        try:
            return self.activation_manager.download_queue.get(timeout=2)
        except Empty:
            return None
    
    def download_item_assets(self, item_group: ItemDownloadGroup) -> bool:
        item_id = item_group.item_id
        row = item_group.row
        try:
            season, year = row['season'], row['year']
            folder_name = f"{season}_{year}"
            output_folder = os.path.join(CURRENT_FOLDER, folder_name)
            os.makedirs(output_folder, exist_ok=True)
            with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as temp_zip:
                temp_zip_path = temp_zip.name
            try:
                successful_downloads = []
                with zipfile.ZipFile(temp_zip_path, 'w') as zf:
                    for asset_info in item_group.assets:
                        if not asset_info.download_url:
                            continue
                        try:
                            response = session.request('GET', asset_info.download_url, timeout=300)
                            response.raise_for_status()
                            if not response.content:
                                self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Download Failed - Empty")
                                continue
                            extension = ASSET_CONFIG[asset_info.asset_key]['extension']
                            filename = f"{item_id}_{asset_info.actual_asset_key}{extension}"
                            zf.writestr(filename, response.content)
                            successful_downloads.append(asset_info)
                        except Exception as e:
                            print(f"[Download Error] {asset_info.asset_key} {item_id}: {e}")
                            self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Download Failed")
                if successful_downloads:
                    with zipfile.ZipFile(temp_zip_path, 'r') as zf:
                        zf.extractall(output_folder)
                        for asset_info in successful_downloads:
                            extension = ASSET_CONFIG[asset_info.asset_key]['extension']
                            expected_file = os.path.join(output_folder, f"{item_id}_{asset_info.actual_asset_key}{extension}")
                            if os.path.exists(expected_file):
                                self.csv_updater.update_status(asset_info.df_index, asset_info.asset_key, "Downloaded")
                    print(f"[Download Complete] All assets saved for {item_id}")
                self.activation_manager.completed_items.add(item_id)
                return bool(successful_downloads)
            finally:
                try: os.unlink(temp_zip_path)
                except: pass
        except Exception as e:
            print(f"[Download Error] {item_id}: {e}")
            return False

# =============================================================================
# THREAD CONTROLLER
# =============================================================================
class ThreadedDownloadProcessor:
    def __init__(self, csv_path: str):
        self.csv_path = csv_path
        self.df = pd.read_csv(csv_path)
        self.csv_updater = ThreadSafeCSVUpdater(csv_path)
        self.activation_manager = ActivationManager(self.csv_updater)
        self.download_manager = DownloadManager(self.csv_updater, self.activation_manager)
        self.shutdown_event = threading.Event()
        
    def process_downloads(self):
        print(f"=== Starting download processing: {self.csv_path} ===")
        try:
            self._phase_1_request_activations()
            self._phase_2_monitor_and_download()
        except KeyboardInterrupt:
            print("[Interrupted] Shutting down...")
            self.shutdown_event.set()
    
    def _phase_1_request_activations(self):
        print("Phase 1: Requesting Activations")
        items_to_process = self._get_items_needing_processing()
        assets_to_activate = []
        for idx, row in items_to_process:
            item_id, item_type = row['item_id'], row['item_type']
            try:
                assets_url = f"{BASE_URL}/item-types/{item_type}/items/{item_id}/assets/"
                response = session.request('GET', assets_url)
                response.raise_for_status()
                assets_data = response.json()
                for asset_key in ASSET_KEYS:
                    current_status = str(row.get(f'{asset_key}_status', ''))
                    if "Downloaded" in current_status:
                        continue
                    actual_asset_key = None
                    if asset_key in assets_data:
                        actual_asset_key = asset_key
                    elif ASSET_CONFIG[asset_key]['fallback'] in assets_data:
                        actual_asset_key = ASSET_CONFIG[asset_key]['fallback']
                    if actual_asset_key:
                        assets_to_activate.append(AssetInfo(
                            item_id, item_type, asset_key, actual_asset_key, idx, row
                        ))
            except Exception as e:
                print(f"[Phase1 Error] {item_id}: {e}")
        print(f"Starting activation for {len(assets_to_activate)} assets")
        with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_ACTIVATIONS) as executor:
            futures = [executor.submit(self.activation_manager.request_activation, asset) for asset in assets_to_activate]
            for i, _ in enumerate(as_completed(futures), 1):
                if i % 20 == 0:
                    print(f"  {i}/{len(assets_to_activate)} activation requests done...")
    
    def _phase_2_monitor_and_download(self):
        print("Phase 2: Monitoring Activations + Downloads")
        start_time = datetime.now()
        last_check = datetime.now()
        download_futures = []
        executor = ThreadPoolExecutor(max_workers=MAX_CONCURRENT_DOWNLOADS)
        try:
            while not self.shutdown_event.is_set():
                if (datetime.now() - start_time).total_seconds() > MAX_ACTIVATION_WAIT:
                    print("Max activation wait time reached.")
                    break
                if (datetime.now() - last_check).total_seconds() >= ACTIVATION_CHECK_INTERVAL:
                    self.activation_manager.check_activations()
                    last_check = datetime.now()
                    status = self.activation_manager.get_activation_status()
                    print(f"Status Update: {status['active_count']} activating, "
                          f"{status['ready_count']} ready, "
                          f"{status['download_queue_size']} queued, "
                          f"{status['completed_count']} completed, "
                          f"{status['failed_count']} failed, "
                          f"{len(download_futures)} downloading")
                while True:
                    item_group = self.download_manager.get_next_download_group()
                    if not item_group: break
                    future = executor.submit(self.download_manager.download_item_assets, item_group)
                    download_futures.append(future)
                download_futures = [f for f in download_futures if not f.done()]
                status = self.activation_manager.get_activation_status()
                if (status['active_count']==0 and status['ready_count']==0 
                    and status['download_queue_size']==0 and not download_futures):
                    print("[Phase2] All processing complete.")
                    break
                time.sleep(30)
        finally:
            executor.shutdown(wait=True)

    def _get_items_needing_processing(self) -> List[Tuple[int, pd.Series]]:
        items = []
        for idx, row in self.df.iterrows():
            for asset_key in ASSET_KEYS:
                status = str(row.get(f"{asset_key}_status", ""))
                if any(x in status for x in ["Needs", "Failed", "Timeout"]) and "Downloaded" not in status:
                    items.append((idx, row)); break
        return items
# =============================================================================
# CSV GENERATION FUNCTIONS
# =============================================================================

def scan_existing_downloads(study_folder: str) -> Dict[str, Dict[str, bool]]:
    """Scan for existing downloads and return asset availability per item"""
    print(f"Scanning existing downloads in: {study_folder}")
    
    downloaded_items = {}
    
    if not os.path.exists(study_folder):
        print(f"  Study folder does not exist: {study_folder}")
        return downloaded_items
    
    # Get all files recursively
    for root, dirs, files in os.walk(study_folder):
        for file in files:
            # Try to match Planet file patterns
            for asset_key, config in ASSET_CONFIG.items():
                ext = config['extension']
                if file.endswith(ext):
                    # Try both 4b and 3b versions
                    for suffix in [asset_key, config['fallback']]:
                        if suffix in file:
                            # Extract item_id
                            item_id = extract_item_id_from_filename(file, suffix, ext)
                            if item_id:
                                if item_id not in downloaded_items:
                                    downloaded_items[item_id] = {key: False for key in ASSET_KEYS}
                                downloaded_items[item_id][asset_key] = True
                                break
    
    print(f"Found {len(downloaded_items)} items with existing downloads")
    return downloaded_items

def extract_item_id_from_filename(filename: str, asset_suffix: str, extension: str) -> Optional[str]:
    """Extract Planet item_id from filename"""
    # Remove extension
    base = filename.replace(extension, '')
    
    # Remove asset suffix
    if base.endswith(f'_{asset_suffix}'):
        item_id = base[:-len(f'_{asset_suffix}')]
    elif base.endswith(asset_suffix):
        item_id = base[:-len(asset_suffix)]
    else:
        return None
    
    # Validate item_id format (Planet IDs are typically YYYYMMDD_HHMMSS_XXXX)
    parts = item_id.split('_')
    if len(parts) >= 3 and len(parts[0]) == 8 and len(parts[1]) == 6:
        return item_id
    
    return None

def search_images(aoi: dict, start_date: str, end_date: str) -> List[dict]:
    """Search for Planet images with full pagination"""
    print(f"Searching images from {start_date} to {end_date}")
    
    search_request = {
        "item_types": ["PSScene"],
        "filter": {
            "type": "AndFilter",
            "config": [
                {
                    "type": "GeometryFilter",
                    "field_name": "geometry",
                    "config": aoi
                },
                {
                    "type": "DateRangeFilter",
                    "field_name": "acquired", 
                    "config": {
                        "gte": start_date,
                        "lte": end_date
                    }
                },
                {
                    "type": "RangeFilter",
                    "field_name": "cloud_cover",
                    "config": {
                        "lte": CLOUD_THRESHOLD
                    }
                }
            ]
        }
    }
    
    all_features = []
    url = f"{BASE_URL}/quick-search"
    is_first = True

    while url:
        try:
            if is_first:
                response = session.request('POST', url, json=search_request)
                is_first = False
            else:
                response = session.request('GET', url)
                
            response.raise_for_status()
            data = response.json()
            
            features = data.get('features', [])
            all_features.extend(features)
            
            print(f"  Retrieved {len(features)} images (total: {len(all_features)})")
            
            url = data.get('_links', {}).get('_next')
        
        except Exception as e:
            print(f"Search error: {e}")
            break
    
    print(f"Total images found: {len(all_features)}")
    return all_features

def check_asset_availability(item_id: str, item_type: str) -> Tuple[bool, Dict[str, str]]:
    """Check if all required assets exist (with 3b/4b fallback)"""
    url = f"{BASE_URL}/item-types/{item_type}/items/{item_id}/assets/"
    
    try:
        response = session.request('GET', url)
        response.raise_for_status()
        assets_data = response.json()
        
        asset_versions = {}
        all_exist = True
        
        for asset_key, config in ASSET_CONFIG.items():
            # Try 4b version first
            if asset_key in assets_data:
                asset_versions[asset_key] = "4b"
            # Try 3b fallback
            elif config['fallback'] in assets_data:
                asset_versions[asset_key] = "3b"
            else:
                all_exist = False
                break
        
        return all_exist, asset_versions
        
    except Exception as e:
        print(f"Error checking assets for {item_id}: {e}")
        return False, {}

def generate_download_csv() -> str:
    """Generate CSV for the currently configured study area"""
    if not all([CURRENT_STUDY_AREA, CURRENT_AOI, CURRENT_SEASONS, CURRENT_YEARS, CURRENT_FOLDER]):
        raise ValueError("Must set all configuration variables first!")
    
    print(f"\nGenerating CSV for {CURRENT_STUDY_AREA}")
    
    # Get existing downloads
    existing_downloads = scan_existing_downloads(CURRENT_FOLDER)
    
    # Generate filename
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    csv_filename = f"planet_download_queue_{timestamp}.csv"
    csv_path = os.path.join(CURRENT_FOLDER, csv_filename)
    
    # Ensure folder exists
    os.makedirs(CURRENT_FOLDER, exist_ok=True)
    
    # Prepare CSV headers
    headers = ['study_area', 'year', 'season', 'item_id', 'item_type', 'acquired', 'cloud_cover', 'assets_url']
    for asset_key in ASSET_KEYS:
        headers.append(f'{asset_key}_status')
    
    records = []
    total_checked = 0
    items_added = 0

    # Summary tracking
    summary_by_year = {}
    asset_needs = {asset: 0 for asset in ASSET_KEYS}
    
    # Process each year and season
    for year in CURRENT_YEARS:
        print(f"  Processing year {year}")
        
        for season_name, (start_template, end_template) in CURRENT_SEASONS.items():
            start_date = start_template.format(year=year)
            end_date = end_template.format(year=year)
            
            print(f"    {season_name}: {start_date} to {end_date}")
            
            # Search for images
            features = search_images(CURRENT_AOI, start_date, end_date)
            
            for feature in features:
                total_checked += 1
                item_id = feature['id']
                item_type = feature['properties']['item_type']
                acquired = feature['properties']['acquired']
                cloud_cover = feature['properties']['cloud_cover']
                
                # Create year summary record if needed
                if year not in summary_by_year:
                    summary_by_year[year] = {
                        "Image Sets Found": 0,
                        "Excluded (Missing Assets)": 0,
                        "Fully Downloaded": 0,
                        "Needs Download": 0
                    }
                
                summary_by_year[year]["Image Sets Found"] += 1

                # Check asset availability
                all_exist, asset_versions = check_asset_availability(item_id, item_type)
                if not all_exist:
                    summary_by_year[year]["Excluded (Missing Assets)"] += 1
                    continue

                # Create record
                assets_url = f"{BASE_URL}/item-types/{item_type}/items/{item_id}/assets/"
                record = [CURRENT_STUDY_AREA, year, season_name, item_id, item_type, acquired, cloud_cover, assets_url]
                
                # Add asset status
                existing = existing_downloads.get(item_id, {})
                asset_statuses = []
                all_downloaded = True
                
                for asset_key in ASSET_KEYS:
                    if existing.get(asset_key, False):
                        status = "Downloaded"
                    else:
                        version = asset_versions.get(asset_key, "4b")
                        status = f"Needs Activation {version.upper()}"
                        all_downloaded = False
                        asset_needs[asset_key] += 1
                    asset_statuses.append(status)
                
                record.extend(asset_statuses)
                records.append(record)
                items_added += 1
                
                # Update summary
                if all_downloaded:
                    summary_by_year[year]["Fully Downloaded"] += 1
                else:
                    summary_by_year[year]["Needs Download"] += 1
                
                if total_checked % 100 == 0:
                    print(f"      Processed {total_checked} items, added {items_added}")
    
    # Write CSV
    with open(csv_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_ALL)
        writer.writerow(headers)
        writer.writerows(records)
    
    # Print Summary by Year
    print(f"\nCSV saved: {csv_path}")
    print(f"Total items checked: {total_checked}")
    print(f"Items added to CSV: {items_added}")

    print("\nImage Set Summary by Year:")
    print("{:<8} {:>10} {:>25} {:>18} {:>17}".format("Year", "Found", "Excluded (Missing Assets)", "Downloaded", "Needs Download"))
    for year, stats in sorted(summary_by_year.items()):
        print("{:<8} {:>10} {:>25} {:>18} {:>17}".format(
            year,
            stats["Image Sets Found"],
            stats["Excluded (Missing Assets)"],
            stats["Fully Downloaded"],
            stats["Needs Download"]
        ))

    # Print per-asset download counts
    print("\nPer-Asset Download Needs:")
    print("{:<30} {:>10}".format("Asset Type", "Needs Download"))
    for asset_key, count in asset_needs.items():
        print("{:<30} {:>10}".format(asset_key, count))
    
    return csv_path

# =============================================================================
# MAIN FUNCTIONS
# =============================================================================

def process_downloads_from_csv(csv_path: str = None):
    """Process downloads from a specific CSV file using threading"""
    if csv_path is None:
        # Look for CSV in current folder
        if not CURRENT_FOLDER:
            raise ValueError("Must set CURRENT_FOLDER or provide csv_path")
        
        csv_pattern = os.path.join(CURRENT_FOLDER, "planet_download_queue_*.csv")
        csv_files = glob.glob(csv_pattern)
        
        if not csv_files:
            print(f"No CSV files found in {CURRENT_FOLDER}")
            return
        
        csv_path = max(csv_files, key=os.path.getctime)  # Most recent
        print(f"Using most recent CSV: {os.path.basename(csv_path)}")
    
    if not os.path.exists(csv_path):
        print(f"CSV file not found: {csv_path}")
        return
    
    processor = ThreadedDownloadProcessor(csv_path)
    processor.process_downloads()

def process_all_downloads():
    """Process downloads from all CSV files"""
    if not CURRENT_FOLDER:
        raise ValueError("Must set CURRENT_FOLDER first")
    
    csv_pattern = os.path.join(CURRENT_FOLDER, "planet_download_queue_*.csv")
    csv_files = glob.glob(csv_pattern)
    
    if not csv_files:
        print("No CSV files found. Run generate_download_csv() first.")
        return
    
    print(f"Found {len(csv_files)} CSV files to process:")
    for csv_file in csv_files:
        print(f"  {os.path.basename(csv_file)}")
    
    for csv_file in csv_files:
        print(f"\n{'='*80}")
        print(f"Processing: {os.path.basename(csv_file)}")
        print(f"{'='*80}")
        
        try:
            process_downloads_from_csv(csv_file)
        except KeyboardInterrupt:
            print("User interrupted processing")
            break
        except Exception as e:
            print(f"Error processing {csv_file}: {e}")
            continue

In [2]:
global CURRENT_STUDY_AREA, CURRENT_AOI, CURRENT_SEASONS, CURRENT_YEARS, CURRENT_FOLDER

# Set primary folder and AOI configuration
PRIMARY_FOLDER = r"E:\planetscope_lake_ice\Data\Input\YF 50x50 km"
CURRENT_FOLDER = PRIMARY_FOLDER

CURRENT_STUDY_AREA = "YF_50x50"

#YKD BELOW:
CURRENT_AOI = { 
    "type": "Polygon", 
    "coordinates": [[ 
        [-146.332784, 66.458245], 
        [-146.343045, 66.906587], 
        [-145.201146, 66.906587], 
        [-145.211407, 66.458245], 
        [-146.332784, 66.458245], 
    ]] 
}



CURRENT_SEASONS = {
    "Breakup": ("{year}-04-15T00:00:00Z", "{year}-07-15T23:59:59Z"),
}


# Define years to process
CURRENT_YEARS = [2019, 2020, 2021, 2022, 2023, 2024, 2025]

In [None]:
# Step 1: Check what you already have downloaded, make CSV
print("\n1. Checking existing downloads...")
csv_path = r"E:\planetscope_lake_ice\Data\Input\YF 50x50 km\planet_download_queue_20250919_155439.csv" #generate_download_csv()

print("\n"*20)
print("2. Downloading!")
process_downloads_from_csv(csv_path)


1. Checking existing downloads...





















2. Downloading!
=== Starting download processing: E:\planetscope_lake_ice\Data\Input\YF 50x50 km\planet_download_queue_20250919_155439.csv ===
Phase 1: Requesting Activations
Starting activation for 20280 assets
[Download Queue] Item 20190622_210615_60_1057 all assets ready
[Download Queue] Item 20190430_205549_1105 all assets ready
[Download Queue] Item 20190430_205550_1105 all assets ready
[Download Queue] Item 20190430_205547_1105 all assets ready
[Download Queue] Item 20190430_205543_1105 all assets ready
[Download Queue] Item 20190430_205542_1105 all assets ready
  20/20280 activation requests done...
[Download Queue] Item 20190430_205544_1105 all assets ready
[Download Queue] Item 20190430_205545_1105 all assets ready
[Download Queue] Item 20190430_205548_1105 all assets ready
[Download Queue] Item 20190715_205712_1014 all assets ready
[Download Queue] Item 20190715_210215_1008 all assets ready
[Download Queue] Item 20190714

Test missing links:

In [4]:
import pandas as pd
import requests
from requests.auth import HTTPBasicAuth
import os
import random

# File paths
old_file = r"E:\planetscope_lake_ice\Data\Input\NS 50x50 km\Old Download Logs\planet_download_queue_20250806_220337.csv"
new_file = r"E:\planetscope_lake_ice\Data\Input\NS 50x50 km\good download clean script.csv"

# Load both CSVs
try:
    df_old = pd.read_csv(old_file, encoding='utf-8')
except UnicodeDecodeError:
    df_old = pd.read_csv(old_file, encoding='latin1')

df_new = pd.read_csv(new_file)

# Column names
link_column = 'assets_url'
season_column = 'season'

# Filter to only include rows where season == 'breakup'
df_old_breakup = df_old[df_old[season_column] == 'Breakup']
df_new_breakup = df_new[df_new[season_column] == 'Breakup']

# Extract asset URLs as sets
old_links = set(df_old_breakup[link_column].dropna().astype(str))
new_links = set(df_new_breakup[link_column].dropna().astype(str))

# Find missing links
missing_links = sorted(old_links - new_links)

# Output summary statistics
print(f"\nTotal 'breakup' links in OLD file: {len(old_links)}")
print(f"Total 'breakup' links in NEW file: {len(new_links)}")
print(f"Total missing 'breakup' links: {len(missing_links)}")

# Select 20 random missing links
selected_missing_links = random.sample(missing_links, min(20, len(missing_links)))

# Asset inspection function
def list_assets(url):
    response = requests.get(url, auth=HTTPBasicAuth(PLANET_API_KEY, ''))

    if response.status_code == 200:
        assets = response.json()
        print(f"\nAssets for: {url}")
        print("Available assets:")
        for asset_name in assets.keys():
            print(f"- {asset_name}")
    else:
        print(f"\nFailed to retrieve assets from {url}")
        print(f"Status code: {response.status_code}")
        print("Response:", response.text)

# Run for selected links
print("\nFetching assets for 20 missing 'breakup' links:")
for url in selected_missing_links:
    list_assets(url)


Total 'breakup' links in OLD file: 3543
Total 'breakup' links in NEW file: 3487
Total missing 'breakup' links: 56

Fetching assets for 20 missing 'breakup' links:

Assets for: https://api.planet.com/data/v1/item-types/PSScene/items/20210707_003601_0f4c/assets/
Available assets:
- basic_udm2
- ortho_udm2
- ortho_visual

Assets for: https://api.planet.com/data/v1/item-types/PSScene/items/20200703_002619_0f02/assets/
Available assets:
- basic_udm2
- ortho_udm2
- ortho_visual

Assets for: https://api.planet.com/data/v1/item-types/PSScene/items/20250711_223353_46_24c6/assets/
Available assets:
- basic_analytic_4b
- basic_analytic_4b_rpc
- basic_analytic_4b_xml
- basic_analytic_8b
- basic_analytic_8b_xml
- basic_udm2
- ortho_analytic_4b
- ortho_analytic_4b_sr
- ortho_analytic_4b_xml
- ortho_analytic_8b
- ortho_analytic_8b_sr
- ortho_analytic_8b_xml
- ortho_udm2
- ortho_visual

Assets for: https://api.planet.com/data/v1/item-types/PSScene/items/20200703_002627_0f02/assets/
Available assets:
