In [None]:
data_zt = query_iwencai("2023年1月3日至2023年12月31日，连续涨停天数>4")
data_zt.to_excel('zt.xlsx', sheet_name='Sheet1')

In [None]:
import pandas as pd
import tushare as ts
from datetime import datetime, timedelta
import logging

# 日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s: %(message)s',
    filename='stock_analysis.log'
)

ts.set_token('2876ea85cb005fb5fa17c809a98174f2d5aae8b1f830110a5ead6211')
pro = ts.pro_api()

def get_stock_data(ts_code, start_date='20221225', end_date='20231231'):
    """数据获取函数"""
    try:
        # 添加复权参数
        df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date, adj='qfq')
        if df.empty:
            logging.warning(f"{ts_code} 无交易数据")
            return None
            
        df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
        df = df.sort_values('trade_date').set_index('trade_date')
        
        # 填充非交易日
        all_days = pd.date_range(start=start_date, end=end_date)
        return df.reindex(all_days).ffill().dropna()
        
    except Exception as e:
        logging.error(f"获取{ts_code}数据失败: {str(e)}")
        return None

def find_consecutive_limit(df, limit_days):
    """识别连板"""
    limit_dates = df[df['pct_chg'] >= 9.8].index
    sequences = []
    current = []
    
    for date in sorted(limit_dates):
        if not current:
            current.append(date)
        else:
            last_date = current[-1]
            expected_date = last_date + timedelta(days=1)
            
            # 处理非交易日
            while expected_date < date:
                if expected_date in df.index:
                    break  # 存在非涨停交易日
                expected_date += timedelta(days=1)
                
            if date == expected_date:
                current.append(date)
            else:
                if len(current) >= limit_days:
                    sequences.append(current)
                current = [date]
    
    if len(current) >= limit_days:
        sequences.append(current)
    
    return sequences

def calculate_indicators(stock_df, required_days):
    """指标计算"""
    try:
        sequences = find_consecutive_limit(stock_df, required_days)
        if not sequences:
            return None
            
        target_seq = sequences[0]
        break_date = target_seq[-1] + timedelta(days=1)
        
        # 寻找断板日
        while break_date not in stock_df.index:
            break_date += timedelta(days=1)
            if break_date > stock_df.index[-1]:
                return None
                
        pre_data = stock_df.loc[:break_date]
        post_data = stock_df.loc[break_date:]
        
        # 关键价格计算
        pre_high = pre_data['high'].max()
        post_high = post_data['high'].max()
        
        min_price = post_data['low'].min()
        min_date = post_data['low'].idxmin()
        
        # 反弹计算
        rebound_data = post_data.loc[min_date:]
        if rebound_data.empty:
            return None
            
        rebound_high = rebound_data['high'].max()
        rebound_date = rebound_data['high'].idxmax()
        
        return {
            '断板日期': break_date.strftime('%Y-%m-%d'),
            '连板天数': len(target_seq),
            '断板前最高价': round(pre_high, 2),
            '断板后最高价': round(post_high, 2),
            '回调最低价日期': min_date.strftime('%Y-%m-%d'),
            '回调最低价': round(min_price, 2),
            '回调幅度': f"{round((min_price/post_data['close'].iloc[0]-1)*100, 2)}%",
            '首次反弹高价': round(rebound_high, 2),
            '反弹幅度': f"{round((rebound_high/min_price-1)*100, 2)}%",
            '反弹用时': (rebound_date - min_date).days
        }
        
    except Exception as e:
        logging.error(f"计算指标失败: {str(e)}")
        return None

if __name__ == "__main__":
    try:
        input_df = pd.read_excel(r"C:\Users\admin\Desktop\zt.xlsx")
        success_codes = []
        fail_codes = []
        
        output_data = []
        for idx, row in input_df.iterrows():
            raw_code = str(row['股票代码']).split('.')[0]
            market = 'SH' if 'SH' in row['股票代码'] else 'SZ'
            ts_code = f"{raw_code}.{market}"
            
            stock_data = get_stock_data(ts_code)
            if stock_data is None:
                fail_codes.append(ts_code)
                continue
                
            result = calculate_indicators(stock_data, row['连续涨停天数'])
            if result:
                output_data.append({
                    '股票代码': row['股票代码'],
                    '股票简称': row['股票简称'],
                    **result
                })
                success_codes.append(ts_code)
            else:
                fail_codes.append(ts_code)
        
        result_df = pd.DataFrame(output_data)
        result_df.to_excel('stock_break_analysis.xlsx', index=False)
        
        with open('process_summary.txt', 'w') as f:
            f.write(f"成功处理: {len(success_codes)} 只股票\n")
            f.write(f"失败股票: {', '.join(fail_codes)}\n")
            
        print(f"分析完成！成功处理 {len(success_codes)} 只，失败 {len(fail_codes)} 只")
        
    except Exception as e:
        logging.critical(f"主流程异常: {str(e)}")
