In [10]:
import numpy as np
from netCDF4 import Dataset
import h5py
import os
from datetime import datetime, timedelta, timezone
from scipy.interpolate import RegularGridInterpolator

def map_elev_era5_scan_to_lut_safe(
    elev_file: str,
    era5_file: str,
    lut_file: str,
    hdf_file: str,
    out_file: str
):
    """Map elevation, ERA5 annual mean temperature/pressure, and FY-4B scan time to LUT grid."""

    # ---------------- 1. Read elevation ----------------
    with Dataset(elev_file, 'r') as nc:
        lat_elev = nc.variables['latitude'][:]
        lon_elev = nc.variables['longitude'][:]
        elev     = nc.variables['elevation'][:]

    # ---------------- 2. Read LUT ----------------
    raw_image = np.fromfile(lut_file)
    raw_image = raw_image.reshape(-1, 2)
    lat_lut   = raw_image[:, 0].reshape(2748, 2748)
    lon_lut   = raw_image[:, 1].reshape(2748, 2748)
    mask = (lat_lut > 999) | (lon_lut > 999)
    valid = ~mask
    ny, nx = lat_lut.shape

    # ---------------- 3. Map elevation ----------------
    # 用 nearest grid index
    def nearest_idx(src_coords, target_coords):
        idx = np.searchsorted(src_coords, target_coords, side="left")
        idx = np.clip(idx, 1, len(src_coords)-1)
        idx = idx - (np.abs(target_coords - src_coords[idx-1]) < np.abs(target_coords - src_coords[idx]))
        return idx

    lat_idx_elev = nearest_idx(lat_elev, lat_lut)
    lon_idx_elev = nearest_idx(lon_elev, lon_lut)
    elev_resampled = np.full(lat_lut.shape, np.nan)
    elev_resampled[valid] = elev[lat_idx_elev[valid], lon_idx_elev[valid]]

    # ---------------- 4. Read ERA5 monthly data and compute annual mean ----------------
    with Dataset(era5_file, 'r') as nc:
        t2m_monthly = nc.variables['t2m'][:]
        sp_monthly  = nc.variables['sp'][:]
        lat_era5    = nc.variables['latitude'][:]
        lon_era5    = nc.variables['longitude'][:]

    t2m_annual = np.mean(t2m_monthly, axis=0)
    sp_annual  = np.mean(sp_monthly, axis=0)

    # ---------------- 5. Interpolate ERA5 annual data to LUT ----------------
    # 如果纬度从北到南，需要翻转
    t2m_interp = RegularGridInterpolator(
        (lat_era5[::-1], lon_era5),
        t2m_annual[::-1, :],
        bounds_error=False,
        fill_value=np.nan
    )
    sp_interp = RegularGridInterpolator(
        (lat_era5[::-1], lon_era5),
        sp_annual[::-1, :],
        bounds_error=False,
        fill_value=np.nan
    )

    points = np.stack([lat_lut.ravel(), lon_lut.ravel()], axis=-1)
    t2m_resampled = t2m_interp(points).reshape(lat_lut.shape)
    sp_resampled  = sp_interp(points).reshape(lat_lut.shape)

    # ---------------- 6. Read HDF scan time ----------------
    with h5py.File(hdf_file, 'r') as f:
        scan_time_raw = f["NOMObs/NOMObsTime"][:]

    filename = os.path.basename(hdf_file)
    parts = filename.split('_')
    start_str = parts[-4]
    end_str   = parts[-3]
    ts_start = int(start_str + "000")
    te_end   = int(end_str + "000")

    # 替换无效时间
    ny_raw = scan_time_raw.shape[0]
    for i in range(ny_raw):
        ts, te = scan_time_raw[i]
        if ts == 9999 and te == 9999:
            if i < ny_raw // 2:
                scan_time_raw[i, 0] = ts_start
                scan_time_raw[i, 1] = ts_start
            else:
                scan_time_raw[i, 0] = te_end
                scan_time_raw[i, 1] = te_end

    def parse_time(ts_int):
        ts_str = f"{int(ts_int):017d}"
        dt = datetime.strptime(ts_str[:14], "%Y%m%d%H%M%S")
        ms = int(ts_str[14:])
        return dt + timedelta(milliseconds=ms)

    def datetime_to_unix(dt):
        return dt.replace(tzinfo=timezone.utc).timestamp()

    def unix_to_int(ts):
        dt = datetime.fromtimestamp(ts, tz=timezone.utc)
        ms = int(dt.microsecond / 1000)
        return int(dt.strftime("%Y%m%d%H%M%S") + f"{ms:03d}")

    # ---------------- 7. Interpolate scan time ----------------
    scan_time_sec = np.full((ny, nx), np.nan, dtype=np.float64)
    for i in range(ny):
        ts_int, te_int = scan_time_raw[i]
        dt_start = parse_time(ts_int)
        dt_end   = parse_time(te_int)
        ts_sec = datetime_to_unix(dt_start)
        te_sec = datetime_to_unix(dt_end)

        valid_cols = np.where(valid[i, :])[0]
        n_valid = len(valid_cols)
        if n_valid > 0:
            scan_time_sec[i, valid_cols] = np.linspace(ts_sec, te_sec, n_valid)

    scan_time_int = np.full((ny, nx), 0, dtype=np.int64)
    valid_idx = ~np.isnan(scan_time_sec)
    scan_time_int[valid_idx] = np.vectorize(unix_to_int)(scan_time_sec[valid_idx])

    # ---------------- 8. Save to NetCDF ----------------
    ds_out = Dataset(out_file, 'w', format='NETCDF4')
    ds_out.createDimension('y', ny)
    ds_out.createDimension('x', nx)

    lat_var  = ds_out.createVariable('latitude', 'f8', ('y','x'))
    lon_var  = ds_out.createVariable('longitude', 'f8', ('y','x'))
    elev_var = ds_out.createVariable('altitude', 'f8', ('y','x'), fill_value=np.nan)
    t2m_var  = ds_out.createVariable('t2m', 'f8', ('y','x'), fill_value=np.nan)
    sp_var   = ds_out.createVariable('sp', 'f8', ('y','x'), fill_value=np.nan)
    scan_var = ds_out.createVariable('scan_time', 'i8', ('y','x'), fill_value=0)

    lat_var[:, :]   = lat_lut
    lon_var[:, :]   = lon_lut
    elev_var[:, :]  = elev_resampled
    t2m_var[:, :]   = t2m_resampled
    sp_var[:, :]    = sp_resampled
    scan_var[:, :]  = scan_time_int

    lat_var.units  = 'degrees_north'
    lon_var.units  = 'degrees_east'
    elev_var.units = 'm'
    t2m_var.units  = 'K'
    sp_var.units   = 'Pa'
    scan_var.units = 'YYYYMMDDHHMMSSsss (UTC)'

    elev_var.description = 'High-resolution elevation mapped to LUT grid'
    t2m_var.description  = 'ERA5 annual mean 2m temperature mapped to LUT grid'
    sp_var.description   = 'ERA5 annual mean surface pressure mapped to LUT grid'
    scan_var.description = 'Scan time interpolated from start/end time of each line, UTC, millisecond precision'

    ds_out.close()
    print("Saved successfully:", out_file)


# ================= Example usage =================
elev_file = "E:/fengyun/code/input/world_ll_elev_0.05deg.nc4"
era5_file = "E:/fengyun/code/input/2024monthly.nc"
lut_file  = "E:/fengyun/code/input/FY4B-_DISK_1050E_GEO_NOM_LUT_20240227000000_4000M_V0001.raw"
hdf_file  = "E:/fengyun/code/input/FY4B-_AGRI--_N_DISK_1050E_L1-_FDI-_MULT_NOM_20240923040000_20240923041459_4000M_V0001.HDF"
out_file  = "E:/fengyun/code/output/result.nc"

map_elev_era5_scan_to_lut_safe(elev_file, era5_file, lut_file, hdf_file, out_file)


Saved successfully: E:/fengyun/code/output/result.nc


In [None]:
###################### SPA-Based Solar Geometry Computation #######################
# This script reads a LUT NetCDF file containing elevation, ERA5 annual mean 
# temperature and pressure, and FY-4B scan time information. 
# It computes solar position parameters (zenith, elevation, azimuth, etc.) 
# using the SPA algorithm implemented in pvlib, and writes the results 
# back to the same NetCDF file.
###################################################################################

import numpy as np
import pandas as pd
from netCDF4 import Dataset
import pvlib

def compute_solarposition_nc(nc_file, delta_t=67.0, numthreads=4):
    """
    Compute solar position (zenith, elevation, azimuth, etc.) for each pixel in
    a LUT NetCDF file using the SPA algorithm and save results back to the file.

    Parameters
    ----------
    nc_file : str
        Path to the NetCDF file containing 'latitude', 'longitude', 'altitude', 
        'scan_time', 't2m', and 'sp' variables.
    delta_t : float, optional
        Difference between Earth rotation time and UT1 time (in seconds).
    numthreads : int, optional
        Number of threads used by pvlib.solarposition.spa_python.
    """

    # ---------------- 1. Open NetCDF file and read variables ----------------
    ds = Dataset(nc_file, 'r+')
    lat = ds.variables['latitude'][:, :]
    lon = ds.variables['longitude'][:, :]
    elev = ds.variables['altitude'][:, :]
    scan_time_int = ds.variables['scan_time'][:, :]
    t2m = ds.variables['t2m'][:, :]   # ERA5 annual mean temperature (K)
    sp  = ds.variables['sp'][:, :]    # ERA5 annual mean surface pressure (Pa)

    ny, nx = lat.shape
    valid_mask = ~np.isnan(lat) & (scan_time_int > 0)

    # ---------------- 2. Convert integer scan time to pandas.DatetimeIndex ----------------
    def int_to_datetime(ts_int):
        ts_str = f"{int(ts_int):017d}"  # Ensure 17 digits
        dt = pd.to_datetime(ts_str[:14], format="%Y%m%d%H%M%S", utc=True)
        ms = int(ts_str[14:])
        return dt + pd.Timedelta(milliseconds=ms)

    time_grid = np.empty((ny, nx), dtype='datetime64[ms]')
    time_grid[valid_mask] = np.vectorize(int_to_datetime)(scan_time_int[valid_mask])
    time_grid[~valid_mask] = np.datetime64('NaT')

    # ---------------- 3. Flatten valid pixels ----------------
    lat_flat = lat[valid_mask]
    lon_flat = lon[valid_mask]
    elev_flat = elev[valid_mask]
    time_flat = pd.DatetimeIndex(time_grid[valid_mask].ravel())
    pressure_flat = sp[valid_mask]
    temperature_flat = t2m[valid_mask] - 273.15  # Convert to Celsius

    # ---------------- 4. Compute solar position using pvlib ----------------
    solpos_flat = pvlib.solarposition.spa_python(
        time=time_flat,
        latitude=lat_flat,
        longitude=lon_flat,
        altitude=elev_flat,
        pressure=pressure_flat,
        temperature=temperature_flat,
        delta_t=delta_t,
        how='numpy',
        numthreads=numthreads
    )

    # ---------------- 5. Reshape results back to grid ----------------
    def create_grid(var_name):
        grid = np.full((ny, nx), np.nan, dtype=np.float32)
        grid[valid_mask] = solpos_flat[var_name].values
        return grid

    apparent_zenith_grid    = create_grid('apparent_zenith')
    zenith_grid             = create_grid('zenith')
    apparent_elevation_grid = create_grid('apparent_elevation')
    elevation_grid          = create_grid('elevation')
    azimuth_grid            = create_grid('azimuth')
    eq_time_grid            = create_grid('equation_of_time')

    # ---------------- 6. Write results back to NetCDF ----------------
    def create_or_replace_var(name, data, units):
        if name in ds.variables:
            ds.variables[name][:] = data
        else:
            var = ds.createVariable(name, 'f4', ('y', 'x'), fill_value=np.nan)
            var[:, :] = data
            var.units = units

    create_or_replace_var('apparent_zenith', apparent_zenith_grid, 'degrees')
    create_or_replace_var('zenith', zenith_grid, 'degrees')
    create_or_replace_var('apparent_elevation', apparent_elevation_grid, 'degrees')
    create_or_replace_var('elevation_solar', elevation_grid, 'degrees')
    create_or_replace_var('azimuth', azimuth_grid, 'degrees')
    create_or_replace_var('equation_of_time', eq_time_grid, 'minutes')

    ds.close()
    print(f"Solar position computation completed and saved to {nc_file}")


# ---------------- Example usage ----------------
nc_file = "E:/fengyun/code/output/result.nc"
compute_solarposition_nc(nc_file, delta_t=69.1322, numthreads=4)


  time_grid[valid_mask] = np.vectorize(int_to_datetime)(scan_time_int[valid_mask])


太阳位置计算完成并保存回 E:/fengyun/code/output/result.nc


In [11]:
"""
FY-4B Satellite Viewing Angle Computation

This script provides a function to compute viewing zenith (θ) and azimuth (φ)
angles from the FY-4B geostationary satellite to the ground, based on
latitude, longitude, and terrain elevation grids in a NetCDF file.

The computed angles are written back to the same NetCDF file as two variables:
- satellite_zenith_angle: Zenith angle in degrees (0 = zenith, 90 = horizon)
- satellite_azimuth_angle: Azimuth angle in degrees (0–360°, clockwise from North)

Parameters can be adjusted for satellite longitude and orbit height.
"""

import math
import numpy as np
from netCDF4 import Dataset

def compute_satellite_angles(nc_file: str, sat_lon_deg: float = 105.0, sat_height: float = 35786000.0):
    """
    Compute viewing zenith angle (θ) and azimuth angle (φ) from FY-4B satellite
    to the ground, and write them to the input NetCDF file.

    Parameters:
    -----------
    nc_file : str
        Input/output NetCDF file containing 'latitude', 'longitude', and 'altitude'.
    sat_lon_deg : float, optional
        Satellite longitude in degrees (default is 105.0° for FY-4B).
    sat_height : float, optional
        Satellite orbit height in meters (default is 35786000 m).
    """

    # ---------------- Step 1: Open NetCDF file ----------------
    nc = Dataset(nc_file, 'r+')

    # ---------------- Step 2: Read latitude, longitude, and altitude ----------------
    lat_arr = np.array(nc.variables['latitude'][:], dtype=np.float64)
    lon_arr = np.array(nc.variables['longitude'][:], dtype=np.float64)
    height_arr = np.array(nc.variables['altitude'][:], dtype=np.float64)

    # ---------------- Step 3: Satellite and Earth parameters (GRS80) ----------------
    a = 6378137.0                   # Equatorial radius (m)
    f = 1 / 298.257222101           # Flattening
    E2 = 2 * f - f**2               # Square of eccentricity

    sat_lon_rad = math.radians(sat_lon_deg)
    x_s = (a + sat_height) * math.cos(sat_lon_rad)
    y_s = (a + sat_height) * math.sin(sat_lon_rad)
    z_s = 0.0

    # ---------------- Step 4: Initialize output arrays ----------------
    zenith_angle_arr = np.full(lat_arr.shape, np.nan, dtype=np.float64)
    azimuth_angle_arr = np.full(lat_arr.shape, np.nan, dtype=np.float64)

    # ---------------- Step 5: Valid point mask ----------------
    valid_mask = (~np.isnan(lat_arr)) & (~np.isnan(lon_arr)) & (lat_arr != 0) & (lon_arr != 0)

    lat_valid = lat_arr[valid_mask]
    lon_valid = lon_arr[valid_mask]
    height_valid = height_arr[valid_mask]

    # ---------------- Step 6: Convert to radians and compute prime vertical radius N ----------------
    lat_rad = np.radians(lat_valid)
    lon_rad = np.radians(lon_valid)
    sin_lat = np.sin(lat_rad)
    cos_lat = np.cos(lat_rad)
    N = a / np.sqrt(1 - E2 * sin_lat**2)

    # ---------------- Step 7: Ground point Cartesian coordinates ----------------
    x_p = (N + height_valid) * cos_lat * np.cos(lon_rad)
    y_p = (N + height_valid) * cos_lat * np.sin(lon_rad)
    z_p = ((1 - E2) * N + height_valid) * sin_lat

    # ---------------- Step 8: Direction vector d = S - P ----------------
    x_d = x_s - x_p
    y_d = y_s - y_p
    z_d = z_s - z_p

    # ---------------- Step 9: Local ENU coordinates ----------------
    sin_lon = np.sin(lon_rad)
    cos_lon = np.cos(lon_rad)
    e = -sin_lon * x_d + cos_lon * y_d
    n = -sin_lat * cos_lon * x_d - sin_lat * sin_lon * y_d + cos_lat * z_d
    u =  cos_lat * cos_lon * x_d + cos_lat * sin_lon * y_d + sin_lat * z_d

    # ---------------- Step 10: Compute zenith (θ) and azimuth (φ) angles ----------------
    theta_deg = 90.0 - np.degrees(np.arctan2(u, np.sqrt(e**2 + n**2)))
    phi_deg = np.degrees(np.arctan2(e, n))
    phi_deg = np.where(phi_deg < 0, 360.0 + phi_deg, phi_deg)

    # ---------------- Step 11: Assign to output arrays ----------------
    zenith_angle_arr[valid_mask] = theta_deg
    azimuth_angle_arr[valid_mask] = phi_deg

    # ---------------- Step 12: Write to NetCDF ----------------
    # Zenith angle
    if 'satellite_zenith_angle' in nc.variables:
        zen_var = nc.variables['satellite_zenith_angle']
    else:
        lat_dim, lon_dim = nc.variables['latitude'].dimensions
        zen_var = nc.createVariable('satellite_zenith_angle', 'f8', (lat_dim, lon_dim))
    zen_var[:, :] = zenith_angle_arr
    zen_var.units = 'degree'
    zen_var.long_name = 'Viewing Zenith Angle (θ)'
    zen_var.description = 'Angle between local zenith and line-of-sight to FY-4B satellite'

    # Azimuth angle
    if 'satellite_azimuth_angle' in nc.variables:
        azi_var = nc.variables['satellite_azimuth_angle']
    else:
        lat_dim, lon_dim = nc.variables['latitude'].dimensions
        azi_var = nc.createVariable('satellite_azimuth_angle', 'f8', (lat_dim, lon_dim))
    azi_var[:, :] = azimuth_angle_arr
    azi_var.units = 'degree'
    azi_var.long_name = 'Viewing Azimuth Angle (φ)'
    azi_var.description = 'Azimuth of satellite direction in local horizon plane (0–360°)'

    # ---------------- Step 13: Close NetCDF ----------------
    nc.close()
    print("✅ Viewing zenith (θ) and azimuth (φ) angles have been successfully written to the NetCDF file.")


# ---------------- Example Usage ----------------

nc_file_path = "E:/fengyun/code/output/result.nc"
compute_satellite_angles(nc_file_path)


✅ Viewing zenith (θ) and azimuth (φ) angles have been successfully written to the NetCDF file.


In [12]:
import numpy as np
from netCDF4 import Dataset, default_fillvals

def atmospheric_refraction_correction(local_pressure, local_temp,
                                      topocentric_elevation_angle_wo_atmosphere,
                                      atmos_refract=0.5667):
    """
    Calculate atmospheric refraction correction for a topocentric elevation angle.
    """
    switch = topocentric_elevation_angle_wo_atmosphere >= -1.0 * (0.26667 + atmos_refract)
    delta_e = ((local_pressure / 1010.0) * (283.0 / (273 + local_temp))
               * 1.02 / (60 * np.tan(np.radians(
                   topocentric_elevation_angle_wo_atmosphere
                   + 10.3 / (topocentric_elevation_angle_wo_atmosphere + 5.11))))) * switch
    return delta_e


def correct_atmospheric_refraction(nc_file_path):
    """
    Apply atmospheric refraction correction to satellite zenith angle in a NetCDF file.

    Parameters
    ----------
    nc_file_path : str
        Path to the NetCDF file containing 'satellite_zenith_angle', 't2m', and 'sp'.

    The function creates a new variable 'satellite_zenith_angle_corrected' in the same file.
    """
    # Open NetCDF file
    nc = Dataset(nc_file_path, "r+")

    # Read variables
    zenith   = nc.variables["satellite_zenith_angle"][:]  # degrees
    temp_K   = nc.variables["t2m"][:]                     # Kelvin
    press_Pa = nc.variables["sp"][:]                      # Pascals

    # ---- Unit conversion ----
    temp_C    = temp_K - 273.15      # Kelvin → Celsius
    press_hPa = press_Pa / 100.0     # Pascals → hPa

    # ---- Convert to elevation angle ----
    elev = 90.0 - zenith

    # ---- Valid mask: all three inputs are not NaN ----
    valid = (~np.isnan(zenith)) & (~np.isnan(temp_C)) & (~np.isnan(press_hPa))

    # ---- Initialize corrected zenith angle array as NaN ----
    zenith_corrected = np.full_like(zenith, np.nan, dtype=np.float32)

    # ---- Apply correction only to valid grid points ----
    if np.any(valid):
        delta_e = atmospheric_refraction_correction(
            local_pressure=press_hPa[valid],
            local_temp=temp_C[valid],
            topocentric_elevation_angle_wo_atmosphere=elev[valid]
        )
        elev_corrected = elev[valid] + delta_e
        zenith_corrected[valid] = 90.0 - elev_corrected

    # ---- Write the corrected zenith angle as a new variable ----
    if "satellite_zenith_angle_corrected" not in nc.variables:
        zen_var = nc.createVariable(
            "satellite_zenith_angle_corrected", "f4",
            nc.variables["satellite_zenith_angle"].dimensions,
            fill_value=default_fillvals["f4"]
        )
        zen_var.units = "degrees"
        zen_var.long_name = "satellite zenith angle corrected for atmospheric refraction"

    nc.variables["satellite_zenith_angle_corrected"][:] = zenith_corrected
    nc.close()


# ==== Usage example ====
correct_atmospheric_refraction("E:/fengyun/code/output/result.nc")
