# 获取2010年1月1日开始的S&P 500股票价格数据

从2010年1月1日的S&P 500股票池开始，获取每个股票从2010年1月1日到现在的日线价格数据（close和adjusted close）。
如果股票中途被移除S&P 500，则只获取到它仍在S&P 500期间的数据。


In [None]:
import pandas as pd
import yfinance as yf
from datetime import datetime
from tqdm import tqdm
import os
import warnings
warnings.filterwarnings('ignore')

pd.options.mode.chained_assignment = None
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 20)

# 设置起始日期
START_DATE = '2010-01-01'
END_DATE = datetime.now().strftime('%Y-%m-%d')  # 当前日期

print(f"起始日期: {START_DATE}")
print(f"结束日期: {END_DATE}")


In [None]:
# 读取历史成分股数据文件
def get_table(filename):
    """读取历史成分股数据文件"""
    if os.path.isfile(filename):
        df = pd.read_csv(filename, index_col='date')
        return df
    else:
        raise FileNotFoundError(f"文件 {filename} 不存在")

# 使用最新的历史数据文件
filename = 'S&P 500 Historical Components & Changes(11-09-2025).csv'
df = get_table(filename)
df.index = pd.to_datetime(df.index)
print(f"历史数据文件已加载，共 {len(df)} 行数据")
print(f"数据日期范围: {df.index.min()} 到 {df.index.max()}")
df.head()


In [None]:
# 获取2010年1月1日的S&P 500股票列表
# 找到最接近2010-01-01的日期（可能是之前最近的日期）
target_date = pd.to_datetime(START_DATE)
df_before_start = df[df.index <= target_date]

if len(df_before_start) == 0:
    raise ValueError(f"无法找到 {START_DATE} 之前的数据")

# 获取最接近目标日期的最后一行
last_row_before_start = df_before_start.tail(1)
# tickers列是逗号分隔的字符串，需要分割
tickers_str = last_row_before_start['tickers'].iloc[0]

# 检查tickers_str是否有效
if pd.isna(tickers_str) or not isinstance(tickers_str, str) or len(tickers_str.strip()) == 0:
    raise ValueError(f"在 {last_row_before_start.index[0]} 的tickers数据无效")

tickers_on_start_date = sorted([t.strip() for t in tickers_str.split(',') if t.strip()])  # 过滤空字符串

print(f"在 {last_row_before_start.index[0].strftime('%Y-%m-%d')} 的S&P 500股票数量: {len(tickers_on_start_date)}")
print(f"前10个股票代码: {tickers_on_start_date[:10]}")


In [None]:
# 将tickers列转换为列表格式，便于后续处理
def convert_tickers_to_list(x):
    """将tickers字符串转换为排序后的列表，处理NaN和空值"""
    if pd.isna(x) or not isinstance(x, str):
        return []
    return sorted([t.strip() for t in x.split(',') if t.strip()])  # 过滤空字符串

df['tickers'] = df['tickers'].apply(convert_tickers_to_list)

# 确定每个股票在S&P 500中的结束日期
# 对于2010-01-01的股票池，找出每个股票何时首次被移除（在2010-01-01之后）
ticker_end_dates = {}

# 优化：在循环外计算df_after_start，避免重复计算
df_after_start = df[df.index >= target_date].sort_index()

for ticker in tqdm(tickers_on_start_date, desc="确定股票移除日期"):
    
    # 查找该股票何时首次不在列表中（在2010-01-01之后）
    end_date = None
    was_in_sp500 = True  # 假设在起始日期时在S&P 500中
    
    for date, row in df_after_start.iterrows():
        is_in_current = ticker in row['tickers']
        
        # 如果之前在里面，但现在不在了，说明在这个日期被移除
        if was_in_sp500 and not is_in_current:
            end_date = date
            break
        
        was_in_sp500 = is_in_current
    
    # 如果找到了结束日期，记录它；否则表示股票仍在S&P 500中
    if end_date:
        ticker_end_dates[ticker] = end_date
    else:
        ticker_end_dates[ticker] = None  # None表示仍在S&P 500中

print(f"\n已确定 {len(ticker_end_dates)} 个股票的移除日期")
removed_count = sum(1 for v in ticker_end_dates.values() if v is not None)
still_in_count = sum(1 for v in ticker_end_dates.values() if v is None)
print(f"已被移除的股票: {removed_count} 个")
print(f"仍在S&P 500的股票: {still_in_count} 个")

# 显示一些被移除的股票示例
if removed_count > 0:
    removed_examples = [(k, v) for k, v in ticker_end_dates.items() if v is not None][:5]
    print(f"\n被移除股票示例（前5个）:")
    for ticker, end_date in removed_examples:
        print(f"  {ticker}: 移除日期 {end_date.strftime('%Y-%m-%d')}")


In [None]:
# 获取每个股票的价格数据
def get_stock_price(ticker, start_date, end_date):
    """
    获取股票价格数据
    
    参数:
    ticker: 股票代码
    start_date: 开始日期
    end_date: 结束日期（如果为None，则获取到当前；如果是datetime，转换为字符串）
    
    返回:
    DataFrame包含Close和Adj Close列，如果失败返回None
    """
    try:
        # 处理yfinance的特殊股票代码（如BRK.B需要转换为BRK-B）
        yf_ticker = ticker.replace('.', '-')
        
        # 确定实际的结束日期
        if end_date is None:
            actual_end = END_DATE
        elif isinstance(end_date, pd.Timestamp):
            # 如果是datetime，转换为字符串，并加1天以确保包含该日期
            actual_end = (end_date + pd.Timedelta(days=1)).strftime('%Y-%m-%d')
        else:
            actual_end = end_date
        
        # 下载数据
        stock = yf.Ticker(yf_ticker)
        hist = stock.history(start=start_date, end=actual_end)
        
        if hist.empty:
            return None
        
        # 检查必需的列是否存在
        required_cols = ['Close', 'Adj Close']
        missing_cols = [col for col in required_cols if col not in hist.columns]
        if missing_cols:
            # 如果缺少Adj Close，尝试使用Close代替
            if 'Adj Close' in missing_cols and 'Close' in hist.columns:
                hist['Adj Close'] = hist['Close']
            else:
                return None
        
        # 只保留Close和Adj Close列
        result = hist[['Close', 'Adj Close']].copy()
        result.columns = ['Close', 'Adj Close']
        result.index.name = 'Date'
        result.reset_index(inplace=True)
        
        # 如果指定了结束日期，过滤掉结束日期之后的数据
        if end_date is not None:
            if isinstance(end_date, pd.Timestamp):
                end_date_str = end_date.strftime('%Y-%m-%d')
            else:
                end_date_str = end_date
            result = result[result['Date'] <= end_date_str]
        
        return result
    
    except Exception as e:
        # 静默处理错误，不在批量下载时打印每个错误
        return None

# 测试单个股票
test_ticker = tickers_on_start_date[0]
test_end = ticker_end_dates[test_ticker]
print(f"测试股票: {test_ticker}")
print(f"结束日期: {test_end.strftime('%Y-%m-%d') if test_end else '仍在S&P 500'}")
test_data = get_stock_price(test_ticker, START_DATE, test_end)
if test_data is not None:
    print(f"成功获取数据，共 {len(test_data)} 行")
    print(test_data.head())
    print(f"\n数据日期范围: {test_data['Date'].min()} 到 {test_data['Date'].max()}")
else:
    print("获取数据失败")


In [None]:
# 批量获取所有股票的价格数据
# 注意：yfinance有速率限制，大量请求可能需要较长时间
import time

all_stock_data = {}
failed_tickers = []

print(f"开始获取 {len(tickers_on_start_date)} 个股票的价格数据...")
print("注意：这可能需要较长时间，请耐心等待...\n")

for ticker in tqdm(tickers_on_start_date, desc="下载股票数据"):
    end_date = ticker_end_dates[ticker]
    
    # 获取价格数据
    price_data = get_stock_price(ticker, START_DATE, end_date)
    
    if price_data is not None and not price_data.empty:
        # 添加股票代码列
        price_data['Ticker'] = ticker
        all_stock_data[ticker] = price_data
    else:
        failed_tickers.append(ticker)
    
    # 添加小延迟以避免请求过快（可选，如果遇到速率限制问题可以取消注释）
    # time.sleep(0.1)

print(f"\n成功获取 {len(all_stock_data)} 个股票的数据")
print(f"失败 {len(failed_tickers)} 个股票")

if failed_tickers:
    print(f"\n失败的股票代码（前20个）: {failed_tickers[:20]}")
    if len(failed_tickers) > 20:
        print(f"... 还有 {len(failed_tickers) - 20} 个失败的股票")


In [None]:
# 合并所有股票数据到一个DataFrame
if all_stock_data:
    # 合并所有数据
    combined_df = pd.concat(all_stock_data.values(), ignore_index=True)
    
    # 重新排列列的顺序：Ticker, Date, Close, Adj Close
    combined_df = combined_df[['Ticker', 'Date', 'Close', 'Adj Close']]
    
    # 按股票代码和日期排序
    combined_df = combined_df.sort_values(['Ticker', 'Date']).reset_index(drop=True)
    
    print(f"合并后的数据形状: {combined_df.shape}")
    print(f"数据日期范围: {combined_df['Date'].min()} 到 {combined_df['Date'].max()}")
    print(f"\n数据预览:")
    print(combined_df.head(10))
    
    # 显示每个股票的数据行数统计
    print(f"\n每个股票的数据行数统计:")
    ticker_counts = combined_df.groupby('Ticker').size()
    print(ticker_counts.describe())
else:
    print("没有成功获取任何数据")


In [None]:
# 保存数据到CSV文件
output_filename = 'sp500_prices_from_2010.csv'

if all_stock_data:
    # 检查combined_df是否已定义（需要先执行Cell 7）
    if 'combined_df' not in globals():
        # 如果未定义，在这里重新创建
        combined_df = pd.concat(all_stock_data.values(), ignore_index=True)
        combined_df = combined_df[['Ticker', 'Date', 'Close', 'Adj Close']]
        combined_df = combined_df.sort_values(['Ticker', 'Date']).reset_index(drop=True)
    
    combined_df.to_csv(output_filename, index=False)
    print(f"数据已保存到: {output_filename}")
    print(f"文件大小: {os.path.getsize(output_filename) / (1024*1024):.2f} MB")
else:
    print("没有数据可保存")


In [None]:
# 可选：将数据保存为按股票代码分组的格式（每个股票一个CSV文件）
# 或者保存为Excel格式，每个股票一个sheet

# 示例：查看特定股票的数据
if all_stock_data:
    sample_ticker = list(all_stock_data.keys())[0]
    print(f"\n示例股票 {sample_ticker} 的数据:")
    print(all_stock_data[sample_ticker].head(10))
    print(f"\n该股票共有 {len(all_stock_data[sample_ticker])} 行数据")


In [None]:
# 数据质量检查：检查是否有缺失值
if all_stock_data:
    # 检查combined_df是否已定义（需要先执行Cell 7）
    if 'combined_df' not in globals():
        # 如果未定义，在这里重新创建
        combined_df = pd.concat(all_stock_data.values(), ignore_index=True)
        combined_df = combined_df[['Ticker', 'Date', 'Close', 'Adj Close']]
        combined_df = combined_df.sort_values(['Ticker', 'Date']).reset_index(drop=True)
    
    print("数据质量检查:")
    print(f"总行数: {len(combined_df)}")
    print(f"缺失值统计:")
    print(combined_df.isnull().sum())
    
    # 检查每个股票的数据完整性
    print(f"\n每个股票的数据完整性:")
    completeness = combined_df.groupby('Ticker').apply(
        lambda x: (x[['Close', 'Adj Close']].isnull().sum().sum() == 0)
    )
    incomplete_tickers = completeness[~completeness].index.tolist()
    if incomplete_tickers:
        print(f"有缺失数据的股票 ({len(incomplete_tickers)} 个): {incomplete_tickers[:10]}")
    else:
        print("所有股票的数据都是完整的")
else:
    print("没有数据可检查")
