In [1]:
import pandas as pd
import re
from tqdm import tqdm
import os
import numpy as np
import xarray as xr
import traceback # 用于打印详细错误信息

# 定义根目录
root_dir = 'D:/wind/zhejiang_all_less/'

# 给定的站点 ID 列表
station_ids_list = [ # Renamed to avoid conflict with loop variable
    "58448", "58450",  "58452", "58459",
    "58467", "58468", "58537", "58542", "58543", "58548",
    "58553","58557","58565","58566","58643","58646","58751",
    "58755"
]

# --- 新增：定义物理常数和目标气压层 ---
P0_STANDARD_HPA = 1013.25  # 标准海平面气压 (hPa)
H_SCALE_METERS = 8000      # 标高 (m)，用于气压高度公式
TARGET_PRESSURES_HPA = np.array([1000, 925, 850, 700, 600, 500, 400]) # 目标气压层 (hPa)

# --- 新增：气压-高度转换函数 ---
def height_to_pressure(height, P0=P0_STANDARD_HPA, H_scale=H_SCALE_METERS):
    """将高度 (米) 转换为气压 (hPa)"""
    pressure = P0 * np.exp(-np.array(height, dtype=float) / H_scale)
    return pressure

def pressure_to_height(pressure, P0=P0_STANDARD_HPA, H_scale=H_SCALE_METERS):
    """将气压 (hPa) 转换为高度 (米)"""
    pressure_arr = np.array(pressure, dtype=float)
    # 对气压值进行裁剪以避免log计算错误，并处理P > P0的情况 (高度为负)
    pressure_clipped = np.clip(pressure_arr, 1e-5, P0 * 2) # 允许一定范围的外插
    height = -H_scale * np.log(pressure_clipped / P0)
    return height
# --- 新增结束 ---

all_heights_set = set()
dataframes_raw = {} # 存储每个站点的原始 DataFrame
station_actual_time_ranges = {} # 存储每个站点实际的 (min_time, max_time)

print("修改后的第一阶段：收集高度、读取数据并确定各文件实际时间范围...")
global_latest_start_time = None
global_earliest_end_time = None
valid_files_count_for_time_range = 0

for station_name_loopvar in tqdm(station_ids_list, desc="站点文件读取与时间范围分析"):
    station_path = os.path.join(root_dir, station_name_loopvar)
    if os.path.isdir(station_path) and os.listdir(station_path):
        filename = os.path.join(station_path, '20230601_20250721.txt')
        if os.path.isfile(filename):
            datas = []
            try:
                with open(filename, 'r', encoding='utf-8') as files:
                    try:
                        next(files) # 跳过表头第一行
                        next(files) # 跳过表头第二行
                    except StopIteration:
                        print(f"警告：文件 {filename} 为空或表头不足两行，跳过。")
                        continue
                    for line_number, line in enumerate(files, start=3):
                        try:
                            line = line.strip()
                            if not line: continue
                            parts = re.split(r'\s+', line)
                            if len(parts) < 12: continue # 确保行数据足够
                            datetime_str = parts[1] + ' ' + parts[2]
                            direction_h = float(parts[5])
                            speed_h = float(parts[6])
                            height = int(parts[7])
                            speed_v = float(parts[8])
                            reliability_h = int(parts[9])
                            reliability_v = int(parts[10])
                            all_heights_set.add(height)
                            datas.append({
                                'Datetime_str': datetime_str,
                                'DIRECTION_H': direction_h, 'SPEED_H': speed_h, 'HEIGHT': height,
                                'SPEED_V': speed_v, 'RELIABILITY_H': reliability_h, 'RELIABILITY_V': reliability_v,
                            })
                        except (ValueError, IndexError) : continue # 跳过格式错误的行
                        except Exception as e_line:
                            print(f"错误：处理文件 {filename} 第 {line_number} 行时发生未知错误: {e_line}，跳过此行。")
            except Exception as e_file:
                print(f"错误：无法打开或读取文件 {filename}: {e_file}")
                continue
            if datas:
                df_temp = pd.DataFrame(datas)
                try:
                    df_temp['Datetime'] = pd.to_datetime(df_temp['Datetime_str'], errors='coerce')
                    df_temp.dropna(subset=['Datetime'], inplace=True) # 移除无法解析的日期
                    df_temp.drop(columns=['Datetime_str'], inplace=True)
                    if not df_temp.empty:
                        dataframes_raw[station_name_loopvar] = df_temp
                        current_min_time = df_temp['Datetime'].min()
                        current_max_time = df_temp['Datetime'].max()
                        station_actual_time_ranges[station_name_loopvar] = (current_min_time, current_max_time)
                        if global_latest_start_time is None:
                            global_latest_start_time = current_min_time
                            global_earliest_end_time = current_max_time
                        else:
                            global_latest_start_time = max(global_latest_start_time, current_min_time)
                            global_earliest_end_time = min(global_earliest_end_time, current_max_time)
                        valid_files_count_for_time_range += 1
                    else:
                        print(f"信息：站点 {station_name_loopvar} 文件 {filename} 解析日期后无有效数据。")
                except Exception as e_df:
                    print(f"错误：转换站点 {station_name_loopvar} 的 Datetime 列或计算其时间范围失败: {e_df}")
            else:
                print(f"信息：站点 {station_name_loopvar} 文件 {filename} 未读取到有效数据行。")
        else:
            print(f"警告：文件 {filename} 不存在。")
    else:
        print(f"警告：目录 {station_path} 不存在或为空。")

if not dataframes_raw:
    print("错误：未能从任何文件中读取到有效数据，程序终止。")
    exit()
if not all_heights_set:
    print("错误：未能从任何文件中收集到高度信息，程序终止。")
    exit()

print(f"\n第一阶段完成：共处理了 {valid_files_count_for_time_range} 个包含有效时间数据的文件。")

if global_latest_start_time is None or global_earliest_end_time is None or global_latest_start_time >= global_earliest_end_time:
    print("错误：未能找到所有有效文件的公共时间段，或公共时间段无效。程序终止。")
    print(f"  计算出的最晚开始时间 (global_latest_start_time): {global_latest_start_time}")
    print(f"  计算出的最早结束时间 (global_earliest_end_time): {global_earliest_end_time}")
    exit()

print(f"\n所有有效文件的最小公共时间段为:")
print(f"  开始 (Latest Start): {global_latest_start_time}")
print(f"  结束 (Earliest End): {global_earliest_end_time}")

try:
    common_period_start_aligned = global_latest_start_time.ceil('6min')
    common_period_end_aligned = global_earliest_end_time.floor('6min')
except AttributeError as e_align:
    print(f"错误：对齐公共时间段边界时出错。可能是时间戳对象类型不正确。{e_align}")
    print(f"  global_latest_start_time: {global_latest_start_time} (type: {type(global_latest_start_time)})")
    print(f"  global_earliest_end_time: {global_earliest_end_time} (type: {type(global_earliest_end_time)})")
    exit()

if common_period_start_aligned >= common_period_end_aligned:
    print(f"错误：对齐后的公共时间段无效。开始: {common_period_start_aligned}, 结束: {common_period_end_aligned}。程序终止。")
    exit()

common_target_time_6min_freq = pd.date_range(start=common_period_start_aligned, end=common_period_end_aligned, freq='6T')
if common_target_time_6min_freq.empty:
    print(f"错误：基于公共时间段生成的6分钟目标时间序列为空。对齐后开始: {common_period_start_aligned}, 结束: {common_period_end_aligned}。程序终止。")
    exit()

print(f"基于公共时间段和6分钟频率生成的目标时间序列:")
print(f"  开始: {common_target_time_6min_freq.min()}")
print(f"  结束: {common_target_time_6min_freq.max()}")
print(f"  点数: {len(common_target_time_6min_freq)}")

global_unique_heights = sorted(list(all_heights_set))
print(f"\n收集到的全局唯一高度值 (共 {len(global_unique_heights)} 个): {global_unique_heights}")

print("\n修改后的第二阶段：基于公共时间段（6分钟频率）处理数据并生成NetCDF文件...")
output_nc_dir_final = root_dir # 输出目录保持不变
os.makedirs(output_nc_dir_final, exist_ok=True)

for station_name_loopvar_p2 in tqdm(station_ids_list, desc="站点数据处理与NetCDF生成"):
    if station_name_loopvar_p2 not in dataframes_raw:
        print(f"信息：站点 {station_name_loopvar_p2} 无原始数据，跳过。")
        continue
    df_station_current_original = dataframes_raw[station_name_loopvar_p2]
    if df_station_current_original.empty:
        print(f"信息：站点 {station_name_loopvar_p2} 原始数据为空，跳过。")
        continue

    df_station_in_common_period = df_station_current_original[
        (df_station_current_original['Datetime'] >= common_period_start_aligned) &
        (df_station_current_original['Datetime'] <= common_period_end_aligned)
    ].copy()

    if df_station_in_common_period.empty:
        print(f"信息：站点 {station_name_loopvar_p2} 在公共时间段 {common_period_start_aligned} 到 {common_period_end_aligned} 内无数据，跳过。")
        continue

    try:
        df_station_no_duplicates_p2 = df_station_in_common_period.drop_duplicates(subset=['Datetime', 'HEIGHT'], keep='first').copy()
        dfs_pivot_p2 = df_station_no_duplicates_p2.pivot_table(
            index='Datetime',
            columns='HEIGHT',
            values=['DIRECTION_H', 'RELIABILITY_H', 'RELIABILITY_V', 'SPEED_H', 'SPEED_V']
        )
    except Exception as e_pivot:
        print(f"错误：为站点 {station_name_loopvar_p2} 创建 pivot table 失败: {e_pivot}")
        traceback.print_exc()
        continue

    if dfs_pivot_p2.empty:
        print(f"信息：站点 {station_name_loopvar_p2} 的 pivot_table 为空，跳过。")
        continue

    dfs_pivot_p2.replace(9999, np.nan, inplace=True) # 将9999替换为NaN

    if isinstance(dfs_pivot_p2.columns, pd.MultiIndex) and len(dfs_pivot_p2.columns.levels) > 1:
        heights_in_pivot = dfs_pivot_p2.columns.levels[1].unique().astype(int)
    else: # pivot table 列结构非预期，可能因为只有一个高度层或变量
        print(f"警告: 站点 {station_name_loopvar_p2} 的 pivot_table 列结构非预期，尝试使用全局高度。")
        # 尝试从单层索引中提取高度（如果适用）
        try:
            # 如果列名是元组 (var, height)，但不是MultiIndex，尝试解析
            parsed_heights = set()
            for col in dfs_pivot_p2.columns:
                if isinstance(col, tuple) and len(col) == 2:
                    parsed_heights.add(col[1])
            if parsed_heights:
                heights_in_pivot = sorted(list(parsed_heights))
            else: # 否则回退到全局高度
                heights_in_pivot = global_unique_heights
        except:
             heights_in_pivot = global_unique_heights


    data_vars_xr = {}
    expected_vars_list = ['DIRECTION_H', 'RELIABILITY_H', 'RELIABILITY_V', 'SPEED_H', 'SPEED_V']
    datetime_coords_for_xr = dfs_pivot_p2.index

    for var_name_xr in expected_vars_list:
        try:
            # 构建 (len(Datetime), len(heights_in_pivot)) 的数组
            var_data_values_xr = np.full((len(datetime_coords_for_xr), len(heights_in_pivot)), np.nan)
            for i, h_val in enumerate(heights_in_pivot):
                if (var_name_xr, h_val) in dfs_pivot_p2.columns:
                    var_data_values_xr[:, i] = dfs_pivot_p2[(var_name_xr, h_val)].values
            data_vars_xr[var_name_xr] = (['Datetime', 'HEIGHT_pivot'], var_data_values_xr)
        except KeyError: # 如果某个变量在pivot表中完全不存在
             data_vars_xr[var_name_xr] = (['Datetime', 'HEIGHT_pivot'], np.full((len(datetime_coords_for_xr), len(heights_in_pivot)), np.nan))
        except Exception as e_prepare_var:
            print(f"错误：为站点 {station_name_loopvar_p2} 准备变量 {var_name_xr} 时出错: {e_prepare_var}")
            data_vars_xr[var_name_xr] = (['Datetime', 'HEIGHT_pivot'], np.full((len(datetime_coords_for_xr), len(heights_in_pivot)), np.nan))


    dss_initial = xr.Dataset(
        data_vars_xr,
        coords={
            'Datetime': ('Datetime', datetime_coords_for_xr.values),
            'HEIGHT_pivot': ('HEIGHT_pivot', heights_in_pivot)
        }
    )

    dss_reindexed_h_p2 = dss_initial.reindex({'HEIGHT_pivot': global_unique_heights}).rename({'HEIGHT_pivot': 'HEIGHT'})
    dss_filled_orig_time_p2 = dss_reindexed_h_p2.ffill(dim='Datetime', limit=None).bfill(dim='Datetime', limit=None)

    dss_interp_height_p2 = dss_filled_orig_time_p2.copy(deep=True)
    for var_xr in list(dss_interp_height_p2.data_vars):
        dss_interp_height_p2[var_xr] = dss_interp_height_p2[var_xr].interpolate_na(dim='HEIGHT', method='linear', limit=None)
        dss_interp_height_p2[var_xr] = dss_interp_height_p2[var_xr].ffill(dim='HEIGHT').bfill(dim='HEIGHT')

    dss_with_uvw_p2 = dss_interp_height_p2.copy(deep=True)
    if 'DIRECTION_H' in dss_interp_height_p2 and 'SPEED_H' in dss_interp_height_p2 and 'SPEED_V' in dss_interp_height_p2:
        direction_rad_p2 = np.deg2rad(dss_interp_height_p2['DIRECTION_H'])
        speed_h_data_p2 = dss_interp_height_p2['SPEED_H']
        U_wind_p2 = -speed_h_data_p2 * np.sin(direction_rad_p2)
        V_wind_p2 = -speed_h_data_p2 * np.cos(direction_rad_p2)
        dss_with_uvw_p2 = dss_interp_height_p2.assign(
            U=(['Datetime', 'HEIGHT'], U_wind_p2.data),
            V=(['Datetime', 'HEIGHT'], V_wind_p2.data),
            W=(['Datetime', 'HEIGHT'], dss_interp_height_p2['SPEED_V'].data) # SPEED_V is already W
        )
    else:
        print(f"  警告: 站点 {station_name_loopvar_p2} 缺少计算U,V,W所需变量，U,V,W将不被计算。")

    target_time_index_for_reindex = pd.DatetimeIndex(common_target_time_6min_freq)
    dss_time_aligned_p2 = dss_with_uvw_p2.reindex({'Datetime': target_time_index_for_reindex})
    dss_time_interp_p2 = dss_time_aligned_p2.interpolate_na(dim='Datetime', method='linear', limit=None)

    dss_filled_along_time_p2 = dss_time_interp_p2.ffill(dim='Datetime', limit=None).bfill(dim='Datetime', limit=None)
    dss_final_filled_p2 = dss_filled_along_time_p2.ffill(dim='HEIGHT', limit=None).bfill(dim='HEIGHT', limit=None)

    # ----- 修改开始: 高度到气压层的插值 -----
    dss_to_save = None # Initialize dss_to_save
    print(f"  对站点 {station_name_loopvar_p2} 进行高度到气压层的插值到 {TARGET_PRESSURES_HPA} hPa...")
    if 'HEIGHT' not in dss_final_filled_p2.coords or not hasattr(dss_final_filled_p2['HEIGHT'], 'values') or len(dss_final_filled_p2['HEIGHT'].values) == 0:
        print(f"  警告: 站点 {station_name_loopvar_p2} 的 dss_final_filled_p2 中缺少有效 'HEIGHT' 坐标，跳过气压插值。")
        dss_to_save = dss_final_filled_p2 # 保存未经气压插值的数据
    else:
        try:
            current_target_pressures_hpa = TARGET_PRESSURES_HPA

            # 1. 计算目标气压层对应的高度值
            # target_heights_m 的顺序将对应 current_target_pressures_hpa 的顺序
            target_heights_m = pressure_to_height(current_target_pressures_hpa, P0=P0_STANDARD_HPA, H_scale=H_SCALE_METERS)

            if np.any(~np.isfinite(target_heights_m)):
                print(f"  警告: 站点 {station_name_loopvar_p2} 计算目标高度时出现非有限值 for pressures {current_target_pressures_hpa}. Heights: {target_heights_m}. 跳过气压插值。")
                dss_to_save = dss_final_filled_p2
            else:
                # 2. 将数据集插值到这些新的高度值
                # dss_final_filled_p2.HEIGHT 应该是单调递增的
                dss_interp_at_equiv_heights = dss_final_filled_p2.interp(
                    HEIGHT=target_heights_m, # interp的HEIGHT参数应为目标高度值
                    method="linear",
                    kwargs={"fill_value": np.nan} # 超出范围的值用NaN填充
                )
                # 此时，dss_interp_at_equiv_heights 的 'HEIGHT' 坐标的值是 target_heights_m

                # 3. 重命名维度并将坐标值替换为目标气压值
                dss_renamed_dim = dss_interp_at_equiv_heights.rename({'HEIGHT': 'PRESS'})
                
                # 将PRESS坐标的值赋为我们期望的目标气压层
                dss_on_pressure = dss_renamed_dim.assign_coords(
                    PRESS=('PRESS', current_target_pressures_hpa)
                )

                # 4. 填充插值后可能产生的NaN值
                for var_name_final in dss_on_pressure.data_vars:
                    # 沿新的 PRESS 维度填充
                    dss_on_pressure[var_name_final] = dss_on_pressure[var_name_final].ffill(dim='PRESS').bfill(dim='PRESS')
                    # （可选）再次沿时间维度填充，以防万一
                    dss_on_pressure[var_name_final] = dss_on_pressure[var_name_final].ffill(dim='Datetime').bfill(dim='Datetime')
                
                dss_to_save = dss_on_pressure
                print(f"  站点 {station_name_loopvar_p2} 数据已插值到气压层。新的维度 'PRESS' (hPa): {dss_to_save.PRESS.values}")

        except Exception as e_interp_press:
            print(f"错误：站点 {station_name_loopvar_p2} 在插值到气压层时失败: {e_interp_press}")
            traceback.print_exc()
            print(f"  将保存未经气压插值的数据。")
            dss_to_save = dss_final_filled_p2 # Fallback
    # ----- 修改结束 -----

    # 更新输出文件名并保存 dss_to_save
    if dss_to_save is None or dss_to_save.equals(dss_final_filled_p2) :
        filename_nc_output = os.path.join(output_nc_dir_final, f"{station_name_loopvar_p2}_height_coords_filled.nc")
    else: # 插值成功
        filename_nc_output = os.path.join(output_nc_dir_final, f"{station_name_loopvar_p2}_pressure_coords_filled.nc")

    try:
        if 'Datetime' in dss_to_save.coords:
             dss_to_save['Datetime'].attrs['long_name'] = 'time'
             dss_to_save['Datetime'].attrs['standard_name'] = 'time'
        
        # 为坐标添加属性
        if 'PRESS' in dss_to_save.coords:
            dss_to_save['PRESS'].attrs['units'] = 'hPa'
            dss_to_save['PRESS'].attrs['long_name'] = 'Pressure Level'
            dss_to_save['PRESS'].attrs['standard_name'] = 'air_pressure'
            dss_to_save['PRESS'].attrs['positive'] = 'down' # 气压向下增加
        elif 'HEIGHT' in dss_to_save.coords: # 如果仍然是高度坐标
             dss_to_save['HEIGHT'].attrs['units'] = 'm'
             dss_to_save['HEIGHT'].attrs['long_name'] = 'Height above ground level' # 假设是AGL
             dss_to_save['HEIGHT'].attrs['standard_name'] = 'height'
             dss_to_save['HEIGHT'].attrs['positive'] = 'up'   # 高度向上增加

        encoding_dict = {var: {'zlib': True, 'complevel': 5} for var in dss_to_save.data_vars}
        dss_to_save.to_netcdf(filename_nc_output, encoding=encoding_dict)
        print(f"  成功保存NetCDF文件: {filename_nc_output}")
    except Exception as e_save:
        print(f"错误：保存NetCDF文件 {filename_nc_output} 失败: {e_save}")
        traceback.print_exc()

print(f"\n所有站点个体NetCDF文件处理完毕，保存在: {output_nc_dir_final}")

修改后的第一阶段：收集高度、读取数据并确定各文件实际时间范围...


站点文件读取与时间范围分析: 100%|██████████████████████████████████████████████████████| 18/18 [11:27<00:00, 38.20s/it]
  common_target_time_6min_freq = pd.date_range(start=common_period_start_aligned, end=common_period_end_aligned, freq='6T')



第一阶段完成：共处理了 18 个包含有效时间数据的文件。

所有有效文件的最小公共时间段为:
  开始 (Latest Start): 2024-07-29 09:30:00
  结束 (Earliest End): 2025-07-21 23:54:00
基于公共时间段和6分钟频率生成的目标时间序列:
  开始: 2024-07-29 09:30:00
  结束: 2025-07-21 23:54:00
  点数: 85825

收集到的全局唯一高度值 (共 308 个): [60, 100, 120, 150, 180, 240, 270, 300, 340, 360, 390, 420, 480, 510, 540, 580, 600, 630, 720, 750, 820, 840, 870, 960, 990, 1060, 1080, 1110, 1200, 1230, 1300, 1320, 1350, 1440, 1470, 1540, 1560, 1590, 1680, 1710, 1780, 1800, 1830, 1920, 1950, 2020, 2040, 2070, 2160, 2190, 2260, 2280, 2310, 2400, 2430, 2500, 2520, 2550, 2640, 2670, 2740, 2760, 2790, 2880, 2910, 2980, 3000, 3030, 3120, 3150, 3220, 3240, 3270, 3360, 3390, 3460, 3480, 3510, 3600, 3630, 3700, 3720, 3750, 3840, 3870, 3940, 3960, 3990, 4000, 4080, 4110, 4180, 4200, 4230, 4240, 4320, 4350, 4420, 4440, 4470, 4480, 4560, 4590, 4660, 4680, 4710, 4720, 4800, 4830, 4900, 4920, 4950, 4960, 5040, 5070, 5140, 5160, 5190, 5200, 5280, 5310, 5380, 5400, 5430, 5440, 5520, 5550, 5620, 5640, 5670, 568

站点数据处理与NetCDF生成:   0%|                                                                 | 0/18 [00:00<?, ?it/s]

  对站点 58448 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58448 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:   6%|███▏                                                     | 1/18 [00:34<09:43, 34.33s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58448_pressure_coords_filled.nc
  对站点 58450 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58450 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  11%|██████▎                                                  | 2/18 [01:03<08:21, 31.35s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58450_pressure_coords_filled.nc
  对站点 58452 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58452 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  17%|█████████▌                                               | 3/18 [01:29<07:15, 29.03s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58452_pressure_coords_filled.nc
  对站点 58459 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58459 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  22%|████████████▋                                            | 4/18 [01:55<06:24, 27.50s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58459_pressure_coords_filled.nc
  对站点 58467 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58467 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  28%|███████████████▊                                         | 5/18 [02:19<05:41, 26.25s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58467_pressure_coords_filled.nc
  对站点 58468 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58468 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  33%|███████████████████                                      | 6/18 [02:42<05:05, 25.43s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58468_pressure_coords_filled.nc
  对站点 58537 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58537 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  39%|██████████████████████▏                                  | 7/18 [03:07<04:37, 25.21s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58537_pressure_coords_filled.nc
  对站点 58542 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58542 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  44%|█████████████████████████▎                               | 8/18 [03:28<03:59, 23.95s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58542_pressure_coords_filled.nc
  对站点 58543 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58543 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  50%|████████████████████████████▌                            | 9/18 [03:54<03:41, 24.59s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58543_pressure_coords_filled.nc
  对站点 58548 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58548 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  56%|███████████████████████████████                         | 10/18 [04:19<03:16, 24.53s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58548_pressure_coords_filled.nc
  对站点 58553 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58553 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  61%|██████████████████████████████████▏                     | 11/18 [04:41<02:46, 23.81s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58553_pressure_coords_filled.nc
  对站点 58557 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58557 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  67%|█████████████████████████████████████▎                  | 12/18 [05:05<02:22, 23.81s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58557_pressure_coords_filled.nc
  对站点 58565 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58565 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  72%|████████████████████████████████████████▍               | 13/18 [05:29<01:59, 23.88s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58565_pressure_coords_filled.nc
  对站点 58566 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58566 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  78%|███████████████████████████████████████████▌            | 14/18 [05:52<01:34, 23.70s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58566_pressure_coords_filled.nc
  对站点 58643 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58643 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  83%|██████████████████████████████████████████████▋         | 15/18 [06:16<01:10, 23.65s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58643_pressure_coords_filled.nc
  对站点 58646 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58646 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  89%|█████████████████████████████████████████████████▊      | 16/18 [06:39<00:46, 23.46s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58646_pressure_coords_filled.nc
  对站点 58751 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58751 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成:  94%|████████████████████████████████████████████████████▉   | 17/18 [07:01<00:23, 23.17s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58751_pressure_coords_filled.nc
  对站点 58755 进行高度到气压层的插值到 [1000  925  850  700  600  500  400] hPa...
  站点 58755 数据已插值到气压层。新的维度 'PRESS' (hPa): [1000  925  850  700  600  500  400]


站点数据处理与NetCDF生成: 100%|████████████████████████████████████████████████████████| 18/18 [07:25<00:00, 24.75s/it]

  成功保存NetCDF文件: D:/wind/zhejiang_all_less/58755_pressure_coords_filled.nc

所有站点个体NetCDF文件处理完毕，保存在: D:/wind/zhejiang_all_less/





In [2]:
import os
import numpy as np
import xarray as xr
import pandas as pd
from tqdm import tqdm
import traceback

# --- Configuration for Third Stage ---
root_dir = 'D:/wind/zhejiang_all_less/'
station_ids_list_stage3 =  [
    "58448", "58450",  "58452", "58459",
    "58467", "58468", "58537", "58542", "58543", "58548",
    "58553","58557","58565","58566","58643","58646","58751",
    "58755"
]
station_coords_int_stage3 = [int(sid) for sid in station_ids_list_stage3]
input_nc_dir_stage3 = root_dir # Individual station files are in the root_dir

# --- MODIFICATION: Reflect new vertical dimension ---
inferred_common_time_coord = None
inferred_global_press_coord = None # Changed from inferred_global_height_coord

print("\n--- Third Stage: Merging Station NetCDF Files (with PRESS dimension) ---")
datasets_to_combine = []
valid_station_coords_for_concat = []

print(f"Reading individual .nc files from: {input_nc_dir_stage3}")
for station_id_str, station_id_int in tqdm(zip(station_ids_list_stage3, station_coords_int_stage3), total=len(station_ids_list_stage3), desc="Merging station files"):
    # --- MODIFICATION: Update input file name to match Stage 2 output for pressure data ---
    file_path = os.path.join(input_nc_dir_stage3, f"{station_id_str}_pressure_coords_filled.nc")
    # Fallback: if the above doesn't exist, maybe user has a different naming or it's height data
    # You might want to add more sophisticated logic here if files can have either HEIGHT or PRESS
    # and you want to merge them differently or handle height-based files.
    # For now, we strictly look for pressure-interpolated files.
    if not os.path.exists(file_path):
        # If you also want to process files named "_no_nan_press.nc" or "_height_coords_filled.nc",
        # you can add more checks here.
        # Example:
        # fallback_file_path_user_naming = os.path.join(input_nc_dir_stage3, f"{station_id_str}_no_nan_press.nc")
        # if os.path.exists(fallback_file_path_user_naming):
        #     file_path = fallback_file_path_user_naming
        # else:
        print(f"INFO: Expected pressure file {file_path} does not exist. Skipping station {station_id_str}.")
        continue

    if os.path.exists(file_path):
        try:
            with xr.open_dataset(file_path) as ds_station:
                # --- MODIFICATION: Check for 'PRESS' coordinate ---
                if 'Datetime' not in ds_station.coords or 'PRESS' not in ds_station.coords:
                    print(f"  WARNING: File {file_path} is missing 'Datetime' or 'PRESS' coordinates. Skipping.")
                    continue
                if not pd.api.types.is_datetime64_any_dtype(ds_station['Datetime'].dtype):
                    print(f"  WARNING: File {file_path} 'Datetime' coordinate is not datetime64. Skipping.")
                    continue

                if inferred_common_time_coord is None:
                    inferred_common_time_coord = ds_station['Datetime'].copy(deep=True)
                    print(f"  Inferred common Datetime coordinate from {file_path} (length {len(inferred_common_time_coord)})")
                # --- MODIFICATION: Infer 'PRESS' coordinate ---
                if inferred_global_press_coord is None:
                    inferred_global_press_coord = ds_station['PRESS'].copy(deep=True)
                    print(f"  Inferred common PRESS coordinate from {file_path} (length {len(inferred_global_press_coord)}, values: {inferred_global_press_coord.values[:5]}...)")

                if not ds_station['Datetime'].equals(inferred_common_time_coord):
                    print(f"  CRITICAL WARNING: Datetime coordinate in {file_path} MISMATCHES inferred common Datetime. Skipping file.")
                    continue
                # --- MODIFICATION: Verify 'PRESS' coordinate ---
                if not ds_station['PRESS'].equals(inferred_global_press_coord):
                    print(f"  CRITICAL WARNING: PRESS coordinate in {file_path} MISMATCHES inferred common PRESS. Skipping file.")
                    print(f"    File PRESS: {ds_station['PRESS'].values}")
                    print(f"    Expected PRESS: {inferred_global_press_coord.values}")
                    continue

                ds_station_expanded = ds_station.expand_dims(station=[station_id_int])
                datasets_to_combine.append(ds_station_expanded)
                valid_station_coords_for_concat.append(station_id_int)
        except Exception as e:
            print(f"WARNING: Failed to read or process file {file_path}: {e}")
            traceback.print_exc()
    # This 'else' block is now covered by the 'continue' above if file_path doesn't exist.
    # else:
    #     print(f"INFO: File {file_path} does not exist. Skipping.")


if not datasets_to_combine:
    print("ERROR: No NetCDF files were successfully read and prepared for merging. Terminating.")
    exit()
# --- MODIFICATION: Check inferred_global_press_coord ---
elif inferred_common_time_coord is None or inferred_global_press_coord is None:
    print("ERROR: Could not infer common Datetime or PRESS coordinates from the input files. Terminating.")
    exit()

print(f"\nSuccessfully prepared {len(datasets_to_combine)} station datasets for merging.")
print(f"Using station IDs: {valid_station_coords_for_concat}")

try:
    print("\nConcatenating datasets along 'station' dimension...")
    combined_ds = xr.concat(
        datasets_to_combine, dim='station', data_vars='all',
        coords='all', join='override', compat='no_conflicts' # Presumes coordinates are identical where they should be
    )
    print("Concatenation complete.")

    # --- MODIFICATION: Update target_dims_order to use 'PRESS' ---
    print("\nTransposing data variables to ('Datetime', 'station', 'PRESS')...")
    target_dims_order = ('Datetime', 'station', 'PRESS')
    missing_dims = [d for d in target_dims_order if d not in combined_ds.dims]
    if missing_dims:
        print(f"ERROR: Cannot transpose. Dataset is missing dimensions: {missing_dims}. Available: {list(combined_ds.dims.keys())}")
        exit()
    
    # Ensure the station coordinate is correctly assigned if it was part of the concat
    if 'station' not in combined_ds.coords:
        combined_ds = combined_ds.assign_coords(station=valid_station_coords_for_concat)

    combined_ds_transposed = combined_ds.transpose(*target_dims_order)
    print("Transpose complete.")

    print("\nVerifying coordinates...")
    if 'Datetime' not in combined_ds_transposed.coords or not combined_ds_transposed['Datetime'].equals(inferred_common_time_coord):
        print("ERROR: Final 'Datetime' coordinate is missing or does not match inferred common time.")
        exit()
    print(f"  Final 'Datetime' coordinate verified (length {len(combined_ds_transposed.Datetime)}).")
    # --- MODIFICATION: Verify 'PRESS' coordinate ---
    if 'PRESS' not in combined_ds_transposed.coords or not combined_ds_transposed['PRESS'].equals(inferred_global_press_coord):
        print("ERROR: Final 'PRESS' coordinate is missing or does not match inferred common press levels.")
        exit()
    print(f"  Final 'PRESS' coordinate verified (length {len(combined_ds_transposed.PRESS)}, values: {combined_ds_transposed.PRESS.values[:5]}...).")

    if 'station' not in combined_ds_transposed.coords:
        print("ERROR: Final 'station' coordinate is missing.")
        combined_ds_transposed = combined_ds_transposed.assign_coords(station=valid_station_coords_for_concat)
        if 'station' not in combined_ds_transposed.coords: exit()
    # Ensure station coordinate is integer
    if combined_ds_transposed['station'].dtype != np.int64 and combined_ds_transposed['station'].dtype != np.int32:
        print(f"  Converting 'station' coordinate from {combined_ds_transposed['station'].dtype} to int...")
        combined_ds_transposed['station'] = combined_ds_transposed['station'].astype(int)
    print(f"  Final 'station' coordinate verified (dtype: {combined_ds_transposed['station'].dtype}, values: {combined_ds_transposed['station'].values[:5]}...).")


    print("\nConverting float64 data variables to float32 for memory efficiency...")
    for var_name in list(combined_ds_transposed.data_vars.keys()):
        if combined_ds_transposed[var_name].dtype == np.float64:
            print(f"  Converting variable '{var_name}' from float64 to float32.")
            combined_ds_transposed[var_name] = combined_ds_transposed[var_name].astype(np.float32)

    print("\n--- Final Dataset Structure Check (after dtype conversion) ---")
    print(combined_ds_transposed)
    all_vars_dims_correct = True
    if combined_ds_transposed.data_vars:
        print(f"  Found {len(combined_ds_transposed.data_vars)} data variables.")
        for var_name, data_array in combined_ds_transposed.data_vars.items():
            print(f"    Variable '{var_name}': Dimensions {data_array.dims}, Shape {data_array.shape}, Dtype {data_array.dtype}")
            if data_array.dims != target_dims_order:
                print(f"      WARNING: Variable '{var_name}' has incorrect dimension order! Expected {target_dims_order}, Got {data_array.dims}")
                all_vars_dims_correct = False
        if all_vars_dims_correct: print("  All data variables have the correct dimension order.")
        else: print("  ERROR: Not all data variables have the correct dimension order. Please review.")
    else:
        print("ERROR: No data variables found in the final combined dataset!")
        exit()

    # --- MODIFICATION: Update output filename (user's desired name) ---
    output_filename = "merged_stations_6min_common_period_float32_no_nan_press.nc"
    output_combined_file_path = os.path.join(root_dir, output_filename)
    print(f"\nSaving final combined dataset to: {output_combined_file_path}")
    try:
        encoding = {var: {'zlib': True, 'complevel': 5} for var in combined_ds_transposed.data_vars}
        # Attributes for Datetime and PRESS should be carried over from individual files
        # if they were set correctly in stage 2. We can double-check/set standard ones.
        if 'Datetime' in combined_ds_transposed.coords:
            combined_ds_transposed['Datetime'].attrs['long_name'] = 'time'
            combined_ds_transposed['Datetime'].attrs['standard_name'] = 'time'
        if 'PRESS' in combined_ds_transposed.coords: # Set attributes if not already present or to ensure consistency
            combined_ds_transposed['PRESS'].attrs.setdefault('units', 'hPa')
            combined_ds_transposed['PRESS'].attrs.setdefault('long_name', 'Pressure Level')
            combined_ds_transposed['PRESS'].attrs.setdefault('standard_name', 'air_pressure')
            combined_ds_transposed['PRESS'].attrs.setdefault('positive', 'down') # Standard for pressure coordinates


        combined_ds_transposed.to_netcdf(
            output_combined_file_path,
            encoding=encoding,
            unlimited_dims=['Datetime']
        )
        print(f"Successfully saved: {output_combined_file_path}")
        print(f"Verify with: ncdump -h {output_combined_file_path}")
    except Exception as e:
        print(f"ERROR: Failed to save the final NetCDF file: {e}")
        traceback.print_exc()
except Exception as e:
    print(f"An error occurred during the third stage processing: {e}")
    traceback.print_exc()
print("\n--- Third Stage Processing Complete ---")


--- Third Stage: Merging Station NetCDF Files (with PRESS dimension) ---
Reading individual .nc files from: D:/wind/zhejiang_all_less/


Merging station files:   0%|                                                                    | 0/18 [00:00<?, ?it/s]

  Inferred common Datetime coordinate from D:/wind/zhejiang_all_less/58448_pressure_coords_filled.nc (length 85825)
  Inferred common PRESS coordinate from D:/wind/zhejiang_all_less/58448_pressure_coords_filled.nc (length 7, values: [1000  925  850  700  600]...)


Merging station files: 100%|███████████████████████████████████████████████████████████| 18/18 [00:03<00:00,  5.27it/s]



Successfully prepared 18 station datasets for merging.
Using station IDs: [58448, 58450, 58452, 58459, 58467, 58468, 58537, 58542, 58543, 58548, 58553, 58557, 58565, 58566, 58643, 58646, 58751, 58755]

Concatenating datasets along 'station' dimension...
Concatenation complete.

Transposing data variables to ('Datetime', 'station', 'PRESS')...
Transpose complete.

Verifying coordinates...
  Final 'Datetime' coordinate verified (length 85825).
  Final 'PRESS' coordinate verified (length 7, values: [1000  925  850  700  600]...).
  Final 'station' coordinate verified (dtype: int32, values: [58448 58450 58452 58459 58467]...).

Converting float64 data variables to float32 for memory efficiency...
  Converting variable 'DIRECTION_H' from float64 to float32.
  Converting variable 'RELIABILITY_H' from float64 to float32.
  Converting variable 'RELIABILITY_V' from float64 to float32.
  Converting variable 'SPEED_H' from float64 to float32.
  Converting variable 'SPEED_V' from float64 to float