In [2]:
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子保证可复现性
torch.manual_seed(42)
np.random.seed(42)

In [5]:
# 1. 数据获取和准备
def fetch_stock_data(ticker='AAPL', start_date='2020-01-01', end_date='2024-01-01'):
    """获取股票数据"""
    stock = yf.download(ticker, start=start_date, end=end_date)
    return stock

# 获取苹果公司股票数据
df = fetch_stock_data('AAPL', '2019-01-01', '2020-01-01')
print(f"数据形状: {df.shape}")
print(f"数据列名: {df.columns.tolist()}")

# 选择特征：使用OHLCV数据
features = ['Open', 'High', 'Low', 'Close', 'Volume']
df = df[features]


[*********************100%***********************]  1 of 1 completed

1 Failed download:
['AAPL']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')


数据形状: (0, 6)
数据列名: [('Adj Close', 'AAPL'), ('Close', 'AAPL'), ('High', 'AAPL'), ('Low', 'AAPL'), ('Open', 'AAPL'), ('Volume', 'AAPL')]


In [None]:
# 2. 数据预处理
def prepare_data(df, seq_length=10):
    """准备时序数据"""
    # 数据归一化
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_data = scaler.fit_transform(df)
    
    # 创建时序数据
    X, y = [], []
    for i in range(len(scaled_data) - seq_length):
        X.append(scaled_data[i:i+seq_length])  # 过去seq_length天的数据
        y.append(scaled_data[i+seq_length, 3])  # 预测第seq_length+1天的收盘价（索引3）
    
    X = np.array(X)
    y = np.array(y)
    
    return X, y, scaler

seq_length = 10
X, y, scaler = prepare_data(df, seq_length)

# 划分训练集和测试集（80%训练，20%测试）
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]

print(f"训练集大小: {X_train.shape}, 测试集大小: {X_test.shape}")

# 转换为PyTorch张量
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train).view(-1, 1)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test).view(-1, 1)

# 创建数据加载器
batch_size = 32
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# 3. 定义BPNN模型
class BPNN(nn.Module):
    def __init__(self, input_size, hidden_size=50, output_size=1):
        super(BPNN, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.fc2 = nn.Linear(hidden_size, hidden_size//2)
        self.fc3 = nn.Linear(hidden_size//2, output_size)
    
    def forward(self, x):
        x = self.flatten(x)  # 将时序数据展平
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
# 4. 定义LSTM模型
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size=50, num_layers=2, output_size=1, dropout=0.2):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # 初始化隐藏状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # LSTM前向传播
        lstm_out, _ = self.lstm(x, (h0, c0))
        
        # 只取最后一个时间步的输出
        lstm_out = lstm_out[:, -1, :]
        
        # 全连接层
        out = self.dropout(lstm_out)
        out = self.fc(out)
        return out

In [None]:
# 5. 训练函数
def train_model(model, train_loader, epochs=100, learning_rate=0.001, model_name="Model"):
    """训练模型"""
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    train_losses = []
    model.train()
    
    print(f"\n开始训练 {model_name}...")
    for epoch in range(epochs):
        epoch_loss = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            predictions = model(batch_X)
            loss = criterion(predictions, batch_y)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        
        avg_loss = epoch_loss / len(train_loader)
        train_losses.append(avg_loss)
        
        if (epoch + 1) % 20 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.6f}")
    
    return train_losses

In [None]:
# 6. 评估函数
def evaluate_model(model, X_test, y_test, scaler, model_name="Model"):
    """评估模型性能"""
    model.eval()
    with torch.no_grad():
        predictions = model(X_test)
    
    # 将预测值和真实值反归一化
    # 创建一个临时数组来存放反归一化的数据
    temp_array = np.zeros((len(predictions), len(features)))
    temp_array[:, 3] = predictions.numpy().flatten()  # 将预测的收盘价放在第4列
    
    predictions_actual = scaler.inverse_transform(temp_array)[:, 3]
    
    temp_array[:, 3] = y_test.numpy().flatten()
    y_test_actual = scaler.inverse_transform(temp_array)[:, 3]
    
    # 计算评估指标
    mse = mean_squared_error(y_test_actual, predictions_actual)
    mae = mean_absolute_error(y_test_actual, predictions_actual)
    rmse = np.sqrt(mse)
    
    # 计算R²分数
    ss_res = np.sum((y_test_actual - predictions_actual) ** 2)
    ss_tot = np.sum((y_test_actual - np.mean(y_test_actual)) ** 2)
    r2 = 1 - (ss_res / ss_tot)
    
    print(f"\n{model_name} 评估结果:")
    print(f"MSE: {mse:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"R² Score: {r2:.4f}")
    
    return predictions_actual, y_test_actual, {'mse': mse, 'rmse': rmse, 'mae': mae, 'r2': r2}

In [None]:
# 7. 初始化并训练模型
# BPNN模型（输入维度 = seq_length * 特征数量）
bpnn_input_size = seq_length * len(features)
bpnn_model = BPNN(input_size=bpnn_input_size, hidden_size=64, output_size=1)
bpnn_losses = train_model(bpnn_model, train_loader, epochs=100, learning_rate=0.001, model_name="BPNN")

# LSTM模型（输入维度 = 特征数量）
lstm_model = LSTMModel(input_size=len(features), hidden_size=50, num_layers=2, output_size=1)
lstm_losses = train_model(lstm_model, train_loader, epochs=100, learning_rate=0.001, model_name="LSTM")


In [None]:
# 8. 评估两个模型
bpnn_pred, bpnn_true, bpnn_metrics = evaluate_model(bpnn_model, X_test_tensor, y_test_tensor, scaler, "BPNN")
lstm_pred, lstm_true, lstm_metrics = evaluate_model(lstm_model, X_test_tensor, y_test_tensor, scaler, "LSTM")

In [None]:
# 9. 可视化结果
plt.figure(figsize=(15, 10))

# 训练损失对比
plt.subplot(2, 2, 1)
plt.plot(bpnn_losses, label='BPNN Loss', alpha=0.7)
plt.plot(lstm_losses, label='LSTM Loss', alpha=0.7)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('训练损失对比')
plt.legend()
plt.grid(True, alpha=0.3)

# 预测结果对比 - BPNN
plt.subplot(2, 2, 2)
plt.plot(bpnn_true, label='Actual Price', alpha=0.7)
plt.plot(bpnn_pred, label='BPNN Prediction', alpha=0.7)
plt.xlabel('Time Step')
plt.ylabel('Price (USD)')
plt.title(f'BPNN预测 vs 实际股价 (R²={bpnn_metrics["r2"]:.3f})')
plt.legend()
plt.grid(True, alpha=0.3)

# 预测结果对比 - LSTM
plt.subplot(2, 2, 3)
plt.plot(lstm_true, label='Actual Price', alpha=0.7)
plt.plot(lstm_pred, label='LSTM Prediction', alpha=0.7)
plt.xlabel('Time Step')
plt.ylabel('Price (USD)')
plt.title(f'LSTM预测 vs 实际股价 (R²={lstm_metrics["r2"]:.3f})')
plt.legend()
plt.grid(True, alpha=0.3)

# 模型性能指标对比
plt.subplot(2, 2, 4)
metrics = ['RMSE', 'MAE', 'R²']
bpnn_values = [bpnn_metrics['rmse'], bpnn_metrics['mae'], bpnn_metrics['r2']]
lstm_values = [lstm_metrics['rmse'], lstm_metrics['mae'], lstm_metrics['r2']]

x = np.arange(len(metrics))
width = 0.35

plt.bar(x - width/2, bpnn_values, width, label='BPNN', alpha=0.8)
plt.bar(x + width/2, lstm_values, width, label='LSTM', alpha=0.8)
plt.xlabel('Metrics')
plt.ylabel('Value')
plt.title('模型性能指标对比')
plt.xticks(x, metrics)
plt.legend()
plt.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

In [None]:
# 10. 进行未来预测
def predict_future_price(model, last_sequence, scaler, model_type='lstm'):
    """预测未来一天的价格"""
    model.eval()
    with torch.no_grad():
        if model_type == 'bpnn':
            # 对BPNN需要展平输入
            prediction = model(last_sequence)
        else:
            # LSTM保持原始形状
            prediction = model(last_sequence)
    
    # 反归一化预测结果
    temp_array = np.zeros((1, len(features)))
    temp_array[0, 3] = prediction.item()
    
    predicted_price = scaler.inverse_transform(temp_array)[0, 3]
    return predicted_price

# 获取最后一段序列进行预测
last_sequence = X_test_tensor[-1:].unsqueeze(0) if X_test_tensor[-1:].dim() == 2 else X_test_tensor[-1:]

# BPNN预测需要展平
bpnn_last_seq = X_test_tensor[-1:].view(1, -1)

# 进行预测
bpnn_future_price = predict_future_price(bpnn_model, bpnn_last_seq, scaler, 'bpnn')
lstm_future_price = predict_future_price(lstm_model, last_sequence, scaler, 'lstm')
actual_future_price = scaler.inverse_transform(
    np.zeros((1, len(features)))
).copy()
actual_future_price[0, 3] = y_test[-1]
actual_future_price = actual_future_price[0, 3]

print("\n" + "="*50)
print("未来一天股价预测结果:")
print("="*50)
print(f"实际最后一天股价: ${actual_future_price:.2f}")
print(f"BPNN预测未来股价: ${bpnn_future_price:.2f}")
print(f"LSTM预测未来股价: ${lstm_future_price:.2f}")
print(f"BPNN预测误差: ${abs(bpnn_future_price - actual_future_price):.2f}")
print(f"LSTM预测误差: ${abs(lstm_future_price - actual_future_price):.2f}")


In [None]:
# 11. 模型架构可视化
print("\n" + "="*50)
print("模型架构总结:")
print("="*50)
print("BPNN架构:")
print(bpnn_model)
print(f"\n总参数数量: {sum(p.numel() for p in bpnn_model.parameters()):,}")

print("\nLSTM架构:")
print(lstm_model)
print(f"\n总参数数量: {sum(p.numel() for p in lstm_model.parameters()):,}")