# chatgpt test hdf5

In [8]:
import os
import time
import numpy as np
import pandas as pd
from pandas import HDFStore
import logging

log = logging.getLogger(__name__)
# 假设 SafeHDFStore 已经定义好

def load_hdf_db(
    fname,
    table="all",
    code_l=None,
    timelimit=True,
    index=False,
    limit_time=300,
    dratio_limit=0.5,
    MultiIndex=False,
    showtable=False,
):
    """
    精简版 HDF5 读取函数，路径处理跨平台
    """
    t0 = time.time()
    df, dd = None, None

    if not os.path.exists(fname):
        log.error("HDF5 file not found: %s", fname)
        return None

    # 读取表
    with SafeHDFStore(fname, mode="r") as store:
        if store is None:
            return None
        keys = list(store.keys())
        if showtable:
            print(f"fname: {fname}, keys: {keys}")

        if "/" + table not in keys:
            log.error("%s not found in %s", table, fname)
            return None

        dd = store[table]

    if dd is None or len(dd) == 0:
        log.warning("Empty table %s in %s", table, fname)
        return None

    # --- 按 code_l 过滤 ---
    if code_l is not None:
        if not MultiIndex:
            if index:
                code_l = [str(1000000 - int(x)) if x.startswith("0") else x for x in code_l]
            dif_co = list(set(dd.index) & set(code_l))

            if len(code_l) > 0:
                dratio = (len(code_l) - len(dif_co)) / float(len(code_l))
            else:
                dratio = 0.0

            log.info("find all:%s missing:%s dratio:%.2f",
                     len(code_l), len(code_l) - len(dif_co), dratio)

            # 时间限制
            if timelimit and "timel" in dd.columns:
                dd = dd.loc[dif_co]
                o_time = [time.time() - t for t in dd[dd.timel != 0].timel.tolist()]
                if len(o_time) > 0:
                    l_time = np.mean(o_time)
                    return_hdf_status = l_time < limit_time
                    log.info("return_hdf_status:%s mean_time:%.2f limit:%.2f",
                             return_hdf_status, l_time, limit_time)
                    if return_hdf_status:
                        df = dd
            else:
                df = dd.loc[dif_co]

            if dratio > dratio_limit:
                log.warning("Too many codes missing: %.2f > %.2f",
                            dratio, dratio_limit)
                return None
        else:
            # 多层索引
            df = dd.loc[dd.index.isin(code_l, level="code")]
    else:
        df = dd

    # --- 统一清理 ---
    if df is not None and len(df) > 0:
        df = df.fillna(0)
        df = df[~df.index.duplicated(keep="last")]

        # MultiIndex 去重逻辑
        if MultiIndex and "volume" in df.columns:
            count_before = len(df)
            df = df.drop_duplicates()
            dratio = len(df) / float(count_before)
            log.debug("MultiIndex drop_duplicates: before=%d after=%d ratio=%.2f",
                      count_before, len(df), dratio)

    log.info("load_hdf_time: %.2f sec", time.time() - t0)
    return df

In [11]:

def write_hdf_with_code(fname, table, df, index=True, complib='zlib', complevel=9):
    """
    写入 HDF5 并把 code 加入 data_columns
    """
    if df is None or len(df) == 0:
        print("Empty DataFrame, skip write")
        return

    # 如果是 MultiIndex，确保 code 是列
    if isinstance(df.index, pd.MultiIndex) and 'code' in df.index.names:
        df_reset = df.reset_index(level='code')
    elif 'code' not in df.columns:
        # 单索引，假设 index 是 code
        df_reset = df.copy()
        df_reset['code'] = df_reset.index
    else:
        df_reset = df.copy()

    # 确保 code 在 data_columns
    df_reset.to_hdf(
        fname,
        key=table,
        mode='a',
        format='table',          # table 格式才能用 data_columns
        data_columns=['code'],   # 指定 code 可筛选
        complevel=complevel,
        complib=complib,
        index=index
    )
    print(f"Wrote table {table} to {fname}, rows: {len(df_reset)}")

# --- 示例 ---
basedir = os.path.join("G:",os.sep )
fname = os.path.join(basedir, "sina_MultiIndex_data.h5")
table = 'sina_MultiIndex'

# 读取数据
df = load_hdf_db(fname, table=table, MultiIndex=True)

fname_data_col = os.path.join(basedir, "sina_MultiIndex_data_columns.h5") 
# 写入 HDF5，带 code data_column
write_hdf_with_code(fname_data_col, table, df)


NameError: name 'SafeHDFStore' is not defined

In [6]:
import pandas as pd
import time
import os

basedir = "G:" + os.sep  # 或者 basedir = os.path.join("G:", "")

# config_ini = os.path.join(basedir, "h5config.txt")
# temp_file = os.path.join(basedir, "tmpfile.h5")
# 示例文件路径
# fname = os.path.join(basedir, "test.h5")
# table = "all"

# MultiIndex 示例路径
# multi_fname = os.path.join(basedir, "sina_MultiIndex.h5")

fname_normal = os.path.join(basedir, "sina_MultiIndex_data.h5")             # 原始 HDF5
fname_data_col = os.path.join(basedir, "sina_MultiIndex_data_columns.h5")  # 写入了 data_columns 的 HDF5
table = "all_30"
test_code = "002258"

# -------- 全表读取 --------
t0 = time.time()
with pd.HDFStore(fname_normal, mode="r") as store:
    df_all = store[table]
t1 = time.time()
print(f"[全表读取] rows: {len(df_all)}, time: {t1-t0:.4f} s")

# 筛选指定 code
t0 = time.time()
df_code = df_all.loc[df_all.index.get_level_values("code") == test_code]
t1 = time.time()
print(f"[全表筛选 code={test_code}] rows: {len(df_code)}, time: {t1-t0:.4f} s")


# -------- select 查询（data_columns） --------
t0 = time.time()
with pd.HDFStore(fname_data_col, mode="r") as store:
    df_select = store.select(table, where=f'code="{test_code}"')
t1 = time.time()
print(f"[select code={test_code}] rows: {len(df_select)}, time: {t1-t0:.4f} s")


[全表读取] rows: 2323059, time: 2.7696 s
[全表筛选 code=002258] rows: 447, time: 0.1845 s


OSError: ``G:\sina_MultiIndex_data_columns.h5`` does not exist

In [13]:
with pd.HDFStore(r"G:\sina_MultiIndex_data_columns.h5") as store:
    print(store.get_storer('all_30').data_columns)

['code']
