In [23]:
import os
import warnings
os.environ["TQDM_DISABLE"] = "1"
warnings.filterwarnings('ignore')

import pandas as pd
import ta
import joblib
import datetime
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import akshare as ak
import yfinance as yf
from openpyxl import load_workbook
from openpyxl.styles import PatternFill

import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.express as px

# ===============================================================
# ⚙️ Holly AI A股推荐系统 (完整代码实现)
# ===============================================================

STOCK_LIST = [
    "002342.SZ", "603072.SH", "300277.SZ",
    "600865.SH", "002601.SZ", "300913.SZ",
    "600519.SH","000858.SZ","601318.SH","600036.SH","601166.SH",
    "600030.SH","601688.SH","300750.SZ","002594.SH","601012.SH",
    "688599.SH","000651.SH","000333.SH","600276.SH","300122.SH",
    "002475.SH","300015.SH","601888.SH","600104.SH","000725.SH","300059.SH"
]

FEATURES = ["RSI","MACD","MACD_Signal","MA20","MA50","ATR","Volume"]

stock_name_map = {}

def init_stock_names():
    global stock_name_map
    try:
        df = ak.stock_info_a_code_name()
        stock_name_map = dict(zip(df["code"], df["name"]))
    except Exception as e:
        print("⚠️ 股票名称加载失败:", e)
        stock_name_map = {}

def get_stock_name(code):
    base = code.split(".")[0]
    return stock_name_map.get(base, code)

init_stock_names()

# ---------------- 数据获取 ----------------
def get_data(ticker, days=30):
    end = datetime.datetime.today()
    start = (end - datetime.timedelta(days=days)).strftime("%Y%m%d")
    end_str = end.strftime("%Y%m%d")
    symbol = ticker.split(".")[0]
    df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start, end_date=end_str, adjust="qfq")
    if df.empty:
        return pd.DataFrame()
    df.rename(columns={"日期": "Date", "开盘": "Open", "最高": "High","最低": "Low", "收盘": "Close", "成交量": "Volume"}, inplace=True)
    df["Date"] = pd.to_datetime(df["Date"])
    df.set_index("Date", inplace=True)
    return df

# ---------------- 技术指标 ----------------
def calculate_indicators(data):
    data = data.dropna(subset=["Close", "Volume"], how="any")
    data = data[data["Volume"] > 0]
    data["Pct_Change"] = data["Close"].pct_change() * 100
    data["Vol_Avg20"] = data["Volume"].rolling(20).mean()
    data["Relative_Volume"] = data["Volume"] / data["Vol_Avg20"]
    data["RSI"] = ta.momentum.RSIIndicator(data["Close"], window=14).rsi()
    macd = ta.trend.MACD(data["Close"])
    data["MACD"] = macd.macd()
    data["MACD_Signal"] = macd.macd_signal()
    data["MA20"] = data["Close"].rolling(20).mean()
    data["MA50"] = data["Close"].rolling(50).mean()
    data["MA200"] = data["Close"].rolling(200).mean()
    bb = ta.volatility.BollingerBands(data["Close"])
    data["Upper_BB"] = bb.bollinger_hband()
    data["Lower_BB"] = bb.bollinger_lband()
    atr = ta.volatility.AverageTrueRange(data["High"], data["Low"], data["Close"])
    data["ATR"] = atr.average_true_range()
    return data.dropna()

# ---------------- 策略信号 ----------------
def check_strategies(m, price):
    signals = []
    if m["RSI"] < 30: signals.append("RSI超卖")
    if m["RSI"] > 70: signals.append("RSI超买")
    if m["MACD"] > m["MACD_Signal"]: signals.append("MACD金叉")
    if m["MACD"] < m["MACD_Signal"]: signals.append("MACD死叉")
    if price > m["MA20"]: signals.append("价格站上MA20")
    if price < m["MA20"]: signals.append("价格跌破MA20")
    if m["Volume"] > m["Vol_Avg20"]*2: signals.append("放量突破")
    return signals

# ---------------- 回测 ----------------
def backtest_strategies(ticker, lookback=60):
    data = get_data(ticker)
    if data.empty: return pd.DataFrame()
    data = calculate_indicators(data)
    results = {}
    for i in range(1, len(data)-1):
        today = data.iloc[i]; tomorrow = data.iloc[i+1]
        signals = check_strategies(today, today["Close"])
        for sig in signals:
            if sig not in results: results[sig] = {"Signals": 0, "Wins": 0}
            results[sig]["Signals"] += 1
            if tomorrow["Close"] > today["Close"]: results[sig]["Wins"] += 1
    stats = []
    for sig, val in results.items():
        if val["Signals"] > 0:
            win_rate = val["Wins"]/val["Signals"]*100
            stats.append({"策略": sig, "胜率%": round(win_rate, 2)})
    return pd.DataFrame(stats).sort_values(by="胜率%", ascending=False)

# ---------------- 资金流 / 相对强弱 / 分时 / 多周期确认 ----------------
def capital_flow(ticker):
    data = yf.download(ticker, period="5d", interval="1d", auto_adjust=True)
    if len(data) < 2: return 0
    return round((data["Close"].iloc[-1]-data["Close"].iloc[-2])*data["Volume"].iloc[-1]/1e6,2)

def relative_strength(ticker):
    spy = yf.download("SPY", period="6mo", interval="1d", auto_adjust=True)["Close"]
    stock = yf.download(ticker, period="6mo", interval="1d", auto_adjust=True)["Close"]
    if stock.empty or spy.empty: return 0
    return round((stock.pct_change().mean()-spy.pct_change().mean())*100,2)

def intraday_trend(ticker):
    data = yf.download(ticker, period="5d", interval="15m", auto_adjust=True)["Close"]
    if len(data) < 2: return 0
    return round((data.iloc[-1]/data.iloc[0]-1)*100,2)

def multi_timeframe_confirm(ticker):
    daily = yf.download(ticker, period="6mo", interval="1d", auto_adjust=True)["Close"]
    hourly = yf.download(ticker, period="1mo", interval="1h", auto_adjust=True)["Close"]
    m15 = yf.download(ticker, period="5d", interval="15m", auto_adjust=True)["Close"]
    if daily.empty or hourly.empty or m15.empty:
        return False
    return (daily.iloc[-1] > daily.mean()) and (hourly.iloc[-1] > hourly.mean()) and (m15.iloc[-1] > m15.mean())

# ---------------- 自动学习模块 ----------------
def train_auto_model(ticker_list):
    features, labels = [], []
    for ticker in ticker_list:
        data = get_data(ticker)
        if data.empty: continue
        data = calculate_indicators(data)
        for i in range(1, len(data)-1):
            today, tomorrow = data.iloc[i], data.iloc[i+1]
            sigs = check_strategies(today, today["Close"])
            if not sigs: continue
            feat = [today[f] for f in FEATURES]
            features.append(feat)
            labels.append(1 if tomorrow["Close"] > today["Close"] else 0)
    if not features: return None
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, shuffle=False)
    model = RandomForestClassifier(n_estimators=200, random_state=42)
    model.fit(X_train, y_train)
    print("✅ 模型训练完成，准确率:", model.score(X_test, y_test))
    joblib.dump(model, "auto_model.pkl")
    return model

def load_auto_model():
    try:
        model = joblib.load("auto_model.pkl")
        if hasattr(model, "n_features_in_") and model.n_features_in_ != len(FEATURES):
            print("⚠️ 特征数不一致，重新训练模型...")
            return train_auto_model(STOCK_LIST)
        return model
    except:
        return None

# ---------------- 严格筛选模块 ----------------
def holly_ai_today(stock_list, lookback=120, winrate_threshold=55, top_n=3):
    final_results = []
    for ticker in stock_list:
        # 使用 yfinance 下载数据
        data = yf.download(ticker, period="1y", interval="1d", auto_adjust=True)

        # ⚠️ 保证列名和 akshare 格式一致
        if "Adj Close" in data.columns:
            data.rename(columns={"Adj Close": "Close"}, inplace=True)

        if len(data) < lookback:
            continue

        data = calculate_indicators(data)
        if data.empty:
            continue

        today = data.iloc[-1]
        m = today.to_dict()
        price = float(today["Close"])

        # 策略回测
        stats = backtest_strategies(ticker)
        if stats.empty:
            continue
        best = stats[stats["胜率%"] >= winrate_threshold]

        for _, row in best.iterrows():
            sig = row["策略"]
            win = row["胜率%"]
            if sig in check_strategies(m, price):
                stop = round(price * 0.97, 2)
                target = round(price * 1.05, 2)
                rr = round((target - price) / (price - stop), 2) if (price - stop) > 0 else None
                final_results.append({
                    "股票": ticker,
                    "日期": today.name.strftime("%Y-%m-%d"),
                    "价格": round(price, 2),
                    "触发信号": sig,
                    "历史胜率%": win,
                    "盈亏比(R/R)": rr,
                    "资金流大单": capital_flow(ticker),
                    "相对强弱%": relative_strength(ticker),
                    "分时趋势%": intraday_trend(ticker),
                    "多周期确认": multi_timeframe_confirm(ticker),
                    "建议": f"买区间:{round(price * 0.98, 2)}-{round(price * 1.02, 2)} | 卖目标:{target} | 止损:{stop} | R/R={rr}"
                })

    if not final_results:
        print("⚠️ 没有符合条件的股票")
        return pd.DataFrame()

    df = pd.DataFrame(final_results).sort_values(by=["历史胜率%", "盈亏比(R/R)"], ascending=False)
    df_top = df.head(top_n)

    # 保存到 Excel，并加上颜色标记
    file_name = "holly_ai_today.xlsx"
    df_top.to_excel(file_name, index=False)
    wb = load_workbook(file_name)
    ws = wb.active
    for row in range(2, ws.max_row + 1):
        win = ws[f"F{row}"].value
        rr = ws[f"G{row}"].value
        if win >= 70 and rr >= 2:
            fill = PatternFill(start_color="90EE90", end_color="90EE90", fill_type="solid")
        elif win >= 55:
            fill = PatternFill(start_color="FFFF99", end_color="FFFF99", fill_type="solid")
        else:
            fill = PatternFill(start_color="FF9999", end_color="FF9999", fill_type="solid")
        ws[f"F{row}"].fill = fill
        ws[f"G{row}"].fill = fill
    wb.save(file_name)
    print(f"✅ 已保存到 {file_name}（带颜色标记）")

    return df_top



# ---------------- Dash 应用 ----------------
app = dash.Dash(__name__)

app.layout = html.Div([
    html.H1("📈 Holly AI A股推荐系统", style={"textAlign":"center"}),
    html.Div([
        html.Label("历史天数 (天)"),
        dcc.Slider(id="days", min=10, max=180, step=10, value=30,
                   marks={i: str(i) for i in range(10, 181, 30)}),
        html.Label("胜率阈值 (%)"),
        dcc.Slider(id="winrate-threshold", min=0, max=100, step=5, value=50,
                   marks={i: str(i) for i in range(0, 101, 20)}),
        html.Label("止损比例"), dcc.Input(id="stop-loss", type="number", value=0.97, step=0.01),
        html.Label("止盈比例"), dcc.Input(id="take-profit", type="number", value=1.05, step=0.01),
        html.Label("推荐数量"), dcc.Input(id="top-n", type="number", value=10, step=1)
    ], style={"margin":"20px"}),
    html.Button("🔄 刷新数据", id="refresh-button", n_clicks=0,
                style={"margin":"10px", "padding":"10px", "fontSize":"16px"}),
    dcc.Loading(id="loading", type="circle", children=[
        html.Div(id="status-text", style={"margin":"20px", "textAlign":"center", "fontWeight":"bold"})
    ]),
    dcc.Graph(id="bar-chart"),
    dcc.Graph(id="scatter-chart"),
    html.Div(id="table-section"),
    html.Div(id="debug-section")
])

@app.callback(
    [Output("status-text", "children"),
     Output("bar-chart", "figure"),
     Output("scatter-chart", "figure"),
     Output("table-section", "children"),
     Output("debug-section", "children")],
    [Input("refresh-button", "n_clicks")],
    [State("days", "value"), State("winrate-threshold", "value"),
     State("stop-loss", "value"), State("take-profit", "value"),
     State("top-n", "value")]
)
def update_output(n_clicks, days, winrate_threshold, stop_loss, take_profit, top_n):
    if n_clicks == 0:
        return ("💡 提示：请调整筛选条件并点击刷新按钮来获取推荐结果。", {}, {}, "", "")
    df = holly_ai_today(STOCK_LIST, lookback=days, winrate_threshold=winrate_threshold, top_n=top_n)
    if df.empty:
        return ("⚠️ 暂无符合条件的股票推荐", {}, {}, "", "")
    fig1 = px.bar(df, x="股票", y="历史胜率%", color="盈亏比(R/R)", title="📊 Holly AI A股推荐 - AI预测概率分布")
    fig2 = px.scatter(df, x="历史胜率%", y="盈亏比(R/R)", color="股票", title="📈 策略胜率 vs AI预测概率分布图")
    table_section = html.Div([
        html.H3("📋 推荐详情表"),
        html.Table([
            html.Thead(html.Tr([html.Th(col) for col in df.columns])),
            html.Tbody([
                html.Tr([html.Td(df.iloc[i][col]) for col in df.columns]) for i in range(len(df))
            ])
        ], style={"width":"100%","border":"1px solid #ddd","borderCollapse":"collapse"})
    ])
    debug_section = html.Div([html.H3("🐞 调试信息"), html.Pre(str(df.head()))])
    return (f"✅ 数据刷新完成，共 {len(df)} 条推荐结果", fig1, fig2, table_section, debug_section)

if __name__ == "__main__":
    app.run(debug=True, port=8052)


[*********************100%***********************]  1 of 1 completed
[2025-09-07 12:07:50,769] ERROR in app: Exception on /_dash-update-component [POST]
Traceback (most recent call last):
  File "C:\Users\Long\AppData\Local\Programs\Python\Python313\Lib\site-packages\flask\app.py", line 917, in full_dispatch_request
    rv = self.dispatch_request()
  File "C:\Users\Long\AppData\Local\Programs\Python\Python313\Lib\site-packages\flask\app.py", line 902, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)  # type: ignore[no-any-return]
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^
  File "C:\Users\Long\AppData\Local\Programs\Python\Python313\Lib\site-packages\dash\dash.py", line 1494, in dispatch
    response_data = ctx.run(partial_func)
  File "C:\Users\Long\AppData\Local\Programs\Python\Python313\Lib\site-packages\dash\_callback.py", line 688, in add_context
    raise err
  File "C:\Users\Long\AppData\Local\Prog