# Getting DLR Data

In [1]:
import rioxarray as rxr
import xarray as xr
from odc.stac import load
import pystac_client
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
from shapely.geometry import Point
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import os

In [2]:
# Create cache directory
CACHE_DIR = "data/stac_data_cache_full"
os.makedirs(CACHE_DIR, exist_ok=True)

In [3]:
def create_bbox(lon, lat, step=0.000001):
    """Create a bounding box around a point."""
    return [lon - step, lat - step, lon + step, lat + step]

def get_stac_data_for_point(args):
    """Process a single point (for parallel execution)"""
    catalog, collection, measurements, point_idx, lon, lat, bbox_step = args
    
    # Check if cached result exists
    cache_file = f"{CACHE_DIR}/point_{lon}_{lat}.parquet"
    if os.path.exists(cache_file):
        try:
            return pd.read_parquet(cache_file)
        except Exception as e:
            print(f"Error reading cache file {cache_file}: {e}")
            pass  # If cache read fails, continue with regular processing
    
    try:
        # Create bounding box for this point
        bbox = create_bbox(lon, lat, bbox_step)
        
        # Search for items
        search = catalog.search(
            collections=collection,
            bbox=bbox,
            datetime="2018-03-01/2020-12-31"
        )
        
        # Convert search results to list
        items = list(search.items())
        
        if len(items) > 0:
            # Load the data
            dataset = load(
                items,
                measurements=measurements,
                bbox=bbox,
                resolution=20
            )
            
            # Convert to dataframe
            data_point = dataset.isel(time=0).to_dataframe().reset_index()
            
            # Add point metadata
            data_point['point_index'] = point_idx
            data_point['source_lon'] = lon
            data_point['source_lat'] = lat
            
            # Cache the result
            try:
                data_point.to_parquet(cache_file)
            except Exception as e:
                print(f"Error caching point {point_idx}: {e}")
                pass  # If caching fails, continue anyway
            
            return data_point
        else:
            print(f"No items found for point {point_idx} ({lon}, {lat})")
            return None
    except Exception as e:
        print(f"Error processing point {point_idx}: {e}")
        return None

def get_all_auxiliary_data(catalog, collection, measurements, long_lat, bbox_step=0.000001, max_workers=4):
    """Retrieve STAC data for all points using parallel processing."""
    
    # Prepare arguments for each point
    args_list = []

    # Create argument list for all points
    for i in range(len(long_lat)): 
        args_list.append((
            catalog, 
            collection, 
            measurements, 
            i,  # Point index
            long_lat.iloc[i]['GPS_LONG'],
            long_lat.iloc[i]['GPS_LAT'],
            bbox_step
        ))
    
    # Process points in parallel
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks and track with progress bar
        futures = [executor.submit(get_stac_data_for_point, args) for args in args_list]
        
        for future in tqdm(as_completed(futures), total=len(args_list), desc="Processing points"):
            result = future.result()
            if result is not None:
                results.append(result)
    
    # Combine all results
    if not results:
        print("No data retrieved!")
        return None
    
    points_df = pd.concat(results, ignore_index=True)
    
    # Create geometry points for GeoDataFrame
    geometry_points = [Point(x, y) for x, y in zip(points_df['x'], points_df['y'])]
    
    # Convert to GeoDataFrame
    points_gdf = gpd.GeoDataFrame(points_df, geometry=geometry_points, crs=3035)
    
    return points_gdf

In [4]:
dlr_measurements = ["MREF_B02", "MREF_B03", "MREF_B04", "MREF_B05", "MREF_B06", "MREF_B07", "MREF_B08", "MREF_B8A", "MREF_B11", "MREF_B12",  # Mean Reflectance bands
                        "MREF-STD_B02", "MREF-STD_B03", "MREF-STD_B04", "MREF-STD_B05", "MREF-STD_B06", "MREF-STD_B07", "MREF-STD_B08", "MREF-STD_B8A", "MREF-STD_B11", "MREF-STD_B12",  # Standard deviation bands
                        "SRC_B02", "SRC_B03", "SRC_B04", "SRC_B05", "SRC_B06", "SRC_B07", "SRC_B08", "SRC_B8A", "SRC_B11", "SRC_B12",  # Bare Surface Reflectance bands
                        "SRC-STD_B02", "SRC-STD_B03", "SRC-STD_B04", "SRC-STD_B05", "SRC-STD_B06", "SRC-STD_B07", "SRC-STD_B08", "SRC-STD_B8A", "SRC-STD_B11", "SRC-STD_B12",  # Bare Surface Standard Deviation bands
                        "SRC-CI95_B02", "SRC-CI95_B03", "SRC-CI95_B04", "SRC-CI95_B05", "SRC-CI95_B06", "SRC-CI95_B07", "SRC-CI95_B08", "SRC-CI95_B8A", "SRC-CI95_B11", "SRC-CI95_B12",  # Bare Surface 95% Confidence Interval bands
                        "SFREQ-BSF" #, "SFREQ-BSC", "SFREQ-VPC"
                        ]

In [5]:
# Load your data
target_raw = pd.read_csv('data/France_lab.csv')
long_lat = target_raw[['GPS_LONG', 'GPS_LAT']]

# Initialize STAC catalog
dlr_catalog = pystac_client.Client.open("https://geoservice.dlr.de/eoc/ogc/stac/v1")

# Define measurements (you can reduce this list if you don't need all bands)
# dlr_measurements = ["MREF_B02", "MREF_B03", "MREF_B04", "MREF_B08", "MREF_B11", "MREF_B12"]

# Define collection
dlr_collection = ["S2-soilsuite-europe-2018-2022-P5Y"]

In [None]:
# # Process all points (consider using a subset for testing: long_lat.iloc[:10])
# results = get_all_auxiliary_data(
#     catalog=dlr_catalog,
#     collection=dlr_collection,
#     measurements=dlr_measurements,
#     long_lat=long_lat,
#     bbox_step=0.000001,
#     max_workers=4  # Adjust based on your CPU and bandwidth
# )

# # Save the results
# if results is not None:
#     results.to_parquet("data/auxiliary_data_results_full.parquet")
#     print("Data saved successfully")

Processing points:  15%|█▍        | 413/2807 [19:17<1:14:54,  1.88s/it]

Error processing point 416: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))


Processing points:  18%|█▊        | 493/2807 [23:15<2:07:44,  3.31s/it]

Error processing point 497: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))


Processing points:  18%|█▊        | 511/2807 [24:02<1:18:04,  2.04s/it]

No items found for point 513 (88.888888, 88.888888)


Processing points:  33%|███▎      | 933/2807 [43:10<1:02:52,  2.01s/it]

No items found for point 935 (88.888888, 88.888888)


Processing points:  59%|█████▉    | 1668/2807 [1:16:21<36:45,  1.94s/it]  

Error processing point 1671: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))


Processing points:  78%|███████▊  | 2201/2807 [1:40:42<22:58,  2.27s/it]  

Error processing point 2204: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))


Processing points:  86%|████████▌ | 2406/2807 [1:50:21<18:09,  2.72s/it]

Error processing point 2410: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))


Processing points:  92%|█████████▏| 2582/2807 [1:58:41<14:03,  3.75s/it]

Error processing point 2586: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))


Processing points: 100%|██████████| 2807/2807 [2:08:47<00:00,  2.75s/it]


Data saved successfully


## Update missing data

In [6]:
def update_missing_dlr_data(
    target_raw_path='data/France_lab.csv',
    input_parquet="data/auxiliary_data_results_full.parquet", 
    output_parquet="data/auxiliary_data_results_updated.parquet", 
    cache_dir="data/stac_data_cache_full",
    max_workers=4,
    max_retries=3
):
    """
    Find and update missing DLR data points
    
    Args:
        target_raw_path: Path to the CSV containing all target points (with GPS_LONG, GPS_LAT)
        input_parquet: Path to the existing parquet file containing the processed results
        output_parquet: Path to save the updated parquet file (if None, overwrites input_parquet)
        cache_dir: Directory containing cached point data
        max_workers: Maximum number of parallel workers
        max_retries: Maximum number of retry attempts for each point
        
    Returns:
        Updated GeoDataFrame with all available points
    """
    import pandas as pd
    import geopandas as gpd
    import os
    import time
    import random
    from shapely.geometry import Point
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from tqdm import tqdm
    import pystac_client
    
    if output_parquet is None:
        output_parquet = input_parquet
    
    # Load original target points
    print(f"Loading original target points from {target_raw_path}")
    target_raw = pd.read_csv(target_raw_path)
    original_points = target_raw[['GPS_LONG', 'GPS_LAT']]
    
    # Try to load existing results
    if os.path.exists(input_parquet):
        try:
            print(f"Loading existing results from {input_parquet}")
            existing_data = gpd.read_parquet(input_parquet)
            print(f"Loaded {len(existing_data)} points from existing data")
            
            # Extract unique source coordinates from the existing data
            processed_coords = set(zip(existing_data['source_lon'], existing_data['source_lat']))
            print(f"Found {len(processed_coords)} unique processed coordinates")
        except Exception as e:
            print(f"Error loading existing results: {e}")
            existing_data = None
            processed_coords = set()
    else:
        print(f"No existing results found at {input_parquet}")
        existing_data = None
        processed_coords = set()
    
    # Identify missing points
    missing_points = []
    for idx, row in original_points.iterrows():
        point_coord = (row['GPS_LONG'], row['GPS_LAT'])
        if point_coord not in processed_coords:
            missing_points.append((idx, point_coord[0], point_coord[1]))
    
    print(f"Found {len(missing_points)} missing points out of {len(original_points)} total points")
    
    if not missing_points:
        print("No missing points to process!")
        return existing_data
    
    # Initialize STAC catalog
    dlr_catalog = pystac_client.Client.open("https://geoservice.dlr.de/eoc/ogc/stac/v1")
    
    # Define collection
    dlr_collection = ["S2-soilsuite-europe-2018-2022-P5Y"]
    
    # Define measurements
    dlr_measurements = [
        "MREF_B02", "MREF_B03", "MREF_B04", "MREF_B05", "MREF_B06", "MREF_B07", "MREF_B08", "MREF_B8A", "MREF_B11", "MREF_B12",  # Mean Reflectance bands
        "MREF-STD_B02", "MREF-STD_B03", "MREF-STD_B04", "MREF-STD_B05", "MREF-STD_B06", "MREF-STD_B07", "MREF-STD_B08", "MREF-STD_B8A", "MREF-STD_B11", "MREF-STD_B12",  # Standard deviation bands
        "SRC_B02", "SRC_B03", "SRC_B04", "SRC_B05", "SRC_B06", "SRC_B07", "SRC_B08", "SRC_B8A", "SRC_B11", "SRC_B12",  # Bare Surface Reflectance bands
        "SRC-STD_B02", "SRC-STD_B03", "SRC-STD_B04", "SRC-STD_B05", "SRC-STD_B06", "SRC-STD_B07", "SRC-STD_B08", "SRC-STD_B8A", "SRC-STD_B11", "SRC-STD_B12",  # Bare Surface Standard Deviation bands
        "SRC-CI95_B02", "SRC-CI95_B03", "SRC-CI95_B04", "SRC-CI95_B05", "SRC-CI95_B06", "SRC-CI95_B07", "SRC-CI95_B08", "SRC-CI95_B8A", "SRC-CI95_B11", "SRC-CI95_B12",  # Bare Surface 95% Confidence Interval bands
        "SFREQ-BSF"  # Surface Frequency - Bare Soil Frequency
    ]

    # Create cache directory if it doesn't exist
    os.makedirs(cache_dir, exist_ok=True)
    
    # Function to process a single point with retries
    def process_point_with_retries(point_info):
        idx, lon, lat = point_info
        
        for retry in range(max_retries):
            try:
                # Create arguments for the get_stac_data_for_point function
                args = (
                    dlr_catalog,
                    dlr_collection,
                    dlr_measurements,
                    idx,
                    lon,
                    lat,
                    0.000001  # bbox_step
                )
                
                result = get_stac_data_for_point(args)
                if result is not None:
                    print(f"Successfully processed point {idx} ({lon}, {lat})")
                    return result
                
                print(f"No data found for point {idx} ({lon}, {lat}) - Attempt {retry+1}/{max_retries}")
                
                # Add randomized delay between retries
                if retry < max_retries - 1:
                    sleep_time = 5 + random.random() * 10
                    print(f"Waiting {sleep_time:.1f} seconds before retry...")
                    time.sleep(sleep_time)
            
            except Exception as e:
                print(f"Error processing point {idx} ({lon}, {lat}) - Attempt {retry+1}/{max_retries}: {e}")
                
                if retry < max_retries - 1:
                    sleep_time = 5 + random.random() * 10
                    print(f"Waiting {sleep_time:.1f} seconds before retry...")
                    time.sleep(sleep_time)
        
        print(f"Failed to process point {idx} ({lon}, {lat}) after {max_retries} attempts")
        return None
    
    # Process missing points in parallel
    new_results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks and track with progress bar
        futures = [executor.submit(process_point_with_retries, point) for point in missing_points]
        
        for future in tqdm(as_completed(futures), total=len(missing_points), desc="Processing missing points"):
            result = future.result()
            if result is not None:
                new_results.append(result)
    
    print(f"Successfully processed {len(new_results)} out of {len(missing_points)} missing points")
    
    # Combine with existing results
    if new_results:
        # Combine all new results
        new_points_df = pd.concat(new_results, ignore_index=True)
        
        # Create geometry points for GeoDataFrame
        geometry_points = [Point(x, y) for x, y in zip(new_points_df['x'], new_points_df['y'])]
        
        # Convert to GeoDataFrame
        new_points_gdf = gpd.GeoDataFrame(new_points_df, geometry=geometry_points, crs=3035)
        
        # Combine with existing data if available
        if existing_data is not None:
            combined_gdf = pd.concat([existing_data, new_points_gdf], ignore_index=True)
        else:
            combined_gdf = new_points_gdf
        
        # Save the updated results
        combined_gdf.to_parquet(output_parquet)
        print(f"Updated data saved to {output_parquet} ({len(combined_gdf)} total points)")
        
        return combined_gdf
    else:
        print("No new data to add")
        return existing_data

In [8]:
# Load your original data points
target_raw = pd.read_csv('data/France_lab.csv')
long_lat = target_raw[['GPS_LONG', 'GPS_LAT']]

# Update missing points
updated_results = update_missing_dlr_data(
    target_raw_path='data/France_lab.csv',
    input_parquet="data/auxiliary_data_results_full.parquet",
    output_parquet="data/auxiliary_data_results_full_updated.parquet",  # Optional: set to None to overwrite input file
    cache_dir="data/stac_data_cache_full",
    max_workers=4,
    max_retries=3
)

Loading original target points from data/France_lab.csv
Loading existing results from data/auxiliary_data_results_full.parquet
Loaded 2799 points from existing data
Found 2797 unique processed coordinates
Found 8 missing points out of 2807 total points


Processing missing points:   0%|          | 0/8 [00:00<?, ?it/s]

Successfully processed point 497 (0.57576, 46.423743)Successfully processed point 416 (-1.278976, 46.370656)

Successfully processed point 1671 (-2.833854, 48.198354)
Successfully processed point 2204 (3.056281, 46.46886)
Successfully processed point 2410 (-2.852118, 47.522143)
Successfully processed point 2586 (-0.317878, 48.826356)
No items found for point 513 (88.888888, 88.888888)
No data found for point 513 (88.888888, 88.888888) - Attempt 1/3
Waiting 13.7 seconds before retry...
No items found for point 935 (88.888888, 88.888888)
No data found for point 935 (88.888888, 88.888888) - Attempt 1/3
Waiting 13.2 seconds before retry...
No items found for point 935 (88.888888, 88.888888)
No data found for point 935 (88.888888, 88.888888) - Attempt 2/3
Waiting 7.1 seconds before retry...
No items found for point 513 (88.888888, 88.888888)
No data found for point 513 (88.888888, 88.888888) - Attempt 2/3
Waiting 8.5 seconds before retry...


Processing missing points:  88%|████████▊ | 7/8 [00:21<00:03,  3.01s/it]

No items found for point 935 (88.888888, 88.888888)
No data found for point 935 (88.888888, 88.888888) - Attempt 3/3
Failed to process point 935 (88.888888, 88.888888) after 3 attempts


Processing missing points: 100%|██████████| 8/8 [00:22<00:00,  2.84s/it]

No items found for point 513 (88.888888, 88.888888)
No data found for point 513 (88.888888, 88.888888) - Attempt 3/3
Failed to process point 513 (88.888888, 88.888888) after 3 attempts
Successfully processed 6 out of 8 missing points
Updated data saved to data/auxiliary_data_results_full_updated.parquet (2805 total points)



