In [None]:
#!/usr/bin/env python3
# install_submodules.py

import os
import subprocess
import sys
from pathlib import Path

def install_submodule(path):
    """安装单个子模块"""
    try:
        print(f"正在安装: {path}")
        result = subprocess.run([
            sys.executable, "-m", "pip", "install", "-e", str(path)
        ], capture_output=True, text=True, check=True)
        
        print(f"✅ 成功安装: {path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"❌ 安装失败: {path}")
        print(f"错误信息: {e.stderr}")
        return False

def find_submodules(root_dir="../"):
    """查找所有子模块目录"""
    submodules = []
    root_path = Path(root_dir)
    
    # 查找所有包含 setup.py 或 pyproject.toml 的子目录
    for item in root_path.iterdir():
        if item.is_dir() and not item.name.startswith('.'):
            has_setup_py = (item / "setup.py").exists()
            has_pyproject = (item / "pyproject.toml").exists()
            if has_setup_py or has_pyproject:
                submodules.append(item)
    
    return submodules

def main():
    """主函数"""
    print("🔍 搜索子模块...")
    submodules = find_submodules()
    
    if not submodules:
        print("❌ 未找到任何子模块")
        return
    
    print(f"📦 找到 {len(submodules)} 个子模块:")
    for module in submodules:
        print(f"  - {module.name}")
    
    print("\n🚀 开始安装...")
    success_count = 0
    
    for module in submodules:
        if install_submodule(module):
            success_count += 1
        print("-" * 50)
    
    print(f"\n📊 安装完成: {success_count}/{len(submodules)} 成功")

main()    


In [1]:
import mysql.connector
import pandas as pd
from collections import defaultdict

def create_connection():
    try:
        connection = mysql.connector.connect(
            host='localhost',
            port=3306,
            user='root',
            password='123456',
            database='ASTOCK'
        )
        return connection
    except mysql.connector.Error as e:
        print(f"连接MySQL失败: {e}")
        return None

In [2]:
def get_min_market_value(year, month) -> list:
    # 获取市值数据
    market_values = get_market_values(year, month)

    # 按市值排序，选择前N个标的
    top_n = 10
    sorted_symbols = sorted(market_values, key=lambda x: x["market_value"])[:top_n]
    return sorted_symbols

def get_market_values(year, month) -> list:
    connection = create_connection()
    if not connection:
        return []
    
    finance_report_months = ['12', '03', '06', '09']
    year_of_interest = year - 1 if month < 4 else year
    report_month_of_interest = str(year_of_interest) + "-" + finance_report_months[(month - 1) // 3]

    cursor = connection.cursor()
    sql = f'''SELECT 
            a.symbol, a.name, a.update_time, (a.total_shares * b.close_price) AS market_value
            FROM finance a
            JOIN (
                SELECT 
                    symbol,
                    datetime,
                    close_price,
                    ROW_NUMBER() OVER (
                        PARTITION BY symbol, YEAR(datetime), MONTH(datetime) 
                        ORDER BY datetime DESC
                    ) as rn
                FROM daily
                WHERE exchange = 'SZSE'
                    AND datetime like '{report_month_of_interest}-%'
            ) b 
            ON a.symbol = b.symbol 
                AND YEAR(a.update_time) = YEAR(b.datetime)
                AND MONTH(a.update_time) = MONTH(b.datetime)
                AND b.rn = 1
            WHERE a.market = '深市'
                AND a.update_time like '{report_month_of_interest}-%'
                AND a.symbol like '00%'
                AND a.name not like '%ST%'
                AND a.name not like '%退'
            ;'''
    # print(f"SQL查询语句: {sql}")
    cursor.execute(sql)
    rows = cursor.fetchall()
    cursor.close()
    connection.close()

    # 将查询结果转换为字典列表
    values = [{"symbol": row[0], "name": row[1], "market_value": row[3]} for row in rows]
    return values

def convert_list_to_df(list_data)->pd.DataFrame:
    results = defaultdict(list)
    for data in list_data:
        for key, value in data.__dict__.items():
            results[key].append(value)
    df = pd.DataFrame.from_dict(results).set_index('datetime')
    return df



In [3]:
from vnpy.trader.optimize import OptimizationSetting
from vnpy_ctastrategy.backtesting import BacktestingEngine
from datetime import datetime
import sys
import os
import copy


parent_dir = os.path.abspath(os.path.join(os.path.curdir, ".."))
sys.path.append(parent_dir)
from chart_tools import data
from strategies.monthly_min_market_value_strategy import MonthlyMinMarketValueStrategy

goods_symbols = []

# start_day = datetime(2024, 4, 1)
start_day = datetime(2025, 1, 1)
end_day = datetime(2025, 1, 31)

month_first_list = []
# current_date = start_day.replace(day=1)  # 确保从每月的1日开始
current_date = start_day

while current_date <= end_day:
    month_first_list.append(current_date)

    if current_date.month == 12:
        current_date = current_date.replace(year=current_date.year + 1, month=1, day=1)
    else:
        current_date = current_date.replace(month=current_date.month + 1, day=1)

print(month_first_list)

capital = 1000000
total_profits = list()

record_df_dicts = list()
record_candle_dicts = list()
record_engines = list()
for i in range(len(month_first_list)):
    if i > 0:
        capital += total_profits[i - 1] / 10  # add profit
    record_df_dicts.append({})
    record_candle_dicts.append({})
    start = month_first_list[i]
    end = month_first_list[i + 1] if i + 1 < len(month_first_list) else end_day
    end = end.replace(month=end.month + 1, day=9) # for sell
    print(f"开始日期: {start}")
    
    min_market_symbols = get_min_market_value(start.year, start.month)

    # sort by market value
    symbols_candidates = min_market_symbols[:2]
    month_profits = list() 
    month_profit = 0
    print(f"候选标的：{symbols_candidates}")
    for symbol in symbols_candidates:
        engine = BacktestingEngine()
        vt_symbol = symbol["symbol"] + ".SZSE"
        engine.set_parameters(
            vt_symbol=vt_symbol,
            interval="d",
            start=start,
            end=end,
            rate=0.3/10000,
            slippage=0.2,
            size=100,
            pricetick=0.2,
            capital=capital,
        )
        engine.add_strategy(MonthlyMinMarketValueStrategy, {"initial_capital": capital, "current_month": start.month})

        engine.load_data()
        engine.run_backtesting()
        df = engine.calculate_result()
        
        record_df_dicts[i][symbol["symbol"]] = df
        record_candle_dicts[i][symbol["symbol"]] = engine.history_data
        res = engine.calculate_statistics()
        engine.cross_stop_order
        print(f"策略回测成功：{symbol['symbol']}，总净利润：{res['total_net_pnl']}")
        month_profits.append(res["total_net_pnl"])
        record_engines.append(copy.deepcopy(engine))
        
        data.update_tech_data(vt_symbol, 'daily_df', df)
        data.update_bar_data(vt_symbol, convert_list_to_df(engine.history_data))
        data.update_trade_data(vt_symbol, convert_list_to_df(engine.get_all_trades()))
        if res["total_net_pnl"] > 0:
            goods_symbols.append({"symbol": symbol["symbol"], "total_net_pnl": res["total_net_pnl"]})
    month_profit = sum(month_profits)
    print(f"{start.year}-{start.month}月总净利润：{month_profit}")
    total_profits.append(month_profit)

print(f"总净利润：{sum(total_profits)}")

[datetime.datetime(2025, 1, 1, 0, 0)]
开始日期: 2025-01-01 00:00:00
候选标的：[{'symbol': '002856', 'name': '美芝股份', 'market_value': 1247584016.0}, {'symbol': '002207', 'name': '准油股份', 'market_value': 1291933003.6799998}]
2025-07-23 20:51:41.216057	开始加载历史数据
正在使用mysql数据库
2025-07-23 20:51:41.259686	历史数据加载完成，数据量：21
2025-07-23 20:51:41.261861	策略初始化完成
2025-07-23 20:51:41.261877	开始回放历史数据
日期: 2025-01-02 00:00:00+08:00, 当前资金: 1000000, 交易单位: 100, 目标仓位: 901.0, 目标价格: 11.09
订单状态更新: Status.ALLTRADED, 价格: 12.0, 数量: 901.0
日期: 2025-02-05 00:00:00+08:00, 交易单位: 901.0, 目标仓位: 0, 目标价格: 11.0
订单状态更新: Status.ALLTRADED, 价格: 10.0, 数量: 901.0
2025-07-23 20:51:41.262314	历史数据回放结束
2025-07-23 20:51:41.262320	开始计算逐日盯市盈亏
2025-07-23 20:51:41.279437	逐日盯市盈亏计算完成
2025-07-23 20:51:41.279476	开始计算策略统计指标
2025-07-23 20:51:41.289981	------------------------------
2025-07-23 20:51:41.290011	首个交易日：	2025-01-02
2025-07-23 20:51:41.290019	最后交易日：	2025-02-07
2025-07-23 20:51:41.290026	总交易日：	21
2025-07-23 20:51:41.290032	盈利交易日：	7
2025-07-23 20:51:

In [4]:
from chart_tools import app
app.run_server(debug=True, port=8882)

hover_data None
hover_data None
hover_data None
hover_data None
hover_data None
hover_data {'points': [{'curveNumber': 1, 'pointNumber': 12, 'pointIndex': 12, 'x': '2025-01-20', 'y': 69777, 'label': '2025-01-20', 'value': 69777, 'marker.color': 'red', 'bbox': {'x0': 386.06, 'x1': 395.94, 'y0': 690.4749999999999, 'y1': 690.4749999999999}}]}
hover_data {'points': [{'curveNumber': 1, 'pointNumber': 13, 'pointIndex': 13, 'x': '2025-01-21', 'y': 46854, 'label': '2025-01-21', 'value': 46854, 'marker.color': 'green', 'bbox': {'x0': 398.41, 'x1': 408.29, 'y0': 711.015, 'y1': 711.015}}]}
hover_data {'points': [{'curveNumber': 1, 'pointNumber': 14, 'pointIndex': 14, 'x': '2025-01-22', 'y': 39330, 'label': '2025-01-22', 'value': 39330, 'marker.color': 'green', 'bbox': {'x0': 410.76, 'x1': 420.65, 'y0': 717.755, 'y1': 717.755}}]}
hover_data {'points': [{'curveNumber': 1, 'pointNumber': 15, 'pointIndex': 15, 'x': '2025-01-23', 'y': 45901, 'label': '2025-01-23', 'value': 45901, 'marker.color': 'gree

In [5]:

from vnpy_ctabacktester.ui.widget import CandleChartDialog
from vnpy.trader.object import TradeData, BarData
from vnpy_ctastrategy.backtesting import (
    BacktestingEngine,
    OptimizationSetting,
    BacktestingMode
)
import sys
from vnpy.trader.ui import QtCore, QtWidgets, QtGui, create_qapp
def show_candle_chart(engine_ins: BacktestingEngine) -> None:
    """"""
    try:
        app = QtWidgets.QApplication.instance()
        if app is None:
            app = create_qapp('show_candle_chart')
        candle_dialog: CandleChartDialog = CandleChartDialog()
        if not candle_dialog.is_updated():
            history: list = engine_ins.history_data
            candle_dialog.update_history(history)

            trades: list[TradeData] = engine_ins.get_all_trades()
            candle_dialog.update_trades(trades)

        candle_dialog.exec_()
    except Exception as e:
        print(f"显示K线图表失败: {e}")
        return
    
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pandas import DataFrame, Series


def show_chart(df: DataFrame, candle_df: DataFrame) -> go.Figure:
    """显示回测结果图表"""

    fig = make_subplots(
        rows=6,
        cols=1,
        shared_xaxes=True,
        vertical_spacing=0.06
    )

    balance_line = go.Scatter(
        x=df.index,
        y=df["balance"],
        mode="lines",
        name="Balance"
    )

    drawdown_scatter = go.Scatter(
        x=df.index,
        y=df["drawdown"],
        fillcolor="red",
        fill='tozeroy',
        mode="lines",
        name="Drawdown"
    )
    pnl_bar = go.Bar(y=df["net_pnl"], x=df.index, name="Daily Pnl")

    fig.add_trace(balance_line, row=1, col=1)
    fig.add_trace(drawdown_scatter, row=2, col=1)
    fig.add_trace(pnl_bar, row=3, col=1)
    fig.add_trace(go.Candlestick(
        x=candle_df.index,
        open=candle_df['open_price'],
        high=candle_df['high_price'],
        low=candle_df['low_price'],
        close=candle_df['close_price'],
        name="OHLC"
    ), row=4, col=1)
    
    colors = ['green' if close < open else 'red' 
              for close, open in zip(candle_df['close_price'], candle_df['open_price'])]
    
    fig.add_trace(
        go.Bar(
            x=candle_df.index,
            y=candle_df['volume'],
            name="成交量",
            marker_color=colors,
            opacity=0.7
        ),
        row=5, col=1
    )

    fig.update_xaxes(title_text="日期", row=5, col=1)
    fig.update_yaxes(title_text="balance", row=1, col=1)
    fig.update_yaxes(title_text="drawdown", row=2, col=1)
    fig.update_yaxes(title_text="Daily Pnl", row=3, col=1)
    fig.update_yaxes(title_text="价格", row=4, col=1)
    fig.update_yaxes(title_text="成交量", row=5, col=1)

    fig.update_layout(height=800, 
                    width=800, 
                    xaxis4_rangeslider_visible = False,
                    xaxis5_rangeslider=dict(
                        visible=True,
                        thickness=0.01,  # 减小滑块厚度
                        bgcolor="lightgray"
                    )
)
    return fig

In [None]:
# len(record_engines)
# record_engines[0].history_data
# candle_dialog: CandleChartDialog = CandleChartDialog()
show_candle_chart(record_engines[0])


In [None]:
from collections import defaultdict

df = list(record_df_dicts[0].values())[0]

results = defaultdict(list)
test = list(record_candle_dicts[0].values())[0]
for daily_bar in test:
    for key, value in daily_bar.__dict__.items():
        results[key].append(value)
    # results['date'] = daily_bar.datetime.strftime("%Y-%m-%d %H:%M:%S")
candle_df = DataFrame.from_dict(results).set_index('datetime')
# print(type(test))
show_chart(df, candle_df)

In [None]:
setting = OptimizationSetting()
setting.set_target("sharpe_ratio")
setting.add_parameter("atr_length", 25, 27, 1)
setting.add_parameter("atr_ma_length", 10, 30, 10)

engine.run_ga_optimization(setting)

In [None]:
engine.run_bf_optimization(setting)