In [1]:
import sys
import pymssql
import pandas as pd
import numpy as np
import functools
from scipy.stats import *
from dateutil.relativedelta import relativedelta
from sqlalchemy import create_engine,text
from joblib import dump, load
import pickle
import os
from datetime import datetime

In [3]:
connection_string = 'mssql+pymssql://gta:jjxy.2018@210.34.5.211/SEL1_TAQ_200506'
engine = create_engine(connection_string)

try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT 1"))  # 使用text()包裹SQL语句
        print("数据库连接成功！")
except Exception as e:
    print(f"数据库连接失败: {e}")

数据库连接失败: (pymssql.exceptions.OperationalError) (20009, b'DB-Lib error message 20009, severity 9:\nUnable to connect: Adaptive Server is unavailable or does not exist (210.34.5.211)\nNet-Lib error during Unknown error (10060)\nDB-Lib error message 20009, severity 9:\nUnable to connect: Adaptive Server is unavailable or does not exist (210.34.5.211)\nNet-Lib error during Unknown error (10060)\n')
(Background on this error at: https://sqlalche.me/e/20/e3q8)


In [4]:
def log_time(index, month, event, duration=None, log_file='time_log.txt'):
    """记录事件的时间和时长到日志文件"""
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    with open(log_file, 'a') as f:
        if duration is not None:
            f.write(f"{current_time} - {event} for row {index} in month {month}, Duration: {duration} seconds\n")
        else:
            f.write(f"{current_time} - {event} for row {index} in month {month}\n")

In [5]:
def log_error(sticker, month, error_message, log_file='error_log.txt'):
    """记录错误信息到日志文件"""
    with open(log_file, 'a') as f:
        f.write(f"Error with sticker {sticker} in month {month}: {error_message}\n")

In [6]:
def get_data(stickers, month):
    # 创建数据库连接字符串
    connection_string = f'mssql+pymssql://gta:jjxy.2018@210.34.5.211/SEL1_TAQ_{month}'
    
    # 使用 SQLAlchemy 创建引擎
    engine = create_engine(connection_string)
    
    # 创建一个空的列表来存储所有结果
    all_data_list = []
    
    # 遍历每个股票代码
    for sticker in stickers:
        # 确定 location
        location = 'Z' if sticker.startswith('0') else 'H'
        
        # SQL 查询
        sql = f"""
            SELECT 
                '{sticker}' AS Sticker,
                MAX(BuyVol) AS MaxBuyVol, 
                MAX(SellVol) AS MaxSellVol,
                FORMAT(MAX(TradingTime), 'yyyy-MM-dd HH:mm') AS TradingTimeFormatted
            FROM (
                SELECT 
                    TradingTime,
                    SUM(CASE WHEN BuyOrSell = 'S' THEN TradeVolume ELSE 0 END) OVER (PARTITION BY CAST(TradingTime AS DATE) ORDER BY TradingTime) AS SellVol,
                    SUM(CASE WHEN BuyOrSell = 'B' THEN TradeVolume ELSE 0 END) OVER (PARTITION BY CAST(TradingTime AS DATE) ORDER BY TradingTime) AS BuyVol
                FROM dbo.S{location}L1_TAQ_{sticker}_{month}
            ) AS subquery
            WHERE 
                FORMAT(TradingTime, 'HH:mm') IN ('10:29', '11:29', '13:59', '14:59') 
                AND SellVol != 0 AND BuyVol != 0
            GROUP BY FORMAT(TradingTime, 'yyyy-MM-dd HH:mm')
            ORDER BY TradingTimeFormatted;
        """
        
        try:
            # 使用 text() 包裹 SQL 查询
            sql_text = text(sql)
            
            # 执行 SQL 查询并获取结果
            df = pd.read_sql_query(sql_text, engine)
            print(df)
            if df.empty:
                print(f"No data for sticker {sticker}")
                continue
            
            # 将结果添加到 all_data_list 中
            all_data_list.append(df)

        except Exception as e:
            print(f"Error with sticker {sticker} in month {month}: {e}")
            log_error(sticker, month, str(e))
            continue
    
    # 关闭数据库连接
    engine.dispose()
    
    # 合并所有数据
    all_data = pd.concat(all_data_list, ignore_index=True) if all_data_list else pd.DataFrame()

    return all_data

In [7]:
def get_data_multiple(stickers, month_ls):
    """stickers 和 month_ls 都需要是list形式的输入"""
    df_list = []
    for month in month_ls:
        new_df = get_data(stickers, month)
        print(new_df)
        df_list.append(new_df)
    
    # 一次性合并所有 DataFrame
    df = pd.concat(df_list, ignore_index=True)
    df['TradingTimeFormatted'] = pd.to_datetime(df['TradingTimeFormatted'])

    result_dfs = []
    for sticker in stickers:
        sticker_df = df[df['Sticker'] == sticker].copy()
        sticker_df['MaxSellVol'] = sticker_df.groupby(sticker_df['TradingTimeFormatted'].dt.date)['MaxSellVol'].diff().fillna(sticker_df['MaxSellVol'])
        sticker_df['MaxBuyVol'] = sticker_df.groupby(sticker_df['TradingTimeFormatted'].dt.date)['MaxBuyVol'].diff().fillna(sticker_df['MaxBuyVol'])
        result_dfs.append(sticker_df)

    return result_dfs

In [8]:
def rename_columns(df):
    sticker = df['Sticker'].iloc[0]
    df = df.rename(columns={
        'MaxBuyVol': f'MaxBuyVol_{sticker}',
        'MaxSellVol': f'MaxSellVol_{sticker}'
    })
    return df

def merge_dfs_on_column(dfs, column, how='inner'):
    """逐步合并所有 DataFrame"""
    merged_df = reduce(lambda left, right: pd.merge(left, right, on=column, how=how), dfs)
    return merged_df

In [9]:
def quartly_data(sticker_ls, month_ls):
    '''
    To get the quarterly order data of a series of stocks and save it as a numpy array
    of shape len(sticker_ls) * len(quarterly trading days) * 2
    '''
    raw_data_ls = get_data_multiple(sticker_ls, month_ls) # 看起来可以直接合并所有的，而不需要 functools

    # 对每个 DataFrame 进行重命名操作
    renamed_dfs = [rename_columns(df) for df in raw_data_ls]
    
    merged_df = functools.reduce(lambda left, right: pd.merge(left, right, on='TradingTimeFormatted', how='inner'), renamed_dfs)
    merged_df = merged_df.drop(columns=[col for col in merged_df.columns if col.startswith('Sticker') or col == 'TradingTimeFormatted'])
    merged_df = merged_df[(merged_df != 0).all(axis=1)]
    
    order_data = np.empty((len(sticker_ls), len(merged_df), 2))
    for i in range(len(sticker_ls)):
        start_col = i * 2
        end_col = (i + 1) * 2
        order_data[i] = merged_df.iloc[:, start_col:end_col].values
        
    return order_data / 100000

In [10]:
# 读取 pickle 文件
components = pd.read_pickle('monthly_components.pkl')

In [11]:
# stickers = components.iloc[0]['hs300_stocks']
# months = components.iloc[0]['month']
# test = get_data_multiple(stickers, months)

In [12]:
for index, row in components.iterrows():
    if index <=153 :
        continue
    
    start_time = datetime.now()  # 记录循环开始时间
    log_time(index, row['M'], "Loop started")

    stickers = row['hs300_stocks']
    months = row['month']
    month = row['M']

    # 检查并转换 months 为列表
    if isinstance(months, str):
        months = [months]  # 如果 months 是单个字符串，则将其转换为列表
    elif isinstance(months, (int, float)):
        months = [str(int(months))]  # 如果 months 是数字，则转换为字符串并放入列表
    elif not isinstance(months, list):
        raise ValueError(f"Expected 'months' to be a list or a string, but got {type(months)}")

    data = get_data_multiple(stickers, months)
    filtered_data = [df for df in data if len(df) >= 200]

    # 使用日期列值作为文件名
    file_name = f'hs300_data/{month}.joblib'

    # 确保目录存在
    os.makedirs(os.path.dirname(file_name), exist_ok=True)

    # 保存数据
    dump(filtered_data, file_name)

    print(f"Saved data to {file_name}")  # 调试信息

    end_time = datetime.now()  # 记录循环结束时间
    duration = (end_time - start_time).total_seconds()  # 计算循环时长
    log_time(index, row['M'], "Loop ended", duration=duration)  # 记录循环结束时间和时长

Error with sticker 000001 in month 201902: (pymssql.exceptions.OperationalError) (20009, b'DB-Lib error message 20009, severity 9:\nUnable to connect: Adaptive Server is unavailable or does not exist (210.34.5.211)\nNet-Lib error during Unknown error (10060)\nDB-Lib error message 20009, severity 9:\nUnable to connect: Adaptive Server is unavailable or does not exist (210.34.5.211)\nNet-Lib error during Unknown error (10060)\n')
(Background on this error at: https://sqlalche.me/e/20/e3q8)
Error with sticker 000002 in month 201902: (pymssql.exceptions.OperationalError) (20009, b'DB-Lib error message 20009, severity 9:\nUnable to connect: Adaptive Server is unavailable or does not exist (210.34.5.211)\nNet-Lib error during Unknown error (10060)\nDB-Lib error message 20009, severity 9:\nUnable to connect: Adaptive Server is unavailable or does not exist (210.34.5.211)\nNet-Lib error during Unknown error (10060)\n')
(Background on this error at: https://sqlalche.me/e/20/e3q8)
Error with sti