# AWS S3 Bucket Directory Crawler

This notebook crawls through AWS S3 bucket directories and saves the complete structure to a JSON file.

## Features:
- Recursive traversal of S3 bucket structure
- Nested JSON output format
- File metadata collection (size, last modified, storage class)
- Progress tracking for large buckets
- Configurable depth and filtering options

## 1. Setup and Dependencies

In [1]:
# Install required packages if needed
import sys
!{sys.executable} -m pip install boto3 tqdm --quiet

In [2]:
import boto3
import json
from datetime import datetime
from collections import defaultdict
import os
from typing import Dict, List, Any, Optional
from tqdm.notebook import tqdm
import time
from botocore.exceptions import NoCredentialsError, ClientError
import pandas as pd
import fsspec

print("Libraries imported successfully!")
print(f"Boto3 version: {boto3.__version__}")

Libraries imported successfully!
Boto3 version: 1.37.3


## 2. AWS Configuration

Make sure your AWS credentials are configured. You can set them using:
- Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
- AWS CLI configuration (~/.aws/credentials)
- IAM role (if running on EC2/Lambda)
- Or configure them directly below (not recommended for production)

## 3. Initiate file systems for reading and (temporary) writing

In [3]:
# If this code were re-used for a protected bucket, anon should be False. Disasters is a protected
# bucket, so we set to False
anon = False

In [4]:
#Initiate file system
fs_read = fsspec.filesystem("s3", anon=anon, skip_instance_cache=False)

In [5]:
# List available files within the nasa-disasters bucket
s3_path = f"s3://nasa-disasters/*"

all_dirs = sorted(["s3://" + f for f in fs_read.glob(s3_path)])
print(f"Discovered {len(all_dirs)} directories discovered from {s3_path}")
all_dirs

Discovered 14 directories discovered from s3://nasa-disasters/*


['s3://nasa-disasters/browseui',
 's3://nasa-disasters/california_wildfires_202501',
 's3://nasa-disasters/disasters',
 's3://nasa-disasters/drcs_activations',
 's3://nasa-disasters/event_testing',
 's3://nasa-disasters/nrt',
 's3://nasa-disasters/scripts',
 's3://nasa-disasters/servir-esi',
 's3://nasa-disasters/sport-lis',
 's3://nasa-disasters/testing-access',
 's3://nasa-disasters/tmp-cog-speed-test',
 's3://nasa-disasters/tmp-cog-speed-test-preserved',
 's3://nasa-disasters/tmp-vsm-soil-moisture',
 's3://nasa-disasters/us_svrwx_202504']

In [6]:
# List available files within the drcs_activations directory
s3_path = f"s3://nasa-disasters/drcs_activations/*"

all_dirs = sorted(["s3://" + f for f in fs_read.glob(s3_path)])
print(f"Discovered {len(all_dirs)} directories discovered from {s3_path}")
all_dirs

Discovered 43 directories discovered from s3://nasa-disasters/drcs_activations/*


['s3://nasa-disasters/drcs_activations/2020',
 's3://nasa-disasters/drcs_activations/2021',
 's3://nasa-disasters/drcs_activations/2022',
 's3://nasa-disasters/drcs_activations/2023',
 's3://nasa-disasters/drcs_activations/202301_Flood_CA',
 's3://nasa-disasters/drcs_activations/202302_Earthquake_Turkiye',
 's3://nasa-disasters/drcs_activations/202305_Typhoon_Mawar',
 's3://nasa-disasters/drcs_activations/20230719_SevereWx_NC',
 's3://nasa-disasters/drcs_activations/202307_Fire_Greece',
 's3://nasa-disasters/drcs_activations/202307_Flood_VT',
 's3://nasa-disasters/drcs_activations/202308_Hurricane_Hilary',
 's3://nasa-disasters/drcs_activations/202309_Earthquake_Morocco',
 's3://nasa-disasters/drcs_activations/202309_Hurricane_Idalia',
 's3://nasa-disasters/drcs_activations/202310_Hurricane_Otis',
 's3://nasa-disasters/drcs_activations/202312_Flood_NewEngland',
 's3://nasa-disasters/drcs_activations/2024',
 's3://nasa-disasters/drcs_activations/202401_SevereWx_SoutheastUS',
 's3://nasa

## 3.5 Initialize AWS S3 Client

Create the S3 client for the crawler to use.

In [None]:
# Create S3 client using default credentials or environment variables
try:
    # Create S3 client - will use AWS credentials from:
    # 1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
    # 2. AWS CLI configuration (~/.aws/credentials)
    # 3. IAM role (if running on EC2/Lambda)
    s3_client = boto3.client('s3')
    
    # Try to test the connection with a simple operation
    # Note: We don't need ListBuckets permission, just access to the specific bucket
    try:
        # Try to list buckets (this might fail due to permissions)
        response = s3_client.list_buckets()
        print(f"✅ S3 client initialized successfully")
        print(f"   Found {len(response.get('Buckets', []))} accessible buckets")
    except ClientError as e:
        # If ListBuckets fails, that's OK - we might still have access to specific buckets
        if e.response['Error']['Code'] == 'AccessDenied':
            print(f"⚠️ S3 client initialized but cannot list all buckets (this is OK)")
            print(f"   Will attempt to access specific bucket: nasa-disasters")
            # Test access to the specific bucket we need
            try:
                s3_client.head_bucket(Bucket='nasa-disasters')
                print(f"✅ Confirmed access to nasa-disasters bucket")
            except ClientError as bucket_error:
                print(f"❌ Cannot access nasa-disasters bucket: {bucket_error}")
        else:
            raise e
    
except NoCredentialsError:
    print("❌ No AWS credentials found. Please configure credentials using:")
    print("   - Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)")
    print("   - AWS CLI: aws configure")
    print("   - IAM role (if on EC2)")
    s3_client = None
except Exception as e:
    print(f"❌ Error initializing S3 client: {e}")
    s3_client = None

## 4. S3 Bucket Crawler Implementation

In [8]:
class S3BucketCrawler:
    """
    Crawls S3 bucket structure and creates a nested dictionary of .tif files.
    """
    
    def __init__(self, bucket_name: str, s3_client=None):
        """
        Initialize the crawler.
        
        Args:
            bucket_name: Name of the S3 bucket
            s3_client: Boto3 S3 client (creates new one if not provided)
        """
        self.bucket_name = bucket_name.replace('s3://', '').rstrip('/')
        self.s3_client = s3_client or boto3.client('s3')
        self.total_files = 0
        self.total_size = 0
        self.total_directories = set()
        
    def build_nested_structure(self, file_list: list) -> Dict:
        """
        Convert flat S3 paths to nested dictionary structure.
        
        Args:
            file_list: List of dictionaries containing file information
            
        Returns:
            Nested dictionary representing directory structure
        """
        root = {}
        
        for file_info in file_list:
            path_parts = file_info['key'].split('/')
            current_level = root
            
            # Navigate/create the directory structure
            for i, part in enumerate(path_parts[:-1]):
                if part not in current_level:
                    current_level[part] = {
                        '_type': 'directory',
                        '_path': '/'.join(path_parts[:i+1]) + '/',
                        '_files': [],
                        '_subdirs': {}
                    }
                    current_level = current_level[part]['_subdirs']
                else:
                    current_level = current_level[part]['_subdirs']
            
            # Add the file to its directory
            file_name = path_parts[-1]
            if '_files' not in current_level:
                current_level['_files'] = []
            
            current_level['_files'].append({
                'name': file_name,
                'path': file_info['key'],
                'size_bytes': file_info['size'],
                'size_readable': self._format_size(file_info['size']),
                'last_modified': file_info['last_modified'],
                'storage_class': file_info.get('storage_class', 'STANDARD')
            })
        
        return root
    
    def crawl(self, 
              prefix: str = '', 
              show_progress: bool = True) -> Dict:
        """
        Crawl the bucket and return nested structure of .tif files only.
        
        Args:
            prefix: Start from this prefix (subdirectory)
            show_progress: Show progress bar
            
        Returns:
            Nested dictionary representing bucket structure with .tif files
        """
        print(f"🔍 Starting crawl of s3://{self.bucket_name}/{prefix}")
        print("📌 Filtering for .tif files only")
        
        # Reset statistics
        self.total_files = 0
        self.total_size = 0
        self.total_directories.clear()
        
        # Collect all .tif files
        all_tif_files = []
        
        # Use paginator for large buckets
        paginator = self.s3_client.get_paginator('list_objects_v2')
        page_iterator = paginator.paginate(
            Bucket=self.bucket_name,
            Prefix=prefix
        )
        
        # Process pages
        print("Scanning bucket...")
        for page in page_iterator:
            if 'Contents' in page:
                for obj in page['Contents']:
                    key = obj['Key']
                    
                    # Filter for .tif files only
                    if key.lower().endswith('.tif'):
                        self.total_files += 1
                        self.total_size += obj.get('Size', 0)
                        
                        # Track directories
                        dir_path = '/'.join(key.split('/')[:-1])
                        if dir_path:
                            self.total_directories.add(dir_path)
                        
                        # Add file info
                        all_tif_files.append({
                            'key': key,
                            'size': obj.get('Size', 0),
                            'last_modified': obj.get('LastModified').isoformat() if obj.get('LastModified') else None,
                            'storage_class': obj.get('StorageClass', 'STANDARD')
                        })
                        
                        # Show progress
                        if show_progress and self.total_files % 100 == 0:
                            print(f"  Found {self.total_files} .tif files...", end="\r")
        
        print(f"\n✅ Found {self.total_files} .tif files")
        print(f"📁 Across {len(self.total_directories)} directories")
        print(f"💾 Total size: {self._format_size(self.total_size)}")
        
        # Build nested structure
        structure = self.build_nested_structure(all_tif_files)
        
        # Create final result with metadata
        result = {
            "_metadata": {
                "bucket": self.bucket_name,
                "prefix": prefix,
                "crawled_at": datetime.now().isoformat(),
                "file_filter": ".tif",
                "total_files": self.total_files,
                "total_directories": len(self.total_directories),
                "total_size_bytes": self.total_size,
                "total_size_readable": self._format_size(self.total_size)
            },
            "structure": structure
        }
        
        return result
    
    def _format_size(self, size_bytes: int) -> str:
        """Format bytes to human-readable size."""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if size_bytes < 1024.0:
                return f"{size_bytes:.2f} {unit}"
            size_bytes /= 1024.0
        return f"{size_bytes:.2f} PB"

print("✅ S3BucketCrawler class defined successfully!")

✅ S3BucketCrawler class defined successfully!


In [20]:
class S3DisastersCrawler:
    """
    Simplified crawler for drcs_activations directory that creates clean nested structure.
    """
    
    def __init__(self, bucket_name: str, s3_client=None):
        """
        Initialize the crawler.
        
        Args:
            bucket_name: Name of the S3 bucket
            s3_client: Boto3 S3 client (creates new one if not provided)
        """
        self.bucket_name = bucket_name.replace('s3://', '').rstrip('/')
        self.s3_client = s3_client or boto3.client('s3')
        self.total_files = 0
        self.activation_events = set()
        
    def build_clean_structure(self, file_list: list, prefix: str) -> Dict:
        """
        Build a clean nested structure organized by activation events.
        
        Args:
            file_list: List of S3 keys
            prefix: The prefix to remove (e.g., 'drcs_activations/')
            
        Returns:
            Nested dictionary with activation events and their files
        """
        structure = {}
        
        for key in file_list:
            # Remove the prefix to get relative path
            relative_path = key.replace(prefix, '', 1) if key.startswith(prefix) else key
            parts = relative_path.split('/')
            
            # Skip if not enough parts
            if len(parts) < 2:
                continue
            
            # First part is the activation event (e.g., '202301_Flood_CA')
            activation_event = parts[0]
            self.activation_events.add(activation_event)
            
            # Initialize activation event if not exists
            if activation_event not in structure:
                structure[activation_event] = {}
            
            # Build nested structure for remaining parts
            current_level = structure[activation_event]
            
            # Navigate through subdirectories
            for part in parts[1:-1]:
                if part not in current_level:
                    current_level[part] = {}
                # Check if current_level[part] is a list (files), if so convert to dict
                if isinstance(current_level[part], list):
                    current_level[part] = {'_files': current_level[part]}
                current_level = current_level[part]
            
            # Add the file
            filename = parts[-1]
            if '_files' not in current_level:
                current_level['_files'] = []
            current_level['_files'].append(filename)
        
        return structure
    
    def crawl_drcs_activations(self, show_progress: bool = True) -> Dict:
        """
        Crawl only the drcs_activations directory for .tif files.
        
        Args:
            show_progress: Show progress during crawl
            
        Returns:
            Clean nested dictionary of activation events and their .tif files
        """
        prefix = 'drcs_activations/'
        print(f"🔍 Crawling s3://{self.bucket_name}/{prefix}")
        print("📌 Filtering for .tif files in activation events")
        
        # Reset counters
        self.total_files = 0
        self.activation_events.clear()
        
        # Collect all .tif files
        tif_files = []
        
        # Use paginator for large buckets
        paginator = self.s3_client.get_paginator('list_objects_v2')
        page_iterator = paginator.paginate(
            Bucket=self.bucket_name,
            Prefix=prefix
        )
        
        # Process pages
        print("Scanning activation events...")
        for page in page_iterator:
            if 'Contents' in page:
                for obj in page['Contents']:
                    key = obj['Key']
                    
                    # Filter for .tif files only
                    if key.lower().endswith('.tif'):
                        tif_files.append(key)
                        self.total_files += 1
                        
                        # Show progress
                        if show_progress and self.total_files % 100 == 0:
                            print(f"  Found {self.total_files} .tif files...", end="\r")
        
        print(f"\n✅ Found {self.total_files} .tif files")
        
        # Build clean structure
        structure = self.build_clean_structure(tif_files, prefix)
        
        print(f"📁 Across {len(self.activation_events)} activation events")
        
        # Create result with drcs_activations as root
        result = {
            "drcs_activations": structure,
            "_metadata": {
                "bucket": self.bucket_name,
                "crawled_at": datetime.now().isoformat(),
                "total_tif_files": self.total_files,
                "total_activation_events": len(self.activation_events),
                "activation_events": sorted(list(self.activation_events))
            }
        }
        
        return result

print("✅ S3DisastersCrawler class defined successfully!")

✅ S3DisastersCrawler class defined successfully!


In [22]:
def save_to_json(data: Dict, filename: str, indent: int = 2) -> str:
    """
    Save dictionary to JSON file.
    
    Args:
        data: Dictionary to save
        filename: Output filename
        indent: JSON indentation (None for compact)
        
    Returns:
        Path to saved file
    """
    filepath = os.path.abspath(filename)
    
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=indent, default=str)
    
    file_size = os.path.getsize(filepath)
    print(f"✅ Saved to: {filepath}")
    print(f"📊 File size: {file_size:,} bytes")
    
    return filepath

def print_structure_preview(structure: Dict, max_depth: int = 3, current_depth: int = 0, prefix: str = ""):
    """
    Print a tree-like preview of the structure.
    
    Args:
        structure: Nested dictionary structure
        max_depth: Maximum depth to display
        current_depth: Current recursion depth
        prefix: Prefix for tree display
    """
    if current_depth > max_depth:
        return
    
    # Skip metadata in display
    items = [(k, v) for k, v in structure.items() if not k.startswith('_')]
    
    for i, (key, value) in enumerate(items[:20]):  # Limit to 20 items per level
        is_last = i == len(items) - 1
        
        # Determine if it's a file or directory
        if isinstance(value, dict):
            if value.get('_type') == 'file':
                icon = "📄"
                size = value.get('size_readable', '')
                print(f"{prefix}{'└── ' if is_last else '├── '}{icon} {key} ({size})")
            elif value.get('_type') == 'directory':
                icon = "📁"
                print(f"{prefix}{'└── ' if is_last else '├── '}{icon} {key}/")
                
                # Recurse into directory
                new_prefix = prefix + ("    " if is_last else "│   ")
                if 'contents' in value:
                    print_structure_preview(value['contents'], max_depth, current_depth + 1, new_prefix)
            else:
                print(f"{prefix}{'└── ' if is_last else '├── '}{key}")
                new_prefix = prefix + ("    " if is_last else "│   ")
                print_structure_preview(value, max_depth, current_depth + 1, new_prefix)
    
    if len(items) > 20:
        print(f"{prefix}... and {len(items) - 20} more items")

def get_statistics(structure: Dict) -> pd.DataFrame:
    """
    Generate statistics from crawled structure.
    
    Args:
        structure: Crawled structure dictionary
        
    Returns:
        DataFrame with statistics
    """
    stats = []
    
    def analyze_recursive(obj, path=""):
        for key, value in obj.items():
            if key.startswith('_'):
                continue
                
            if isinstance(value, dict):
                if value.get('_type') == 'file':
                    stats.append({
                        'type': 'file',
                        'path': value.get('_path', ''),
                        'name': key,
                        'size_bytes': value.get('size_bytes', 0),
                        'extension': os.path.splitext(key)[1]
                    })
                elif value.get('_type') == 'directory':
                    stats.append({
                        'type': 'directory',
                        'path': value.get('_path', ''),
                        'name': key,
                        'size_bytes': 0,
                        'extension': ''
                    })
                    if 'contents' in value:
                        analyze_recursive(value['contents'], value.get('_path', ''))
    
    if 'structure' in structure:
        analyze_recursive(structure['structure'])
    else:
        analyze_recursive(structure)
    
    df = pd.DataFrame(stats)
    
    if not df.empty:
        # Add summary
        print("\n📊 Statistics:")
        print(f"Total files: {len(df[df['type'] == 'file'])}")
        print(f"Total directories: {len(df[df['type'] == 'directory'])}")
        print(f"Total size: {df['size_bytes'].sum():,} bytes")
        
        # File extensions
        if len(df[df['type'] == 'file']) > 0:
            ext_counts = df[df['type'] == 'file']['extension'].value_counts().head(10)
            print("\nTop file extensions:")
            print(ext_counts)
    
    return df

print("✅ Helper functions defined successfully!")

✅ Helper functions defined successfully!


In [23]:
# Configuration
BUCKET_NAME = "nasa-disasters"  # Change this to your target bucket
PREFIX = "drcs_activations/"     # Leave empty for entire bucket, or specify a path like "drcs_activations/"
OUTPUT_FILE = "s3_tif_files_structure.json"  # Output filename

print(f"Configuration:")
print(f"  Bucket: s3://{BUCKET_NAME}/{PREFIX}")
print(f"  File filter: .tif files only")
print(f"  Output file: {OUTPUT_FILE}")

Configuration:
  Bucket: s3://nasa-disasters/drcs_activations/
  File filter: .tif files only
  Output file: s3_tif_files_structure.json


In [24]:
# Initialize crawler
if s3_client is None:
    print("❌ S3 client not initialized. Please check your AWS credentials above.")
else:
    crawler = S3BucketCrawler(BUCKET_NAME, s3_client)
    
    # Crawl the bucket
    try:
        print("🚀 Starting crawl... This may take a while for large buckets.\n")
        
        start_time = time.time()
        
        # Perform the crawl (now filters for .tif files only)
        result = crawler.crawl(
            prefix=PREFIX,
            show_progress=True
        )
        
        elapsed_time = time.time() - start_time
        
        print(f"\n✅ Crawl completed in {elapsed_time:.2f} seconds")
        
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchBucket':
            print(f"❌ Bucket '{BUCKET_NAME}' does not exist or you don't have access.")
        elif e.response['Error']['Code'] == 'AccessDenied':
            print(f"❌ Access denied to bucket '{BUCKET_NAME}'. Check your permissions.")
        else:
            print(f"❌ AWS Error: {e}")
    except Exception as e:
        print(f"❌ Error during crawl: {e}")

❌ S3 client not initialized. Please check your AWS credentials above.


In [25]:
# This cell is now redundant - see the cell above for the updated crawler initialization
# The crawler now properly checks if s3_client exists before attempting to use it

## 6. Preview Structure

In [26]:
# Display tree preview
if 'result' in locals():
    print("\n🌳 Directory Structure Preview (max depth 3):\n")
    print(f"s3://{BUCKET_NAME}/")
    print_structure_preview(result['structure'], max_depth=3)
else:
    print("No results to display. Run the crawl first.")

No results to display. Run the crawl first.


## 7. Save to JSON File

In [27]:
# Save the complete structure to JSON
if 'result' in locals():
    filepath = save_to_json(result, OUTPUT_FILE)
    print(f"\n📝 Structure saved to: {filepath}")
    
    # Option to save compact version (no indentation, smaller file)
    compact_file = OUTPUT_FILE.replace('.json', '_compact.json')
    compact_path = save_to_json(result, compact_file, indent=None)
    print(f"📝 Compact version saved to: {compact_path}")
else:
    print("No results to save. Run the crawl first.")

No results to save. Run the crawl first.


## 8. Generate Statistics

In [28]:
# Generate and display statistics
if 'result' in locals():
    df_stats = get_statistics(result)
    
    # Save statistics to CSV
    if not df_stats.empty:
        stats_file = OUTPUT_FILE.replace('.json', '_statistics.csv')
        df_stats.to_csv(stats_file, index=False)
        print(f"\n📊 Statistics saved to: {stats_file}")
        
        # Show sample
        print("\nSample of files:")
        display(df_stats[df_stats['type'] == 'file'].head())
else:
    print("No results to analyze. Run the crawl first.")

No results to analyze. Run the crawl first.


## 9. Advanced Usage Examples

In [16]:
# Example 1: Crawl with specific file extensions only
def crawl_specific_files(bucket_name, extensions):
    """
    Crawl only specific file types.
    """
    crawler = S3BucketCrawler(bucket_name, s3_client)
    
    result = crawler.crawl(
        prefix="",
        max_depth=3,  # Limit depth for faster results
        include_metadata=True,
        file_extensions=extensions,
        show_progress=False
    )
    
    return result

# Example: Find only GeoTIFF and NetCDF files
# geospatial_result = crawl_specific_files("nasa-disasters", ['.tif', '.tiff', '.nc', '.nc4'])
print("Function defined: crawl_specific_files()")

Function defined: crawl_specific_files()


In [17]:
# Example 2: Search for specific patterns in the structure
def search_structure(structure, pattern):
    """
    Search for files/directories matching a pattern.
    """
    matches = []
    
    def search_recursive(obj, current_path=""):
        for key, value in obj.items():
            if key.startswith('_'):
                continue
            
            if pattern.lower() in key.lower():
                matches.append({
                    'name': key,
                    'path': value.get('_path', current_path + key),
                    'type': value.get('_type', 'unknown')
                })
            
            if isinstance(value, dict):
                if 'contents' in value:
                    search_recursive(value['contents'], value.get('_path', ''))
                elif not value.get('_type'):
                    search_recursive(value, current_path + key + '/')
    
    if 'structure' in structure:
        search_recursive(structure['structure'])
    else:
        search_recursive(structure)
    
    return matches

# Example usage:
if 'result' in locals():
    # Search for items containing "flood"
    flood_items = search_structure(result, "flood")
    print(f"Found {len(flood_items)} items containing 'flood':")
    for item in flood_items[:5]:  # Show first 5
        print(f"  {item['type']}: {item['name']}")
else:
    print("Run the main crawl first to search.")

Run the main crawl first to search.


In [18]:
# Example 3: Create a flat list of all file paths
def get_all_file_paths(structure):
    """
    Extract all file paths from nested structure.
    """
    file_paths = []
    
    def extract_recursive(obj):
        for key, value in obj.items():
            if key.startswith('_'):
                continue
            
            if isinstance(value, dict):
                if value.get('_type') == 'file':
                    file_paths.append(value.get('_path', key))
                elif 'contents' in value:
                    extract_recursive(value['contents'])
                else:
                    extract_recursive(value)
    
    if 'structure' in structure:
        extract_recursive(structure['structure'])
    else:
        extract_recursive(structure)
    
    return file_paths

# Example usage:
if 'result' in locals():
    all_files = get_all_file_paths(result)
    print(f"\nTotal files in bucket: {len(all_files)}")
    print("\nFirst 10 file paths:")
    for path in all_files[:10]:
        print(f"  s3://{BUCKET_NAME}/{path}")
    
    # Save file list
    file_list_path = OUTPUT_FILE.replace('.json', '_file_list.txt')
    with open(file_list_path, 'w') as f:
        for path in all_files:
            f.write(f"s3://{BUCKET_NAME}/{path}\n")
    print(f"\n📄 File list saved to: {file_list_path}")
else:
    print("Run the main crawl first to extract file paths.")

Run the main crawl first to extract file paths.


## 10. Load and Explore Saved JSON

In [None]:
# Load previously saved JSON file
def load_json_structure(filename):
    """
    Load a previously saved JSON structure.
    """
    with open(filename, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    print(f"✅ Loaded structure from: {filename}")
    
    if '_metadata' in data:
        meta = data['_metadata']
        print(f"\nMetadata:")
        print(f"  Bucket: {meta.get('bucket')}")
        print(f"  Crawled at: {meta.get('crawled_at')}")
        print(f"  Total files: {meta.get('total_files')}")
        print(f"  Total directories: {meta.get('total_directories')}")
        print(f"  Total size: {meta.get('total_size_readable')}")
    
    return data

# Load the saved file
# Make sure OUTPUT_FILE is defined, or use a default
if 'OUTPUT_FILE' not in locals():
    OUTPUT_FILE = "s3_tif_files_structure.json"
    
if os.path.exists(OUTPUT_FILE):
    loaded_data = load_json_structure(OUTPUT_FILE)
else:
    print(f"File {OUTPUT_FILE} not found. Run the crawl first.")

## 11. DRCS Activations - Focused Disaster Event Crawling

Use the S3DisastersCrawler for a simplified, organized view of disaster activation events.

In [None]:
# Configuration for DRCS Activations
DRCS_OUTPUT_FILE = "drcs_activations_tif_files.json"

print(f"Configuration:")
print(f"  Bucket: s3://{BUCKET_NAME}/drcs_activations/")
print(f"  File filter: .tif files only")
print(f"  Output file: {DRCS_OUTPUT_FILE}")

if s3_client is None:
    print("\n❌ S3 client not initialized. Please check your AWS credentials above.")
else:
    # Initialize the disasters crawler
    disasters_crawler = S3DisastersCrawler(BUCKET_NAME, s3_client)
    
    # Crawl drcs_activations
    print("\n🚀 Starting focused crawl of drcs_activations...\n")
    
    start_time = time.time()
    
    # Perform the crawl
    drcs_result = disasters_crawler.crawl_drcs_activations(show_progress=True)
    
    elapsed_time = time.time() - start_time
    
    print(f"\n✅ Crawl completed in {elapsed_time:.2f} seconds")
    
    # Show sample activation events
    if '_metadata' in drcs_result:
        events = drcs_result['_metadata']['activation_events'][:10]
        print(f"\nSample activation events:")
        for event in events:
            print(f"  - {event}")
        if len(drcs_result['_metadata']['activation_events']) > 10:
            print(f"  ... and {len(drcs_result['_metadata']['activation_events']) - 10} more")

In [None]:
# Preview the structure
if 'drcs_result' in locals():
    print("📊 Structure Preview:\n")
    
    # Show first 3 activation events and their structure
    events = list(drcs_result['drcs_activations'].keys())[:3]
    
    for event in events:
        print(f"📁 {event}/")
        event_data = drcs_result['drcs_activations'][event]
        
        # Show subdirectories and file counts
        for subdir, content in event_data.items():
            if isinstance(content, dict):
                if '_files' in content:
                    print(f"  └── {subdir}/ ({len(content['_files'])} .tif files)")
                else:
                    # Count nested files
                    total = sum(len(v['_files']) if isinstance(v, dict) and '_files' in v else 0 
                               for v in content.values())
                    print(f"  └── {subdir}/ ({total} .tif files in subdirs)")
            elif isinstance(content, list):
                print(f"  └── {len(content)} .tif files directly in {event}/")
        print()
else:
    print("No DRCS results to preview. Run the crawler first.")

In [None]:
# Save the DRCS structure
if 'drcs_result' in locals():
    filepath = save_to_json(drcs_result, DRCS_OUTPUT_FILE)
    print(f"\n📝 DRCS structure saved to: {filepath}")
    
    # Also save a compact version
    compact_file = DRCS_OUTPUT_FILE.replace('.json', '_compact.json')
    compact_path = save_to_json(drcs_result, compact_file, indent=None)
    print(f"📝 Compact version saved to: {compact_path}")
else:
    print("No DRCS results to save. Run the crawler first.")

In [None]:
# Access example - show how to work with the DRCS data
if 'drcs_result' in locals():
    print("🔍 Access Examples:\n")
    
    # Get all activation events
    all_events = list(drcs_result['drcs_activations'].keys())
    print(f"Total activation events: {len(all_events)}")
    
    # Access files for a specific event
    if all_events:
        sample_event = all_events[0]
        print(f"\nAccessing files for '{sample_event}':")
        
        event_data = drcs_result['drcs_activations'][sample_event]
        
        # Count total files in this event
        def count_files(obj):
            total = 0
            if isinstance(obj, dict):
                if '_files' in obj:
                    total += len(obj['_files'])
                for value in obj.values():
                    if isinstance(value, dict):
                        total += count_files(value)
            return total
        
        total_files = count_files(event_data)
        print(f"  Total .tif files: {total_files}")
        
        # Show how to iterate through all files
        def get_all_files(obj, prefix=''):
            files = []
            if isinstance(obj, dict):
                if '_files' in obj:
                    for f in obj['_files']:
                        files.append(prefix + f)
                for key, value in obj.items():
                    if key != '_files' and isinstance(value, dict):
                        files.extend(get_all_files(value, prefix + key + '/'))
            return files
        
        all_files = get_all_files(event_data, f'{sample_event}/')
        print(f"\n  First 5 file paths:")
        for f in all_files[:5]:
            print(f"    s3://nasa-disasters/drcs_activations/{f}")
else:
    print("No DRCS results available. Run the crawler first.")

## 12. Split DRCS Data by Year

Automatically split the crawled DRCS activation data into separate files for each year (2020-2025).

In [None]:
def extract_year_from_event_name(event_name):
    """Extract year from event name."""
    # Check if it's a plain year (e.g., "2020")
    if event_name in ["2020", "2021", "2022", "2023", "2024", "2025"]:
        return int(event_name)
    
    # Check if it starts with a year (e.g., "202301_Flood_CA" or "20230719_SevereWx_NC")
    if event_name[:4].isdigit():
        year = int(event_name[:4])
        if 2020 <= year <= 2025:
            return year
    
    return None

def count_files_recursive(data):
    """Count total .tif files in a nested structure."""
    total = 0
    
    if isinstance(data, dict):
        if '_files' in data:
            total += len(data['_files'])
        for key, value in data.items():
            if key != '_files' and isinstance(value, dict):
                total += count_files_recursive(value)
    
    return total

def split_drcs_by_year(drcs_result):
    """
    Split DRCS activation data into separate files by year.
    
    Args:
        drcs_result: The crawled DRCS data dictionary
        
    Returns:
        Dictionary mapping years to their data
    """
    # Initialize year-based dictionaries
    years_data = {year: {} for year in range(2020, 2026)}
    
    # Process each activation event
    drcs_data = drcs_result.get('drcs_activations', {})
    
    for event_name, event_data in drcs_data.items():
        year = extract_year_from_event_name(event_name)
        
        if year and year in years_data:
            # If it's a plain year folder (e.g., "2020"), merge its contents
            if event_name == str(year):
                # This is a year folder, add all its contents
                for sub_event, sub_data in event_data.items():
                    years_data[year][sub_event] = sub_data
            else:
                # This is a named event (e.g., "202301_Flood_CA")
                years_data[year][event_name] = event_data
    
    return years_data

print("✅ Year-splitting functions defined successfully!")

In [None]:
# Automatically split DRCS data by year after saving
if 'drcs_result' in locals():
    print("\n📅 Splitting DRCS data by year (2020-2025)...\n")
    
    # Split the data by year
    years_data = split_drcs_by_year(drcs_result)
    
    # Create separate JSON files for each year
    for year in range(2020, 2026):
        output_file = f'drcs_activations_{year}.json'
        
        # Count statistics
        event_count = len(years_data[year])
        total_files = sum(count_files_recursive(event_data) 
                         for event_data in years_data[year].values())
        
        # Create output structure
        output_data = {
            "drcs_activations": years_data[year],
            "_metadata": {
                "year": year,
                "extracted_from": DRCS_OUTPUT_FILE,
                "created_at": datetime.now().isoformat(),
                "total_events": event_count,
                "total_tif_files": total_files,
                "events": sorted(list(years_data[year].keys()))
            }
        }
        
        # Save to file
        with open(output_file, 'w') as f:
            json.dump(output_data, f, indent=2)
        
        print(f"✅ Created {output_file}")
        print(f"   - Events: {event_count}")
        print(f"   - Total .tif files: {total_files}")
        if event_count > 0:
            sample_events = list(years_data[year].keys())[:3]
            if event_count > 3:
                sample_events.append(f"... and {event_count - 3} more")
            print(f"   - Sample events: {sample_events}")
        print()
    
    print("📊 Year-based splitting complete!")
else:
    print("⚠️ No DRCS data available. Run the DRCS crawler first.")

## Summary

This notebook provides a complete solution for crawling AWS S3 bucket structures and saving them to JSON format.

### Key Features:
- ✅ Recursive traversal of S3 buckets
- ✅ Nested JSON output preserving directory structure
- ✅ File metadata collection (size, modified date, etc.)
- ✅ Configurable depth limits and file filtering
- ✅ Progress tracking for large buckets
- ✅ Statistics generation and analysis
- ✅ Multiple output formats (structured JSON, file list, statistics CSV)

### Next Steps:
1. Modify `BUCKET_NAME` to your target bucket
2. Adjust `PREFIX` to focus on specific directories
3. Set `MAX_DEPTH` to limit traversal depth
4. Use file extension filters for specific file types
5. Analyze the generated JSON for your specific use case

### Tips:
- For very large buckets, consider using prefixes to crawl in sections
- The compact JSON format saves space for large structures
- Use the statistics CSV for data analysis and reporting
- The file list output is useful for batch processing operations