In [None]:
# Enable autoreload to automatically reload modules when they are updated
%load_ext autoreload
%autoreload 2

In [None]:
from pathlib import Path
from typing import Dict, Any, Optional, Union
import joblib
import psutil
import os
import numpy as np

# Only add path if not already added
cwd = Path.cwd()
sarpyx_path = cwd.parent
if str(sarpyx_path) not in __import__('sys').path:
    __import__('sys').path.append(str(sarpyx_path))

# Import after adding path to avoid import errors
try:
    from sarpyx.processor.core.focus import CoarseRDA
    if 'CoarseRDA' not in globals() or 'raw_data' not in globals():
        print('‚úÖ Successfully imported CoarseRDA')
except ImportError as e:
    print(f'‚ùå Import error: {e}')
    raise
    
def get_processed_data_paths(base_dir: Path, burst_idx: int) -> Dict[str, Path]:
    """Get the paths for echo, metadata, and ephemeris files in the processed_data directory.

    Args:
        base_dir (Path): The base directory containing the 'processed_data' folder.

    Returns:
        Dict[str, Path]: Dictionary with keys 'echo', 'metadata', and 'ephemeris' and their corresponding file paths.

    Raises:
        AssertionError: If any of the required files are not found.
    """
    processed_data_dir = base_dir / 'processed_data'
    assert processed_data_dir.exists(), f'Processed data directory not found: {processed_data_dir}'

    # Find files with the specified burst index pattern
    echo_pattern = f'*_burst_{burst_idx}_echo.pkl'
    metadata_pattern = f'*_burst_{burst_idx}_metadata.pkl'
    ephemeris_pattern = '*_ephemeris.pkl'  # Ephemeris files don't have burst index
    
    echo_path = next(processed_data_dir.glob(echo_pattern), None)
    metadata_path = next(processed_data_dir.glob(metadata_pattern), None)
    ephemeris_path = next(processed_data_dir.glob(ephemeris_pattern), None)

    assert echo_path is not None, 'Echo file not found in processed_data directory.'
    assert metadata_path is not None, 'Metadata file not found in processed_data directory.'
    assert ephemeris_path is not None, 'Ephemeris file not found in processed_data directory.'

    return {
        'echo': echo_path,
        'metadata': metadata_path,
        'ephemeris': ephemeris_path,
    }

# Only load data if not already loaded
if 'raw_data' not in globals():
    print('üìÅ Loading processed data files...')
    paths = get_processed_data_paths(cwd.parent, 1)
    echo_path = paths['echo']
    metadata_path = paths['metadata']
    ephemeris_path = paths['ephemeris']

    echo = joblib.load(echo_path)
    metadata = joblib.load(metadata_path)
    ephemeris = joblib.load(ephemeris_path)

    raw_data = {
        'echo': echo,
        'metadata': metadata,
        'ephemeris': ephemeris,
    }

    print('‚úÖ Loaded data successfully:')
    print(f'  Echo shape: {echo.shape}')
    print(f'  Metadata shape: {metadata.shape}')
    print(f'  Ephemeris shape: {ephemeris.shape}')
else:
    print('‚úÖ Data already loaded and available in workspace')
# Initialize the focuser
try:
    focuser = CoarseRDA(raw_data=raw_data,
                        verbose=True,
                        backend='numpy',
                    )
    print('‚úÖ Focuser initialized successfully!')
    print(f'  Radar data shape: {focuser.radar_data.shape}')
    print(f'  Backend: {focuser._backend}')
except Exception as e:
    print(f'‚ùå Error initializing focuser: {e}')
    raise

# SAR Focusing API

This section provides a minimal API to focus the SAR product using the CoarseRDA processor.
The API includes functions for:
- Focusing SAR products
- Displaying results
- Memory management

In [None]:
def focus_sar_product(
    focuser: CoarseRDA, 
    save_path: Optional[Union[str, Path]] = None
) -> np.ndarray:
    """Focus SAR product using Range Doppler Algorithm.
    
    Args:
        focuser (CoarseRDA): Initialized CoarseRDA processor instance.
        save_path (Optional[Union[str, Path]]): Optional path to save the focused data.
        
    Returns:
        np.ndarray: Focused radar data array.
        
    Raises:
        AssertionError: If focuser is not properly initialized.
        Exception: If focusing process fails.
    """
    assert isinstance(focuser, CoarseRDA), 'focuser must be a CoarseRDA instance'
    assert hasattr(focuser, 'radar_data'), 'Focuser must be initialized with radar data'
    assert focuser.radar_data is not None, 'Radar data cannot be None'
    
    print('üöÄ Starting SAR product focusing...')
    print(f'Input data shape: {focuser.radar_data.shape}')
    print(f'Backend: {focuser._backend}')
    
    try:
        # Run the focusing algorithm
        focuser.data_focus()
        
        # Save if path provided
        if save_path:
            save_path = Path(save_path) if isinstance(save_path, str) else save_path
            focuser.save_file(save_path)
            print(f'‚úÖ Focused data saved to: {save_path}')
        
        print('‚úÖ SAR focusing completed successfully!')
        return focuser.radar_data
        
    except Exception as e:
        print(f'‚ùå Error during focusing: {str(e)}')
        print(f'üìä Debug info:')
        print(f'  Radar data shape: {focuser.radar_data.shape}')
        if hasattr(focuser, 'effective_velocities'):
            print(f'  Effective velocities shape: {focuser.effective_velocities.shape}')
        if hasattr(focuser, 'az_freq_vals'):
            print(f'  Azimuth freq vals shape: {focuser.az_freq_vals.shape}')
        if hasattr(focuser, 'D'):
            print(f'  D array shape: {focuser.D.shape}')
        if hasattr(focuser, 'slant_range_vec'):
            print(f'  Slant range vec shape: {focuser.slant_range_vec.shape}')
        raise


def get_focusing_metadata(focuser: CoarseRDA) -> Dict[str, Any]:
    """Extract metadata information from the CoarseRDA processor.
    
    Args:
        focuser (CoarseRDA): Initialized CoarseRDA processor instance.
        
    Returns:
        Dict[str, Any]: Dictionary containing processing metadata.
    """
    metadata = {
        'backend': focuser._backend,
        'verbose': focuser._verbose,
        'data_shape': focuser.radar_data.shape,
        'range_sample_freq': getattr(focuser, 'range_sample_freq', None),
        'replica_length': getattr(focuser, 'replica_len', None),
        'azimuth_lines': focuser.len_az_line,
        'range_lines': focuser.len_range_line,
    }
    
    # Add optional attributes if they exist
    optional_attrs = ['wavelength', 'pri', 'c', 'az_sample_freq']
    for attr in optional_attrs:
        if hasattr(focuser, attr):
            metadata[attr] = getattr(focuser, attr)
    
    return metadata


# Run the focusing process
print('üìÅ Setting up output path...')
focused_data_path = cwd.parent / 'processed_data' / 'focused_sar_data.pkl'
focused_data_path.parent.mkdir(exist_ok=True)  # Ensure directory exists

print('üìä Processor metadata before focusing:')
pre_focus_metadata = get_focusing_metadata(focuser)
for key, value in pre_focus_metadata.items():
    print(f'  {key}: {value}')

focused_radar_data = focus_sar_product(focuser, focused_data_path)

In [None]:
def display_focusing_results(radar_data: np.ndarray, metadata: Optional[Dict[str, Any]] = None) -> None:
    """Display comprehensive information about the focused radar data.
    
    Args:
        radar_data (np.ndarray): Focused radar data array.
        metadata (Optional[Dict[str, Any]]): Optional metadata dictionary.
    """
    print('=' * 50)
    print('üìà SAR FOCUSING RESULTS')
    print('=' * 50)
    
    # Basic data information
    print('üìä Data Information:')
    print(f'  Shape: {radar_data.shape}')
    print(f'  Data type: {radar_data.dtype}')
    print(f'  Size: {radar_data.nbytes / (1024**2):.2f} MB')
    print(f'  Complex data: {np.iscomplexobj(radar_data)}')
    
    # Statistical information
    print('\nüìà Statistical Analysis:')
    if np.iscomplexobj(radar_data):
        amplitudes = np.abs(radar_data)
        phases = np.angle(radar_data)
        
        print(f'  Max amplitude: {amplitudes.max():.6f}')
        print(f'  Mean amplitude: {amplitudes.mean():.6f}')
        print(f'  Min amplitude: {amplitudes.min():.6f}')
        print(f'  Std amplitude: {amplitudes.std():.6f}')
        print(f'  Phase range: [{phases.min():.3f}, {phases.max():.3f}] rad')
    else:
        print(f'  Max value: {radar_data.max():.6f}')
        print(f'  Mean value: {radar_data.mean():.6f}')
        print(f'  Min value: {radar_data.min():.6f}')
        print(f'  Std value: {radar_data.std():.6f}')
    
    # Data quality checks
    print('\nüîç Data Quality Checks:')
    nan_count = np.isnan(radar_data).sum()
    inf_count = np.isinf(radar_data).sum()
    zero_count = (radar_data == 0).sum()
    
    if nan_count > 0:
        print(f'  ‚ö†Ô∏è  Warning: {nan_count} NaN values detected ({nan_count/radar_data.size*100:.2f}%)')
    else:
        print('  ‚úÖ No NaN values detected')
        
    if inf_count > 0:
        print(f'  ‚ö†Ô∏è  Warning: {inf_count} infinite values detected ({inf_count/radar_data.size*100:.2f}%)')
    else:
        print('  ‚úÖ No infinite values detected')
        
    if zero_count > radar_data.size * 0.1:  # More than 10% zeros might be suspicious
        print(f'  ‚ö†Ô∏è  Note: {zero_count} zero values detected ({zero_count/radar_data.size*100:.2f}%)')
    else:
        print(f'  ‚úÖ Zero values: {zero_count} ({zero_count/radar_data.size*100:.2f}%)')
    
    # Processing metadata
    if metadata:
        print('\n‚öôÔ∏è  Processing Metadata:')
        for key, value in metadata.items():
            print(f'  {key}: {value}')
    
    print('=' * 50)


def analyze_focusing_quality(radar_data: np.ndarray) -> Dict[str, float]:
    """Analyze the quality of the focused SAR data.
    
    Args:
        radar_data (np.ndarray): Focused radar data array.
        
    Returns:
        Dict[str, float]: Dictionary containing quality metrics.
    """
    if not np.iscomplexobj(radar_data):
        raise ValueError('Expected complex radar data for quality analysis')
    
    amplitudes = np.abs(radar_data)
    
    # Calculate quality metrics
    quality_metrics = {
        'peak_to_mean_ratio': amplitudes.max() / amplitudes.mean(),
        'dynamic_range_db': 20 * np.log10(amplitudes.max() / amplitudes.min()) if amplitudes.min() > 0 else float('inf'),
        'signal_to_noise_estimate': amplitudes.mean() / amplitudes.std(),
        'image_entropy': -np.sum(amplitudes * np.log(amplitudes + 1e-12)),
        'contrast': amplitudes.std() / amplitudes.mean(),
    }
    
    return quality_metrics


# Display comprehensive results only if focusing was successful
if 'focused_radar_data' in locals():
    post_focus_metadata = get_focusing_metadata(focuser)
    display_focusing_results(focused_radar_data, post_focus_metadata)
    
    # Analyze focusing quality
    print('\nüéØ Focusing Quality Analysis:')
    quality_metrics = analyze_focusing_quality(focused_radar_data)
    for metric, value in quality_metrics.items():
        print(f'  {metric}: {value:.4f}')
else:
    print('‚è∏Ô∏è  Focusing not yet completed - run the focusing cell first.')

# Viz

In [2]:
from sarpyx.processor.utils.viz import plot_with_cdf
import joblib 
# load pkl file
focused_data_path = "/Data_large/marine/PythonProjects/SAR/sarpyx/processed_data/focused_sar_data.pkl"
focused_data = joblib.load(focused_data_path)

In [None]:
focused_data.shape

In [None]:
plot_with_cdf(focused_data[:5000,:5000], savepath="img.jpg")