In [9]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# 假设data是一个包含视频流量数据的NumPy数组
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)

# 创建时间序列数据集
def create_dataset(data, seq_length):
    xs, ys = [], []
    for i in range(len(data) - seq_length):
        xs.append(data[i:i + seq_length])
        ys.append(data[i + seq_length])
    return torch.tensor(xs), torch.tensor(ys)

seq_length = 50  # 定义序列长度
X, y = create_dataset(data_scaled, seq_length)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 转换为PyTorch DataLoader
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)

# 定义RNN模型
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_layer_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_layer_size = hidden_layer_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_layer_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_layer_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_layer_size)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])  # 取序列最后一个时间步的输出
        return out

input_size = 1  # 特征数量
hidden_layer_size = 100  # 隐藏层大小
num_layers = 2  # 层数
output_size = 1  # 输出大小

model = LSTMModel(input_size, hidden_layer_size, num_layers, output_size)

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练模型
epochs = 100
for epoch in range(epochs):
    for i, (inputs, targets) in enumerate(train_loader):
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, targets)

        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# 模型评估
model.eval()
with torch.no_grad():
    predictions = model(X_test)
    test_loss = criterion(predictions, y_test)
print(f'Test Loss: {test_loss.item():.4f}')


ValueError: Expected 2D array, got 1D array instead:
array=[ 0.          0.00628314  0.01256604 ... -0.01884844 -0.01256604
 -0.00628314].
Reshape your data either using array.reshape(-1, 1) if your data has a single feature or array.reshape(1, -1) if it contains a single sample.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# 生成模拟数据
# 假设我们有一个时间序列数据，这里我们使用正弦波函数生成
np.random.seed(0)
torch.manual_seed(0)

def generate_wave_data(amplitude, frequency, phase, duration, sample_rate):
    t = np.arange(0, duration, 1/sample_rate)
    data = amplitude * np.sin(2 * np.pi * frequency * t + phase)
    return data

# 参数设置
sample_rate = 100  # 采样率
duration = 1000    # 数据时长（单位：秒）
seq_length = 50     # 序列长度

# 生成数据
amplitude = 1.0
frequency = 0.1
phase = 0
data = generate_wave_data(amplitude, frequency, phase, duration, sample_rate)

# 归一化数据
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data.reshape(-1, 1)).reshape(-1)

# 创建时间序列数据集
X, y = create_dataset(data_scaled, seq_length)

# 转换为PyTorch张量
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)

# 创建数据加载器
train_size = int(0.8 * len(X_tensor))
test_size = len(X_tensor) - train_size
X_train, X_test = X_tensor[:train_size], X_tensor[train_size:]
y_train, y_test = y_tensor[:train_size], y_tensor[train_size:]

train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

batch_size = 64
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# 定义LSTM模型（这里使用之前定义的LSTMModel类）
input_size = 1
hidden_layer_size = 50
num_layers = 1
output_size = 1

model = LSTMModel(input_size, hidden_layer_size, num_layers, output_size)

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练模型
epochs = 100
for epoch in range(epochs):
    model.train()
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# 模型评估
model.eval()
with torch.no_grad():
    predictions = model(X_test.unsqueeze(1))  # 增加维度以匹配模型的期望输入
    test_loss = criterion(predictions, y_test.unsqueeze(1))
print(f'Test Loss: {test_loss.item():.4f}')

# 绘制预测结果
predicted_data = scaler.inverse_transform(predictions.numpy().reshape(-1, 1)).reshape(-1)
original_data = scaler.inverse_transform(y_test.numpy().reshape(-1, 1)).reshape(-1)

plt.figure(figsize=(10, 6))
plt.plot(original_data, label='Original Data')
plt.plot(predicted_data, label='Predicted Data')
plt.legend()
plt.show()