In [1]:
# Geocoding: 
#     - Transform SAR coordinates to map coords
#     - SAR image, orbit info, ellipsoid
#     - Map-projected image
#     - Coordinate system change

# Terrain Correction: 
#     - Correct distortions due to terrain
#     - SAR image + DEM + orbit data
#     - Geometrically accurate map-projected image
#     - Topographic distortion correction

# forward geocoding projects SAR image data onto a map projection, 
# while backward geocoding transforms data from a map projection to the SAR sensor's geometry

In [2]:
from pysarflow import Sentinel1GRDProcessor, parse_beta_lut
import matplotlib.pyplot as plt

import os
os.chdir("/Users/rabinatwayana/Rabina/CDE II/software_development/py_sar_flow/")
!pwd

processor = Sentinel1GRDProcessor()
# Testing read_grd_data 
# safe_path = r'C:\Users\Ethel Ogallo\Documents\CDE\PLUS\SS25\practice_softwaredev\pysarflow\docs\ethel_temp\S1C_IW_GRDH_1SDV_20250527T181900_20250527T181925_002520_0053F6_70DB_COG.zip'
# if reading zip file
zip_safe_path = "docs/data/S1A_IW_GRDH_1SDV_20241209T015852_20241209T015917_056909_06FD49_AE78.SAFE"
# zip_safe_path = "docs/data/S1B_IW_GRDH_1SDV_20211223T051122_20211223T051147_030148_039993_5371.SAFE"
safe_extract_path= "docs/data/"
ds = processor.read_grd_data(zip_safe_path,safe_extract_path)

radiometric_calibrated_ds=processor.radiometric_calibration(zip_safe_path,ds,"sigmaNought")



/Users/rabinatwayana/Rabina/CDE II/software_development/py_sar_flow
Loading band VV from docs/data/S1A_IW_GRDH_1SDV_20241209T015852_20241209T015917_056909_06FD49_AE78.SAFE/measurement/s1a-iw-grd-vv-20241209t015852-20241209t015917-056909-06fd49-001.tiff
Loading band VH from docs/data/S1A_IW_GRDH_1SDV_20241209T015852_20241209T015917_056909_06FD49_AE78.SAFE/measurement/s1a-iw-grd-vh-20241209t015852-20241209t015917-056909-06fd49-002.tiff
Data loaded successfully
Reading calibration for VV band
Reading calibration for VH band
Radiometric calibration LUT created successfully
Radiometric calibration completed successfully


In [3]:
# https://registry.opendata.aws/terrain-tiles/
# https://www.mapzen.com/blog/terrain-tile-service/ #skadi is unprojected latlon

import os
import gzip
import shutil
import requests
import numpy as np
import rasterio
from rasterio.merge import merge
from rasterio.mask import mask
from shapely.geometry import box
import tempfile


def get_tile_names(lon_min, lat_min, lon_max, lat_max):
    lons = range(int(np.floor(lon_min)), int(np.ceil(lon_max)))
    lats = range(int(np.floor(lat_min)), int(np.ceil(lat_max)))
    tiles = []
    for lat in lats:
        for lon in lons:
            ns = "N" if lat >= 0 else "S"
            ew = "E" if lon >= 0 else "W"
            tile = f"{ns}{abs(lat):02d}{ew}{abs(lon):03d}"
            tiles.append(tile)
    return tiles

def download_and_unzip_tile(tile_name, dest_dir):
    url = f"https://s3.amazonaws.com/elevation-tiles-prod/skadi/{tile_name[:3]}/{tile_name}.hgt.gz"
    gz_path = os.path.join(dest_dir, f"{tile_name}.hgt.gz")
    hgt_path = os.path.join(dest_dir, f"{tile_name}.hgt")

    if not os.path.exists(hgt_path):
        print(f"Downloading {tile_name}...")
        r = requests.get(url, stream=True)
        if r.status_code != 200:
            print(f"Tile {tile_name} not found!")
            return None
        with open(gz_path, 'wb') as f:
            shutil.copyfileobj(r.raw, f)
        with gzip.open(gz_path, 'rb') as f_in:
            with open(hgt_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        print(f"{tile_name} downloaded and extracted.")
    return hgt_path

def hgt_to_raster(hgt_path, tile_name):
    lat = int(tile_name[1:3]) * (1 if tile_name[0] == 'N' else -1)
    lon = int(tile_name[4:7]) * (1 if tile_name[3] == 'E' else -1)
    data = np.fromfile(hgt_path, dtype='>i2')
    size = int(np.sqrt(data.size))
    data = data.reshape((size, size))
    transform = rasterio.transform.from_origin(lon, lat + 1, 1 / (size - 1), 1 / (size - 1))
    
    profile = {
        'driver': 'GTiff',
        'dtype': 'int16',
        'nodata': -32768,
        'width': size,
        'height': size,
        'count': 1,
        'crs': 'EPSG:4326',
        'transform': transform
    }

    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.tif')
    with rasterio.open(temp_file.name, 'w', **profile) as dst:
        dst.write(data, 1)

    return temp_file.name

def download_dem(lon_min, lat_min, lon_max, lat_max, output_dem_path):
    # Main workflow
    temp_dir = tempfile.mkdtemp()
    tile_names = get_tile_names(lon_min, lat_min, lon_max, lat_max)

    raster_paths = []
    for tile in tile_names:
        hgt_file = download_and_unzip_tile(tile, temp_dir)
        if hgt_file:
            raster_path = hgt_to_raster(hgt_file, tile)
            raster_paths.append(raster_path)

    # Merge tiles
    srcs = [rasterio.open(p) for p in raster_paths]
    mosaic, out_trans = merge(srcs)

    # Clip to bbox
    bbox = box(lon_min, lat_min, lon_max, lat_max)
    geo = [bbox.__geo_interface__]
    out_image, out_transform = mask(dataset=rasterio.open(raster_paths[0]), shapes=geo, crop=True)

    # Update metadata
    out_meta = srcs[0].meta.copy()
    out_meta.update({
        "driver": "GTiff",
        "height": out_image.shape[1],
        "width": out_image.shape[2],
        "transform": out_transform,
        "compress": "deflate",
        "predictor": 2,
        "zlevel": 9,
    })

    # Save clipped DEM
    # dem_clipped_path = "dem.tif"
    with rasterio.open(output_dem_path, "w", **out_meta) as dest:
        dest.write(out_image)

    print(f"\nDEM clipped and saved to {output_dem_path}")



In [4]:
!pwd

/Users/rabinatwayana/Rabina/CDE II/software_development/py_sar_flow


In [5]:
import xarray as xr
from typing import Any

def open_dem_raster(dem_urlpath):
    dem_raster = xr.open_dataarray(dem_urlpath)
    if dem_raster.y.diff("y").values[0] < 0:
        dem_raster = dem_raster.isel(y=slice(None, None, -1))
    dem_raster.attrs["long_name"] = "elevation"
    dem_raster.attrs["units"] = "m"
    dem_raster = dem_raster.rename("dem").squeeze(drop=True)
    return dem_raster

dem_raster=open_dem_raster("docs/examples/dem.tif")


In [6]:
'''
This code provides a pipeline to convert a 2D DEM raster into a 3D geospatial representation in ECEF (Earth-Centered, Earth-Fixed) coordinates, and here's why it is necessary:

🧭 Why convert a DEM to 3D ECEF coordinates?
True 3D Geolocation:

A regular DEM provides elevation values over a 2D grid (usually in a local or projected CRS like UTM).
But for many geodetic, global modeling, or 3D visualization tasks (e.g., in satellite systems, globe rendering, or simulations), you need positions in a global 3D Cartesian reference system like ECEF (EPSG:4978).

Interoperability with 3D Systems:
ECEF coordinates are used in satellite navigation (GPS), Earth observation, and 3D engines.
Converting to ECEF enables seamless integration with tools that require 3D Cartesian input, like Cesium, 3D GIS, or custom simulations.
Accurate Distance and Direction Calculations:
In ECEF space, you can compute geocentric distances, angles, and vectors without projection distortion.

This is crucial in orbital mechanics, remote sensing, and sensor fusion applications.

🧠 Step-by-step breakdown of the code
✅ make_nd_dataarray(...)
Creates a stacked 3D array from multiple 2D DataArrays (x, y, and elevation), along a new dimension (typically "axis"):

Output has shape: [3, height, width], where:

Axis 0 = x (longitude or easting)

Axis 1 = y (latitude or northing)

Axis 2 = z (elevation)

This converts the scalar elevation field into a 3D field representing full spatial coordinates.

✅ convert_to_dem_3d(...)
Uses broadcasting to expand x and y coordinates to match the shape of the DEM grid.

Assembles [x, y, z] components into one stacked DataArray.

Prepares the data for transformation into another coordinate system.

✅ transform_dem_3d(...)
Transforms [x, y, z] from the original CRS (e.g., EPSG:32633 or EPSG:4326) into ECEF (EPSG:4978) using rasterio.warp.transform.
Handles flattening and reshaping of the data.
Outputs a new [3, height, width] DataArray in ECEF coordinates.

✅ convert_to_dem_ecef(...)
A wrapper that combines:

convert_to_dem_3d → builds [x, y, z] array in original CRS

transform_dem_3d → transforms it into ECEF

'''

from rasterio import warp
ECEF_CRS = "EPSG:4978"
import numpy as np

def make_nd_dataarray(das: list[xr.DataArray], dim: str = "axis") -> xr.DataArray:
    da_nd = xr.concat(das, dim=dim, coords="minimal")
    dim_attrs = {"long_name": "cartesian axis index", "units": 1}
    return da_nd.assign_coords({dim: (dim, range(len(das)), dim_attrs)})

def convert_to_dem_3d(
    dem_raster: xr.DataArray,
    dim: str = "axis",
    x: str = "x",
    y: str = "y",
    dtype: str = "float64",
) -> xr.DataArray:
    _, dem_raster_x = xr.broadcast(dem_raster, dem_raster.coords[x].astype(dtype))
    dem_raster_y = dem_raster.coords[y].astype(dtype)
    dem_raster_astype = dem_raster.astype(dtype)
    dem_3d = make_nd_dataarray([dem_raster_x, dem_raster_y, dem_raster_astype], dim=dim)
    dem_3d.attrs.clear()
    dem_3d.attrs.update(dem_raster.attrs)
    return dem_3d.rename("dem_3d")


def transform_dem_3d(
    dem_3d: xr.DataArray,
    source_crs: str | None = None,
    target_crs: str = ECEF_CRS,
    dim: str = "axis",
) -> xr.DataArray:
    if source_crs is None:
        source_crs = dem_3d.rio.crs
    try:
        x, y, z = warp.transform(
            source_crs,
            target_crs,
            dem_3d.sel({dim: 0}).values.flat,
            dem_3d.sel({dim: 1}).values.flat,
            dem_3d.sel({dim: 2}).values.flat,
        )
    except Exception:
        # HACK: the very first call to warp.transform sometimes fails
        # LOGGER.warn("rasterio.warp.transform failed, retrying...")
        x, y, z = warp.transform(
            source_crs,
            target_crs,
            dem_3d.sel({dim: 0}).values.flat,
            dem_3d.sel({dim: 1}).values.flat,
            dem_3d.sel({dim: 2}).values.flat,
        )
    dem_3d_crs: xr.DataArray = xr.zeros_like(dem_3d)
    shape = dem_3d_crs.loc[{dim: 0}].shape
    dem_3d_crs.loc[{dim: 0}] = np.reshape(x, shape)
    dem_3d_crs.loc[{dim: 1}] = np.reshape(y, shape)
    dem_3d_crs.loc[{dim: 2}] = np.reshape(z, shape)
    return dem_3d_crs

def convert_to_dem_ecef(
    dem_raster: xr.DataArray, x: str = "x", y: str = "y", **kwargs: Any
) -> xr.DataArray:
    dem_3d = convert_to_dem_3d(dem_raster, x=x, y=y)
    return transform_dem_3d(dem_3d, **kwargs)

dem_ecef= convert_to_dem_ecef(dem_raster)

template_raster = dem_ecef.isel(axis=0).drop_vars(["axis", "spatial_ref"]) * 0.0


In [7]:
import xml.etree.ElementTree as ET
from pathlib import Path


import xml.etree.ElementTree as ET
import numpy as np
import xarray as xr

def open_orbit_state_vectors(xml_path: str) -> xr.DataArray:
    tree = ET.parse(xml_path)
    root = tree.getroot()

    # Find all <position> elements (adjust tag path if necessary)
    position_elements = root.findall(".//position")

    # Lists to store extracted values
    x_vals, y_vals, z_vals, times = [], [], [], []

    for pos in position_elements:
        x = float(pos.findtext("x"))
        y = float(pos.findtext("y"))
        z = float(pos.findtext("z"))

        time_tag = pos.find("time")  # optional, if <time> exists
        if time_tag is not None:
            times.append(np.datetime64(time_tag.text))
        else:
            times.append(None)

        x_vals.append(x)
        y_vals.append(y)
        z_vals.append(z)

    # Create axis-wise array: (time, axis)
    data = np.vstack([x_vals, y_vals, z_vals]).T

    # Build DataArray
    coords = {
        "axis": ["x", "y", "z"],
        "azimuth_time": times if times[0] is not None else np.arange(len(x_vals)),
    }

    position_da = xr.DataArray(
        data,
        dims=("azimuth_time", "axis"),
        coords=coords,
        name="position"
    )

    return position_da


position= open_orbit_state_vectors("docs/data/S1A_IW_GRDH_1SDV_20241209T015852_20241209T015917_056909_06FD49_AE78.SAFE/annotation/s1a-iw-grd-vh-20241209t015852-20241209t015917-056909-06fd49-002_updated.xml")


In [8]:
position

In [9]:
import numpy as np
import xarray as xr
def polyder(coefficients: xr.DataArray) -> xr.DataArray:
    # TODO: raise if "degree" coord is not decreasing
    derivative_coefficients = coefficients.isel(degree=slice(1, None)).copy()
    for degree in coefficients.coords["degree"].values[:-1]:
        derivative_coefficients.loc[{"degree": degree - 1}] = (
            coefficients.loc[{"degree": degree}] * degree
        )
    return derivative_coefficients

def orb_poly_fit_from_position(
    position: xr.DataArray,
    dim: str = "azimuth_time",
    deg: int = 5,
    epoch: np.datetime64 = None,
    interval: tuple[np.datetime64, np.datetime64] = None,
):
    time = position.coords[dim]

    # Set epoch to the midpoint of time range if not provided
    if epoch is None:
        start = time.values[0].astype("datetime64[ns]")
        end = time.values[-1].astype("datetime64[ns]")
        epoch = start + (end - start) // 2
        # epoch = time.values[0] + (time.values[-1] - time.values[0]) / 2

    # Set interpolation interval if not provided
    if interval is None:
        interval = (time.values[0], time.values[-1])

    # Convert time to seconds since epoch (safely)
    orbit_time_vals = (
        (time.values.astype("datetime64[ns]") - epoch.astype("datetime64[ns]"))
        / np.timedelta64(1, "s")
    ).astype("float64")

    # Assign the new numeric time coordinate
    orbit_time = xr.DataArray(orbit_time_vals, coords={dim: time}, dims=dim)
    data = position.assign_coords({dim: orbit_time})

    # Fit polynomial for each axis (x/y/z)
    polyfit_results = data.polyfit(dim=dim, deg=deg)

    return {
        "coefficients": polyfit_results.polyfit_coefficients,
        "epoch": epoch,
        "interval": interval,
    }

# Assume `position` is a DataArray with coords like azimuth_time and axis (x/y/z)
orbit_interpolator = orb_poly_fit_from_position(position)

coefficients=orbit_interpolator["coefficients"]
velocity_coefficients=polyder(coefficients)
epoch= orbit_interpolator["epoch"]


print(orbit_interpolator["coefficients"])
print("Epoch:", orbit_interpolator["epoch"])
print("Valid interval:", orbit_interpolator["interval"])


<xarray.DataArray 'polyfit_coefficients' (degree: 6, axis: 3)> Size: 144B
array([[-6.79253857e+32,  2.80983471e+33,  4.30702494e+33],
       [ 1.26630472e+27, -3.70734756e+27,  2.65483220e+27],
       [ 1.27249839e+22, -5.70023956e+22, -8.72813139e+22],
       [-1.86941015e+16,  4.93048076e+16, -3.54855470e+16],
       [-4.34705250e+10,  2.11963260e+11,  3.23248314e+11],
       [ 2.88420694e+04, -6.91790330e+04,  3.78346625e+04]])
Coordinates:
  * degree   (degree) int64 48B 5 4 3 2 1 0
  * axis     (axis) <U1 12B 'x' 'y' 'z'
Epoch: 1970-01-01T00:00:00.000004680
Valid interval: (np.int64(0), np.int64(9360))


# Start from here

In [10]:
"""Reference "Guide to Sentinel-1 Geocoding" UZH-S1-GC-AD 1.10 26.03.2019.

See: https://sentinel.esa.int/documents/247904/0/Guide-to-Sentinel-1-Geocoding.pdf/e0450150-b4e9-4b2d-9b32-dadf989d3bd3
"""

import functools
from typing import Any, Callable, TypeVar

import numpy as np
import numpy.typing as npt
import xarray as xr

# from . import orbit

ArrayLike = TypeVar("ArrayLike", bound=npt.ArrayLike)
FloatArrayLike = TypeVar("FloatArrayLike", bound=npt.ArrayLike)


def secant_method(
    ufunc: Callable[[ArrayLike], tuple[FloatArrayLike, Any]],
    t_prev: ArrayLike,
    t_curr: ArrayLike,
    diff_ufunc: float = 1.0,
    diff_t: Any = 1e-6,
    maxiter: int = 10,
) -> tuple[ArrayLike, ArrayLike, FloatArrayLike, int, Any]:
    """Return the root of ufunc calculated using the secant method."""
    # implementation modified from https://en.wikipedia.org/wiki/Secant_method
    f_prev, _ = ufunc(t_prev)

    # strong convergence, all points below one of the two thresholds
    for k in range(maxiter):
        f_curr, payload_curr = ufunc(t_curr)

        # print(f"{f_curr / 7500}")

        # the `not np.any` construct let us accept `np.nan` as good values
        if not np.any((np.abs(f_curr) > diff_ufunc)):
            break

        t_diff = t_curr - t_prev  # type: ignore

        # the `not np.any` construct let us accept `np.nat` as good values
        if not np.any(np.abs(t_diff) > diff_t):
            break

        q = f_curr - f_prev  # type: ignore

        # NOTE: in same cases f_curr * t_diff overflows datetime64[ns] before the division by q
        t_prev, t_curr = t_curr, t_curr - np.where(q != 0, f_curr / q, 0) * t_diff  # type: ignore
        f_prev = f_curr

    return t_curr, t_prev, f_curr, k, payload_curr


def newton_raphson_method(
    ufunc: Callable[[ArrayLike], tuple[FloatArrayLike, Any]],
    ufunc_prime: Callable[[ArrayLike, Any], FloatArrayLike],
    t_curr: ArrayLike,
    diff_ufunc: float = 1.0,
    diff_t: Any = 1e-6,
    maxiter: int = 10,
) -> tuple[ArrayLike, FloatArrayLike, int, Any]:
    """Return the root of ufunc calculated using the Newton method."""
    # implementation based on https://en.wikipedia.org/wiki/Newton%27s_method
    # strong convergence, all points below one of the two thresholds
    for k in range(maxiter):
        f_curr, payload_curr = ufunc(t_curr)

        # print(f"{f_curr / 7500}")

        # the `not np.any` construct let us accept `np.nan` as good values
        if not np.any((np.abs(f_curr) > diff_ufunc)):
            break

        fp_curr = ufunc_prime(t_curr, payload_curr)

        t_diff = f_curr / fp_curr  # type: ignore

        # the `not np.any` construct let us accept `np.nat` as good values
        if not np.any(np.abs(t_diff) > diff_t):
            break

        t_curr = t_curr - t_diff  # type: ignore

    return t_curr, f_curr, k, payload_curr

def position_from_orbit_time(orbit_time: xr.DataArray) -> xr.DataArray:
        position = xr.polyval(orbit_time, coefficients)
        return position.rename("position")

def velocity_from_orbit_time(orbit_time: xr.DataArray) -> xr.DataArray:
        velocity = xr.polyval(orbit_time, velocity_coefficients)
        return velocity.rename("velocity")

def zero_doppler_plane_distance_velocity(
    dem_ecef: xr.DataArray,
    orbit_interpolator,
    orbit_time: xr.DataArray,
    dim: str = "axis",
) -> tuple[xr.DataArray, tuple[xr.DataArray, xr.DataArray]]:
    dem_distance = dem_ecef - position_from_orbit_time(orbit_time)
    satellite_velocity = velocity_from_orbit_time(orbit_time)
    plane_distance_velocity = (dem_distance * satellite_velocity).sum(dim, skipna=False)
    return plane_distance_velocity, (dem_distance, satellite_velocity)


def zero_doppler_plane_distance_velocity_prime(
    orbit_interpolator,
    orbit_time: xr.DataArray,
    payload: tuple[xr.DataArray, xr.DataArray],
    dim: str = "axis",
) -> xr.DataArray:
    dem_distance, satellite_velocity = payload

    plane_distance_velocity_prime = (
        dem_distance * orbit_interpolator.acceleration_from_orbit_time(orbit_time)
        - satellite_velocity**2
    ).sum(dim)
    return plane_distance_velocity_prime


def backward_geocode_simple(
    dem_ecef: xr.DataArray,
    orbit_interpolator,
    orbit_time_guess: xr.DataArray | float = 0.0,
    dim: str = "axis",
    zero_doppler_distance: float = 1.0,
    satellite_speed: float = 7_500.0,
    method: str = "secant",
    orbit_time_prev_shift: float = -0.1,
    maxiter: int = 10,
) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]:
    diff_ufunc = zero_doppler_distance * satellite_speed

    zero_doppler = functools.partial(
        zero_doppler_plane_distance_velocity, dem_ecef, orbit_interpolator
    )

    if isinstance(orbit_time_guess, xr.DataArray):
        pass
    else:
        t_template = dem_ecef.isel({dim: 0}).drop_vars(dim).rename("azimuth_time")
        orbit_time_guess = xr.full_like(
            t_template,
            orbit_time_guess,
            dtype="float64",
        )

    if method == "secant":
        orbit_time_guess_prev = orbit_time_guess + orbit_time_prev_shift
        orbit_time, _, _, k, (dem_distance, satellite_velocity) = secant_method(
            zero_doppler,
            orbit_time_guess_prev,
            orbit_time_guess,
            diff_ufunc,
            maxiter=maxiter,
        )
    elif method in {"newton", "newton_raphson"}:
        zero_doppler_prime = functools.partial(
            zero_doppler_plane_distance_velocity_prime, orbit_interpolator
        )
        orbit_time, _, k, (dem_distance, satellite_velocity) = newton_raphson_method(
            zero_doppler,
            zero_doppler_prime,
            orbit_time_guess,
            diff_ufunc,
            maxiter=maxiter,
        )
    # print(f"iterations: {k}")
    return orbit_time, dem_distance, satellite_velocity

S_TO_NS = 10**9
# def orbit_time_to_azimuth_time(
#     orbit_time: xr.DataArray, epoch: np.datetime64
# ) -> xr.DataArray:
#     azimuth_time = orbit_time * np.timedelta64(S_TO_NS, "ns") + epoch
#     return azimuth_time.rename("azimuth_time")

def orbit_time_to_azimuth_time(orbit_time: xr.DataArray, epoch: np.datetime64) -> xr.DataArray:
    # Convert float seconds to int64 nanoseconds for timedelta64[ns]
    time_deltas = xr.apply_ufunc(
        lambda x: x.astype("timedelta64[ns]"),
        (orbit_time * 1e9).astype("int64")
    )
    print(type(time_deltas), type(epoch),"**************************")

    azimuth_time = epoch + time_deltas
    return azimuth_time.rename("azimuth_time")



def backward_geocode(
    dem_ecef: xr.DataArray,
    orbit_interpolator,
    orbit_time_guess: xr.DataArray | float = 0.0,
    dim: str = "axis",
    zero_doppler_distance: float = 1.0,
    satellite_speed: float = 7_500.0,
    method: str = "newton",
    seed_step: tuple[int, int] | None = None,
    maxiter: int = 10,
    maxiter_after_seed: int = 1,
    orbit_time_prev_shift: float = -0.1,
) -> xr.Dataset:
    if seed_step is not None:
        dem_ecef_seed = dem_ecef.isel(
            y=slice(seed_step[0] // 2, None, seed_step[0]),
            x=slice(seed_step[1] // 2, None, seed_step[1]),
        )
        orbit_time_seed, _, _ = backward_geocode_simple(
            dem_ecef_seed,
            orbit_interpolator,
            orbit_time_guess,
            dim,
            zero_doppler_distance,
            satellite_speed,
            method,
            orbit_time_prev_shift=orbit_time_prev_shift,
        )
        orbit_time_guess = orbit_time_seed.interp_like(
            dem_ecef.sel(axis=0), kwargs={"fill_value": "extrapolate"}
        )
        maxiter = maxiter_after_seed

    orbit_time, dem_distance, satellite_velocity = backward_geocode_simple(
        dem_ecef,
        orbit_interpolator,
        orbit_time_guess,
        dim,
        zero_doppler_distance,
        satellite_speed,
        method,
        maxiter=maxiter,
        orbit_time_prev_shift=orbit_time_prev_shift,
    )

    acquisition = xr.Dataset(
        data_vars={
            "azimuth_time": orbit_time_to_azimuth_time(orbit_time, epoch),
            "dem_distance": dem_distance,
            "satellite_velocity": satellite_velocity.transpose(*dem_distance.dims),
        }
    )
    return acquisition


In [11]:
import logging
from unittest import mock

import flox.xarray
import numpy as np
import xarray as xr

# from . import scene

logger = logging.getLogger(__name__)

ONE_SECOND = np.timedelta64(10**9, "ns")


def sum_weights(
    initial_weights: xr.DataArray,
    azimuth_index: xr.DataArray,
    slant_range_index: xr.DataArray,
    multilook: tuple[int, int] | None = None,
) -> xr.DataArray:
    geocoded = initial_weights.assign_coords(
        slant_range_index=slant_range_index, azimuth_index=azimuth_index
    )

    flat_sum: xr.DataArray = flox.xarray.xarray_reduce(
        geocoded,
        geocoded.slant_range_index,
        geocoded.azimuth_index,
        func="sum",
        method="map-reduce",
    )

    if multilook:
        flat_sum = flat_sum.rolling(
            azimuth_index=multilook[0],
            slant_range_index=multilook[1],
            center=True,
            min_periods=multilook[0] * multilook[1] // 2 + 1,
        ).mean()

    with mock.patch("xarray.core.missing._localize", lambda o, i: (o, i)):
        weights_sum = flat_sum.interp(
            slant_range_index=slant_range_index,
            azimuth_index=azimuth_index,
            method="nearest",
        )

    return weights_sum


def compute_dem_oriented_area(dem_ecef: xr.DataArray) -> xr.DataArray:
    x_corners: npt.ArrayLike = np.concatenate(
        [
            [dem_ecef.x[0] + (dem_ecef.x[0] - dem_ecef.x[1]) / 2],
            ((dem_ecef.x.shift(x=-1) + dem_ecef.x) / 2)[:-1].data,
            [dem_ecef.x[-1] + (dem_ecef.x[-1] - dem_ecef.x[-2]) / 2],
        ]
    )
    y_corners: npt.ArrayLike = np.concatenate(
        [
            [dem_ecef.y[0] + (dem_ecef.y[0] - dem_ecef.y[1]) / 2],
            ((dem_ecef.y.shift(y=-1) + dem_ecef.y) / 2)[:-1].data,
            [dem_ecef.y[-1] + (dem_ecef.y[-1] - dem_ecef.y[-2]) / 2],
        ]
    )
    dem_ecef_corners = dem_ecef.interp(
        {"x": x_corners, "y": y_corners},
        method="linear",
        kwargs={"fill_value": "extrapolate"},
    )

    dx = dem_ecef_corners.diff("x", 1)
    dy = dem_ecef_corners.diff("y", 1)

    dx1 = dx.isel(y=slice(1, None)).assign_coords(dem_ecef.coords)
    dy1 = dy.isel(x=slice(1, None)).assign_coords(dem_ecef.coords)
    dx2 = dx.isel(y=slice(None, -1)).assign_coords(dem_ecef.coords)
    dy2 = dy.isel(x=slice(None, -1)).assign_coords(dem_ecef.coords)

    cross_1 = xr.cross(dx1, dy1, dim="axis") / 2
    sign_1 = np.sign(
        xr.dot(cross_1, dem_ecef, dim="axis")
    )  # ensure direction out of DEM

    cross_2 = xr.cross(dx2, dy2, dim="axis") / 2
    sign_2 = np.sign(
        xr.dot(cross_2, dem_ecef, dim="axis")
    )  # ensure direction out of DEM
    dem_oriented_area: xr.DataArray = cross_1 * sign_1 + cross_2 * sign_2

    return dem_oriented_area.rename("dem_oriented_area")


def compute_gamma_area(
    dem_ecef: xr.DataArray,
    dem_direction: xr.DataArray,
) -> xr.DataArray:
    dem_oriented_area = compute_dem_oriented_area(dem_ecef)
    gamma_area: xr.DataArray = xr.dot(dem_oriented_area, -dem_direction, dim="axis")
    gamma_area = gamma_area.where(gamma_area > 0, 0)
    return gamma_area


def gamma_weights_bilinear(
    dem_coords: xr.Dataset,
    slant_range_time0: float,
    azimuth_time0: np.datetime64,
    slant_range_time_interval_s: float,
    azimuth_time_interval_s: float,
    slant_range_spacing_m: float = 1.0,
    azimuth_spacing_m: float = 1.0,
) -> xr.DataArray:
    # compute dem image coordinates
    azimuth_index = ((dem_coords.azimuth_time - azimuth_time0) / ONE_SECOND) / (
        azimuth_time_interval_s
    )

    slant_range_index = (dem_coords.slant_range_time - slant_range_time0) / (
        slant_range_time_interval_s
    )

    slant_range_index_0 = np.floor(slant_range_index).astype(int).compute()
    slant_range_index_1 = np.ceil(slant_range_index).astype(int).compute()
    azimuth_index_0 = np.floor(azimuth_index).astype(int).compute()
    azimuth_index_1 = np.ceil(azimuth_index).astype(int).compute()

    logger.info("compute gamma areas 1/4")
    w_00 = abs(
        (azimuth_index_1 - azimuth_index) * (slant_range_index_1 - slant_range_index)
    )
    tot_area_00 = sum_weights(
        dem_coords["gamma_area"] * w_00,
        azimuth_index=azimuth_index_0,
        slant_range_index=slant_range_index_0,
    )

    logger.info("compute gamma areas 2/4")
    w_01 = abs(
        (azimuth_index_1 - azimuth_index) * (slant_range_index_0 - slant_range_index)
    )
    tot_area_01 = sum_weights(
        dem_coords["gamma_area"] * w_01,
        azimuth_index=azimuth_index_0,
        slant_range_index=slant_range_index_1,
    )

    logger.info("compute gamma areas 3/4")
    w_10 = abs(
        (azimuth_index_0 - azimuth_index) * (slant_range_index_1 - slant_range_index)
    )
    tot_area_10 = sum_weights(
        dem_coords["gamma_area"] * w_10,
        azimuth_index=azimuth_index_1,
        slant_range_index=slant_range_index_0,
    )

    logger.info("compute gamma areas 4/4")
    w_11 = abs(
        (azimuth_index_0 - azimuth_index) * (slant_range_index_0 - slant_range_index)
    )
    tot_area_11 = sum_weights(
        dem_coords["gamma_area"] * w_11,
        azimuth_index=azimuth_index_1,
        slant_range_index=slant_range_index_1,
    )

    tot_area = tot_area_00 + tot_area_01 + tot_area_10 + tot_area_11

    normalized_area = tot_area / (azimuth_spacing_m * slant_range_spacing_m)
    return normalized_area


def gamma_weights_nearest(
    dem_coords: xr.Dataset,
    slant_range_time0: float,
    azimuth_time0: np.datetime64,
    slant_range_time_interval_s: float,
    azimuth_time_interval_s: float,
    slant_range_spacing_m: float = 1.0,
    azimuth_spacing_m: float = 1.0,
) -> xr.DataArray:
    # compute dem image coordinates
    azimuth_index = np.round(
        (dem_coords.azimuth_time - azimuth_time0) / ONE_SECOND / azimuth_time_interval_s
    ).astype(int)

    slant_range_index = np.round(
        (dem_coords.slant_range_time - slant_range_time0) / slant_range_time_interval_s
    ).astype(int)

    logger.info("compute gamma areas 1/1")

    tot_area = sum_weights(
        dem_coords["gamma_area"],
        azimuth_index=azimuth_index,
        slant_range_index=slant_range_index,
    )

    normalized_area = tot_area / (azimuth_spacing_m * slant_range_spacing_m)
    return normalized_area


In [12]:
def make_simulate_acquisition_template(
    template_raster: xr.DataArray,
    correct_radiometry: str | None = None,
) -> xr.Dataset:
    acquisition_template = xr.Dataset(
        data_vars={
            "slant_range_time": template_raster,
            "azimuth_time": template_raster.astype("datetime64[ns]"),
        }
    )
    include_variables = {"slant_range_time", "azimuth_time"}
    if correct_radiometry is not None:
        acquisition_template["gamma_area"] = template_raster
        include_variables.add("gamma_area")

    return acquisition_template



SPEED_OF_LIGHT = 299_792_458.0  # m / s
def simulate_acquisition(
    dem_ecef: xr.DataArray,
    orbit_interpolator,
    include_variables, #: Container[str] = ()
    azimuth_time: xr.DataArray | float = 0.0,
    **kwargs: Any,
) -> xr.Dataset:
    """Compute the image coordinates of the DEM given the satellite orbit."""
    acquisition = backward_geocode(
        dem_ecef, orbit_interpolator, azimuth_time, **kwargs
    )

    slant_range = (acquisition.dem_distance**2).sum(dim="axis") ** 0.5
    slant_range_time = 2.0 / SPEED_OF_LIGHT * slant_range

    acquisition["slant_range_time"] = slant_range_time

    if include_variables and "gamma_area" in include_variables:
        gamma_area = compute_gamma_area(
            dem_ecef, acquisition.dem_distance / slant_range
        )
        acquisition["gamma_area"] = gamma_area

    for data_var_name in acquisition.data_vars:
        if include_variables and data_var_name not in include_variables:
            acquisition = acquisition.drop_vars(data_var_name)  # type: ignore

    # drop coordinates that are not associated with any data variable
    for coord_name in acquisition.coords:
        if all(coord_name not in dv.coords for dv in acquisition.data_vars.values()):
            acquisition = acquisition.drop_vars(coord_name)  # type: ignore

    return acquisition

# def map_simulate_acquisition(
#     dem_ecef: xr.DataArray,
#     orbit_interpolator,
#     template_raster: xr.DataArray | None = None,
#     correct_radiometry: str | None = None,
#     **kwargs: Any,
# ) -> xr.Dataset:
#     if template_raster is None:
#         template_raster = dem_ecef.isel(axis=0).drop_vars(["axis", "spatial_ref"]) * 0.0
#     acquisition_template = make_simulate_acquisition_template(
#         template_raster, correct_radiometry
#     )
#     acquisition = xr.map_blocks(
#         simulate_acquisition,
#         dem_ecef.drop_vars("spatial_ref"),
#         kwargs={
#             "orbit_interpolator": orbit_interpolator,
#             "include_variables": list(acquisition_template.data_vars),
#         }
#         | kwargs,
#         template=acquisition_template,
#     )
#     return acquisition

def map_simulate_acquisition(
    dem_ecef: xr.DataArray,
    orbit_interpolator,
    template_raster: xr.DataArray | None = None,
    correct_radiometry: str | None = None,
    **kwargs: Any,
) -> xr.Dataset:
    if template_raster is None:
        template_raster = dem_ecef.isel(axis=0).drop_vars(["axis", "spatial_ref"]) * 0.0

    acquisition_template = make_simulate_acquisition_template(
        template_raster, correct_radiometry
    )
    include_variables = list(acquisition_template.data_vars)

    # Get all index values for the 'axis' dimension
    axis_dim = "axis" if "axis" in dem_ecef.dims else dem_ecef.dims[0]
    chunks = []
    
    for i in range(dem_ecef.sizes[axis_dim]):
        # Select a single chunk along the axis
        dem_chunk = dem_ecef.isel({axis_dim: i})

        # Add the axis dimension back to preserve structure
        dem_chunk = dem_chunk.expand_dims({axis_dim: [dem_ecef[axis_dim].values[i]]})

        acquisition = simulate_acquisition(
            dem_chunk,
            orbit_interpolator=orbit_interpolator,
            include_variables=include_variables,
            **kwargs
        )
        chunks.append(acquisition)

    # Concatenate all chunk results along the 'axis' dimension
    acquisition_all = xr.concat(chunks, dim=axis_dim)

    return acquisition_all




acquisition = map_simulate_acquisition(
        dem_ecef,
        orbit_interpolator,
        correct_radiometry=None,
        seed_step=None,
    )

<class 'xarray.core.dataarray.DataArray'> <class 'numpy.datetime64'> **************************
<class 'xarray.core.dataarray.DataArray'> <class 'numpy.datetime64'> **************************
<class 'xarray.core.dataarray.DataArray'> <class 'numpy.datetime64'> **************************


In [13]:
!pwd

/Users/rabinatwayana/Rabina/CDE II/software_development/py_sar_flow


In [14]:
# def parse_tag_as_list(
#     xml_path: PathOrFileType,
#     query: str,
#     schema_type: str = "annotation",
#     validation: str = "skip",
# ) -> list[dict[str, Any]]:
#     schema = cached_sentinel1_schemas(schema_type)
#     xml_tree = ElementTree.parse(xml_path)
#     tag: Any = schema.decode(xml_tree, query, validation=validation)
#     if tag is None:
#         tag = []
#     elif isinstance(tag, dict):
#         tag = [tag]
#     tag_list: list[dict[str, Any]] = tag
#     assert isinstance(tag_list, list), f"{type(tag_list)} is not list"
#     return tag_list


# def open_coordinate_conversion_dataset(
#     annotation_path, attrs: dict[str, Any] = {}
# ) -> xr.Dataset:
#     coordinate_conversion = parse_tag_as_list(
#         annotation_path, ".//coordinateConversionList/coordinateConversion"
#     )
#     if len(coordinate_conversion) == 0:
#         raise TypeError("coordinateConversion tag not present in annotations")

#     gr0 = []
#     sr0 = []
#     azimuth_time = []
#     slant_range_time = []
#     srgrCoefficients: list[list[float]] = []
#     grsrCoefficients: list[list[float]] = []
#     for values in coordinate_conversion:
#         sr0.append(values["sr0"])
#         gr0.append(values["gr0"])
#         azimuth_time.append(values["azimuthTime"])
#         slant_range_time.append(values["slantRangeTime"])
#         srgrCoefficients.append(
#             [float(v) for v in values["srgrCoefficients"]["$"].split()]
#         )
#         grsrCoefficients.append(
#             [float(v) for v in values["grsrCoefficients"]["$"].split()]
#         )

#     coords: dict[str, Any] = {}
#     data_vars: dict[str, Any] = {}
#     coords["azimuth_time"] = [np.datetime64(dt, "ns") for dt in azimuth_time]
#     coords["degree"] = list(range(len(srgrCoefficients[0])))

#     data_vars["gr0"] = ("azimuth_time", gr0)
#     data_vars["sr0"] = ("azimuth_time", sr0)
#     data_vars["slant_range_time"] = ("azimuth_time", slant_range_time)
#     data_vars["srgrCoefficients"] = (("azimuth_time", "degree"), srgrCoefficients)
#     data_vars["grsrCoefficients"] = (("azimuth_time", "degree"), grsrCoefficients)

#     return xr.Dataset(data_vars=data_vars, coords=coords, attrs=attrs)

In [15]:
# import xml.etree.ElementTree as ET
# import glob
# import numpy as np
# import xarray as xr
# import pandas as pd

# def extract_geolocation_grid(annotation_xml):
#     tree = ET.parse(annotation_xml)
#     root = tree.getroot()
#     geo_points = root.findall(".//geolocationGridPoint")

#     pixel, line = [], []
#     lat, lon = [], []
#     az_time, height = [], []

#     for point in geo_points:
#         pixel.append(float(point.find("pixel").text))
#         line.append(float(point.find("line").text))
#         lat.append(float(point.find("latitude").text))
#         lon.append(float(point.find("longitude").text))
#         az_time.append(point.find("azimuthTime").text)
#         height.append(float(point.find("height").text))

#     df = pd.DataFrame({
#         "pixel": pixel,
#         "line": line,
#         "lat": lat,
#         "lon": lon,
#         "azimuth_time": pd.to_datetime(az_time),
#         "height": height,
#     })

#     return df



# def create_coordinate_conversion_from_geogrid(df, poly_degree=1):
#     times = sorted(df['azimuth_time'].unique())
#     srgrCoeffs, grsrCoeffs, gr0s, sr0s, slant_range_times = [], [], [], [], []
#     # print(times)

#     for t in times:
#         sub = df[df['azimuth_time'] == t]
#         # print(len(sub))
#         # if len(sub) < poly_degree + 1:
#         #     continue  # not enough points to fit
#         print("123")
#         # Simulate slant-range and ground-range (e.g., using pixel and line)
#         sr = np.array(sub['pixel'])  # pixel as proxy for slant-range
#         gr = np.array(sub['lon'])    # longitude as proxy for ground-range
#         az = np.array(sub['lat'])    # latitude as proxy for azimuth

#         # Fit polynomials
#         srgr = np.polyfit(sr, gr, deg=poly_degree).tolist()
#         grsr = np.polyfit(gr, sr, deg=poly_degree).tolist()

#         srgrCoeffs.append(srgr)
#         grsrCoeffs.append(grsr)
#         sr0s.append(sr[0])
#         gr0s.append(gr[0])
#         slant_range_times.append(0.0)  # dummy value

#     return xr.Dataset(
#         {
#             "gr0": ("azimuth_time", gr0s),
#             "sr0": ("azimuth_time", sr0s),
#             "slant_range_time": ("azimuth_time", slant_range_times),
#             "srgrCoefficients": (("azimuth_time", "degree"), srgrCoeffs),
#             "grsrCoefficients": (("azimuth_time", "degree"), grsrCoeffs),
#         },
#         coords={
#             "azimuth_time": np.array(times[:len(srgrCoeffs)], dtype="datetime64[ns]"),
#             "degree": list(range(poly_degree + 1)),
#         }
#     )

import xml.etree.ElementTree as ET
import numpy as np
import xarray as xr

xml_path = "docs/data/S1A_IW_GRDH_1SDV_20241209T015852_20241209T015917_056909_06FD49_AE78.SAFE/annotation/s1a-iw-grd-vh-20241209t015852-20241209t015917-056909-06fd49-002_updated.xml"

# Replace with the actual annotation file path
# xml_path = "path/to/annotation.xml"

tree = ET.parse(xml_path)
root = tree.getroot()

ns = {"s1": "http://www.esa.int/safe/sentinel-1.0"}  # adjust if needed

coordinate_nodes = root.findall(".//coordinateConversionList/coordinateConversion")

azimuth_time = []
sr0 = []
gr0 = []
slant_range_time = []
srgr_coeffs = []
grsr_coeffs = []

for node in coordinate_nodes:
    azimuth_time.append(np.datetime64(node.findtext("azimuthTime"), "ns"))
    sr0.append(float(node.findtext("sr0")))
    gr0.append(float(node.findtext("gr0")))
    slant_range_time.append(float(node.findtext("slantRangeTime")))

    srgr = [float(x) for x in node.findtext("srgrCoefficients").split()]
    grsr = [float(x) for x in node.findtext("grsrCoefficients").split()]
    srgr_coeffs.append(srgr)
    grsr_coeffs.append(grsr)

degree = list(range(len(srgr_coeffs[0])))

coordinate_conversion = xr.Dataset(
    coords={
        "azimuth_time": ("azimuth_time", azimuth_time),
        "degree": ("degree", degree),
    },
    data_vars={
        "sr0": ("azimuth_time", sr0),
        "gr0": ("azimuth_time", gr0),
        "slant_range_time": ("azimuth_time", slant_range_time),
        "srgrCoefficients": (("azimuth_time", "degree"), srgr_coeffs),
        "grsrCoefficients": (("azimuth_time", "degree"), grsr_coeffs),
    },
)

print(coordinate_conversion)




# 1. Locate annotation XML

# 2. Extract grid
# geo_df = extract_geolocation_grid(annotation_xml)
# print(geo_df)

# 3. Create coordinate_conversion
# coordinate_conversion = create_coordinate_conversion_from_geogrid(geo_df)

<xarray.Dataset> Size: 5kB
Dimensions:           (azimuth_time: 28, degree: 9)
Coordinates:
  * azimuth_time      (azimuth_time) datetime64[ns] 224B 2024-12-09T01:58:51....
  * degree            (degree) int64 72B 0 1 2 3 4 5 6 7 8
Data variables:
    sr0               (azimuth_time) float64 224B 8.001e+05 ... 8.001e+05
    gr0               (azimuth_time) float64 224B 0.0 0.0 0.0 ... 0.0 0.0 0.0
    slant_range_time  (azimuth_time) float64 224B 0.005338 0.005338 ... 0.005338
    srgrCoefficients  (azimuth_time, degree) float64 2kB 0.03615 ... -7.62e-39
    grsrCoefficients  (azimuth_time, degree) float64 2kB 8.001e+05 ... -6.03e-45


In [16]:
def parse_radiometric_calibration_lut(safe_folder_path, representation_type="sigmaNought"):
    """
    Parsing LUT for radiometric calibration bt reading the calibration xml files.

    This function reads calibration related XML files within the 'annotation/calibration' folder in the
    provided SAFE directory path,
    and returns an xarray.Dataset with LUT for available polarizations based on the representation type as required.

    Arguments:
        safe_folder_path (str): Path to the root directory of the Sentinel-1 SAFE format product.
        representation_type (str, optional): Type of backscatter representation to be used. 
        Options are:
            - 'sigmaNought' (default)
            - 'betaNought'
            - 'gamma'

    Returns:
        xarray.Dataset: A dataset containing the LUT for radiometric calibration for available polarizations
        (e.g., 'VV', 'VH')

    Raises Exception:
        Exception: representation_type is not valid
        FileNotFoundError: If the 'calibration' folder or required XML files are missing,
    """

    supporting_representation_types=["sigmaNought","betaNought","gamma"]
    if representation_type not in supporting_representation_types:
        raise Exception(f"representation_type {representation_type} is not supported. Supporting types are {supporting_representation_types}")

    calibration_path = os.path.join(safe_folder_path, "annotation/calibration/")
    if not os.path.exists(calibration_path):
        raise FileNotFoundError(f"'calibration' folder not found inside {safe_folder_path}")

    # Find XML files starting with 's1a-iw-grd' (case insensitive)
    xml_files = [f for f in os.listdir(calibration_path) if f.lower().startswith('calibration') and f.lower().endswith('.xml')]
    if not xml_files:
        raise FileNotFoundError("No suitable calibration XML files found in 'calibration' folder")
    lut_dict={}
    for xml_file in xml_files:

        polarizations = ["vv", "vh", "hh", "hv"]
        band_name = None
        for pol in polarizations:
            if pol in xml_file.lower():
                band_name = pol.upper()
                break

        if not band_name:
            raise FileNotFoundError(f"Polarization type not found in file name: {xml_file}")

        print(f"Reading calibration for {band_name} band")

        xml_path = os.path.join(calibration_path, xml_file)
        tree = ET.parse(xml_path)
        root = tree.getroot()

        lines = []
        pixels = None
        correction_values = []

        for calib_vec in root.findall('.//calibrationVector'):
            line = int(calib_vec.find('line').text)
            pixel_str = calib_vec.find('pixel').text.strip()
            value_str = calib_vec.find(representation_type).text.strip()

            pixels = [int(x) for x in pixel_str.split()]
            values = [float(x) for x in value_str.split()]

            lines.append(line)
            correction_values.append(values)

        beta_array = np.array(correction_values)
        lut = xr.DataArray(beta_array, coords={"line": lines, "pixel": pixels}, dims=["line", "pixel"])
        lut_dict[band_name]= lut    
    lut_ds = xr.Dataset(lut_dict)
    print("Radiometric calibration LUT created successfully")
    return lut_ds

In [17]:

# import pandas as pd
# # from pysarflow import parse_radiometric_calibration_lut

# def lut_pixel_line_to_time(lut_ds, acquisition):
#     # Convert 'line' index to azimuth_time
#     # Assumptions:
#     # - acquisition.attrs['startTime'] is a pd.Timestamp
#     # - acquisition.attrs['prf'] is Pulse Repetition Frequency (Hz)
#     # - acquisition has 'slant_range_time' or 'ground_range' coords

#     start_time = pd.to_datetime(acquisition.attrs['startTime'])
#     prf = acquisition.attrs.get('prf', 1680)  # default PRF example

#     # Calculate azimuth time for each line in LUT
#     lut_azimuth_time = start_time + pd.to_timedelta(lut_ds.line.values / prf, unit='s')

#     # For slant_range_time: Map pixel index to slant range time
#     # Assuming acquisition.slant_range_time exists and is evenly spaced
#     # Map pixel index to slant_range_time by interpolation or indexing

#     slant_range_time_arr = acquisition.slant_range_time.values
#     pixel_indices = lut_ds.pixel.values

#     # Linear interpolation of pixel index to slant_range_time values
#     from numpy import interp
#     lut_slant_range_time = interp(pixel_indices, range(len(slant_range_time_arr)), slant_range_time_arr)

#     # Assign new coords to each DataArray in lut_ds
#     lut_ds_new = lut_ds.copy()
#     for pol in lut_ds_new.data_vars:
#         lut_ds_new[pol] = lut_ds_new[pol].assign_coords(
#             azimuth_time=("line", lut_azimuth_time),
#             slant_range_time=("pixel", lut_slant_range_time)
#         ).rename({"line": "azimuth_time", "pixel": "slant_range_time"})

#     return lut_ds_new

# lut_ds_new = parse_radiometric_calibration_lut(zip_safe_path, representation_type="sigmaNought")

# # ls_ds_new = lut_pixel_line_to_time(lut_ds, acquisition):

# lut_interp = lut_ds_new.interp(
#     azimuth_time=acquisition.azimuth_time,
#     slant_range_time=acquisition.slant_range_time
# )
# beta_nought = ds / (lut_interp ** 2)

In [18]:



def slant_range_time_to_ground_range(
    azimuth_time: xr.DataArray,
    slant_range_time: xr.DataArray,
    coordinate_conversion: xr.Dataset,
) -> xr.DataArray:
    """Convert slant range time to ground range using the coordinate conversion metadata.

    :param azimuth_time: azimuth time coordinates
    :param slant_range_time: slant range time
    :param coordinate_conversion: coordinate conversion dataset.
    The coordinate conversion dataset can be opened using the measurement sub-groub `coordinate_conversion`
    """
    slant_range = SPEED_OF_LIGHT / 2.0 * slant_range_time
    sr0 = coordinate_conversion.sr0.interp(azimuth_time=azimuth_time)
    srgrCoefficients = coordinate_conversion.srgrCoefficients.interp(
        azimuth_time=azimuth_time,
    )
    x = slant_range - sr0
    ground_range = (srgrCoefficients * x**srgrCoefficients.degree).sum("degree")
    return ground_range  # type: ignore



def interp_sar(
    data: xr.DataArray,
    azimuth_time: xr.DataArray,
    slant_range_time: xr.DataArray | None = None,

    method: xr.core.types.InterpOptions = "nearest",
    ground_range: xr.DataArray | None = None,
    coordinate_conversion:xr.DataArray | None = None
) -> xr.DataArray:
    if ground_range is None:
        assert slant_range_time is not None
        ground_range = slant_range_time_to_ground_range(
            azimuth_time, slant_range_time, coordinate_conversion
        )
 

    interpolated = data.interp(
        azimuth_time=azimuth_time, ground_range=ground_range, method=method
    )
    
    return interpolated.assign_attrs(data.attrs)

# def interp_sar(data, azimuth_time, slant_range_time, coordinate_conversion, method='linear'):
#     # Convert target times to line/pixel
#     # target_line, target_pixel = coordinate_conversion(azimuth_time, slant_range_time)
#     # Example: Suppose coordinate_conversion is a Dataset with variables 'line' and 'pixel'
#     target_line = coordinate_conversion['line'].interp(
#         azimuth_time=azimuth_time, slant_range_time=slant_range_time
#     )
#     target_pixel = coordinate_conversion['pixel'].interp(
#         azimuth_time=azimuth_time, slant_range_time=slant_range_time
#     )

#     # Interpolate using the correct dimensions
#     interpolated = data.interp(
#         line=target_line,
#         pixel=target_pixel,
#         method=method
#     )
#     return interpolated.assign_attrs(data.attrs)



In [19]:
# acquisition.azimuth_time

In [20]:
# print((acquisition.azimuth_time),(acquisition.slant_range_time))

In [21]:
# beta_nought=radiometric_calibrated_ds
# beta_nought=radiometric_calibrated_ds
safe_folder = "docs/data/S1A_IW_GRDH_1SDV_20241209T015852_20241209T015917_056909_06FD49_AE78.SAFE"
beta_nought=parse_beta_lut(safe_folder, representation_type="betaNought")

beta_nought = beta_nought.rename({'line': 'azimuth_time', 'pixel': 'ground_range'})
beta_nought = beta_nought.assign_coords(
    azimuth_time=acquisition.azimuth_time,
    ground_range=acquisition.slant_range_time
)

geocoded = interp_sar(
            data=beta_nought,
            azimuth_time=acquisition.azimuth_time,
            slant_range_time=acquisition.slant_range_time,
            coordinate_conversion=coordinate_conversion
            # method=interp_method,
        )

# geocoded = beta_nought.interp(
#     x=beta_nought['x'],  # you need to define these
#     y=beta_nought['y'],
#     method='linear'
# )


# geocoded, simulated_beta_nought = do_terrain_correction(
    #     product=product,
    #     dem_raster=dem_raster,
    #     correct_radiometry=correct_radiometry,
    #     interp_method=interp_method,
    #     grouping_area_factor=grouping_area_factor,
    #     radiometry_chunks=radiometry_chunks,
    #     radiometry_bound=radiometry_bound,
    #     seed_step=seed_step,
    # )

Reading calibration for VV band
2024-12-09T01:58:52.968127 khjkhjk
2024-12-09T01:58:53.968127 khjkhjk
2024-12-09T01:58:54.968127 khjkhjk
2024-12-09T01:58:55.968127 khjkhjk
2024-12-09T01:58:56.968127 khjkhjk
2024-12-09T01:58:57.968127 khjkhjk
2024-12-09T01:58:58.968127 khjkhjk
2024-12-09T01:58:59.968127 khjkhjk
2024-12-09T01:59:00.968127 khjkhjk
2024-12-09T01:59:01.968127 khjkhjk
2024-12-09T01:59:02.968127 khjkhjk
2024-12-09T01:59:03.968127 khjkhjk
2024-12-09T01:59:04.968127 khjkhjk
2024-12-09T01:59:05.968127 khjkhjk
2024-12-09T01:59:06.968127 khjkhjk
2024-12-09T01:59:07.968127 khjkhjk
2024-12-09T01:59:08.968127 khjkhjk
2024-12-09T01:59:09.968127 khjkhjk
2024-12-09T01:59:10.968127 khjkhjk
2024-12-09T01:59:11.968127 khjkhjk
2024-12-09T01:59:12.968127 khjkhjk
2024-12-09T01:59:13.968127 khjkhjk
2024-12-09T01:59:14.968127 khjkhjk
2024-12-09T01:59:15.968127 khjkhjk
2024-12-09T01:59:16.968127 khjkhjk
2024-12-09T01:59:17.968127 khjkhjk
2024-12-09T01:59:18.968127 khjkhjk
Reading calibration for

ValueError: Input DataArray is not 1-D.

In [None]:
# acquisition

In [None]:
beta_nought

In [None]:
lon_min, lat_min, lon_max, lat_max = 13.45, 42.40, 13.55, 42.50
output_dem_path="dem2.tif"
download_dem(lon_min, lat_min, lon_max, lat_max,output_dem_path)