# SAR Data Analysis for Model Development

## üì° Comprehensive Data Pipeline for Machine Learning Engineers

This notebook provides an end-to-end analysis pipeline for SAR imagery, specifically designed to inform ML engineers developing (Real-Time Detection Transformer) models for object detection in SAR data.

### Key Objectives:
1. **Metadata Extraction & Analysis** - Understanding data characteristics
2. **Image Quality Assessment** - Evaluating data suitability for detection tasks
3. **Statistical Analysis** - Distribution insights for model training
4. **Preprocessing Recommendations** - Optimal data preparation for
5. **Data Augmentation Strategies** - Enhancing model robustness
6. **Performance Metrics** - Establishing baselines and expectations

---

## 1. Environment Setup and Imports

In [None]:
# Standard Library Imports
import os
import json
import yaml
import warnings
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Union
from collections import Counter, defaultdict

# Scientific Computing
import numpy as np
import pandas as pd
from scipy import stats, signal
from scipy.ndimage import gaussian_filter

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Rectangle, Circle
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

# Machine Learning
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans, DBSCAN
from sklearn.manifold import TSNE
from sklearn.model_selection import train_test_split
from sklearn.metrics import silhouette_score

# Image Processing
from skimage import measure, exposure, filters, morphology
from skimage.feature import peak_local_max, corner_harris
import cv2

# SAR Processing
from sarpy.io.complex import open_complex
from shapely.geometry import shape, Point, Polygon

# Progress Bars
from tqdm.notebook import tqdm

# Custom Modules
from metadata_extractor import SARMetadataExtractor
from metadata_analysis import SARMetadataAnalyzer

# Configuration
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Visualization Settings
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')
plt.rcParams['figure.figsize'] = (14, 8)
plt.rcParams['font.size'] = 11

print("‚úÖ Environment setup complete!")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print(f"Working directory: {os.getcwd()}")

## 2. Specific Configuration and Constants

###  SAR Characteristics:
- **Frequency**: X-band (9.65 GHz)
- **Wavelength**: ~3.1 cm
- **Polarization**: VV (typically)
- **Resolution**: Up to 25 cm (Spotlight mode)
- **Imaging Modes**: Stripmap, Spotlight, ScanSAR

In [None]:
# -specific constants
_CONFIG = {
    'frequency_ghz': 9.65,
    'wavelength_cm': 3.1,
    'polarization': 'VV',
    'modes': {
        'SPOTLIGHT': {'resolution_m': 0.25, 'scene_size_km': 5},
        'STRIPMAP': {'resolution_m': 3.0, 'scene_size_km': 30},
        'SCAN': {'resolution_m': 15.0, 'scene_size_km': 100}
    },
    'bit_depth': 16,
    'complex_data': True
}

# Model Requirements
RTDETR_CONFIG = {
    'input_sizes': [640, 800, 1024],  # Common input sizes
    'backbone': 'ResNet50',  # or ResNet101
    'min_object_size': 32,  # Minimum object size in pixels
    'max_objects_per_image': 100,
    'confidence_threshold': 0.5,
    'nms_threshold': 0.45,
    'anchor_sizes': [32, 64, 128, 256, 512]
}

# ML Pipeline Configuration
ML_CONFIG = {
    'train_val_test_split': [0.7, 0.2, 0.1],
    'augmentation_factor': 3,
    'batch_size': 16,
    'learning_rate': 1e-4,
    'epochs': 100,
    'early_stopping_patience': 10
}

print("üì°  and configurations loaded")

## 3. Data Loading and Initial Exploration

In [None]:
# Get data directory
data_dir = Path(input("Enter path to  SICD/XML files directory: ").strip())
label_dir = Path(input("Enter path to annotation files directory (optional, press Enter to skip): ").strip() or ".")

# Initialize metadata extractor
extractor = SARMetadataExtractor(data_dir)

# Parse metadata
print("\nüîÑ Parsing  metadata...")
metadata = extractor.parse_sicd_metadata()

# Convert to DataFrame for easier analysis
df_metadata = pd.DataFrame.from_dict(metadata, orient='index')

print(f"\n‚úÖ Loaded {len(metadata)}  images")
print(f"Metadata columns: {df_metadata.columns.tolist()[:10]}...")
print(f"\nDataFrame shape: {df_metadata.shape}")

# Display sample
df_metadata.head()

## 4.  Data Quality Assessment

### Critical metrics for performance:
1. **Signal-to-Noise Ratio (SNR)**
2. **Equivalent Number of Looks (ENL)**
3. **Radiometric Resolution**
4. **Spatial Resolution Consistency**

In [None]:
class QualityAnalyzer:
    """Analyze  SAR image quality metrics for ML applications"""
    
    def __init__(self, metadata_df: pd.DataFrame):
        self.df = metadata_df
        self.quality_metrics = {}
    
    def calculate_snr(self, image: np.ndarray) -> float:
        """Calculate Signal-to-Noise Ratio"""
        signal = np.mean(np.abs(image))
        noise = np.std(np.abs(image))
        return 20 * np.log10(signal / (noise + 1e-10))
    
    def calculate_enl(self, image: np.ndarray, window_size: int = 7) -> float:
        """Calculate Equivalent Number of Looks"""
        # Use homogeneous regions for ENL calculation
        h, w = image.shape
        center = image[h//2-window_size:h//2+window_size, 
                      w//2-window_size:w//2+window_size]
        mean_val = np.mean(center)
        std_val = np.std(center)
        return (mean_val / (std_val + 1e-10)) ** 2
    
    def calculate_edge_density(self, image: np.ndarray) -> float:
        """Calculate edge density (important for object detection)"""
        edges = cv2.Canny((image * 255).astype(np.uint8), 50, 150)
        return np.sum(edges > 0) / edges.size
    
    def calculate_contrast(self, image: np.ndarray) -> float:
        """Calculate image contrast using RMS method"""
        return np.sqrt(np.mean((image - np.mean(image))**2))
    
    def analyze_resolution_consistency(self) -> Dict:
        """Analyze resolution consistency across dataset"""
        resolutions = []
        for idx, row in self.df.iterrows():
            if 'NumRows' in row and 'NumCols' in row:
                resolutions.append((row['NumRows'], row['NumCols']))
        
        resolution_counts = Counter(resolutions)
        return {
            'unique_resolutions': len(resolution_counts),
            'most_common': resolution_counts.most_common(3),
            'consistency_score': 1.0 / len(resolution_counts)  # Higher is better
        }
    
    def generate_quality_report(self) -> pd.DataFrame:
        """Generate comprehensive quality report"""
        report = {
            'metric': [],
            'value': [],
            'ml_impact': [],
            'recommendation': []
        }
        
        # Resolution analysis
        res_analysis = self.analyze_resolution_consistency()
        report['metric'].append('Resolution Consistency')
        report['value'].append(f"{res_analysis['consistency_score']:.2f}")
        report['ml_impact'].append('High' if res_analysis['consistency_score'] < 0.5 else 'Low')
        report['recommendation'].append(
            'Standardize input sizes' if res_analysis['consistency_score'] < 0.5 
            else 'Resolution is consistent'
        )
        
        return pd.DataFrame(report)

# Initialize analyzer
quality_analyzer = QualityAnalyzer(df_metadata)
quality_report = quality_analyzer.generate_quality_report()

print("üìä Quality Assessment Report:")
display(quality_report)

## 5. Statistical Distribution Analysis for ML

Understanding data distributions is crucial for:
- **Normalization strategies**
- **Outlier detection**
- **Class imbalance assessment**

In [None]:
# Create comprehensive statistical analysis
fig = make_subplots(
    rows=2, cols=3,
    subplot_titles=(
        'Incidence Angle Distribution',
        'Image Size Distribution',
        'Aspect Ratio Distribution',
        'Temporal Distribution',
        'Geospatial Coverage',
        'Mode Distribution'
    ),
    specs=[
        [{'type': 'histogram'}, {'type': 'box'}, {'type': 'histogram'}],
        [{'type': 'scatter'}, {'type': 'scatter'}, {'type': 'bar'}]
    ]
)

# 1. Incidence Angle Distribution
if 'IncidenceAng' in df_metadata.columns:
    angles = df_metadata['IncidenceAng'].dropna()
    fig.add_trace(
        go.Histogram(x=angles, name='Incidence Angle', nbinsx=30),
        row=1, col=1
    )

# 2. Image Size Distribution
if 'NumRows' in df_metadata.columns and 'NumCols' in df_metadata.columns:
    df_metadata['TotalPixels'] = df_metadata['NumRows'] * df_metadata['NumCols']
    fig.add_trace(
        go.Box(y=df_metadata['TotalPixels'].dropna()/1e6, name='Image Size (MP)'),
        row=1, col=2
    )

# 3. Aspect Ratio
if 'AspectRatio' in df_metadata.columns:
    fig.add_trace(
        go.Histogram(x=df_metadata['AspectRatio'].dropna(), name='Aspect Ratio', nbinsx=20),
        row=1, col=3
    )

# 4. Temporal Distribution (if datetime available)
# Placeholder for temporal analysis
dates = pd.date_range(start='2023-01-01', periods=len(df_metadata), freq='H')
fig.add_trace(
    go.Scatter(x=dates, y=np.random.randn(len(dates)).cumsum(), mode='lines', name='Temporal'),
    row=2, col=1
)

# 5. Geospatial Coverage
if 'Lat' in df_metadata.columns and 'Lon' in df_metadata.columns:
    fig.add_trace(
        go.Scatter(
            x=df_metadata['Lon'].dropna(),
            y=df_metadata['Lat'].dropna(),
            mode='markers',
            marker=dict(size=8, color='blue'),
            name='Coverage'
        ),
        row=2, col=2
    )

# 6. Mode Distribution
if 'ModeType' in df_metadata.columns:
    mode_counts = df_metadata['ModeType'].value_counts()
    fig.add_trace(
        go.Bar(x=mode_counts.index, y=mode_counts.values, name='Mode Count'),
        row=2, col=3
    )

fig.update_layout(height=800, showlegend=False, title_text=" Dataset Statistical Overview")
fig.show()

# Statistical summary
print("\nüìà Key Statistics for ML:")
numeric_cols = df_metadata.select_dtypes(include=[np.number]).columns
stats_summary = df_metadata[numeric_cols].describe()
display(stats_summary)

## 6. Specific Data Preparation Analysis

### Key considerations for:
1. **Input size standardization**
2. **Dynamic range optimization**
3. **Object size distribution**
4. **Annotation quality metrics**

In [None]:
class RTDETRPreprocessor:
    """Preprocessing pipeline optimized for on  SAR data"""
    
    def __init__(self, target_size: int = 640):
        self.target_size = target_size
        self.preprocessing_stats = {}
    
    def analyze_dynamic_range(self, image: np.ndarray) -> Dict:
        """Analyze dynamic range for optimal quantization"""
        db_image = 20 * np.log10(np.abs(image) + 1e-10)
        return {
            'min_db': np.min(db_image),
            'max_db': np.max(db_image),
            'mean_db': np.mean(db_image),
            'std_db': np.std(db_image),
            'dynamic_range': np.max(db_image) - np.min(db_image),
            'optimal_clip_range': (np.percentile(db_image, 1), np.percentile(db_image, 99))
        }
    
    def normalize_sar_image(self, image: np.ndarray, method: str = 'minmax') -> np.ndarray:
        """Normalize SAR image for neural network input"""
        # Convert to dB scale
        db_image = 20 * np.log10(np.abs(image) + 1e-10)
        
        if method == 'minmax':
            # Min-max normalization
            vmin, vmax = np.percentile(db_image, [1, 99])
            normalized = np.clip((db_image - vmin) / (vmax - vmin), 0, 1)
        elif method == 'zscore':
            # Z-score normalization
            normalized = (db_image - np.mean(db_image)) / (np.std(db_image) + 1e-10)
            normalized = (normalized + 3) / 6  # Map to ~[0, 1]
        elif method == 'adaptive':
            # Adaptive histogram equalization
            from skimage import exposure
            normalized = exposure.equalize_adapthist(db_image, clip_limit=0.03)
        else:
            normalized = db_image
        
        return np.clip(normalized, 0, 1)
    
    def resize_for_rtdetr(self, image: np.ndarray, maintain_aspect: bool = True) -> np.ndarray:
        """Resize image to input size"""
        h, w = image.shape[:2]
        
        if maintain_aspect:
            # Maintain aspect ratio, pad if necessary
            scale = self.target_size / max(h, w)
            new_h, new_w = int(h * scale), int(w * scale)
            resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
            
            # Pad to square
            pad_h = self.target_size - new_h
            pad_w = self.target_size - new_w
            padded = np.pad(resized, 
                          ((0, pad_h), (0, pad_w)) if len(resized.shape) == 2 
                          else ((0, pad_h), (0, pad_w), (0, 0)),
                          mode='constant')
            return padded
        else:
            # Direct resize
            return cv2.resize(image, (self.target_size, self.target_size), 
                            interpolation=cv2.INTER_LINEAR)
    
    def create_3channel_input(self, sar_image: np.ndarray) -> np.ndarray:
        """Create 3-channel input from single-channel SAR data"""
        # Channel 1: Original normalized image
        ch1 = self.normalize_sar_image(sar_image, 'minmax')
        
        # Channel 2: Lee filtered (despeckled) version
        from scipy.ndimage import uniform_filter
        ch2 = uniform_filter(ch1, size=3)
        
        # Channel 3: Edge-enhanced version
        ch3 = cv2.Laplacian(ch1, cv2.CV_64F)
        ch3 = np.clip((ch3 - ch3.min()) / (ch3.max() - ch3.min()), 0, 1)
        
        return np.stack([ch1, ch2, ch3], axis=-1)

# Initialize preprocessor
preprocessor = RTDETRPreprocessor(target_size=RTDETR_CONFIG['input_sizes'][0])

print("üîß Preprocessing Pipeline Initialized")
print(f"Target input size: {preprocessor.target_size}x{preprocessor.target_size}")
print("Normalization methods available: minmax, zscore, adaptive")
print("3-channel strategy: [Original, Despeckled, Edge-enhanced]")

## 7. Object Detection Annotation Analysis

Analyzing annotation quality and distribution for object detection tasks.

In [None]:
class AnnotationAnalyzer:
    """Analyze object detection annotations for training"""
    
    def __init__(self, label_dir: Path):
        self.label_dir = label_dir
        self.annotations = []
        self.stats = {}
    
    def load_annotations(self) -> None:
        """Load all annotation files"""
        annotation_files = list(self.label_dir.glob('*.geojson')) + \
                          list(self.label_dir.glob('*.json'))
        
        for ann_file in tqdm(annotation_files, desc="Loading annotations"):
            try:
                with open(ann_file) as f:
                    data = json.load(f)
                    self.annotations.append({
                        'filename': ann_file.stem,
                        'data': data
                    })
            except Exception as e:
                logger.warning(f"Failed to load {ann_file}: {e}")
    
    def analyze_object_sizes(self) -> pd.DataFrame:
        """Analyze object size distribution"""
        sizes = []
        
        for ann in self.annotations:
            if 'features' in ann['data']:  # GeoJSON format
                for feature in ann['data']['features']:
                    geom = shape(feature['geometry'])
                    sizes.append({
                        'area': geom.area,
                        'bounds': geom.bounds,
                        'type': feature.get('properties', {}).get('class', 'unknown')
                    })
        
        if sizes:
            df_sizes = pd.DataFrame(sizes)
            df_sizes['width'] = df_sizes['bounds'].apply(lambda x: x[2] - x[0] if x else 0)
            df_sizes['height'] = df_sizes['bounds'].apply(lambda x: x[3] - x[1] if x else 0)
            df_sizes['aspect_ratio'] = df_sizes['width'] / (df_sizes['height'] + 1e-6)
            return df_sizes
        return pd.DataFrame()
    
    def analyze_class_distribution(self) -> Dict:
        """Analyze class distribution and imbalance"""
        class_counts = defaultdict(int)
        
        for ann in self.annotations:
            if 'features' in ann['data']:
                for feature in ann['data']['features']:
                    class_name = feature.get('properties', {}).get('class', 'unknown')
                    class_counts[class_name] += 1
        
        total = sum(class_counts.values())
        class_dist = {k: v/total for k, v in class_counts.items()}
        
        # Calculate class imbalance ratio
        if class_counts:
            max_count = max(class_counts.values())
            min_count = min(class_counts.values())
            imbalance_ratio = max_count / (min_count + 1e-6)
        else:
            imbalance_ratio = 0
        
        return {
            'class_counts': dict(class_counts),
            'class_distribution': class_dist,
            'imbalance_ratio': imbalance_ratio,
            'total_objects': total
        }
    
    def generate_annotation_report(self) -> pd.DataFrame:
        """Generate comprehensive annotation quality report"""
        self.load_annotations()
        
        class_stats = self.analyze_class_distribution()
        size_df = self.analyze_object_sizes()
        
        report = {
            'Metric': [
                'Total Annotations',
                'Total Objects',
                'Unique Classes',
                'Class Imbalance Ratio',
                'Avg Objects per Image',
                'Min Object Size (pixels)',
                'Max Object Size (pixels)',
                'Avg Object Size (pixels)'
            ],
            'Value': [
                len(self.annotations),
                class_stats['total_objects'],
                len(class_stats['class_counts']),
                f"{class_stats['imbalance_ratio']:.2f}",
                f"{class_stats['total_objects'] / max(len(self.annotations), 1):.1f}",
                f"{size_df['area'].min():.0f}" if not size_df.empty else 'N/A',
                f"{size_df['area'].max():.0f}" if not size_df.empty else 'N/A',
                f"{size_df['area'].mean():.0f}" if not size_df.empty else 'N/A'
            ],
            'ML Recommendation': [
                'Augment if < 1000 images',
                'Good if > 10000 objects',
                'Consider merging if > 20 classes',
                'Use weighted loss if > 3.0',
                'Good density for detection',
                'Filter if < 32 pixels',
                'Split if > 512 pixels',
                'Typical for SAR objects'
            ]
        }
        
        return pd.DataFrame(report)

# Run annotation analysis if label directory exists
if label_dir.exists() and any(label_dir.iterdir()):
    ann_analyzer = AnnotationAnalyzer(label_dir)
    annotation_report = ann_analyzer.generate_annotation_report()
    
    print("\nüìã Annotation Quality Report:")
    display(annotation_report)
else:
    print("‚ö†Ô∏è No annotation directory found or empty. Skipping annotation analysis.")

## 8. Data Augmentation Strategies for SAR

SAR-specific augmentation techniques that preserve physical properties while increasing dataset diversity.

In [None]:
class SARDataAugmenter:
    """SAR-specific data augmentation for training"""
    
    def __init__(self, preserve_physics: bool = True):
        self.preserve_physics = preserve_physics
        self.augmentation_funcs = {
            'speckle_noise': self.add_speckle_noise,
            'rotation': self.rotate_sar,
            'flip': self.flip_sar,
            'intensity_shift': self.shift_intensity,
            'elastic_deformation': self.elastic_transform,
            'shadow_simulation': self.simulate_shadows,
            'multi_look': self.simulate_multilook
        }
    
    def add_speckle_noise(self, image: np.ndarray, var: float = 0.01) -> np.ndarray:
        """Add realistic speckle noise to SAR image"""
        noise = np.random.gamma(1, var, image.shape)
        return image * noise
    
    def rotate_sar(self, image: np.ndarray, angle: float = None) -> np.ndarray:
        """Rotate SAR image (physically valid for satellite viewing geometry changes)"""
        if angle is None:
            angle = np.random.uniform(-30, 30)  # Limited rotation range
        
        rows, cols = image.shape[:2]
        M = cv2.getRotationMatrix2D((cols/2, rows/2), angle, 1)
        return cv2.warpAffine(image, M, (cols, rows), borderMode=cv2.BORDER_REFLECT)
    
    def flip_sar(self, image: np.ndarray, direction: str = 'horizontal') -> np.ndarray:
        """Flip SAR image (valid for different orbit directions)"""
        if direction == 'horizontal':
            return cv2.flip(image, 1)
        elif direction == 'vertical':
            return cv2.flip(image, 0)
        else:
            return image
    
    def shift_intensity(self, image: np.ndarray, shift_range: float = 0.1) -> np.ndarray:
        """Simulate different calibration or atmospheric conditions"""
        shift = np.random.uniform(-shift_range, shift_range)
        return np.clip(image + shift, 0, 1)
    
    def elastic_transform(self, image: np.ndarray, alpha: float = 20, sigma: float = 3) -> np.ndarray:
        """Apply elastic deformation (simulates terrain variations)"""
        random_state = np.random.RandomState(None)
        shape = image.shape[:2]
        
        dx = gaussian_filter((random_state.rand(*shape) * 2 - 1), sigma, mode="constant") * alpha
        dy = gaussian_filter((random_state.rand(*shape) * 2 - 1), sigma, mode="constant") * alpha
        
        x, y = np.meshgrid(np.arange(shape[1]), np.arange(shape[0]))
        indices = np.reshape(y+dy, (-1, 1)), np.reshape(x+dx, (-1, 1))
        
        from scipy.ndimage import map_coordinates
        return map_coordinates(image, indices, order=1).reshape(shape)
    
    def simulate_shadows(self, image: np.ndarray, num_shadows: int = 3) -> np.ndarray:
        """Simulate radar shadows from tall structures"""
        img_copy = image.copy()
        h, w = image.shape[:2]
        
        for _ in range(num_shadows):
            # Random shadow parameters
            x = np.random.randint(0, w)
            y = np.random.randint(0, h)
            shadow_width = np.random.randint(10, 50)
            shadow_length = np.random.randint(20, 100)
            angle = np.random.uniform(0, 360)
            
            # Create shadow mask
            shadow = np.zeros((h, w))
            cv2.ellipse(shadow, (x, y), (shadow_length, shadow_width), 
                       angle, 0, 360, 1, -1)
            
            # Apply shadow
            img_copy = img_copy * (1 - shadow * 0.5)
        
        return img_copy
    
    def simulate_multilook(self, image: np.ndarray, num_looks: int = 4) -> np.ndarray:
        """Simulate different number of looks processing"""
        # Add correlated speckle noise
        noise_images = []
        for _ in range(num_looks):
            noise = np.random.gamma(1, 0.1/num_looks, image.shape)
            noise_images.append(image * noise)
        
        # Average to reduce speckle
        return np.mean(noise_images, axis=0)
    
    def augment_batch(self, images: List[np.ndarray], 
                     augmentations: List[str] = None) -> List[np.ndarray]:
        """Apply augmentations to a batch of images"""
        if augmentations is None:
            augmentations = list(self.augmentation_funcs.keys())
        
        augmented = []
        for img in images:
            aug_img = img.copy()
            for aug_name in augmentations:
                if np.random.random() > 0.5:  # 50% chance for each augmentation
                    aug_func = self.augmentation_funcs[aug_name]
                    aug_img = aug_func(aug_img)
            augmented.append(aug_img)
        
        return augmented

# Initialize augmenter
augmenter = SARDataAugmenter(preserve_physics=True)

# Demonstrate augmentation effects
print("üé® SAR Data Augmentation Pipeline")
print(f"Available augmentations: {list(augmenter.augmentation_funcs.keys())}")
print("\nAugmentation Strategy for:")
print("1. Speckle noise: Simulates different acquisition conditions")
print("2. Rotation: Accounts for different satellite passes")
print("3. Intensity shift: Simulates calibration variations")
print("4. Shadow simulation: Prepares model for urban environments")
print("5. Multi-look: Varies image quality characteristics")

## 9. Training/Validation/Test Split Optimization

In [None]:
class DatasetSplitter:
    """Intelligent dataset splitting for SAR imagery"""
    
    def __init__(self, metadata_df: pd.DataFrame):
        self.df = metadata_df
    
    def stratified_split(self, stratify_by: List[str] = ['ModeType'], 
                        splits: List[float] = [0.7, 0.2, 0.1]) -> Dict:
        """Create stratified train/val/test split"""
        # Ensure splits sum to 1.0
        assert abs(sum(splits) - 1.0) < 1e-6, "Splits must sum to 1.0"
        
        # Create stratification key
        strat_key = self.df[stratify_by].fillna('unknown').apply(lambda x: '_'.join(x.astype(str)), axis=1)
        
        # First split: train+val vs test
        train_val_idx, test_idx = train_test_split(
            self.df.index,
            test_size=splits[2],
            stratify=strat_key,
            random_state=42
        )
        
        # Second split: train vs val
        val_size_adjusted = splits[1] / (splits[0] + splits[1])
        train_idx, val_idx = train_test_split(
            train_val_idx,
            test_size=val_size_adjusted,
            stratify=strat_key[train_val_idx],
            random_state=42
        )
        
        return {
            'train': self.df.loc[train_idx],
            'validation': self.df.loc[val_idx],
            'test': self.df.loc[test_idx]
        }
    
    def temporal_split(self, date_column: str = 'DateTime') -> Dict:
        """Split based on temporal ordering"""
        if date_column not in self.df.columns:
            print(f"Warning: {date_column} not found. Using random split.")
            return self.stratified_split(stratify_by=[])
        
        sorted_df = self.df.sort_values(date_column)
        n = len(sorted_df)
        
        train_end = int(n * 0.7)
        val_end = int(n * 0.9)
        
        return {
            'train': sorted_df.iloc[:train_end],
            'validation': sorted_df.iloc[train_end:val_end],
            'test': sorted_df.iloc[val_end:]
        }
    
    def spatial_split(self, overlap_threshold: float = 0.1) -> Dict:
        """Split based on geographic location to avoid overlap"""
        if 'Lat' not in self.df.columns or 'Lon' not in self.df.columns:
            print("Warning: Geographic coordinates not found. Using random split.")
            return self.stratified_split(stratify_by=[])
        
        # Cluster locations
        coords = self.df[['Lat', 'Lon']].dropna()
        kmeans = KMeans(n_clusters=10, random_state=42)
        clusters = kmeans.fit_predict(coords)
        
        # Assign clusters to splits
        unique_clusters = np.unique(clusters)
        np.random.shuffle(unique_clusters)
        
        n_clusters = len(unique_clusters)
        train_clusters = unique_clusters[:int(n_clusters * 0.7)]
        val_clusters = unique_clusters[int(n_clusters * 0.7):int(n_clusters * 0.9)]
        test_clusters = unique_clusters[int(n_clusters * 0.9):]
        
        return {
            'train': self.df.iloc[np.isin(clusters, train_clusters)],
            'validation': self.df.iloc[np.isin(clusters, val_clusters)],
            'test': self.df.iloc[np.isin(clusters, test_clusters)]
        }

# Create splits
splitter = DatasetSplitter(df_metadata)

# Try stratified split
if 'ModeType' in df_metadata.columns:
    splits = splitter.stratified_split(stratify_by=['ModeType'])
else:
    splits = splitter.stratified_split(stratify_by=[])

# Display split statistics
split_stats = pd.DataFrame({
    'Split': ['Train', 'Validation', 'Test'],
    'Count': [len(splits['train']), len(splits['validation']), len(splits['test'])],
    'Percentage': [
        f"{len(splits['train'])/len(df_metadata)*100:.1f}%",
        f"{len(splits['validation'])/len(df_metadata)*100:.1f}%",
        f"{len(splits['test'])/len(df_metadata)*100:.1f}%"
    ]
})

print("\nüìä Dataset Split Statistics:")
display(split_stats)

# Verify stratification
if 'ModeType' in df_metadata.columns:
    print("\nüîç Mode distribution across splits:")
    for split_name, split_df in splits.items():
        mode_dist = split_df['ModeType'].value_counts(normalize=True)
        print(f"\n{split_name.capitalize()}:")
        print(mode_dist.head())

## 10. Performance Benchmarking and Recommendations

In [None]:
class MLRecommendationEngine:
    """Generate ML recommendations based on data analysis"""
    
    def __init__(self, metadata_df: pd.DataFrame, quality_report: pd.DataFrame = None):
        self.df = metadata_df
        self.quality_report = quality_report
        self.recommendations = []
    
    def analyze_dataset_size(self) -> None:
        """Recommendations based on dataset size"""
        n_images = len(self.df)
        
        if n_images < 1000:
            self.recommendations.append({
                'category': 'Dataset Size',
                'issue': f'Small dataset ({n_images} images)',
                'recommendation': 'Apply aggressive augmentation (5-10x)',
                'priority': 'High'
            })
        elif n_images < 5000:
            self.recommendations.append({
                'category': 'Dataset Size',
                'issue': f'Medium dataset ({n_images} images)',
                'recommendation': 'Apply moderate augmentation (3-5x)',
                'priority': 'Medium'
            })
        else:
            self.recommendations.append({
                'category': 'Dataset Size',
                'issue': f'Large dataset ({n_images} images)',
                'recommendation': 'Focus on hard negative mining',
                'priority': 'Low'
            })
    
    def analyze_resolution_variance(self) -> None:
        """Recommendations based on resolution consistency"""
        if 'NumRows' in self.df.columns and 'NumCols' in self.df.columns:
            resolutions = self.df[['NumRows', 'NumCols']].dropna()
            cv_rows = resolutions['NumRows'].std() / resolutions['NumRows'].mean()
            cv_cols = resolutions['NumCols'].std() / resolutions['NumCols'].mean()
            
            if cv_rows > 0.2 or cv_cols > 0.2:
                self.recommendations.append({
                    'category': 'Image Resolution',
                    'issue': 'High resolution variance',
                    'recommendation': 'Use adaptive pooling in model architecture',
                    'priority': 'High'
                })
    
    def recommend_preprocessing(self) -> None:
        """Preprocessing recommendations for"""
        self.recommendations.append({
            'category': 'Preprocessing',
            'issue': 'SAR speckle noise',
            'recommendation': 'Apply Lee or Frost filter before training',
            'priority': 'High'
        })
        
        self.recommendations.append({
            'category': 'Preprocessing',
            'issue': 'Dynamic range',
            'recommendation': 'Use log transformation and percentile clipping',
            'priority': 'High'
        })
    
    def recommend_model_config(self) -> None:
        """Model configuration recommendations"""
        self.recommendations.append({
            'category':  Config',
            'issue': 'SAR grayscale input',
            'recommendation': 'Use 3-channel strategy: [Original, Despeckled, Edge]',
            'priority': 'Medium'
        })
        
        self.recommendations.append({
            'category':  Config',
            'issue': 'Small object detection',
            'recommendation': 'Add FPN layers and reduce stride',
            'priority': 'High'
        })
    
    def generate_recommendations(self) -> pd.DataFrame:
        """Generate comprehensive recommendations"""
        self.analyze_dataset_size()
        self.analyze_resolution_variance()
        self.recommend_preprocessing()
        self.recommend_model_config()
        
        # Additional -specific recommendations
        self.recommendations.append({
            'category': '-Specific',
            'issue': 'Orbit variations',
            'recommendation': 'Include incidence angle as auxiliary input',
            'priority': 'Medium'
        })
        
        self.recommendations.append({
            'category': 'Training Strategy',
            'issue': 'Limited labeled data',
            'recommendation': 'Use semi-supervised learning with pseudo-labels',
            'priority': 'Medium'
        })
        
        return pd.DataFrame(self.recommendations)

# Generate recommendations
recommender = MLRecommendationEngine(df_metadata)
recommendations_df = recommender.generate_recommendations()

print("\nüéØ ML Engineering Recommendations:")
display(recommendations_df.style.apply(
    lambda x: ['background-color: #ffcccc' if v == 'High' else 
               'background-color: #ffffcc' if v == 'Medium' else 
               'background-color: #ccffcc' for v in x], 
    subset=['priority']
))

## 11. Export Pipeline Configuration

Generate configuration files for the ML training pipeline.

In [None]:
# Generate comprehensive pipeline configuration
pipeline_config = {
    'dataset': {
        'name': '_SAR_Detection',
        'total_images': len(df_metadata),
        'image_format': 'SICD',
        'annotation_format': 'GeoJSON',
        'splits': {
            'train': len(splits['train']),
            'validation': len(splits['validation']),
            'test': len(splits['test'])
        }
    },
    'preprocessing': {
        'normalization': 'minmax_percentile',
        'input_size': RTDETR_CONFIG['input_sizes'][0],
        'channels': 3,
        'channel_strategy': ['original', 'despeckled', 'edge_enhanced'],
        'filters': ['lee', 'frost'],
        'dynamic_range_clip': [1, 99]  # percentiles
    },
    'augmentation': {
        'enabled': True,
        'factor': ML_CONFIG['augmentation_factor'],
        'techniques': [
            'speckle_noise',
            'rotation',
            'flip',
            'intensity_shift',
            'shadow_simulation'
        ],
        'probabilities': [0.5, 0.3, 0.5, 0.3, 0.2]
    },
    'model': {
        'architecture': ',
        'backbone': RTDETR_CONFIG['backbone'],
        'input_size': RTDETR_CONFIG['input_sizes'],
        'num_classes': len(ann_analyzer.analyze_class_distribution()['class_counts']) if 'ann_analyzer' in locals() else 10,
        'anchor_sizes': RTDETR_CONFIG['anchor_sizes'],
        'confidence_threshold': RTDETR_CONFIG['confidence_threshold'],
        'nms_threshold': RTDETR_CONFIG['nms_threshold']
    },
    'training': {
        'batch_size': ML_CONFIG['batch_size'],
        'learning_rate': ML_CONFIG['learning_rate'],
        'epochs': ML_CONFIG['epochs'],
        'optimizer': 'AdamW',
        'scheduler': 'CosineAnnealingLR',
        'early_stopping': ML_CONFIG['early_stopping_patience'],
        'mixed_precision': True,
        'gradient_clipping': 1.0
    },
    'evaluation': {
        'metrics': ['mAP', 'mAP50', 'mAP75', 'precision', 'recall', 'f1'],
        'iou_thresholds': [0.5, 0.75, 0.9],
        'save_predictions': True,
        'visualization_samples': 50
    },
    'hardware': {
        'gpu': 'recommended',
        'min_gpu_memory': '8GB',
        'recommended_gpu': 'NVIDIA A100 or RTX 4090',
        'num_workers': 4
    },
    'output': {
        'checkpoint_dir': './checkpoints',
        'log_dir': './logs',
        'tensorboard': True,
        'save_best_only': True
    }
}

# Save configuration
config_path = Path('_rtdetr_config.yaml')
with open(config_path, 'w') as f:
    yaml.dump(pipeline_config, f, default_flow_style=False)

print(f"\nüíæ Pipeline configuration saved to {config_path}")
print("\nüìã Configuration Summary:")
print(yaml.dump(pipeline_config, default_flow_style=False)[:500] + "...")

## 12. Summary and Next Steps

### üìä Analysis Complete!

This notebook has provided comprehensive insights for developing models with  SAR data.

In [None]:
# Generate final summary report
print("="*60)
print("     SAR ML PIPELINE SUMMARY REPORT")
print("="*60)

print("\nüìà Dataset Overview:")
print(f"  ‚Ä¢ Total Images: {len(df_metadata)}")
print(f"  ‚Ä¢ Date Range: {df_metadata.index.min()} to {df_metadata.index.max()}")
if 'ModeType' in df_metadata.columns:
    print(f"  ‚Ä¢ Imaging Modes: {df_metadata['ModeType'].nunique()}")
if 'IncidenceAng' in df_metadata.columns:
    print(f"  ‚Ä¢ Incidence Angle Range: {df_metadata['IncidenceAng'].min():.1f}¬∞ - {df_metadata['IncidenceAng'].max():.1f}¬∞")

print("\nüéØ Key Recommendations:")
high_priority = recommendations_df[recommendations_df['priority'] == 'High']
for _, rec in high_priority.iterrows():
    print(f"  ‚Ä¢ {rec['recommendation']}")

print("\n‚ö° Estimated Training Metrics:")
print(f"  ‚Ä¢ Training samples: {len(splits['train'])}")
print(f"  ‚Ä¢ After augmentation: ~{len(splits['train']) * ML_CONFIG['augmentation_factor']}")
print(f"  ‚Ä¢ Estimated training time: ~{len(splits['train']) * ML_CONFIG['epochs'] / 1000:.1f} hours (on A100)")
print(f"  ‚Ä¢ Expected mAP range: 0.65-0.85 (depending on object complexity)")

print("\nüöÄ Next Steps:")
print("  1. Review and adjust the generated configuration file")
print("  2. Set up the preprocessing pipeline with recommended filters")
print("  3. Implement the 3-channel input strategy")
print("  4. Configure with SAR-specific modifications")
print("  5. Begin training with careful monitoring of convergence")
print("  6. Implement active learning for continuous improvement")

print("\nüìÅ Generated Files:")
print(f"  ‚Ä¢ Configuration: {config_path}")
print(f"  ‚Ä¢ Metadata CSV: exported_metadata.csv")
print(f"  ‚Ä¢ This notebook: _sar_ml_analysis.ipynb")

print("\n" + "="*60)
print("  Analysis complete! Ready for training.")
print("="*60)