In [None]:
import tushare as ts
import pandas as pd
from sqlalchemy import create_engine
import time
from sqlalchemy import text
from concurrent.futures import ThreadPoolExecutor, as_completed
import concurrent.futures

# 读取包含股票代码的 CSV 文件
stock_list = pd.read_csv('./基础数据_预处理20241123.csv')
stock_codes = stock_list['ts_code'].values

# 设置 Tushare Token
token = '6488998e7e15b3596e11308b921fdc726d3401cd4bde7958d3ba49cd'
pro = ts.pro_api(token)

# 使用 SQLAlchemy 和 pymysql 创建 MySQL 数据库连接
engine = create_engine('mysql+pymysql://root@localhost:3306/stocks_data')

# 插入数据到 MySQL 的函数
def insert_sql(data, db_name, if_exists='append'):
    try:
        data.to_sql(db_name, engine, index=False, if_exists=if_exists)
        return True  # 插入成功时返回 True
    except Exception as e:
        return False  # 插入失败时返回 False

# 记录成功与失败的股票信息
def log_status(ts_code, status, error_message=''):
    status_data = {
        'ts_code': ts_code,
        'status': status,  # 'success' 或 'failure'
        'error_message': error_message
    }
    status_df = pd.DataFrame([status_data])
    status_df.to_sql('stock_data_status', engine, index=False, if_exists='append')

# 查询数据库中某个股票的最新交易日期
def get_last_trade_date(ts_code):
    query = text(f"SELECT MAX(trade_date) FROM stock_data WHERE ts_code='{ts_code}'")
    with engine.connect() as connection:
        result = connection.execute(query).fetchone()
    return result[0] if result else None

# 拉取股票数据并插入
def fetch_and_insert_single_stock(code, start_date, end_date, retries=3):
    # 查询数据库中该股票已有的最新日期
    last_date = get_last_trade_date(code)
    
    if last_date:
        # 如果数据库已有数据，从最后一条记录的日期+1天开始拉取
        start_date = str(int(last_date) + 1)
    else:
        # 如果数据库中没有该股票的数据，从2015年开始拉取
        start_date = '20150101'

    attempt = 0
    success = False
    while attempt < retries and not success:
        try:
            # 拉取股票的日线数据
            data = pro.daily(ts_code=code, start_date=start_date, end_date=end_date)
            
            if data is not None and len(data) > 0:  # 判断是否有数据返回
                # 将数据插入数据库
                if insert_sql(data, 'stock_data'):
                    log_status(code, 'success')  # 成功插入日志
                    return (code, 'success', None)
                else:
                    log_status(code, 'failure', '插入数据库失败')
                    attempt += 1
            else:
                log_status(code, 'failure', '无数据返回')
                return (code, 'failure', '无数据返回')
        except Exception as e:
            log_status(code, 'failure', str(e))
            attempt += 1

    return (code, 'failure', '超过最大重试次数')


# 拉取股票数据并插入（并行化版本）
def fetch_and_insert_data_parallel(stock_codes, start_date, end_date, retries=3):
    total_stocks = len(stock_codes)  # 获取总股票数量
    skipped_count = 0  # 跳过的股票数量
    new_count = 0  # 成功上传的新股票数量

    # 使用线程池来并行处理股票代码
    def process_stock(code):
        nonlocal skipped_count, new_count
        last_date = get_last_trade_date(code)

        if last_date:
            start_date = str(int(last_date) + 1)
        else:
            start_date = '20000101'

        attempt = 0
        success = False
        while attempt < retries and not success:
            try:
                # 拉取股票的日线数据
                data = pro.daily(ts_code=code, start_date=start_date, end_date=end_date)

                if data is not None and len(data) > 0:
                    if insert_sql(data, 'stock_data'):
                        log_status(code, 'success')
                        new_count += 1
                        success = True
                    else:
                        log_status(code, 'failure', '插入数据库失败')
                        attempt += 1
                else:
                    log_status(code, 'failure', '无数据返回')
                    skipped_count += 1
                    success = True
            except Exception as e:
                log_status(code, 'failure', str(e))
                attempt += 1

    # 使用 ThreadPoolExecutor 并行处理所有股票
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(process_stock, code) for code in stock_codes]

        # 持续检查并更新进度
        while True:
            completed_futures = sum(1 for future in futures if future.done())
            total_processed = skipped_count + new_count
            print(f"\r拉取总进度：{total_processed}/{total_stocks}，已跳过 {skipped_count}/{total_stocks}", end='', flush=True)
            
            # 如果所有任务完成，则退出
            if completed_futures == total_stocks:
                break

    # 确保最后打印完整的输出
    print("\n所有股票数据处理完成。")

# 设置日期范围
start_date = '20000101'
end_date = '20141231'

# 开始并行拉取并插入数据
fetch_and_insert_data_parallel(stock_codes, start_date, end_date)

拉取总进度：500/4412，已跳过 500/4412

In [None]:
import pandas as pd
import os
# 输出多周期 k 线
# 输入的日线数据CSV文件路径
input_csv = '300467_data.csv'

# 读取日线数据
df = pd.read_csv(input_csv)

# 确保日期列是datetime类型，并按日期排序
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')  # 日期格式是'YYYYMMDD'
df = df.sort_values(by='trade_date')

# 获取股票代码（假设所有数据来自同一股票）
ts_code = df['ts_code'].iloc[0]  # 取第一个数据的股票代码

# 创建以股票代码为名字的文件夹，如果文件夹不存在则创建
output_dir = f"./{ts_code}"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# 定义聚合函数
def resample_data(df, freq):
    """
    根据不同频率（'W', 'M', 'Q', 'Y'）对日线数据进行重采样
    """
    df_resampled = df.resample(freq, on='trade_date').agg({
        'ts_code': 'first',        # 股票代码（取第一个）
        'open': 'first',           # 开盘价（周期内的第一个）
        'high': 'max',             # 最高价（周期内的最大值）
        'low': 'min',              # 最低价（周期内的最小值）
        'close': 'last',           # 收盘价（周期内的最后一个）
        'pre_close': 'last',       # 前收盘价（周期内最后一个的前收盘价）
        'change': 'last',          # 涨跌额（最后一根K线的涨跌额）
        'pct_chg': 'last',         # 涨跌幅（最后一根K线的涨跌幅）
        'vol': 'sum',              # 成交量（周期内成交量的总和）
        'amount': 'sum'            # 成交额（周期内成交额的总和）
    }).dropna()  # 删除缺失值

    # 重置索引，便于后续操作
    return df_resampled.reset_index()

# 生成周线数据
df_weekly = resample_data(df, 'W-MON')
df_weekly.to_csv(f"{output_dir}/weekly_data.csv", index=False)

# 生成月线数据
df_monthly = resample_data(df, 'M')
df_monthly.to_csv(f"{output_dir}/monthly_data.csv", index=False)

# 生成季线数据
df_quarterly = resample_data(df, 'Q')
df_quarterly.to_csv(f"{output_dir}/quarterly_data.csv", index=False)

# 生成年线数据
df_yearly = resample_data(df, 'Y')
df_yearly.to_csv(f"{output_dir}/yearly_data.csv", index=False)

print(f"周线数据已保存到: {output_dir}/weekly_data.csv")
print(f"月线数据已保存到: {output_dir}/monthly_data.csv")
print(f"季线数据已保存到: {output_dir}/quarterly_data.csv")
print(f"年线数据已保存到: {output_dir}/yearly_data.csv")

In [18]:
import pymysql
import pandas as pd

'''
从数据库查询并导出多周期 k 线
'''

conn = pymysql.connect(
    host="localhost",
    user="root",
    database="stocks"
)

# 定义 SQL 查询
query = """
SELECT
    ts_code,
    YEAR(trade_date) AS year,
    MIN(open) AS open,
    MAX(high) AS high,
    MIN(low) AS low,
    MAX(close) AS close,
    SUM(vol) AS volume,
    SUM(amount) AS amount
FROM
    merged_stock_data
GROUP BY
    ts_code, YEAR(trade_date)
ORDER BY
    ts_code, year;
"""

# 使用 Pandas 执行查询并将结果加载为 DataFrame
df = pd.read_sql(query, conn)

# 将查询结果保存为 CSV 文件
df.to_csv('./stock_data.csv', index=False)

# 关闭连接
conn.close()

  df = pd.read_sql(query, conn)


In [4]:
import pandas as pd

# 加载数据
data_file1 = '/Users/sean/Jupyter_Projects/StockDataPrep/data/1merged_stocks_data_20241204.csv'
data_file2 = '/Users/sean/Jupyter_Projects/StockDataPrep/data/merged_stocks_data_qfq_20241204.csv'
df1 = pd.read_csv(data_file1)
df2 = pd.read_csv(data_file2)

# 筛选出 ts_code 为 603689.SH 和 603790.SH 的数据
filtered_data1 = df1[df1['ts_code'].isin(['603206.SH', '600525.SH'])]
filtered_data2 = df1[df1['ts_code'].isin(['603206.SH', '600525.SH'])]

# 查看筛选后的数据
print(filtered_data1)
print(filtered_data2)

# 如果只需要查看这两只股票的部分数据（如前5行），可以使用 .head()
print(filtered_data1.head())
print(filtered_data1.head())

Empty DataFrame
Columns: [ts_code, trade_date, open, high, low, close, pre_close, change, pct_chg, vol, amount]
Index: []
Empty DataFrame
Columns: [ts_code, trade_date, open, high, low, close, pre_close, change, pct_chg, vol, amount]
Index: []


In [7]:
# 查看筛选后的数据的行数和列数
print(df.shape)

(10230025, 11)


In [9]:
# 获取筛选后的数据中唯一的股票代码数量
unique_stock_count = df['ts_code'].nunique()
print(f'唯一的股票数量: {unique_stock_count}')

唯一的股票数量: 4324


In [2]:
import pandas as pd

# 加载数据
#data_file = '/Users/sean/Jupyter_Projects/StockDataPrep/data/基础数据_预处理20241128.csv'
#data_file = '/Users/sean/Jupyter_Projects/StockDataPrep/data/merged_stocks_data_20241128.csv'
data_file2 = '/Users/sean/Jupyter_Projects/StockDataPrep/data/merged_stocks_data_qfq_20241204.csv'  # 前复权

dfs = pd.read_csv(data_file2)


# 如果只需要查看这两只股票的部分数据（如前5行），可以使用 .head()
print(dfs.head())

     ts_code  trade_date   open   high    low  close  pre_close  change  \
0  301237.SZ    20241204  25.36  25.36  24.50  24.70      25.30   -0.60   
1  301237.SZ    20241203  25.47  25.47  25.07  25.30      25.17    0.13   
2  301237.SZ    20241202  24.72  25.25  24.72  25.17      24.72    0.45   
3  301237.SZ    20241129  24.27  25.00  24.27  24.72      24.55    0.17   
4  301237.SZ    20241128  24.25  24.99  23.94  24.55      24.24    0.31   

   pct_chg     vol     amount  
0  -2.3715  5732.0  14304.951  
1   0.5165  5823.1  14703.321  
2   1.8204  8114.0  20323.521  
3   0.6925  5180.0  12772.430  
4   1.2789  6935.0  17077.050  


In [12]:
import pandas as pd

# 加载数据
#data_file = '/Users/sean/Jupyter_Projects/StockDataPrep/data/基础数据_预处理20241128.csv'
data_file = '/Users/sean/Jupyter_Projects/StockDataPrep/data/merged_stocks_data_20241128.csv'
dfs = pd.read_csv(data_file)

# 获取筛选后的数据中唯一的股票代码数量
sunique_stock_count = dfs['ts_code'].nunique()
print(f'唯一的股票数量: {sunique_stock_count}')



唯一的股票数量: 4375


In [1]:
import pymysql
import pandas as pd

def update_stock_data_to_mysql(data, db_config):
    """
    将股票数据更新到 MySQL 数据库。

    :param data: 包含股票数据的 DataFrame
    :param db_config: 数据库配置字典，包含 'host', 'user', 'password', 'database'
    """
    # 连接数据库
    connection = pymysql.connect(
        host=db_config['host'],
        user=db_config['user'],
        database=db_config['database']
    )
    
    try:
        # 创建数据库操作游标
        cursor = connection.cursor()

        # 获取数据库中已存在的最大日期（检查是否有重复数据）
        cursor.execute("SELECT MAX(trade_date) FROM merged_stock_data")  # 根据实际表名修改
        max_date = cursor.fetchone()[0]  # 取得最大日期

        if max_date:
            # 过滤出新数据
            new_data = data[data['trade_date'] > max_date]
        else:
            # 如果数据库为空，插入所有数据
            new_data = data

        if not new_data.empty:
            # 将数据插入数据库
            for _, row in new_data.iterrows():
                # 插入语句，字段名加反引号以避免与保留字冲突
                sql = """
                    INSERT INTO stock_data (`ts_code`, `trade_date`, `open`, `high`, `low`, `close`, `pre_close`, `change`, `pct_chg`, `vol`, `amount`)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON DUPLICATE KEY UPDATE
                        `open` = VALUES(`open`),
                        `high` = VALUES(`high`),
                        `low` = VALUES(`low`),
                        `close` = VALUES(`close`),
                        `pre_close` = VALUES(`pre_close`),
                        `change` = VALUES(`change`),
                        `pct_chg` = VALUES(`pct_chg`),
                        `vol` = VALUES(`vol`),
                        `amount` = VALUES(`amount`)
                """
                cursor.execute(sql, tuple(row))

            # 提交事务
            connection.commit()
            print(f"成功插入 {len(new_data)} 条新数据")
        else:
            print("没有新数据需要插入。")

    except Exception as e:
        print(f"数据库更新失败：{e}")
    finally:
        cursor.close()
        connection.close()


# 假设已经拉取的股票数据存储在 DataFrame `stock_data` 中
# update_stock_data_to_mysql(stock_data, db_config)

In [2]:
import os
import glob
import pandas as pd

# 数据库配置（在这里定义一次）
db_config = {
    'host': 'localhost',
    'user': 'root',
    'database': 'stocks'
}

def get_latest_csv_file(directory):
    """
    获取目录中最新的 CSV 文件。
    
    :param directory: 存放 CSV 文件的目录
    :return: 最新的 CSV 文件路径
    """
    # 搜索匹配的 CSV 文件
    file_pattern = os.path.join(directory, "merged_stocks_data_*.csv")
    files = glob.glob(file_pattern)
    
    if not files:
        raise FileNotFoundError("没有找到符合条件的文件")
    
    # 按文件名后缀的日期排序，取最新的文件
    latest_file = max(files, key=lambda x: os.path.basename(x).split('_')[-1].replace('.csv', ''))
    return latest_file

def update_stock_data_from_csv(csv_file_path, db_config):
    """
    读取 CSV 文件并更新数据库。
    
    :param csv_file_path: CSV 文件路径
    :param db_config: 数据库配置
    """
    # 读取 CSV 文件
    stock_data = pd.read_csv(csv_file_path)
    
    # 调用数据库更新函数
    update_stock_data_to_mysql(stock_data, db_config)

# 获取最新的 CSV 文件路径
directory = '/Users/sean/Jupyter_Projects/StockDataPrep/data'  
latest_csv_file = get_latest_csv_file(directory)

print(f"正在读取文件: {latest_csv_file}")

update_stock_data_from_csv(latest_csv_file, db_config)

正在读取文件: /Users/sean/Jupyter_Projects/StockDataPrep/data/merged_stocks_data_20241128.csv
数据库更新失败：(1146, "Table 'stocks.stock_data' doesn't exist")


In [1]:
import pandas as pd

# 文件路径
data_file1 = '/Users/sean/Jupyter_Projects/StockDataPrep/data/1merged_stocks_data_20241204.csv'  # 未复权
data_file2 = '/Users/sean/Jupyter_Projects/StockDataPrep/data/merged_stocks_data_qfq_20241204.csv'  # 前复权

# 加载数据
df1 = pd.read_csv(data_file1)
df2 = pd.read_csv(data_file2)

# 选择股票代码和年份
selected_stocks = ['300467.SZ']  # 替换为实际股票代码
year = '2019'  # 查询的年份

# 筛选数据：未复权
filtered_data1 = df1[(df1['ts_code'].isin(selected_stocks)) & (df1['trade_date'].astype(str).str[:4] == year)]
max_close_raw = filtered_data1.loc[filtered_data1['close'].idxmax()]  # 获取未复权的最高价记录

# 筛选数据：前复权
filtered_data2 = df2[(df2['ts_code'].isin(selected_stocks)) & (df2['trade_date'].astype(str).str[:4] == year)]
max_close_qfq = filtered_data2.loc[filtered_data2['close'].idxmax()]  # 获取前复权的最高价记录

# 输出结果
print("未复权最高价数据：")
print(max_close_raw[['ts_code', 'trade_date', 'close']])

print("\n前复权最高价数据：")
print(max_close_qfq[['ts_code', 'trade_date', 'close']])

未复权最高价数据：
ts_code       300467.SZ
trade_date     20190312
close             32.96
Name: 5575304, dtype: object

前复权最高价数据：
ts_code       300467.SZ
trade_date     20190312
close             32.96
Name: 5613905, dtype: object
