# 训练数据加载

In [5]:
import os
from datetime import datetime
from typing import Sequence, Any, Dict

import numpy as np
import pandas as pd
import xarray as xr
from dask.distributed import Client
from tqdm.auto import tqdm

# Worker‐local cache
_ds_abandon = None
_ds_feat    = None

def extract_single_point(
    lat: float,
    lon: float,
    year: int,
    p_area: float,
    capacity_m: float,
    unique_id: Any,
    country: str,
    abandon_pattern: str,
    feature_pattern: str
) -> Dict[str, Any]:
    """
    在每个 worker 进程中首次打开并缓存两个 Dataset，然后针对单点抽特征。
    """
    global _ds_abandon, _ds_feat

    # 第一次调用时打开并 chunk
    if _ds_abandon is None:
        ds = xr.open_mfdataset(
            abandon_pattern, combine='by_coords',
            engine='netcdf4', parallel=False
        )
        # 先加载 metadata，再 chunk
        tlen = ds.sizes['time']
        _ds_abandon = ds.chunk({'time': tlen, 'lat': 500, 'lon': 500})

    if _ds_feat is None:
        ds = xr.open_mfdataset(
            feature_pattern, combine='by_coords',
            engine='netcdf4', parallel=False
        )
        tlen = ds.sizes['time']
        _ds_feat = ds.chunk({'time': tlen, 'lat': 1000, 'lon': 1000})

    # 最近邻抽取
    env_pt     = _ds_feat.sel(time=str(year), method='nearest') \
                         .sel(lat=lat, lon=lon, method='nearest')
    abandon_pt = _ds_abandon.sel(lat=lat, lon=lon, method='nearest')

    feat = []
    # 环境/社会经济特征
    for var in _ds_feat.data_vars:
        arr = env_pt[var].load().values
        feat.append(float(arr) if arr.ndim == 0 else float(arr.flat[0]))

    # 撂荒属性
    for var in ("current_abandonment", "abandonment_year",
                "abandonment_duration", "recultivation"):
        if var in _ds_abandon.data_vars:
            arr = abandon_pt[var].load().values
            feat.append(float(arr))
        else:
            feat.append(np.nan)

    # landcover 序列 embedding
    if "landcover" in _ds_abandon.data_vars:
        lc = abandon_pt["landcover"].load().values  # shape (time,)
        seq = np.nan_to_num(lc, 0).astype(int)
        seq = np.clip(seq, 1, 9) - 1
        onehot = np.eye(9)[seq]
        feat.extend(onehot.mean(axis=0).tolist())

    return {
        'lat': lat, 'lon': lon, 'year': year,
        'unique_id': unique_id,
        'p_area': p_area, 'capacity_m': capacity_m,
        'country': country,
        **{f'f{i}': v for i, v in enumerate(feat)}
    }


def load_pv_sites(
    csv_path: str,
    years: Sequence[int] = (2018, 2020)
) -> pd.DataFrame:
    """
    加载并标准化 PV 站点数据，过滤指定年份。
    """
    df = pd.read_csv(csv_path)
    if df.empty:
        raise ValueError(f"CSV 文件为空: {csv_path}")

    # 经纬度列映射
    rename_map = {}
    for src in ('latitude','lat_deg','LAT','Lat'):
        if src in df.columns:
            rename_map[src] = 'lat'
    for src in ('longitude','lon_deg','LON','Lon'):
        if src in df.columns:
            rename_map[src] = 'lon'
    df = df.rename(columns=rename_map)

    # 类型强制
    df['lat']  = pd.to_numeric(df['lat'], errors='raise')
    df['lon']  = pd.to_numeric(df['lon'], errors='raise')
    df['year'] = pd.to_numeric(df['year'], downcast='integer', errors='raise')

    required = {'lat','lon','year','unique_id','p_area','capacity_m','country'}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"CSV 文件缺少必要列: {sorted(missing)}")

    df = df[df['year'].isin(years)]
    if df.empty:
        raise ValueError(f"没有符合年份 {years} 的记录")

    return df.reset_index(drop=True)

# 数据采样


## 特征提取通用函数

In [6]:
# 顶部已经定义好了：
# _nc_lock = threading.Lock()

def extract_all_features(
    ds_feat: xr.Dataset,
    ds_abandon: xr.Dataset,
    pv_df: pd.DataFrame,
    years: Sequence[int]
) -> pd.DataFrame:
    records = []

    for yr in tqdm(years, desc="处理年份"):
        sub = pv_df[pv_df.year == yr].reset_index(drop=True)
        lats = xr.DataArray(sub.lat.values, dims="point")
        lons = xr.DataArray(sub.lon.values, dims="point")

        ds_fy = ds_feat.sel(time=str(yr), method="nearest")

        # —— 环境特征
        env_dict = {}
        for var in tqdm(ds_feat.data_vars, desc=f"提取环境变量 {yr}"):
            da_sel = ds_fy[var].sel(lat=lats, lon=lons, method="nearest")

            # ← 关键修改：在 compute 之前加全局锁
            with _nc_lock:
                arr = da_sel.compute(scheduler="single-threaded")

            env_dict[var] = arr  # numpy array

        # —— 撂荒属性
        aband_dict = {}
        for var in ("current_abandonment","abandonment_year",
                    "abandonment_duration","recultivation"):
            if var in ds_abandon.data_vars:
                da_sel = ds_abandon[var].sel(lat=lats, lon=lons, method="nearest")
                with _nc_lock:
                    arr = da_sel.compute(scheduler="single-threaded")
                aband_dict[var] = arr
            else:
                aband_dict[var] = np.full(len(sub), np.nan)

        # —— landcover 序列 embedding
        if "landcover" in ds_abandon.data_vars:
            lc_sel = ds_abandon.landcover.sel(lat=lats, lon=lons, method="nearest")
            with _nc_lock:
                lc_arr = lc_sel.compute(scheduler="single-threaded")
            lc_vals = np.nan_to_num(lc_arr, nan=0).astype(int)
            lc_vals = np.clip(lc_vals, 1, 9) - 1
            onehot = np.eye(9)[lc_vals]
            mean_onehot = onehot.mean(axis=0)
            for i in range(9):
                aband_dict[f"lc_cls_{i}"] = mean_onehot[:, i]

        df_feat = pd.DataFrame({**env_dict, **aband_dict})
        df_out  = pd.concat([sub, df_feat], axis=1)
        records.append(df_out)

    return pd.concat(records, ignore_index=True)


## GMM负样本增强

In [7]:
import numpy as np
import pandas as pd
import xarray as xr
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler

def get_candidate_negatives(ds_abandon: xr.Dataset,
                            coords_pos: pd.DataFrame) -> pd.DataFrame:
    """
    候选负样本：current_abandonment=1 且不在 coords_pos。
    返回 DataFrame ['lat','lon']。
    """
    mask = ds_abandon['current_abandonment'] == 1
    idx = np.column_stack(np.where(mask.values))
    cand = pd.DataFrame({
        'lat': ds_abandon['lat'].values[idx[:,0]],
        'lon': ds_abandon['lon'].values[idx[:,1]]
    })
    pos_set = set(zip(coords_pos['lat'].round(6), coords_pos['lon'].round(6)))
    cand = cand[~cand.apply(lambda r: (round(r.lat,6), round(r.lon,6)) in pos_set, axis=1)]
    return cand.reset_index(drop=True)

def sample_negative(ds_abandon: xr.Dataset,
                    ds_feature: xr.Dataset,
                    coords_pos: pd.DataFrame,
                    year: int = 2020,
                    sample_size: int = 100_000,
                    n_clusters: int = 10,
                    quantile: float = 0.7):
    """
    强负样本采样：
      1) 从候选负样本中随机抽 sample_size，
         用 extract_features(year) 提取特征并 GMM 聚类；
      2) 选与“正样本集”最远的簇；
      3) 对所有候选分批预测，筛出这些强负簇对应点；
      4) 提取它们的特征并返回 X_neg, y_neg, coords_neg, scaler, gmm。
    """
    cand = get_candidate_negatives(ds_abandon, coords_pos)
    samp = cand.sample(min(sample_size, len(cand)), random_state=0).reset_index(drop=True)
    feats = np.vstack([
        extract_features(r.lat, r.lon, year, ds_abandon, ds_feature)
        for _, r in samp.iterrows()
    ])
    scaler = StandardScaler().fit(feats)
    fs = scaler.transform(feats)
    gmm = GaussianMixture(n_components=n_clusters, random_state=0).fit(fs)

    # 近似正样本 pos_fs
    pos_fs = fs[:n_clusters]
    dists = np.array([np.min(np.linalg.norm(pos_fs - c, axis=1)) for c in gmm.means_])
    thr = np.quantile(dists, quantile)
    strong_cls = np.where(dists >= thr)[0]

    strong_list = []
    batch = 10_000
    for i in range(0, len(cand), batch):
        block = cand.iloc[i:i+batch]
        feats_blk = np.vstack([
            extract_features(r.lat, r.lon, year, ds_abandon, ds_feature)
            for _, r in block.iterrows()
        ])
        labels = gmm.predict(scaler.transform(feats_blk))
        strong_list.append(block.iloc[np.isin(labels, strong_cls)])
    strong_df = pd.concat(strong_list).drop_duplicates().reset_index(drop=True)

    X_neg = np.vstack([
        extract_features(r.lat, r.lon, year, ds_abandon, ds_feature)
        for _, r in strong_df.iterrows()
    ])
    y_neg = np.zeros(X_neg.shape[0], dtype=int)
    coords_neg = strong_df[['lat','lon']].reset_index(drop=True)

    return X_neg, y_neg, coords_neg, scaler, gmm

# 测试部分

In [1]:
import os
from datetime import datetime
from typing import Sequence
import numpy as np
import pandas as pd
import xarray as xr
from tqdm.auto import tqdm
from dask.diagnostics import ProgressBar
import threading
import glob

def load_datasets(abandon_pattern: str, feature_pattern: str):
    """
    打开 NetCDF，用 h5netcdf 替代 netcdf4，避免底层 HDF5 并发错误。
    """
    files_abandon = glob.glob(abandon_pattern)
    files_feature = glob.glob(feature_pattern)
    if not files_abandon or not files_feature:
        raise FileNotFoundError("找不到文件")

    # 用 h5netcdf 引擎打开
    ds_abandon = xr.open_mfdataset(
        files_abandon,
        combine='by_coords',
        engine='h5netcdf',
        parallel=False          # 还是用单线程模式
    )
    ds_feat = xr.open_mfdataset(
        files_feature,
        combine='by_coords',
        engine='h5netcdf',
        parallel=False
    )

    # 一次性 rechunk（保持你原先的尺寸）
    t_ab = ds_abandon.sizes['time']
    ds_abandon = ds_abandon.chunk({'time': t_ab, 'lat': 500, 'lon': 500})

    t_ft = ds_feat.sizes['time']
    ds_feat = ds_feat.chunk({'time': t_ft, 'lat': 1000, 'lon': 1000})

    return ds_abandon, ds_feat


def load_pv_sites(
    csv_path: str,
    years: Sequence[int] = (2018, 2020)
) -> pd.DataFrame:
    """
    加载并标准化 PV 站点数据，过滤指定年份。
    """
    df = pd.read_csv(csv_path)
    if df.empty:
        raise ValueError(f"CSV 文件为空: {csv_path}")

    # 经纬度列映射
    rename_map = {}
    for src in ('latitude', 'lat_deg', 'LAT', 'Lat'):
        if src in df.columns:
            rename_map[src] = 'lat'
    for src in ('longitude', 'lon_deg', 'LON', 'Lon'):
        if src in df.columns:
            rename_map[src] = 'lon'
    df = df.rename(columns=rename_map)

    # 强制类型转换
    df['lat'] = pd.to_numeric(df['lat'], errors='raise')
    df['lon'] = pd.to_numeric(df['lon'], errors='raise')
    df['year'] = pd.to_numeric(df['year'], downcast='integer', errors='raise')

    required = {'lat', 'lon', 'year', 'unique_id', 'p_area', 'capacity_m', 'country'}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"CSV 文件缺少必要列: {sorted(missing)}")

    df = df[df['year'].isin(years)]
    if df.empty:
        raise ValueError(f"没有符合年份 {years} 的记录")

    return df.reset_index(drop=True)


def extract_all_features(
    ds_feat: xr.Dataset,
    ds_abandon: xr.Dataset,
    pv_df: pd.DataFrame,
    years: Sequence[int]
) -> pd.DataFrame:
    """
    向量化地一次性对每个变量和年份 sel + load，
    并用 _io_lock 串行化所有底层 NetCDF4 读操作。
    """
    records = []

    for yr in years:
        sub = pv_df[pv_df.year == yr].reset_index(drop=True)
        lats = xr.DataArray(sub.lat.values, dims="point")
        lons = xr.DataArray(sub.lon.values, dims="point")

        ds_fy = ds_feat.sel(time=str(yr), method="nearest")

        # 环境变量
        env_dict = {}
        for var in tqdm(ds_feat.data_vars, desc=f"提取环境变量 {yr}"):
            da_sel = ds_fy[var].sel(lat=lats, lon=lons, method="nearest")
            arr = da_sel.load(scheduler="single-threaded").values
            env_dict[var] = arr

        # 撂荒属性
        aband_dict = {}
        for var in ("current_abandonment", "abandonment_year",
                    "abandonment_duration", "recultivation"):
            if var in ds_abandon.data_vars:
                da_sel = ds_abandon[var].sel(lat=lats, lon=lons, method="nearest")
                arr = da_sel.load(scheduler="single-threaded").values
                aband_dict[var] = arr
            else:
                aband_dict[var] = np.full(len(sub), np.nan)

        # landcover 序列 embedding
        if "landcover" in ds_abandon.data_vars:
            lc_sel = ds_abandon.landcover.sel(lat=lats, lon=lons, method="nearest")

            lc_arr = lc_sel.load(scheduler="single-threaded").values
            seq = np.nan_to_num(lc_arr, nan=0).astype(int)
            seq = np.clip(seq, 1, 9) - 1
            onehot = np.eye(9)[seq]
            mean_onehot = onehot.mean(axis=0)
            for i in range(9):
                aband_dict[f"lc_cls_{i}"] = mean_onehot[:, i]

        df_feat = pd.DataFrame({**env_dict, **aband_dict})
        df_out  = pd.concat([sub, df_feat], axis=1)
        records.append(df_out)

    return pd.concat(records, ignore_index=True)


def main(test_mode: bool = False, test_n: int = 500):
    PATHS = {
        'abandonment': "D:/xarray/abandonment_chunkall/*.nc",
        'feature':     "D:/xarray/aligned2/Feature_all/*.nc",
        'csv':         "aligned_for_training.csv",
        'test_output': "positive_samples_test_500.csv",
        'output':      "positive_samples_full_with_features.csv"
    }
    YEARS = [2018, 2020]

    print("开始处理...", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 1. 打开并 rechunk
    ds_abandon, ds_feat = load_datasets(
        PATHS['abandonment'], PATHS['feature']
    )

    # 2. 读取 PV 站点并切片（测试模式）
    pv_df = load_pv_sites(PATHS['csv'], years=YEARS)
    if test_mode:
        pv_df = pv_df.iloc[:test_n].reset_index(drop=True)
        print(f"⚠️ 测试模式：仅前 {test_n} 条记录")

    # 3. 向量化抽取
    print("批量抽取特征 …")
    with ProgressBar():
        df_all = extract_all_features(ds_feat, ds_abandon, pv_df, YEARS)

    # 4. 去重 & 保存
    df_unique = (
        df_all
        .sort_values(['year','lat','lon'])
        .drop_duplicates(subset=['lat','lon'], keep='last')
        .reset_index(drop=True)
    )
    out_path = PATHS['test_output'] if test_mode else PATHS['output']
    os.makedirs(os.path.dirname(out_path) or '.', exist_ok=True)
    df_unique.to_csv(out_path, index=False)

    print(f"完成，结果保存到: {out_path}")
    print("结束时间:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


if __name__ == "__main__":
    main(test_mode=True)


开始处理... 2025-05-18 16:26:21


ValueError: unrecognized engine h5netcdf must be one of: ['netcdf4', 'scipy', 'rasterio', 'store']

# Process

## 1.1 Load

In [None]:

import pandas as pd
from function import *


abandon_2d_variable = [
    "current_abandonment",
    "recultivation", 
    "abandonment_duration",
    "abandonment_year"
]
fea_3d_variable = [
    'GDPpc',
    'GDPtot',
    'GURdist',
    'Population',
    'gdmp',
    'rsds',
    'tas',
    'wind'
]
fea_2d_variable = [
    'DEM',
    'Powerdist',
    'PrimaryRoad',
    'SecondaryRoad',
    'Slope',
    'TertiaryRoad'
]
PATHS = {
    'abandonment': "D:/xarray/abandonment_chunkall/*.nc",
    'feature':     "D:/xarray/aligned2/Feature_all/*.nc",
    'csv':         "data/aligned_for_training0519.csv",
}

YEARS = [2018, 2020]
time=['2018-01-01','2020-01-01']
# 2. 读取 PV 站点并切片（测试模式）
# 2. 读取 PV 站点并切片（测试模式）
pv_df = load_pv_sites(PATHS['csv'], years=YEARS)
# Convert lon and lat columns to float32

pv_df['lon'] = pv_df['lon'].astype('float32')
pv_df['lat'] = pv_df['lat'].astype('float32')
# Rename 'year' to 'time' and convert to datexxtime64
pv_df = pv_df.rename(columns={'year': 'time'})
pv_df['time'] = pd.to_datetime(pv_df['time'], format='%Y')


# 1. 打开并 rechunk
ds_abandon, ds_feat = load_datasets(
    PATHS['abandonment'], PATHS['feature']
)



ds_merge=xr.merge([ds_abandon, ds_feat])
# Convert coordinates to float32 while preserving other variables
ds_merge = ds_merge.assign_coords({
    'lon': ds_merge.lon.astype('float32'),
    'lat': ds_merge.lat.astype('float32')
})

# For variables without time dimension, expand them to have same value for all times
for var in ds_merge.data_vars:
    if 'time' not in ds_merge[var].dims:
        # Expand the variable to have time dimension with same values
        ds_merge[var] = ds_merge[var].expand_dims(time=ds_merge.time)


gogogo


  result = blockwise(
  result = blockwise(
  result = blockwise(
  result = blockwise(
  result = blockwise(
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  value = value[(slice(None),) * axis + (subkey,)]
  result = blockwise(
  result = blockwise(
  result = blockwise(
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  value = value[(slice(None),) * axis + (subkey,)]
  result = blockwise(
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.s

## 1.2 ZZZ

In [10]:
pv_df
pv_df['year'] = pd.to_datetime(pv_df['time']).dt.year
pv_df_dedup = (
    pv_df.sort_values('time')  # 可改为其他排序依据
         .drop_duplicates(subset=['year', 'lat', 'lon'], keep='last')  # 你也可以用 keep='first'
         .reset_index(drop=True)
)
pv_df_dedup

Unnamed: 0,unique_id,p_area,capacity_m,country,time,lon,lat,year
0,1,13592.361840,1.201704,GRC,2018-01-01,21.612499,38.112499,2018
1,45763,3005.453465,0.281647,CHN,2018-01-01,116.545830,33.420834,2018
2,45764,2134.049502,0.200225,CHN,2018-01-01,116.520836,32.987499,2018
3,45765,3429.546491,0.321102,CHN,2018-01-01,116.504166,34.495834,2018
4,45766,2231.352132,0.209629,CHN,2018-01-01,116.562500,32.287498,2018
...,...,...,...,...,...,...,...,...
90825,11758,40069.483340,2.684475,USA,2020-01-01,-78.079170,35.312500,2020
90826,11759,56.397704,0.000000,USA,2020-01-01,-79.037498,36.004166,2020
90827,11760,22.106298,0.000000,USA,2020-01-01,-79.054169,35.954166,2020
90828,11754,26182.946620,1.875714,USA,2020-01-01,-78.262497,36.420834,2020


## 1.3 SDFD

In [12]:
from tqdm.auto import tqdm
import numpy as np

def process_chunk(df, ds_merge, step,stop=0):

    # Get coordinates that exist in both df_temp and pv_df
    common_lats = np.intersect1d(ds_merge.lat.values, df['lat'].unique())
    common_lons = np.intersect1d(ds_merge.lon.values, df['lon'].unique())

    # Select data from df_temp using only the common coordinates
    ds_merge = ds_merge.sel(
        lat=common_lats,
        lon=common_lons,
    )

    #step = 500
    total_lat = len(ds_merge.lat)
    total_lon = len(ds_merge.lon)
    merged_dfs = []
    
    # Calculate total iterations for progress bar
    total_iterations = (total_lat // step + (1 if total_lat % step else 0)) * \
                    (total_lon // step + (1 if total_lon % step else 0))

    # Create progress bar with Chinese description
    pbar = tqdm(total=total_iterations, desc="处理数据块")

    # Iterate through all latitude and longitude points in chunks
    for start_lat in range(0, total_lat, step):
        end_lat = min(start_lat + step, total_lat)
        for start_lon in range(0, total_lon, step):
            end_lon = min(start_lon + step, total_lon)
            #print(1)
            # Extract data from ds_merge for each time point
            df = ds_merge.isel(
                lat=slice(start_lat, end_lat),
                lon=slice(start_lon, end_lon)
            ).compute().to_dataframe()
            
            # Reset index to convert multi-index to columns
            df = df.reset_index()
            
            # Merge with pv_df based on lat/lon coordinates
            chunk_merged = pd.merge(df, pv_df_dedup, on=['lat','lon','time'], how='inner')
            merged_dfs.append(chunk_merged)
            
            # Update progress bar
            pbar.update(1)
            if stop==1:
                break
    # Close progress bar
    pbar.close()
    # Combine all chunks into final dataframe
    merged_df = pd.concat(merged_dfs, ignore_index=True)
    return merged_df


# Iterate through each variable
# Get all variables from ds_merge
all_vars = list(ds_merge.data_vars)
merged_dfs = []
# Iterate through each variable
for i, var in enumerate(all_vars):
    print(f"Processing variable {i+1} of {len(all_vars)}: {var}")
    # Check if variable has time dimension
    df_temp = ds_merge[var].sel(time=['2018-01-01','2020-01-01'])

    # Process the chunk and merge with pv_df
    merged_df = process_chunk(pv_df_dedup, df_temp, step=3000)

    merged_dfs.append(merged_df)

# Concatenate all merged dataframes with single-column retention

final_merged_df = merged_dfs[0]
for df in tqdm(merged_dfs[1:], desc="合并数据"):

    final_merged_df = pd.merge(
        final_merged_df,
        df,
        on=["time", "lon", "lat"],
        how="inner",
        suffixes=("", "_drop")
    )
    # Drop duplicated columns with "_drop" suffix
    final_merged_df = final_merged_df.loc[:, ~final_merged_df.columns.str.endswith("_drop")]



Processing variable 1 of 19: abandonment_year


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 2 of 19: abandonment_duration


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 3 of 19: recultivation


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 4 of 19: current_abandonment


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 5 of 19: landcover


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 6 of 19: DEM


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 7 of 19: GDPpc


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 8 of 19: GDPtot


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 9 of 19: GURdist


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 10 of 19: Population


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 11 of 19: Powerdist


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 12 of 19: PrimaryRoad


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 13 of 19: SecondaryRoad


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 14 of 19: Slope


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 15 of 19: TertiaryRoad


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 16 of 19: gdmp


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 17 of 19: rsds


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 18 of 19: tas


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

Processing variable 19 of 19: wind


处理数据块:   0%|          | 0/18 [00:00<?, ?it/s]

合并数据:   0%|          | 0/18 [00:00<?, ?it/s]

In [4]:
import dask.dataframe as dd

# 将每个 pandas.DataFrame 转为 Dask DataFrame（默认划分成 4 个分区）
ddfs = [dd.from_pandas(df, npartitions=4) for df in merged_dfs]

# 初始化为第一个
merged_ddf = ddfs[0]

# 逐个合并后续 DataFrames
for i, ddf in enumerate(ddfs[1:], start=1):
    print(f"Merging variable {i+1} of {len(ddfs)}")
    merged_ddf = merged_ddf.merge(ddf, on=["time", "lon", "lat"], how="inner", suffixes=("", "_drop"))
    # 删除带 "_drop" 后缀的重复列
    merged_ddf = merged_ddf.loc[:, [col for col in merged_ddf.columns if not col.endswith("_drop")]]


output_path = "D:/xarray/aligned2/temp/final_merged_*.csv"
merged_ddf.to_csv(output_path, index=False, single_file=False)
print("✅ Final Dask-merged CSVs saved.")


Merging variable 2 of 6
Merging variable 3 of 6
Merging variable 4 of 6
Merging variable 5 of 6
Merging variable 6 of 6
✅ Final Dask-merged CSVs saved.


In [None]:
import os
import pandas as pd

def write_each_variable_parquet(merged_dfs, save_dir, base_cols=["time", "lat", "lon"]):
    os.makedirs(save_dir, exist_ok=True)
    for i, df in enumerate(merged_dfs):
        var_cols = [c for c in df.columns if c not in base_cols and c not in ['band', 'spatial_ref']]
        if len(var_cols) != 1:
            print(f"❌ Warning: variable {i} has unexpected columns: {var_cols}")
            continue
        var_name = var_cols[0]
        out_df = df[base_cols + [var_name]]
        out_path = os.path.join(save_dir, f"{var_name}.parquet")
        out_df.to_parquet(out_path, index=False)
        print(f"✅ Saved {var_name}.parquet")


import duckdb
import glob

def join_all_parquet_to_csv(parquet_dir, output_csv):
    files = sorted(glob.glob(f"{parquet_dir}/*.parquet"))
    
    # 构造 DuckDB SQL JOIN 查询
    base = f"read_parquet('{files[0]}')"
    for f in files[1:]:
        base = f"({base}) JOIN read_parquet('{f}') USING (time, lat, lon)"
    
    # 执行 SQL，并写出 CSV（或 Parquet）
    con = duckdb.connect()
    con.execute(f"COPY ({base}) TO '{output_csv}' (HEADER, DELIMITER ',')")
    con.close()
    print(f"🎯 Final merged training table saved at: {output_csv}")


# 第一步：保存每个变量
write_each_variable_parquet(merged_dfs, save_dir="D:/xarray/aligned2/vars")

# 第二步：用 DuckDB 整合为训练表
join_all_parquet_to_csv(
    parquet_dir="D:/xarray/aligned2/vars",
    output_csv="D:/xarray/aligned2/final_training.csv"
)


ModuleNotFoundError: No module named 'duckdb'

In [None]:
save_dir=r"D:\xarray\aligned2\temp_dfs"
all_files = sorted([f for f in os.listdir(save_dir) if f.endswith(".parquet") and f.startswith("df_")])
unmerge_df=[]



for i, file in enumerate(all_files):
    df_path = os.path.join(save_dir, file)
    df=pd.read_parquet(df_path)
    unmerge_df.append(df)

    if i==5:
        break


len(unmerge_df)
unmerge_df[1]



Unnamed: 0,time,lat,lon,spatial_ref,band,abandonment_duration,unique_id,p_area,capacity_m,country
0,2018-01-01,-13.837500,-171.804169,0,1,,26510,87614.296875,7.850205,WSM
1,2018-01-01,-13.829166,-171.995834,0,1,,26500,10696.658203,0.956408,WSM
2,2018-01-01,-13.829166,-171.987503,0,1,,26501,66982.132812,5.989546,WSM
3,2020-01-01,-29.245832,-177.929169,0,1,,19077,385.150726,0.000000,NZL
4,2020-01-01,-22.645834,-152.795837,0,1,,34745,615.614868,0.000000,PYF
...,...,...,...,...,...,...,...,...,...,...
103912,2020-01-01,55.712502,37.462502,0,1,,32467,75.500473,0.000000,RUS
103913,2020-01-01,55.737499,37.579166,0,1,,9832,281.253357,0.000000,RUS
103914,2020-01-01,55.820835,37.704166,0,1,,32288,44.549313,0.000000,RUS
103915,2020-01-01,55.979168,37.262501,0,1,,17828,538.808228,0.000000,RUS


In [13]:
final_merged_df

Unnamed: 0,time,lat,lon,spatial_ref,band,abandonment_year,unique_id,p_area,capacity_m,country,...,Population,Powerdist,PrimaryRoad,SecondaryRoad,Slope,TertiaryRoad,gdmp,rsds,tas,wind
0,2018-01-01,-13.837500,-171.804169,0,1,,26510,87614.298290,7.850205,WSM,...,646.119934,622589.0000,12.719680,0.000000,2.521324,197.039190,,19.063463,2995.384528,2.293710
1,2018-01-01,-13.829166,-171.995834,0,1,,26500,10696.657940,0.956408,WSM,...,138.296951,622588.9375,14.276298,0.000000,0.715136,100.218651,,18.820724,2999.923652,2.770978
2,2018-01-01,-13.829166,-171.987503,0,1,,26501,66982.132520,5.989546,WSM,...,138.296951,622588.9375,14.276298,0.000000,0.922556,100.218651,,18.825073,2999.838979,2.721136
3,2018-01-01,-4.387500,-79.812500,0,1,,10893,16680.699130,1.511097,ECU,...,15.645079,622609.3750,18.676649,24.508936,6.674829,43.411486,167.423999,18.745583,2969.209102,0.836408
4,2018-01-01,-4.254167,-79.470833,0,1,,10880,34065.381000,3.038932,ECU,...,37.699821,622609.3750,6.512673,18.597032,7.650049,89.491292,128.872533,17.977419,2920.696152,3.776348
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
90816,2020-01-01,55.712502,37.462502,0,1,,32467,75.500476,0.000000,RUS,...,14468.128906,622589.8125,62.451511,112.277082,1.968263,861.205086,,9.967303,2804.318214,2.684799
90817,2020-01-01,55.737499,37.579166,0,1,,9832,281.253369,0.000000,RUS,...,12943.362305,622589.8125,118.298899,140.942360,0.393874,1132.096021,,9.992375,2804.168017,2.531498
90818,2020-01-01,55.820835,37.704166,0,1,,32288,44.549314,0.000000,RUS,...,16157.847656,622589.8125,38.558650,73.424121,0.412605,642.344304,,9.950527,2803.408293,2.519807
90819,2020-01-01,55.979168,37.262501,0,1,,17828,538.808249,0.000000,RUS,...,4346.128418,622589.5000,63.311948,5.627337,0.978244,316.949814,,9.815846,2800.570781,2.697752


## 1.4 Save

In [15]:
 # Remove spatial_ref and band columns
final_merged_df = final_merged_df.drop(['spatial_ref', 'band'], axis=1)

# Save to CSV
final_merged_df.to_csv('training_embedding.csv', index=False)




KeyboardInterrupt



备注：这里经过了年内去重操作、

In [18]:
final_merged_df

Unnamed: 0,time,lat,lon,abandonment_year,unique_id,p_area,capacity_m,country,year,abandonment_duration,...,Population,Powerdist,PrimaryRoad,SecondaryRoad,Slope,TertiaryRoad,gdmp,rsds,tas,wind
0,2018-01-01,-13.837500,-171.804169,,26510,87614.298290,7.850205,WSM,2018,,...,646.119934,622589.0000,12.719680,0.000000,2.521324,197.039190,,19.063463,2995.384528,2.293710
1,2018-01-01,-13.829166,-171.995834,,26500,10696.657940,0.956408,WSM,2018,,...,138.296951,622588.9375,14.276298,0.000000,0.715136,100.218651,,18.820724,2999.923652,2.770978
2,2018-01-01,-13.829166,-171.987503,,26501,66982.132520,5.989546,WSM,2018,,...,138.296951,622588.9375,14.276298,0.000000,0.922556,100.218651,,18.825073,2999.838979,2.721136
3,2018-01-01,-4.387500,-79.812500,,10893,16680.699130,1.511097,ECU,2018,,...,15.645079,622609.3750,18.676649,24.508936,6.674829,43.411486,167.423999,18.745583,2969.209102,0.836408
4,2018-01-01,-4.254167,-79.470833,,10880,34065.381000,3.038932,ECU,2018,,...,37.699821,622609.3750,6.512673,18.597032,7.650049,89.491292,128.872533,17.977419,2920.696152,3.776348
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
90816,2020-01-01,55.712502,37.462502,,32467,75.500476,0.000000,RUS,2020,,...,14468.128906,622589.8125,62.451511,112.277082,1.968263,861.205086,,9.967303,2804.318214,2.684799
90817,2020-01-01,55.737499,37.579166,,9832,281.253369,0.000000,RUS,2020,,...,12943.362305,622589.8125,118.298899,140.942360,0.393874,1132.096021,,9.992375,2804.168017,2.531498
90818,2020-01-01,55.820835,37.704166,,32288,44.549314,0.000000,RUS,2020,,...,16157.847656,622589.8125,38.558650,73.424121,0.412605,642.344304,,9.950527,2803.408293,2.519807
90819,2020-01-01,55.979168,37.262501,,17828,538.808249,0.000000,RUS,2020,,...,4346.128418,622589.5000,63.311948,5.627337,0.978244,316.949814,,9.815846,2800.570781,2.697752


In [16]:
missing_keys = pd.merge(pv_df_dedup, final_merged_df, on=['lat', 'lon','time'], how='left', indicator=True)
missing = missing_keys[missing_keys['_merge'] == 'left_only']
missing

Unnamed: 0,unique_id_x,p_area_x,capacity_m_x,country_x,time,lon,lat,year_x,abandonment_year,unique_id_y,...,Powerdist,PrimaryRoad,SecondaryRoad,Slope,TertiaryRoad,gdmp,rsds,tas,wind,_merge
18639,67752,8082.070784,0.731052,GBR,2018-01-01,0.004167,53.487499,2018,,,...,,,,,,,,,,left_only
18677,67834,2716.929277,0.245711,GBR,2018-01-01,0.004167,53.412498,2018,,,...,,,,,,,,,,left_only
18883,68009,2457.466166,0.222334,GBR,2018-01-01,0.004167,53.4375,2018,,,...,,,,,,,,,,left_only
20724,65832,426831.0569,31.896933,GBR,2018-01-01,0.004167,52.0625,2018,,,...,,,,,,,,,,left_only
26806,56002,20156.43269,1.762364,ESP,2018-01-01,0.004167,38.8125,2018,,,...,,,,,,,,,,left_only
73947,5807,4218.800539,0.0,GBR,2020-01-01,0.004167,53.362499,2020,,,...,,,,,,,,,,left_only
75772,5171,5906.320755,0.0,ESP,2020-01-01,0.004167,38.8125,2020,,,...,,,,,,,,,,left_only
77505,7497,562.506739,0.0,GBR,2020-01-01,0.004167,52.0625,2020,,,...,,,,,,,,,,left_only
85144,17155,345.945594,0.0,GBR,2020-01-01,0.004167,52.079166,2020,,,...,,,,,,,,,,left_only


# Load

In [None]:
# float32, 