In [1]:
import os
import numpy as np
import pandas as pd
import shutil
import baostock as bs
import talib
import tushare as ts

## 策略

In [2]:
class DataLoader:
    def __init__(self, 
                 stock_data_path, 
                 hs300_constituents_path):
        self.stock_data_path = stock_data_path
        self.hs300_constituents_path = hs300_constituents_path
        
    def load_hs300_constituents(self):
        """Load HS300 constituent stocks data from CSV file"""
        if not os.path.exists(self.hs300_constituents_path):
            raise FileNotFoundError(f"HS300 constituents file not found: {self.hs300_constituents_path}")
            
        df = pd.read_csv(self.hs300_constituents_path)
        df['updateDate'] = pd.to_datetime(df['updateDate'])
        return df
        
    def load_stock_data(self):
        """Load stock price data from CSV file"""
        if not os.path.exists(self.stock_data_path):
            raise FileNotFoundError(f"Stock data file not found: {self.stock_data_path}")
            
        df = pd.read_csv(self.stock_data_path)
        df['date'] = pd.to_datetime(df['date'])
        df = df.sort_values(['code', 'date'])
        return df 

In [3]:
class TechnicalAnalysis:
    @staticmethod
    def calculate_ma(df, window=20):
        """Calculate Moving Average for each stock"""
        return df.groupby('code')['close'].transform(lambda x: x.rolling(window=window).mean())
    
    @staticmethod
    def calculate_kdj(df, n=9, m1=3, m2=3):
        """
        Calculate KDJ indicator
        n: RSV period
        m1: K period
        m2: D period
        """
        df = df.copy()
        
        # Group by code to calculate KDJ for each stock
        for code in df['code'].unique():
            mask = df['code'] == code
            df_stock = df[mask].copy()
            
            df_stock = df_stock.reset_index(drop=True)
            
            # Calculate RSV
            low_list = df_stock['low'].rolling(window=n, min_periods=1).min()
            high_list = df_stock['high'].rolling(window=n, min_periods=1).max()
            rsv = (df_stock['close'] - low_list) / (high_list - low_list) * 100
            
            # Initialize K, D, J arrays
            k = np.zeros(len(df_stock))
            d = np.zeros(len(df_stock))
            
            # Calculate K and D
            for i in range(len(df_stock)):
                if i == 0:
                    k[i] = 50
                    d[i] = 50
                else:
                    k[i] = (m1 - 1) * k[i-1] / m1 + rsv[i] / m1
                    d[i] = (m2 - 1) * d[i-1] / m2 + k[i] / m2
            
            # Calculate J
            j = 3 * k - 2 * d
            
            df.loc[mask, 'kdj_k'] = k
            df.loc[mask, 'kdj_d'] = d
            df.loc[mask, 'kdj_j'] = j
            
        return df 

In [4]:
class TradingStrategy:
    def __init__(self):
        self.ta = TechnicalAnalysis()
        
    def prepare_data(self, df):
        """Prepare data by calculating necessary indicators"""
        df = df.copy()
        # Calculate MA5
        df['ma5'] = self.ta.calculate_ma(df, window=5)
        # Calculate MA10
        df['ma20'] = self.ta.calculate_ma(df, window=20)
        # df['ma10'] = self.ta.calculate_ma(df, window=10)
        # Calculate MA60
        df['ma60'] = self.ta.calculate_ma(df, window=60)
        # Calculate KDJ
        df = self.ta.calculate_kdj(df)
        return df
        
    def find_trading_signals(self, df):
        """
        Find trading signals based on strategy rules:
        1. Price above MA20
        2. Find where J turns negative (D1) and then turns positive (D2)
        """
        df = df.copy()
        
        # Check if price is above MA10
        df['above_ma20'] = df['close'] > df['ma20']
        
        # Find where J value turns negative and then positive
        df['prev_j'] = df.groupby('code')['kdj_j'].shift(1)
        df['j_turns_negative'] = (df['kdj_j'] < 0) & (df['prev_j'] >= 0)
        
        # Create initial signals when J turns negative (D1)
        signals = df[df['above_ma20'] & df['j_turns_negative']].copy()
        
        # For each D1 signal, find the corresponding D2 (when J turns positive)
        results = []
        for _, signal in signals.iterrows():
            code = signal['code']
            d1_date = signal['date']
            
            # Find the next date when J turns positive after D1
            future_data = df[
                (df['code'] == code) & 
                (df['date'] > d1_date)
            ].copy()
            
            future_data['j_turns_positive'] = (future_data['kdj_j'] >= 0) & (future_data['prev_j'] < 0)
            d2_data = future_data[future_data['j_turns_positive']]
            
            if len(d2_data) > 0:
                d2_date = d2_data.iloc[0]['date']
                d2_price = d2_data.iloc[0]['close']
                period_return = (d2_price / signal['close'] - 1) * 100
                
                result = {
                    'code': code,
                    'D1日期': d1_date,
                    'D2日期': d2_date,
                    'D1收盘价': signal['close'],
                    'D2收盘价': d2_price,
                    'D1-D2收益率': period_return,
                    'D1_5日均线': signal['ma5'],
                    'D1_20日均线': signal['ma20'],
                    'D1_60日均线': signal['ma60'],
                    'D1_J值': signal['kdj_j'],
                    'D2_J值': d2_data.iloc[0]['kdj_j'],
                    '持仓天数': (d2_date - d1_date).days
                }
                results.append(result)
        
        if not results:
            return pd.DataFrame()
            
        results_df = pd.DataFrame(results)
        return results_df
        
    def calculate_returns(self, df, signals, days=10):
        """This method is kept for backward compatibility but not used in the new strategy"""
        return pd.DataFrame() 

## 沪深300股票id

In [5]:
"""沪深300成分股"""
lg = bs.login()
if lg.error_code != '0':
    print(f'登录失败，错误代码：{lg.error_code}, 错误信息：{lg.error_msg}')
else:
    print('登录成功')

# 获取沪深300成分股
rs = bs.query_hs300_stocks()
if rs.error_code != '0':
    print(f'查询沪深300成分股失败，错误代码：{rs.error_code}, 错误信息：{rs.error_msg}')
else:
    # 保存成DataFrame
    hs300_stocks = []
    while rs.next():
        hs300_stocks.append(rs.get_row_data())
    df_hs300 = pd.DataFrame(hs300_stocks, columns=rs.fields)
    print(df_hs300)

bs.logout()

login success!
登录成功
     updateDate       code code_name
0    2025-04-21  sh.600000      浦发银行
1    2025-04-21  sh.600009      上海机场
2    2025-04-21  sh.600010      包钢股份
3    2025-04-21  sh.600011      华能国际
4    2025-04-21  sh.600015      华夏银行
..          ...        ...       ...
295  2025-04-21  sz.300832       新产业
296  2025-04-21  sz.300896       爱美客
297  2025-04-21  sz.300979      华利集团
298  2025-04-21  sz.300999       金龙鱼
299  2025-04-21  sz.301269      华大九天

[300 rows x 3 columns]
logout success!


<baostock.data.resultset.ResultData at 0x7f2412ebc610>

In [6]:
df_hs300

Unnamed: 0,updateDate,code,code_name
0,2025-04-21,sh.600000,浦发银行
1,2025-04-21,sh.600009,上海机场
2,2025-04-21,sh.600010,包钢股份
3,2025-04-21,sh.600011,华能国际
4,2025-04-21,sh.600015,华夏银行
...,...,...,...
295,2025-04-21,sz.300832,新产业
296,2025-04-21,sz.300896,爱美客
297,2025-04-21,sz.300979,华利集团
298,2025-04-21,sz.300999,金龙鱼


In [7]:
df_hs300[df_hs300["code_name"] == "赛力斯"]

Unnamed: 0,updateDate,code,code_name
94,2025-04-21,sh.601127,赛力斯


In [10]:
df_hs300[df_hs300["code_name"] == "五粮液"]

Unnamed: 0,updateDate,code,code_name
218,2025-04-21,sz.000858,五粮液


In [11]:
df_hs300[df_hs300["code_name"] == "宁德时代"]

Unnamed: 0,updateDate,code,code_name
291,2025-04-21,sz.300750,宁德时代


In [12]:
df_hs300[df_hs300["code_name"] == "比亚迪"]

Unnamed: 0,updateDate,code,code_name
259,2025-04-21,sz.002594,比亚迪


In [13]:
df_hs300[df_hs300["code_name"] == "恒生电子"]

Unnamed: 0,updateDate,code,code_name
54,2025-04-21,sh.600570,恒生电子


## 获取行情

In [16]:
"""
23年
"""
lg = bs.login()
if lg.error_code != '0':
    print(f'登录失败，错误代码：{lg.error_code}, 错误信息：{lg.error_msg}')
else:
    print('登录成功')

# 设置日期范围
start_date = '2023-01-01'
end_date = '2023-12-31'

# 初始化存储数据的列表
stock_data_2023 = []

# 遍历每只成分股
for code in df_hs300['code']:
    # 获取日K线数据
    rs = bs.query_history_k_data_plus(
        code,
        "date,code,open,high,low,close,preclose,volume,amount,turn",
        start_date=start_date, end_date=end_date,
        frequency="d", adjustflag="3"
    )
    if rs.error_code != '0':
        print(f'查询股票 {code} 数据失败，错误代码：{rs.error_code}, 错误信息：{rs.error_msg}')
        continue

    # 保存数据
    stock_data = []
    while rs.next():
        stock_data.append(rs.get_row_data())
    df_stock = pd.DataFrame(stock_data, columns=rs.fields)
    stock_data_2023.append(df_stock)

# 合并所有股票的数据
df_stock_2023 = pd.concat(stock_data_2023, ignore_index=True)

# 登出系统
bs.logout()

login success!
登录成功
logout success!


<baostock.data.resultset.ResultData at 0x7f2491f00490>

In [19]:
df_stock_2023.to_csv("/home/kennys/MineX/QuantTrading/dataset/沪深300-2023年数据.csv")

In [17]:
df_stock_2023

Unnamed: 0,date,code,open,high,low,close,preclose,volume,amount,turn
0,2023-01-03,sh.600000,7.2700,7.2800,7.1700,7.2300,7.2800,25892521,187094064.4100,0.088200
1,2023-01-04,sh.600000,7.2700,7.3500,7.2300,7.3100,7.2300,30947081,226321372.0500,0.105400
2,2023-01-05,sh.600000,7.3700,7.3800,7.3000,7.3500,7.3100,30162154,221617354.6200,0.102800
3,2023-01-06,sh.600000,7.3500,7.3800,7.3100,7.3400,7.3500,20312881,149170537.8500,0.069200
4,2023-01-09,sh.600000,7.3800,7.3800,7.3000,7.3400,7.3400,19612260,143998210.6000,0.066800
...,...,...,...,...,...,...,...,...,...,...
72328,2023-12-25,sz.301269,101.1200,102.9800,100.2200,102.6900,101.5000,1306118,133157675.1500,0.488400
72329,2023-12-26,sz.301269,102.6800,104.2000,101.4000,103.3300,102.6900,1398825,144208092.2200,0.523000
72330,2023-12-27,sz.301269,103.5000,105.3800,102.7000,104.4400,103.3300,1710496,178485254.8100,0.639600
72331,2023-12-28,sz.301269,104.0000,105.3000,103.3000,105.1900,104.4400,2035357,212350969.3200,0.761000


In [None]:
"""
24年至今
"""
# 登录系统
lg = bs.login()
if lg.error_code != '0':
    print(f'登录失败，错误代码：{lg.error_code}, 错误信息：{lg.error_msg}')
else:
    print('登录成功')

# 设置日期范围
start_date = '2024-01-01'
end_date = '2025-04-22'

# 初始化存储数据的列表
all_stock_data = []

# 遍历每只成分股
for code in df_hs300['code']:
    # 获取日K线数据
    rs = bs.query_history_k_data_plus(
        code,
        "date,code,open,high,low,close,preclose,volume,amount,turn",
        start_date=start_date, end_date=end_date,
        frequency="d", adjustflag="3"
    )
    if rs.error_code != '0':
        print(f'查询股票 {code} 数据失败，错误代码：{rs.error_code}, 错误信息：{rs.error_msg}')
        continue

    # 保存数据
    stock_data = []
    while rs.next():
        stock_data.append(rs.get_row_data())
    df_stock = pd.DataFrame(stock_data, columns=rs.fields)
    all_stock_data.append(df_stock)

# 合并所有股票的数据
df_all_stocks = pd.concat(all_stock_data, ignore_index=True)

# 登出系统
bs.logout()

login success!
登录成功
logout success!


<baostock.data.resultset.ResultData at 0x7f241207ecb0>

In [18]:
df_all_stocks.to_csv("/home/kennys/MineX/QuantTrading/dataset/沪深300-2024年至今数据.csv")

In [15]:
df_all_stocks

Unnamed: 0,date,code,open,high,low,close,preclose,volume,amount,turn
0,2024-01-02,sh.600000,6.6300,6.6500,6.6000,6.6000,6.6200,22066700,146066303.7200,0.075200
1,2024-01-03,sh.600000,6.5900,6.6500,6.5900,6.6400,6.6000,18203654,120639706.0100,0.062000
2,2024-01-04,sh.600000,6.6400,6.6700,6.5500,6.6200,6.6400,28885978,190580609.9900,0.098400
3,2024-01-05,sh.600000,6.6000,6.7600,6.5900,6.6800,6.6200,44421387,296976885.7900,0.151300
4,2024-01-08,sh.600000,6.6800,6.7100,6.5600,6.5900,6.6800,37520337,247977824.9800,0.127800
...,...,...,...,...,...,...,...,...,...,...
94195,2025-04-16,sz.301269,124.6800,125.5500,121.8000,123.8000,123.5400,6258128,774022704.8500,2.385700
94196,2025-04-17,sz.301269,122.3500,125.5000,122.3500,123.1800,123.8000,4504425,558238259.6700,1.717100
94197,2025-04-18,sz.301269,122.5000,125.2800,120.3400,120.9800,123.1800,4141438,505640958.5400,1.578800
94198,2025-04-21,sz.301269,120.8000,124.6000,119.8100,123.9100,120.9800,4083840,499746858.2000,1.556800


## 计算

In [None]:
"""
五粮液为例
sz.000858
"""
stock_data = df_all_stocks[df_all_stocks["code"] == "sz.000858"]
stock_data["date"] = pd.to_datetime(stock_data["date"])
stock_data = stock_data.sort_values(['code', 'date'])

# 将价格相关的列转换为浮点数
numeric_columns = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn']
for col in numeric_columns:
    stock_data[col] = pd.to_numeric(stock_data[col])

hs300_constituents = df_hs300[df_hs300["code"] == "sz.000858"]
hs300_constituents['updateDate'] = pd.to_datetime(hs300_constituents['updateDate'])

stock_data = pd.merge(stock_data, hs300_constituents[['code', 'code_name']], on='code', how='inner')

strategy = TradingStrategy()

prepared_data = strategy.prepare_data(stock_data)

signals = strategy.find_trading_signals(prepared_data)



if len(signals) > 0:
    # 添加股票名称
    signals = pd.merge(signals, hs300_constituents[['code', 'code_name']], on='code', how='left')
    
    # 格式化日期
    signals['D1日期'] = signals['D1日期'].dt.strftime('%Y-%m-%d')
    signals['D2日期'] = signals['D2日期'].dt.strftime('%Y-%m-%d')
    
    # 格式化数值列
    numeric_columns = ['D1收盘价', 'D2收盘价', 'D1-D2收益率', 
                        'D1_5日均线', 'D1_20日均线', 'D1_60日均线',
                        'D1_J值', 'D2_J值']
    signals[numeric_columns] = signals[numeric_columns].round(2)
    
    # 设置显示列顺序
    display_columns = ['code', 'code_name', 'D1日期', 'D2日期', 
                        'D1收盘价', 'D2收盘价', 'D1-D2收益率', 
                        'D1_5日均线', 'D1_20日均线', 'D1_60日均线',
                        'D1_J值', 'D2_J值', '持仓天数']
    
    # 保存结果到CSV
    output_path = "trading_signals_ma20.csv"
    signals[display_columns].to_csv(output_path, index=False, encoding='utf-8-sig')
    print(f"\n交易信号明细已保存至: {os.path.abspath(output_path)}")
    
    # 打印统计信息
    print("\n=== 策略回测结果 ===")
    print(f"找到的交易信号总数: {len(signals)}")
    
    print("\n=== 收益率统计 ===")
    print(f"平均收益率: {signals['D1-D2收益率'].mean():.2f}%")
    print(f"收益率中位数: {signals['D1-D2收益率'].median():.2f}%")
    print(f"胜率: {(signals['D1-D2收益率'] > 0).mean() * 100:.2f}%")
    print(f"平均持仓天数: {signals['持仓天数'].mean():.1f}天")
    
    print("\n=== 信号时间分布 ===")
    signals_by_month = signals.groupby(pd.to_datetime(signals['D1日期']).dt.to_period('M')).size()
    print("\n每月信号数量:")
    print(signals_by_month)
    
    print("\n每个交易信号的详细信息:")
    pd.set_option('display.max_rows', None)
    pd.set_option('display.width', None)
    print(signals[display_columns].to_string(index=False))
else:
    print("\n未找到符合条件的交易信号")


In [None]:
"""
赛力斯
sh.601127
"""
stock_data = df_all_stocks[df_all_stocks["code"] == "sh.601127"]
stock_data["date"] = pd.to_datetime(stock_data["date"])
stock_data = stock_data.sort_values(['code', 'date'])

# 将价格相关的列转换为浮点数
numeric_columns = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn']
for col in numeric_columns:
    stock_data[col] = pd.to_numeric(stock_data[col])

hs300_constituents = df_hs300[df_hs300["code"] == "sh.601127"]
hs300_constituents['updateDate'] = pd.to_datetime(hs300_constituents['updateDate'])

stock_data = pd.merge(stock_data, hs300_constituents[['code', 'code_name']], on='code', how='inner')

strategy = TradingStrategy()

prepared_data = strategy.prepare_data(stock_data)

signals = strategy.find_trading_signals(prepared_data)



if len(signals) > 0:
    # 添加股票名称
    signals = pd.merge(signals, hs300_constituents[['code', 'code_name']], on='code', how='left')
    
    # 格式化日期
    signals['D1日期'] = signals['D1日期'].dt.strftime('%Y-%m-%d')
    signals['D2日期'] = signals['D2日期'].dt.strftime('%Y-%m-%d')
    
    # 格式化数值列
    numeric_columns = ['D1收盘价', 'D2收盘价', 'D1-D2收益率', 
                        'D1_5日均线', 'D1_20日均线', 'D1_60日均线',
                        'D1_J值', 'D2_J值']
    signals[numeric_columns] = signals[numeric_columns].round(2)
    
    # 设置显示列顺序
    display_columns = ['code', 'code_name', 'D1日期', 'D2日期', 
                        'D1收盘价', 'D2收盘价', 'D1-D2收益率', 
                        'D1_5日均线', 'D1_20日均线', 'D1_60日均线',
                        'D1_J值', 'D2_J值', '持仓天数']
    
    # 保存结果到CSV
    # output_path = "trading_signals_ma20.csv"
    # signals[display_columns].to_csv(output_path, index=False, encoding='utf-8-sig')
    # print(f"\n交易信号明细已保存至: {os.path.abspath(output_path)}")
    
    # 打印统计信息
    print("\n=== 策略回测结果 ===")
    print(f"找到的交易信号总数: {len(signals)}")
    
    print("\n=== 收益率统计 ===")
    print(f"平均收益率: {signals['D1-D2收益率'].mean():.2f}%")
    print(f"收益率中位数: {signals['D1-D2收益率'].median():.2f}%")
    print(f"胜率: {(signals['D1-D2收益率'] > 0).mean() * 100:.2f}%")
    print(f"平均持仓天数: {signals['持仓天数'].mean():.1f}天")
    
    print("\n=== 信号时间分布 ===")
    signals_by_month = signals.groupby(pd.to_datetime(signals['D1日期']).dt.to_period('M')).size()
    print("\n每月信号数量:")
    print(signals_by_month)
    
    print("\n每个交易信号的详细信息:")
    pd.set_option('display.max_rows', None)
    pd.set_option('display.width', None)
    print(signals[display_columns].to_string(index=False))
else:
    print("\n未找到符合条件的交易信号")


In [None]:
"""
宁德
sz.300750
"""
stock_data = df_all_stocks[df_all_stocks["code"] == "sz.300750"]
stock_data["date"] = pd.to_datetime(stock_data["date"])
stock_data = stock_data.sort_values(['code', 'date'])

# 将价格相关的列转换为浮点数
numeric_columns = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn']
for col in numeric_columns:
    stock_data[col] = pd.to_numeric(stock_data[col])

hs300_constituents = df_hs300[df_hs300["code"] == "sz.300750"]
hs300_constituents['updateDate'] = pd.to_datetime(hs300_constituents['updateDate'])

stock_data = pd.merge(stock_data, hs300_constituents[['code', 'code_name']], on='code', how='inner')

strategy = TradingStrategy()

prepared_data = strategy.prepare_data(stock_data)

signals = strategy.find_trading_signals(prepared_data)



if len(signals) > 0:
    # 添加股票名称
    signals = pd.merge(signals, hs300_constituents[['code', 'code_name']], on='code', how='left')
    
    # 格式化日期
    signals['D1日期'] = signals['D1日期'].dt.strftime('%Y-%m-%d')
    signals['D2日期'] = signals['D2日期'].dt.strftime('%Y-%m-%d')
    
    # 格式化数值列
    numeric_columns = ['D1收盘价', 'D2收盘价', 'D1-D2收益率', 
                        'D1_5日均线', 'D1_20日均线', 'D1_60日均线',
                        'D1_J值', 'D2_J值']
    signals[numeric_columns] = signals[numeric_columns].round(2)
    
    # 设置显示列顺序
    display_columns = ['code', 'code_name', 'D1日期', 'D2日期', 
                        'D1收盘价', 'D2收盘价', 'D1-D2收益率', 
                        'D1_5日均线', 'D1_20日均线', 'D1_60日均线',
                        'D1_J值', 'D2_J值', '持仓天数']
    
    # 保存结果到CSV
    # output_path = "trading_signals_ma20.csv"
    # signals[display_columns].to_csv(output_path, index=False, encoding='utf-8-sig')
    # print(f"\n交易信号明细已保存至: {os.path.abspath(output_path)}")
    
    # 打印统计信息
    print("\n=== 策略回测结果 ===")
    print(f"找到的交易信号总数: {len(signals)}")
    
    print("\n=== 收益率统计 ===")
    print(f"平均收益率: {signals['D1-D2收益率'].mean():.2f}%")
    print(f"收益率中位数: {signals['D1-D2收益率'].median():.2f}%")
    print(f"胜率: {(signals['D1-D2收益率'] > 0).mean() * 100:.2f}%")
    print(f"平均持仓天数: {signals['持仓天数'].mean():.1f}天")
    
    print("\n=== 信号时间分布 ===")
    signals_by_month = signals.groupby(pd.to_datetime(signals['D1日期']).dt.to_period('M')).size()
    print("\n每月信号数量:")
    print(signals_by_month)
    
    print("\n每个交易信号的详细信息:")
    pd.set_option('display.max_rows', None)
    pd.set_option('display.width', None)
    print(signals[display_columns].to_string(index=False))
else:
    print("\n未找到符合条件的交易信号")


## 前两年的数据

In [None]:
# 登录系统
lg = bs.login()
if lg.error_code != '0':
    print(f'登录失败，错误代码：{lg.error_code}, 错误信息：{lg.error_msg}')
else:
    print('登录成功')

# 设置日期范围
start_date = '2023-01-01'
end_date = '2023-12-31'

# 初始化存储数据的列表
data_2023 = []

# 遍历每只成分股
for code in df_hs300['code']:
    # 获取日K线数据
    rs = bs.query_history_k_data_plus(
        code,
        "date,code,open,high,low,close,preclose,volume,amount,turn",
        start_date=start_date, end_date=end_date,
        frequency="d", adjustflag="3"
    )
    if rs.error_code != '0':
        print(f'查询股票 {code} 数据失败，错误代码：{rs.error_code}, 错误信息：{rs.error_msg}')
        continue

    # 保存数据
    stock_data = []
    while rs.next():
        stock_data.append(rs.get_row_data())
    df_stock = pd.DataFrame(stock_data, columns=rs.fields)
    data_2023.append(df_stock)

# 合并所有股票的数据
df_all_stocks = pd.concat(data_2023, ignore_index=True)

# 登出系统
bs.logout()

In [None]:
df_all_stocks