In [1]:
from meta_info import *
from tools import *
from plot import *
from ntpath import join
import xarray as xr
import numpy as np
import pandas as pd
from scipy import stats
import os
import math
from preprocessing import *

In [None]:
def monthly_compose(dataset, method='mean'):
        
        # 根据method选择聚合方法
        if method == 'mean':
            monthly_data = dataset.resample(time='1MS').mean()
        elif method == 'max':
            monthly_data = dataset.resample(time='1MS').max()
        elif method == 'sum':
            monthly_data = dataset.resample(time='1MS').sum()
        else:
            raise ValueError("Unsupported method. Choose from 'mean', 'max', 'sum'.")
        return monthly_data
def deseason_detrend(dataset , outpath):

        # 计算多年平均值
        multiyear_mean = dataset.mean(dim='time')

        # 筛选出多年平均值大于或等于0.2的像元，不满足条件的设置为NaN
        filtered_dataset = dataset.where(multiyear_mean >= 0.2)

        # 第1步: 去季节性
        # 计算每个月份的多年平均值
        monthly_avg = filtered_dataset.groupby('time.month').mean('time')

        # 从每个原始值中减去对应月份的多年平均值
        deseasonalized = filtered_dataset.groupby('time.month') - monthly_avg

        # 第2步: 去趋势
        deseason_detrend_data = xr.full_like(deseasonalized, fill_value=np.nan)
        
        for lat in deseasonalized.lat:
            for lon in deseasonalized.lon:
                # 获取时间编码作为自变量
                time = np.arange(1,len(deseasonalized.time.dt.month)+1)
                y = deseasonalized.sel(lat=lat, lon=lon).ndvi.values

                #只进行非nan去趋势
                valid_indices = ~np.isnan(y)  # 获取y中非NaN值的索引
                time_clean = time[valid_indices]
                y_clean = y[valid_indices]
              
                if  y_clean.size>1:
                    slope, intercept, _, _, _ = stats.linregress(time_clean, y_clean)
                    trend_line = slope * time + intercept
                    deseason_detrend_data.loc[dict(lat=lat, lon=lon)] =xr.DataArray(y - trend_line, dims=['time'])
                    
        deseason_detrend_data.to_netcdf(outpath)

def calculate_resistance(Deseason_detrend_ndvi, drought_mask, growing_season_mask, spei):
    """
    计算抵抗力 
    公式为: Y_drought_mean-Y_normal_mean
    其中Y是去季节去趋势的ndvi

    """
    # 初始化抵抗力数据集，所有值为NaN
    resistance = xr.full_like(Deseason_detrend_ndvi, np.nan)  

    # Z-score 标准化
    mean_value = Deseason_detrend_ndvi.mean(dim = 'time', skipna = True)
    std_value = Deseason_detrend_ndvi.std(dim = 'time', skipna = True)
    Deseason_detrend_ndvi = (Deseason_detrend_ndvi - mean_value) / std_value
    # 计算正常条件下ndvi均值
    normal_growing_season_mask = growing_season_mask.where(spei >= -0.5)
    normal_growing_season_mean = Deseason_detrend_ndvi.where(normal_growing_season_mask == 1).mean(dim='time', skipna = True)
    # 遍历每个地理位置
    for lat in tqdm(Deseason_detrend_ndvi.lat.values):
        for lon in Deseason_detrend_ndvi.lon.values:
            # 提取单个像元的温度和掩码
            pixel_ndvi = Deseason_detrend_ndvi.sel(lat=lat, lon=lon)
            pixel_drought_mask = drought_mask.sel(lat=lat, lon=lon)
            pixel_ndvi_mean = normal_growing_season_mean.sel(lat=lat, lon=lon)

            # 检查像元是否不全为 NaN
            if not (np.isnan(pixel_ndvi).all() or np.isnan(pixel_drought_mask).all()):

                # 计算连续为 1 的区间的索引
                region_indices = np.where(pixel_drought_mask == 1)[0]  # 获取为 1 的位置索引
                if len(region_indices) > 0:
                    region_starts = [region_indices[0]]  # 连续区间的起始索引
                    for i in range(1, len(region_indices)):
                        if region_indices[i] != region_indices[i-1] + 1:  # 如果不连续，则记录新的起始索引
                            region_starts.append(region_indices[i])

                    # 对每个连续区间计算均值，并替换为resistance
                    for start_idx in region_starts:
                        end_idx = start_idx + 1
                        while end_idx < len(pixel_ndvi.time) and pixel_drought_mask[end_idx] == 1:
                            end_idx += 1  # 找到连续区间的结束索引
                        region_ndvi = pixel_ndvi.isel(time=slice(int(start_idx), int(end_idx)))  # 提取连续干旱区间的ndvi

                        region_mean = region_ndvi.mean()
                        resistance.loc[{'lat': lat, 'lon': lon, 'time': region_ndvi.time[0]}] = region_mean - pixel_ndvi_mean

    return resistance

def calculate_resilience(Deseason_detrend_ndvi, drought_mask, growing_season_mask, spei):
    """
    计算恢复力 
    公式为: Y_post_mean-Y_normal_mean
    其中Y是去季节去趋势的ndvi

    """
    # 初始化恢复力数据集，所有值为NaN
    resilience = xr.full_like(Deseason_detrend_ndvi, np.nan)  

    # Z-score 标准化
    mean_value = Deseason_detrend_ndvi.mean(dim = 'time', skipna = True)
    std_value = Deseason_detrend_ndvi.std(dim = 'time', skipna = True)
    Deseason_detrend_ndvi = (Deseason_detrend_ndvi - mean_value) / std_value
    # 计算正常条件下ndvi均值
    normal_growing_season_mask = growing_season_mask.where(spei >= -0.5)
    normal_growing_season_mean = Deseason_detrend_ndvi.where(normal_growing_season_mask == 1).mean(dim='time', skipna = True)
    
    # 遍历每个地理位置
    for lat in tqdm(Deseason_detrend_ndvi.lat.values):
        for lon in Deseason_detrend_ndvi.lon.values:
            # 提取单个像元的温度和掩码
            pixel_ndvi = Deseason_detrend_ndvi.sel(lat=lat, lon=lon)
            pixel_drought_mask = drought_mask.sel(lat=lat, lon=lon)
            pixel_growing_season_mask = growing_season_mask.sel(lat=lat, lon=lon)
            pixel_ndvi_mean = normal_growing_season_mean.sel(lat=lat, lon=lon)

            # 检查像元是否不全为 NaN
            if not (np.isnan(pixel_ndvi).all() or np.isnan(pixel_drought_mask).all() or np.isnan(pixel_growing_season_mask).all()):
                
                # 计算连续为 1 的区间的索引
                region_indices = np.where(pixel_drought_mask == 1)[0]  # 获取为 1 的位置索引
                if len(region_indices) > 0:
                    region_starts = [region_indices[0]]  # 连续区间的起始索引
                    for i in range(1, len(region_indices)):
                        if region_indices[i] != region_indices[i-1] + 1:  # 如果不连续，则记录新的起始索引
                            region_starts.append(region_indices[i])

                    for start_idx in region_starts:
                        end_idx = start_idx + 1
                        while end_idx < len(pixel_ndvi.time) and pixel_drought_mask[end_idx] == 1:
                            end_idx += 1  # 找到连续区间的结束索引
                        region_ndvi = pixel_ndvi.isel(time=slice(int(start_idx), int(end_idx)))  # 提取生长季的ndvi
                        recovery_time = None
                        for time_idx, ndvi_value in enumerate(pixel_ndvi.sel(time=pixel_ndvi.time[(start_idx + 1):])):
                            if abs(ndvi_value - pixel_ndvi_mean) / pixel_ndvi_mean  <= 0.05:
                                recovery_time = pixel_ndvi.time[(start_idx+1) + time_idx]
                                break
                        # 如果找到恢复时间点，计算该点一年内的生长季平均NDVI
                        if recovery_time is not None:
                            # 计算两个时间点之间的差值，并转换为年份
                            time_diff_years = (pd.to_datetime(recovery_time.values) - pd.to_datetime(pixel_ndvi.time[start_idx].values)).days / 365.25  # 使用365.25来考虑闰年

                            if time_diff_years < 2:
                                one_year_later = pd.to_datetime(recovery_time.values) + pd.DateOffset(years=1)  # 恢复点之后的一年
                                two_years_later = pd.to_datetime(pixel_ndvi.time[start_idx].values) + pd.DateOffset(years=2) # 干旱开始后的两年

                                # 根据 one_year_later 是否在两年内进行选择
                                if one_year_later <= two_years_later:
                                    end_time = one_year_later
                                else:
                                    end_time = two_years_later
                                # 选取从恢复时间到结束时间的时间范围
                                pixel_ndvi_masked = pixel_ndvi.where(pixel_growing_season_mask == 1)
                                future_growing_season = pixel_ndvi_masked.sel(time=slice(recovery_time, end_time))
                                # 计算平均值
                                average_ndvi_over_year = future_growing_season.mean()

                                # 存储结果
                                resilience.loc[{'lat': lat, 'lon': lon, 'time': region_ndvi.time[0]}] = average_ndvi_over_year - pixel_ndvi_mean         

    return resilience

def run(deseason_detrend_ndvi, drought_droped, growing_season_mask, spei, out_fname):
 
        # result_drought_resistance=[]
        # result_drought_resilience=[]
        drought_resistance = calculate_resistance(deseason_detrend_ndvi, drought_droped, growing_season_mask, spei)
        drought_resistance.to_netcdf(rf'G:\PHD_Project\2_types_results\Resistance_resilience_0.08\{out_fname}_resistance.nc')

        drought_resilience = calculate_resilience(deseason_detrend_ndvi, drought_droped, growing_season_mask, spei)
        drought_resilience.to_netcdf(rf'G:\PHD_Project\2_types_results\Resistance_resilience_0.08\{out_fname}_resilience.nc')  
        # result_drought_resistance.append(drought_resistance)
        # result_drought_resilience.append(drought_resilience)

        
        

# 合并去季节去趋势

In [None]:
datadir = 'G:\\PHD_Project\\Data\\GIMMS3g_NDVI'
# 使用通配符指定文件路径
fdir= join(datadir,'Clean')
outdir = join(datadir,'Deseason_detrend_0.08')
outpath = join(outdir,'deseason_detrend_0.08.nc4')
mk_dir(outdir)
combine_datasets = []
for f in tqdm(os.listdir(fdir)):
    if not f.endswith('.nc4'):
        continue
    fpath = join(fdir,f)
    outpath = join(outdir,f)
    
    # 使用xarray打开文件
    dataset=xr.open_dataset(fpath)
    combine_datasets.append(dataset)
# 合并所有重采样后的数据集
combined_ds = xr.concat(combine_datasets, dim='time')  # 时间维度名为'time'
combined_ds=combined_ds.sortby('time')   #按照时间升序整理数据
monthly_data = monthly_compose(combined_ds, method='max')
deseason_detrend(monthly_data, outpath)

# 计算抵抗力恢复力

In [None]:
normal_drought_droped=xr.open_dataset(r'G:\PHD_Project\2_types_results\Drought_types\normal_drought_droped.nc').ndvi
hot_drought_droped=xr.open_dataset(r'G:\PHD_Project\2_types_results\Drought_types\hot_drought_droped.nc').ndvi
spei=xr.open_dataset(r'G:\PHD_Project\Data\SPEI\Source\spei03.nc').spei
growing_season_mask=xr.open_dataset(r'G:\PHD_Project\Data\GIMMS3g_NDVI\Growing_season_mask_monthly\Growing_season_mask_monthly_leftrightclear.nc4').ndvi
deseason_detrend_ndvi=xr.open_dataset(r'G:\PHD_Project\Data\GIMMS3g_NDVI\Deseason_detrend_0.08\deseason_detrend_0.08.nc4').ndvi
# 按月重采样到每月开始, 将时间对齐
spei = spei.resample(time='MS').mean()  # 'MS'表示每月的开始，mean()是一种聚合方法，你可以根据需要选择合适的聚合方法
growing_season_mask = growing_season_mask.resample(time='MS').mean()
# 截取ndvi同等时间段
spei = spei.sel(time=growing_season_mask.time)
# 获取基准数据的坐标
target_coords = deseason_detrend_ndvi.drop_vars('month').coords
# 重采样到0.08
normal_drought_droped_interp = normal_drought_droped.interp(coords=target_coords, method='nearest')
hot_drought_droped_interp = hot_drought_droped.interp(coords=target_coords, method='nearest')
spei_interp = spei.interp(coords=target_coords, method='nearest')
growing_season_mask_interp = growing_season_mask.interp(coords=target_coords, method='nearest')
normal_drought_droped_interp = normal_drought_droped_interp.astype('float32')
hot_drought_droped_interp = hot_drought_droped_interp.astype('float32')
growing_season_mask_interp = growing_season_mask_interp.astype('float32')
normal_drought_droped_interp.to_netcdf(r'G:\PHD_Project\Data\All_calculate_var\normal_drought_droped_0.08.nc4')
hot_drought_droped_interp.to_netcdf(r'G:\PHD_Project\Data\All_calculate_var\hot_drought_droped_0.08.nc4')
spei_interp.to_netcdf(r'G:\PHD_Project\Data\All_calculate_var\spei_0.08.nc4')
growing_season_mask_interp.to_netcdf(r'G:\PHD_Project\Data\All_calculate_var\growing_season_mask_0.08.nc4')

In [None]:
hot_drought_droped=xr.open_dataset(r'G:\PHD_Project\Data\All_calculate_var\hot_drought_droped_0.08.nc4').ndvi
deseason_detrend_ndvi=xr.open_dataset(r'G:\PHD_Project\Data\GIMMS3g_NDVI\Deseason_detrend_0.08\deseason_detrend_0.08.nc4').ndvi
spei=xr.open_dataset(r'G:\PHD_Project\Data\All_calculate_var\spei_0.08.nc4').spei
growing_season_mask=xr.open_dataset(r'G:\PHD_Project\Data\All_calculate_var\growing_season_mask_0.08.nc4').ndvi

In [None]:
run(deseason_detrend_ndvi, hot_drought_droped, growing_season_mask, spei, 'hot_drought')

In [None]:
hot_drought_resistance=xr.open_dataset(r'G:\PHD_Project\2_types_results\Resistance_resilience_0.08\hot_drought_resistance.nc').ndvi
hot_drought_resilience=xr.open_dataset(r'G:\PHD_Project\2_types_results\Resistance_resilience_0.08\hot_drought_resilience.nc').ndvi

In [None]:
hot_drought_resistance

In [None]:
import matplotlib.pyplot as plt
from matplotlib.colors import BoundaryNorm

def plot_2d(dataset, cmap='coolwarm', vmax=None, vmin=None):
    # 设定断点
    boundaries = [-1.5, -0.1, 0, 0.05, 0.1, 1.5]  # 定义颜色断点
    # 创建归一化对象，使用上面定义的断点
    norm = BoundaryNorm(boundaries, ncolors=256, extend='both')

    fig, ax = plt.subplots(figsize=(12, 6), subplot_kw={'projection': ccrs.Robinson()})

    # 绘制数据
    dataset.plot(ax=ax, transform=ccrs.PlateCarree(), cmap=cmap, norm=norm,add_colorbar=True, vmax=vmax , vmin=vmin, cbar_kwargs={'label': ''})

    #隐藏tittle
    ax.set_title('')

    # 添加海岸线
    ax.coastlines()

    # 添加经纬度网格，但不显示网格线
    gl = ax.gridlines(crs=ccrs.PlateCarree(), draw_labels=True, linewidth=0, color='none', alpha=0)
    # gl.top_labels = False
    # gl.bottom_labels = False
    # gl.left_labels = False
    # gl.right_labels = False
    gl.xlines = False
    gl.ylines = False

In [None]:
plot_2d(hot_drought_resistance.min(dim = 'time'), cmap= global_cmap)
plot_2d(hot_drought_resistance.mean(dim = 'time'), cmap= global_cmap)
plot_2d(hot_drought_resilience.min(dim = 'time'), cmap= global_cmap)
plot_2d(hot_drought_resilience.mean(dim = 'time'), cmap= global_cmap)