In [1]:
from clickhouse_connect import get_client
import pandas as pd
client = get_client(
    host="chdb.ruogu.work",
    port=8123,
    username="user40",
    password="fb4a4d977b118e1540d6577fe41958f3"
)

## 使用逐笔数据复现盘口价格

In [None]:
import pandas as pd
import collections

def reconstruct_order_book(df, target_time_str='11:30:00'):
    """
    订单簿重建：处理限价单(type=2)、对手方最优(type=1)、本方最优(type=U)
    """
    df = df.copy()  # 避免修改原始数据

    # 数据清洗
    def clean_bytes(x):
        """处理各种可能的字节/字符串格式"""
        if x is None:
            return ''
        if isinstance(x, bytes):
            return x.decode('utf-8', errors='ignore')
        if isinstance(x, str):
            if x.startswith("b'") and x.endswith("'"):
                return x[2:-1]
            if x.startswith('b"') and x.endswith('"'):
                return x[2:-1]
        return str(x)

    for col in ['side', 'type', 'trade_flag']:
        if col in df.columns:
            # 处理可能是 bytes 或其他格式的情况
            df[col] = df[col].apply(lambda x: clean_bytes(x)) #if not isinstance(x, (list, dict)) else str(x))
    df['price'] = pd.to_numeric(df['price'], errors='coerce').fillna(0.0)
    for col in ['volume', 'seqno', 'cbidno', 'caskno', 'time_int']:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)

    df = df.sort_values('seqno').reset_index(drop=True)

    # 计算目标时间 (毫秒)
    h, m, s = map(int, target_time_str.split(':'))
    target_time_int = (h * 3600 + m * 60 + s) * 1000
    print(f"Replaying up to {target_time_str} (time_int <= {target_time_int})...")

    # 状态
    active_orders = {}  # seqno -> {price, vol, side}
    price_levels = {'B': collections.defaultdict(float), 'S': collections.defaultdict(float)}
    processed_count = 0

    def get_best_bid():
        """获取当前最高买价"""
        if price_levels['B']:
            return max(price_levels['B'].keys())
        return None

    def get_best_ask():
        """获取当前最低卖价"""
        if price_levels['S']:
            return min(price_levels['S'].keys())
        return None

    for row in df.itertuples():
        if row.time_int > target_time_int:
            break
        processed_count += 1

        rtype = str(row.type).strip()
        rflag = str(row.trade_flag).strip()
        rside = str(row.side).strip()
        rprice = float(row.price)
        rvol = float(row.volume)
        rseqno = int(row.seqno)
        cbidno = int(row.cbidno)
        caskno = int(row.caskno)

        # 判断是否为新订单 (trade_flag 为空或特殊字符)
        is_new_order = rflag in ['', '\\x00', 'nan', 'None', '\x00']

        # 新订单
        if is_new_order and rtype in ['2', '1', 'U']:
            final_price = None
            
            if rtype == '2':
                # 限价单：直接使用订单价格
                if rprice > 0:
                    final_price = rprice
                    
            elif rtype == 'U':
                # 本方最优：买单用最高买价，卖单用最低卖价
                if rside == 'B':
                    final_price = get_best_bid()
                else:  # rside == 'S'
                    final_price = get_best_ask()
                    
            elif rtype == '1':
                # 对手方最优/市价单：买单用最低卖价，卖单用最高买价
                if rprice > 0:
                    final_price = rprice
                else:
                    if rside == 'B':
                        final_price = get_best_ask()
                    else:  # rside == 'S'
                        final_price = get_best_bid()
            
            # 只有确定了价格才能入盘口
            if final_price is not None and final_price > 0:
                active_orders[rseqno] = {'price': final_price, 'vol': rvol, 'side': rside}
                price_levels[rside][final_price] += rvol

        # 成交扣减
        elif rflag == 'F':
            for orig_seqno in [cbidno, caskno]:
                if orig_seqno > 0 and orig_seqno in active_orders:
                    order = active_orders[orig_seqno]
                    order['vol'] -= rvol
                    price_levels[order['side']][order['price']] -= rvol
                    if price_levels[order['side']][order['price']] <= 0:
                        del price_levels[order['side']][order['price']]
                    if order['vol'] <= 0:
                        del active_orders[orig_seqno]

        # 撤单扣减
        elif rflag == '4':
            for orig_seqno in [cbidno, caskno]:
                if orig_seqno > 0 and orig_seqno in active_orders:
                    order = active_orders[orig_seqno]
                    order['vol'] -= rvol
                    price_levels[order['side']][order['price']] -= rvol
                    if price_levels[order['side']][order['price']] <= 0:
                        del price_levels[order['side']][order['price']]
                    if order['vol'] <= 0:
                        del active_orders[orig_seqno]

    print(f"Processed {processed_count} events, {len(active_orders)} active orders")

    # 生成快照
    bids = pd.DataFrame([{'price': p, 'volume': v} for p, v in price_levels['B'].items() if v > 0])
    if len(bids) > 0:
        bids = bids.sort_values('price', ascending=False).head(10).reset_index(drop=True)
        bids['level'] = bids.index + 1
        bids['side'] = 'Bid'

    asks = pd.DataFrame([{'price': p, 'volume': v} for p, v in price_levels['S'].items() if v > 0])
    if len(asks) > 0:
        asks = asks.sort_values('price', ascending=True).head(10).reset_index(drop=True)
        asks['level'] = asks.index + 1
        asks['side'] = 'Ask'

    return bids, asks

## 批量计算盘口价格，并与真实快照数据对比

In [None]:
# 全部深交所股票批量对比计算
import numpy as np

# 固定参数
test_date = '20250930'
test_time = '11:30:00'

# 1. 获取所有深交所股票代码 (深交所股票代码以0、2、3开头)
sql_codes = f'''
    SELECT DISTINCT code 
    FROM stock_base.zb 
    WHERE date = '{test_date}'
      AND (code LIKE '0%' OR code LIKE '2%' OR code LIKE '3%')
    ORDER BY code
'''
all_codes = client.query_df(sql_codes)['code'].tolist()
print(f"深交所共有 {len(all_codes)} 只股票")

# 2. 批量测试
all_results = []
total_correct = 0
total_fields = 0
skipped_no_zb = 0
skipped_no_tk = 0

for idx, code in enumerate(all_codes):
    if (idx + 1) % 100 == 0:
        print(f"进度: {idx + 1}/{len(all_codes)}")
    
    # 获取zb数据
    sql_zb = f'''
        SELECT * FROM stock_base.zb
        WHERE date = '{test_date}' AND code = '{code}'
    '''
    zb_data = client.query_df(sql_zb)
    
    if len(zb_data) == 0:
        skipped_no_zb += 1
        continue
    
    # 重建订单簿
    df_bids, df_asks = reconstruct_order_book(zb_data, test_time)
    
    # 获取真实tk数据
    sql_tk = f'''
        SELECT 
            bid1, bidv1, ask1, askv1,
            bid2, bidv2, ask2, askv2,
            bid3, bidv3, ask3, askv3,
            bid4, bidv4, ask4, askv4,
            bid5, bidv5, ask5, askv5,
            bid6, bidv6, ask6, askv6,
            bid7, bidv7, ask7, askv7,
            bid8, bidv8, ask8, askv8,
            bid9, bidv9, ask9, askv9,
            bid10, bidv10, ask10, askv10
        FROM stock_base.tk 
        WHERE date = '{test_date}' AND code = '{code}' AND time_int = Tit('{test_time}')
        LIMIT 1
    '''
    df_tk = client.query_df(sql_tk)
    
    if len(df_tk) == 0:
        skipped_no_tk += 1
        continue
    
    # 对比
    tk = df_tk.iloc[0]
    correct = 0
    fields = 0
    
    def safe_get(series, key, default=0):
        val = series.get(key, default)
        if val is None or (isinstance(val, float) and np.isnan(val)):
            return default
        return val
    
    for i in range(1, 11):
        # 买盘
        real_bid_price = safe_get(tk, f'bid{i}', 0)
        real_bid_vol = safe_get(tk, f'bidv{i}', 0)
        calc_bid = df_bids[df_bids['level'] == i] if len(df_bids) > 0 else pd.DataFrame()
        calc_bid_price = calc_bid['price'].values[0] if len(calc_bid) > 0 else 0
        calc_bid_vol = calc_bid['volume'].values[0] if len(calc_bid) > 0 else 0
        
        if abs(real_bid_price - calc_bid_price) < 0.001:
            correct += 1
        if abs(real_bid_vol - calc_bid_vol) < 1:
            correct += 1
        fields += 2
        
        # 卖盘
        real_ask_price = safe_get(tk, f'ask{i}', 0)
        real_ask_vol = safe_get(tk, f'askv{i}', 0)
        calc_ask = df_asks[df_asks['level'] == i] if len(df_asks) > 0 else pd.DataFrame()
        calc_ask_price = calc_ask['price'].values[0] if len(calc_ask) > 0 else 0
        calc_ask_vol = calc_ask['volume'].values[0] if len(calc_ask) > 0 else 0
        
        if abs(real_ask_price - calc_ask_price) < 0.001:
            correct += 1
        if abs(real_ask_vol - calc_ask_vol) < 1:
            correct += 1
        fields += 2
    
    accuracy = correct / fields * 100 if fields > 0 else 0
    
    all_results.append({
        '股票代码': code,
        '正确字段': correct,
        '总字段': fields,
        '正确率': accuracy
    })
    total_correct += correct
    total_fields += fields

# 3. 汇总结果
print(f"\n{'='*50}")
print(f"测试完成!")
print(f"深交所股票总数: {len(all_codes)}")
print(f"跳过(无zb数据): {skipped_no_zb}")
print(f"跳过(无tk数据): {skipped_no_tk}")
print(f"有效测试股票数: {len(all_results)}")

df_summary = pd.DataFrame(all_results)

# 统计分析
print(f"\n正确率分布:")
print(f"  100%正确: {len(df_summary[df_summary['正确率'] == 100])} 只")
print(f"  90%-100%: {len(df_summary[(df_summary['正确率'] >= 90) & (df_summary['正确率'] < 100)])} 只")
print(f"  80%-90%: {len(df_summary[(df_summary['正确率'] >= 80) & (df_summary['正确率'] < 90)])} 只")
print(f"  <80%: {len(df_summary[df_summary['正确率'] < 80])} 只")

overall_accuracy = total_correct / total_fields * 100 if total_fields > 0 else 0
print(f"\n总体正确率: {total_correct}/{total_fields} = {overall_accuracy:.2f}%")

# 显示正确率最低的10只股票
print(f"\n正确率最低的10只股票:")
df_summary_sorted = df_summary.sort_values('正确率').head(10)
df_summary_sorted['正确率'] = df_summary_sorted['正确率'].apply(lambda x: f"{x:.2f}%")
print(df_summary_sorted)

### 最终运行结果（250930；11：30：00盘口）

运行结果：
==================================================
测试完成!
深交所股票总数: 2867
跳过(无zb数据): 0
跳过(无tk数据): 0
有效测试股票数: 2867

正确率分布:
  100%正确: 2861 只
  90%-100%: 0 只
  80%-90%: 0 只
  <80%: 6 只

总体正确率: 114570/114680 = 99.90%

正确率最低的10只股票:
        股票代码  正确字段  总字段      正确率
1840  300394    20   40   50.00%
1281  002829    20   40   50.00%
953   002466    20   40   50.00%
2152  300718    21   40   52.50%
1584  300115    21   40   52.50%
2618  301227    28   40   70.00%
2840  301602    40   40  100.00%
2841  301603    40   40  100.00%
2842  301606    40   40  100.00%
2843  301607    40   40  100.00%
