环境配置

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.ar_model import AutoReg
from arch import arch_model
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import ParameterGrid
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import time
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

数据预处理

In [None]:
# 加载数据
data = pd.read_csv('AAPL历史数据.csv', parse_dates=['日期'])
data.set_index('日期', inplace=True)
close_prices = data[['收盘']].values

# 归一化
scaler = MinMaxScaler(feature_range=(-1, 1))  
scaled_data = scaler.fit_transform(close_prices)

生成合成数据

In [None]:
# 自回归模型（AR）
ar_model = AutoReg(scaled_data, lags=5).fit()
ar_forecast = ar_model.predict(start=len(scaled_data), end=len(scaled_data)+100, dynamic=False)

# 广义自回归条件异方差模型（GARCH）
scale_factor = 10  
scaled_data_for_garch = scaled_data * scale_factor

garch_model = arch_model(scaled_data_for_garch, vol='GARCH', p=1, q=1)
garch_results = garch_model.fit(disp='off')
garch_forecast_scaled = garch_results.forecast(horizon=100)
garch_forecast = garch_forecast_scaled.variance.iloc[-1].values / (scale_factor ** 2)

# 分数布朗运动（fBm）
def fbm(n, H):
    t = np.linspace(0, 1, n)
    W = np.random.normal(size=n)
    W = np.cumsum(W) * (1.0 / n)**H
    return W

fbm_data = fbm(100, 0.7)

# 输出结果
print("AR模型预测结果:", ar_forecast)
print("GARCH模型预测结果:", garch_forecast)
print("分数布朗运动数据:", fbm_data)

# 可视化分析

# AR模型可视化
plt.figure(figsize=(10, 6))
plt.plot(ar_forecast, label='AR模型预测结果', color='blue', marker='o', linestyle='-', linewidth=1)
plt.title('')
plt.xlabel('时间步长')
plt.ylabel('预测值')
plt.legend()
plt.grid(True)
plt.savefig('AR模型预测结果.png', dpi=300)
plt.close()

# GARCH模型可视化
plt.figure(figsize=(10, 6))
plt.plot(garch_forecast, label='GARCH模型预测波动率', color='green', marker='x', linestyle='--', linewidth=1)
plt.title('')
plt.xlabel('时间步长')
plt.ylabel('波动率')
plt.legend()
plt.grid(True)
plt.savefig('GARCH模型预测波动率.png', dpi=300)
plt.close()

# 分数布朗运动可视化
plt.figure(figsize=(10, 6))
for _ in range(5):  # 绘制5条路径进行展示
    fbm_path = fbm(100, 0.7)
    plt.plot(fbm_path, label=f'fBm路径 {_+1}', alpha=0.7)
plt.title('')
plt.xlabel('时间步长')
plt.ylabel('值')
plt.legend()
plt.grid(True)
plt.savefig('分数布朗运动路径图.png', dpi=300)
plt.close()

SBTS模型的实现与训练

In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import time
import matplotlib.pyplot as plt

# 假设 scaled_data 已经被定义并预处理
# 假设 data 是原始的 pandas DataFrame

# 创建增强数据集
sequence_length = 60
X, y = [], []
for i in range(len(scaled_data) - sequence_length):
    X.append(scaled_data[i:i+sequence_length])
    y.append(scaled_data[i+sequence_length])
X, y = np.array(X), np.array(y)

X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)
dataset = TensorDataset(X_tensor, y_tensor)

# 划分训练集和验证集
train_ratio = 0.8
train_size = int(len(dataset) * train_ratio)
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, len(dataset) - train_size])
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=128, shuffle=False)

# 使用指定的超参数
best_params = {
    'hidden_size': 512,
    'learning_rate': 0.0005,
    'num_layers': 4,
    'dropout_rate': 0.2
}

# 定义模型
class EnhancedSBTS(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout_rate):
        super(EnhancedSBTS, self).__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True
        )
        self.self_attn = nn.MultiheadAttention(embed_dim=hidden_size*2, num_heads=8)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size*2, 256),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(64, input_size),
            nn.Tanh()
        )
        
    def forward(self, x):
        out, _ = self.lstm(x)
        out = out.permute(1, 0, 2)
        attn_out, _ = self.self_attn(out, out, out)
        out = attn_out.permute(1, 0, 2)
        out = self.fc(out[:, -1, :])
        return out.squeeze(-1)

# 使用指定超参数初始化模型
model = EnhancedSBTS(input_size=1, hidden_size=best_params['hidden_size'], num_layers=best_params['num_layers'], dropout_rate=best_params['dropout_rate'])
criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=best_params['learning_rate'], weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3)

# 注释掉训练部分的代码，但保留训练时间变量
# train_dataloader = DataLoader(dataset, batch_size=128, shuffle=True)
# 
# model.train()
# start_time = time.time()
# for epoch in range(50):  # 修改训练轮次为 50
#     model.train()
#     total_loss = 0
#     for batch_idx, (X_batch, y_batch) in enumerate(train_dataloader):
#         optimizer.zero_grad()
#         outputs = model(X_batch)
#         loss = criterion(outputs, y_batch.squeeze(-1))
#         loss.backward()
#         nn.utils.clip_grad_norm_(model.parameters(), 0.5)
#         optimizer.step()
#         total_loss += loss.item()
#         progress = (batch_idx + 1) / len(train_dataloader) * 100
#         print(f'Epoch {epoch+1}/50, Batch {batch_idx+1}/{len(train_dataloader)}, Progress: {progress:.2f}%, Loss: {loss.item():.6f}', end='\r')
#     avg_loss = total_loss / len(train_dataloader)
#     scheduler.step(avg_loss)
#     print(f'\nEpoch {epoch+1}, Average Loss: {avg_loss:.6f}')
# train_time_sbts = time.time() - start_time

# 初始化生成时间变量
gen_time_sbts = 0
train_time_sbts = 0

# 生成增强数据
model.eval()
start_time = time.time()  # 开始计时
with torch.no_grad():
    generated = []
    seed_sequence = X_tensor[0:1]
    
    # 应用核密度估计增强数据
    def kernel_density_estimation(data, h=0.05):
        return torch.normal(mean=data, std=h)
    
    for _ in range(len(X_tensor)):
        # 生成预测
        pred = model(seed_sequence)
        
        # 应用核密度估计来增强数据，生成1个样本
        enhanced_pred = kernel_density_estimation(pred, h=0.05)
        generated.append(enhanced_pred.numpy()[0])
        
        # 更新序列以继续生成
        pred_reshaped = pred.unsqueeze(1).unsqueeze(2)
        new_seq = torch.cat([seed_sequence[:, 1:, :], pred_reshaped], dim=1)
        seed_sequence = new_seq
    
    generated_data_sbts = np.array(generated)
gen_time_sbts = time.time() - start_time  # 结束计时并记录生成时间

# 使用正确的逆变换方法
generated_data_sbts = scaler.inverse_transform(generated_data_sbts.reshape(-1, 1))

# 对齐数据
synth_df = pd.DataFrame(
    generated_data_sbts,
    index=data.index[sequence_length:sequence_length + len(generated_data_sbts)],
    columns=['收盘']  # 假设列名为“收盘”
)

# 确保 real_data 和 synth_data 的长度一致
real_data = data.iloc[sequence_length:sequence_length + len(synth_df)]

# 检查数据长度是否一致
print(f"生成数据长度: {len(synth_df)}")
print(f"原始数据长度: {len(real_data)}")

# 绘制结果
plt.figure(figsize=(12, 6))
plt.plot(real_data.index, real_data['收盘'], label='真实数据', color='steelblue')
plt.plot(synth_df.index, synth_df['收盘'], label='合成数据', color='darkorange', alpha=0.7)
plt.xlabel('日期')
plt.ylabel('收盘价')
plt.title('真实数据与合成数据对比')
plt.legend()
plt.show()

# 输出时间和数据变量
print(f"训练时间: {train_time_sbts} 秒")
print(f"生成时间: {gen_time_sbts} 秒")
print(f"生成的数据已存储到 generated_data_sbts")

TimeGAN模型的实现与训练

In [None]:
class Generator(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Generator, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()  # 添加 sigmoid 激活函数
   
    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        c0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out)
        out = self.sigmoid(out)  
        return out
        
class Discriminator(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Discriminator, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()  # 添加 sigmoid 激活函数
   
    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        c0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        out = self.sigmoid(out)  
        return out

# 参数设置
input_size = 1
hidden_size = 64
output_size = 1
learning_rate = 0.001
num_epochs = 100

# 初始化模型和优化器
generator = Generator(input_size, hidden_size, output_size)
discriminator = Discriminator(input_size, hidden_size, 1)
optimizer_g = torch.optim.Adam(generator.parameters(), lr=learning_rate)
optimizer_d = torch.optim.Adam(discriminator.parameters(), lr=learning_rate)
criterion = nn.BCELoss()

# 训练模型
start_time = time.time()
for epoch in range(num_epochs):
    for X_batch, y_batch in dataloader:
        # 训练判别器
        optimizer_d.zero_grad()
        real_outputs = discriminator(X_batch)
        real_labels = torch.ones(real_outputs.size(0), 1).to(X_batch.device)
        fake_data = generator(X_batch)
        fake_outputs = discriminator(fake_data.detach())
        fake_labels = torch.zeros(fake_outputs.size(0), 1).to(X_batch.device)
        d_loss = criterion(real_outputs, real_labels) + criterion(fake_outputs, fake_labels)
        d_loss.backward()
        optimizer_d.step()
        
        # 训练生成器
        optimizer_g.zero_grad()
        fake_data = generator(X_batch)
        g_outputs = discriminator(fake_data)
        g_loss = criterion(g_outputs, real_labels)
        g_loss.backward()
        optimizer_g.step()
train_time_timegan = time.time() - start_time  # 存储训练时间到 train_time_timegan

# 生成数据
start_time = time.time()
with torch.no_grad():
    generated_data_timegan = generator(X_tensor)  # 存储生成的数据到 generated_data_timegan
gen_time_timegan = time.time() - start_time  # 存储生成时间到 gen_time_timegan

Diffusion-SB模型的实现与训练

In [None]:
class DiffusionSB(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, T=100):
        super(DiffusionSB, self).__init__()
        self.T = T
        self.beta = torch.linspace(0.0001, 0.02, T)
        self.alpha = 1 - self.beta
        self.alpha_bar = torch.cumprod(self.alpha, dim=0)
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, t):
        h0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        c0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out)
        return out

# 参数设置
input_size = 1
hidden_size = 64
output_size = 1
learning_rate = 0.001
num_epochs = 100
T = 100

# 初始化模型和优化器
model = DiffusionSB(input_size, hidden_size, output_size, T)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# 训练模型
start_time = time.time()
for epoch in range(num_epochs):
    for X_batch, y_batch in dataloader:
        optimizer.zero_grad()
        t = torch.randint(0, T, (X_batch.size(0),)).to(X_batch.device)
        noise = torch.randn_like(X_batch)
        alpha_bar_t = model.alpha_bar[t].view(-1, 1, 1)
        x_t = alpha_bar_t.sqrt() * X_batch + (1 - alpha_bar_t).sqrt() * noise
        predicted_noise = model(x_t, t)
        loss = nn.MSELoss()(predicted_noise, noise)
        loss.backward()
        optimizer.step()
train_time_diffusion = time.time() - start_time  # 存储训练时间到 train_time_diffusion

# 生成数据
start_time = time.time()
with torch.no_grad():
    generated_data_diffusion = torch.randn(X_tensor.size(0), 100, 1).to(X_tensor.device)
    for t in reversed(range(T)):
        alpha_t = model.alpha[t]
        alpha_bar_t = model.alpha_bar[t]
        beta_t = model.beta[t]
        predicted_noise = model(generated_data_diffusion, torch.full((generated_data_diffusion.size(0),), t, dtype=torch.long).to(generated_data_diffusion.device))
        generated_data_diffusion = (generated_data_diffusion - beta_t / (1 - alpha_bar_t).sqrt() * predicted_noise) / alpha_t.sqrt()
        if t > 0:
            generated_data_diffusion += model.beta[t].sqrt() * torch.randn_like(generated_data_diffusion)
gen_time_diffusion = time.time() - start_time  # 存储生成时间到 gen_time_diffusion

CAVE模型的实现与训练

In [None]:
class CVAE(nn.Module):
    def __init__(self, input_size, hidden_size, latent_size):
        super(CVAE, self).__init__()
        self.encoder = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc_mu = nn.Linear(hidden_size, latent_size)
        self.fc_logvar = nn.Linear(hidden_size, latent_size)
        self.decoder = nn.LSTM(latent_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, input_size)
    
    def encode(self, x):
        h0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        c0 = torch.zeros(1, x.size(0), hidden_size).to(x.device)
        out, _ = self.encoder(x, (h0, c0))
        mu = self.fc_mu(out[:, -1, :])
        logvar = self.fc_logvar(out[:, -1, :])
        return mu, logvar
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z):
        h0 = torch.zeros(1, z.size(0), hidden_size).to(z.device)
        c0 = torch.zeros(1, z.size(0), hidden_size).to(z.device)
        z = z.unsqueeze(1).repeat(1, 100, 1)
        out, _ = self.decoder(z, (h0, c0))
        out = self.fc(out)
        return out
    
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        reconstructed = self.decode(z)
        return reconstructed, mu, logvar

# 参数设置
input_size = 1
hidden_size = 64
latent_size = 16
learning_rate = 0.001
num_epochs = 100

# 初始化模型和优化器
model = CVAE(input_size, hidden_size, latent_size)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# 训练模型
start_time = time.time()
for epoch in range(num_epochs):
    for X_batch, y_batch in dataloader:
        optimizer.zero_grad()
        reconstructed, mu, logvar = model(X_batch)
        reconstruction_loss = nn.MSELoss()(reconstructed, X_batch)
        kl_loss = 0.5 * torch.sum(torch.exp(logvar) + mu**2 - 1 - logvar)
        loss = reconstruction_loss + kl_loss
        loss.backward()
        optimizer.step()
train_time_cvae = time.time() - start_time  # 存储训练时间到 train_time_cvae

# 生成数据
start_time = time.time()
with torch.no_grad():
    z = torch.randn(X_tensor.size(0), latent_size).to(X_tensor.device)
    generated_data_cvae = model.decode(z)  # 存储生成的数据到 generated_data_cvae
gen_time_cvae = time.time() - start_time  # 存储生成时间到 gen_time_cvae

统计特性分析

In [None]:
def calculate_statistics(data):
    mean = np.mean(data)
    std = np.std(data)
    skew = pd.Series(data.flatten()).skew()
    kurtosis = pd.Series(data.flatten()).kurtosis()
    return mean, std, skew, kurtosis

# 计算统计特性
real_mean, real_std, real_skew, real_kurtosis = calculate_statistics(scaled_data)
sbts_mean, sbts_std, sbts_skew, sbts_kurtosis = calculate_statistics(generated_data_sbts.numpy())
timegan_mean, timegan_std, timegan_skew, timegan_kurtosis = calculate_statistics(generated_data_timegan.numpy())
diffusion_mean, diffusion_std, diffusion_skew, diffusion_kurtosis = calculate_statistics(generated_data_diffusion.numpy())
cvae_mean, cvae_std, cvae_skew, cvae_kurtosis = calculate_statistics(generated_data_cvae.numpy())

# 打印结果
print("真实数据: 均值=%.4f, 标准差=%.4f, 偏度=%.4f, 峰度=%.4f" % (real_mean, real_std, real_skew, real_kurtosis))
print("SBTS: 均值=%.4f, 标准差=%.4f, 偏度=%.4f, 峰度=%.4f" % (sbts_mean, sbts_std, sbts_skew, sbts_kurtosis))
print("TimeGAN: 均值=%.4f, 标准差=%.4f, 偏度=%.4f, 峰度=%.4f" % (timegan_mean, timegan_std, timegan_skew, timegan_kurtosis))
print("Diffusion-SB: 均值=%.4f, 标准差=%.4f, 偏度=%.4f, 峰度=%.4f" % (diffusion_mean, diffusion_std, diffusion_skew, diffusion_kurtosis))
print("CVAE: 均值=%.4f, 标准差=%.4f, 偏度=%.4f, 峰度=%.4f" % (cvae_mean, cvae_std, cvae_skew, cvae_kurtosis))

ACF和PACF图

In [None]:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf


# 绘制真实数据的ACF
plt.figure(figsize=(6, 4))
plot_acf(data['收盘'].values)
plt.title('')
plt.tight_layout()
plt.savefig('real_acf.png', dpi=300)
plt.close()

# 绘制真实数据的PACF
plt.figure(figsize=(6, 4))
plot_pacf(data['收盘'].values)
plt.title('')
plt.tight_layout()
plt.savefig('real_pacf.png', dpi=300)
plt.close()

# 绘制SBTS生成数据的ACF
plt.figure(figsize=(6, 4))
plot_acf(generated_data_sbts[:, -1].numpy())
plt.title('')
plt.tight_layout()
plt.savefig('sbts_acf.png', dpi=300)
plt.close()

# 绘制SBTS生成数据的PACF
plt.figure(figsize=(6, 4))
plot_pacf(generated_data_sbts[:, -1].numpy())
plt.title('')
plt.tight_layout()
plt.savefig('sbts_pacf.png', dpi=300)
plt.close()

训练和生成时间

In [None]:
print("SBTS 训练时间: %.2f秒, 生成时间: %.2f秒" % (train_time_sbts, gen_time_sbts))
print("TimeGAN 训练时间: %.2f秒, 生成时间: %.2f秒" % (train_time_timegan, gen_time_timegan))
print("Diffusion-SB 训练时间: %.2f秒, 生成时间: %.2f秒" % (train_time_diffusion, gen_time_diffusion))
print("CVAE 训练时间: %.2f秒, 生成时间: %.2f秒" % (train_time_cvae, gen_time_cvae))

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error

# 对齐数据
synth_df = pd.DataFrame(
    generated_data_sbts, 
    index=data.index[sequence_length:sequence_length + len(generated_data_sbts)],
    columns=['收盘']
)

# 确保 real_data 和 synth_data 的长度一致
real_data = data.iloc[sequence_length:sequence_length + len(synth_df)]
synth_data = synth_df.copy()  

# 仅选择2024年7月后的数据
start_date = '2024-07-01'  # 2024年7月开始的日期
real_data_2024 = real_data.loc[real_data.index >= start_date]
synth_data_2024 = synth_data.loc[synth_data.index >= start_date]

# 截取相同长度的数据
min_length = min(len(real_data_2024), len(synth_data_2024))
real_data_2024 = real_data_2024.iloc[:min_length]
synth_data_2024 = synth_data_2024.iloc[:min_length]

# 将数据转换为 numpy 数组并确保为数值类型
real_values = pd.to_numeric(real_data_2024['收盘'], errors='coerce').values
synth_values = pd.to_numeric(synth_data_2024['收盘'], errors='coerce').values

# 检查是否存在 NaN 值
print("real_values 中 NaN 的数量:", np.isnan(real_values).sum())
print("synth_values 中 NaN 的数量:", np.isnan(synth_values).sum())

# 如果存在 NaN 值，替换为零或进行插值
if np.isnan(real_values).any():
    real_values = np.nan_to_num(real_values)
if np.isnan(synth_values).any():
    synth_values = np.nan_to_num(synth_values)

# 计算统计指标
real_mse = mean_squared_error(real_values[:-1], real_values[1:])
synth_mse = mean_squared_error(real_values, synth_values)

print(f"真实数据基准MSE: {real_mse:.4f}")
print(f"合成数据MSE: {synth_mse:.4f}")

if synth_mse < real_mse:
    improvement = (real_mse - synth_mse) / real_mse * 100
    print(f"误差降低 {improvement:.1f}%，模型验证成功！")
else:
    print("需要进一步优化模型参数或结构")

# 可视化结果
plt.figure(figsize=(21, 6))
plt.plot(real_data_2024.index, real_values, label='真实数据', alpha=0.7)
plt.plot(synth_data_2024.index, synth_values, label='合成数据', alpha=0.7)

plt.xlabel('日期')
plt.ylabel('收盘价')
plt.title('')
plt.legend()

# 添加统计信息到图表
#plt.text(0.05, 0.9, f'真实数据基准MSE: {real_mse:.4f}', transform=plt.gca().transAxes, fontsize=10)
#plt.text(0.05, 0.85, f'合成数据MSE: {synth_mse:.4f}', transform=plt.gca().transAxes, fontsize=10)
#plt.text(0.05, 0.8, ('模型验证成功！误差降低 {:.1f}%'.format(improvement) if synth_mse < real_mse else '需要进一步优化模型'), 
         #transform=plt.gca().transAxes, fontsize=10)

# 保存可视化结果到当前文件夹
plt.savefig('真实数据、合成数据对比.png')
plt.show()