In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
# 1. Đọc và chuẩn bị dữ liệu
from google.colab import files
uploaded = files.upload()

Saving hanoi_quality.csv to hanoi_quality.csv


In [16]:
df = pd.read_csv("hanoi_quality.csv")
df1 = df['pm25']
scaler = MinMaxScaler(feature_range=(0, 1))
df1_scaled = scaler.fit_transform(np.array(df1).reshape(-1, 1))

train_size = int(0.7 * len(df1_scaled))
test_size = len(df1_scaled) - train_size
train_data = df1_scaled[:train_size]
test_data = df1_scaled[train_size:]
def create_dataset(dataset, time_step=1):
    dataX, dataY = [], []
    for i in range(len(dataset)-time_step):
        a = dataset[i:(i+time_step), 0]
        dataX.append(a)
        dataY.append(dataset[i + time_step, 0])
    return np.array(dataX), np.array(dataY)

In [17]:
time_step = 100
X_train, y_train = create_dataset(train_data, time_step)
X_test, y_test = create_dataset(test_data, time_step)

X_train = torch.tensor(X_train, dtype=torch.float32).unsqueeze(-1)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32).unsqueeze(-1)
y_test = torch.tensor(y_test, dtype=torch.float32)

In [18]:
# 2. Xây dựng mô hình Autoformer
class SeriesDecomposition(nn.Module):
    def __init__(self, kernel_size):
        super(SeriesDecomposition, self).__init__()
        self.moving_avg = nn.AvgPool1d(kernel_size=kernel_size, stride=1, padding=kernel_size // 2)

    def forward(self, x):
        padding = x[:, :, :1].repeat(1, 1, self.moving_avg.kernel_size[0] // 2)
        x = torch.cat([padding, x, padding], dim=2)
        trend = self.moving_avg(x)
        seasonal = x - trend
        return trend, seasonal

class AutoCorrelation(nn.Module):
    def __init__(self, K):
        super(AutoCorrelation, self).__init__()
        self.K = K

    def forward(self, x):
        length = x.shape[2]
        acf = torch.zeros((x.shape[0], x.shape[1], length)).to(x.device)
        for tau in range(1, length):
            acf[:, :, tau] = torch.mean(x[:, :, :-tau] * x[:, :, tau:], dim=2)
        topk, indices = torch.topk(acf, self.K, dim=2)
        correlated_series = torch.zeros_like(x)
        for i in range(self.K):
            correlated_series += x[:, :, indices[:, :, i]]
        correlated_series /= self.K
        return correlated_series

class EncoderLayer(nn.Module):
    def __init__(self, decomp_kernel_size, K):
        super(EncoderLayer, self).__init__()
        self.decomp = SeriesDecomposition(decomp_kernel_size)
        self.auto_corr = AutoCorrelation(K)

    def forward(self, x):
        trend, seasonal = self.decomp(x)
        seasonal = self.auto_corr(seasonal)
        return trend + seasonal

class DecoderLayer(nn.Module):
    def __init__(self, decomp_kernel_size, K):
        super(DecoderLayer, self).__init__()
        self.decomp = SeriesDecomposition(decomp_kernel_size)
        self.auto_corr = AutoCorrelation(K)

    def forward(self, x):
        trend, seasonal = self.decomp(x)
        seasonal = self.auto_corr(seasonal)
        return trend + seasonal

class Autoformer(nn.Module):
    def __init__(self, enc_in, dec_in, seq_len, label_len, out_len, decomp_kernel_size, K):
        super(Autoformer, self).__init__()
        self.enc_in = enc_in
        self.dec_in = dec_in
        self.seq_len = seq_len
        self.label_len = label_len
        self.out_len = out_len

        self.encoder = EncoderLayer(decomp_kernel_size, K)
        self.decoder = DecoderLayer(decomp_kernel_size, K)

        self.enc_embedding = nn.Linear(enc_in, enc_in)
        self.dec_embedding = nn.Linear(dec_in, dec_in)

        self.projection = nn.Linear(enc_in, 1)

    def forward(self, x_enc, x_dec):
        enc_out = self.enc_embedding(x_enc)
        enc_out = self.encoder(enc_out)

        dec_out = self.dec_embedding(x_dec)
        dec_out = self.decoder(dec_out)

        output = self.projection(dec_out)
        return output

In [19]:
# 3. Huấn luyện mô hình
batch_size = 64
num_epochs = 100
learning_rate = 0.001

train_loader = torch.utils.data.DataLoader(
    dataset=torch.utils.data.TensorDataset(X_train, y_train),
    batch_size=batch_size,
    shuffle=True
)
test_loader = torch.utils.data.DataLoader(
    dataset=torch.utils.data.TensorDataset(X_test, y_test),
    batch_size=batch_size,
    shuffle=False
)

model = Autoformer(enc_in=1, dec_in=1, seq_len=time_step, label_len=time_step, out_len=1, decomp_kernel_size=25, K=5)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
for epoch in range(num_epochs):
    model.train()
    for i, (x_batch, y_batch) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(x_batch, x_batch)
        loss = criterion(outputs.squeeze(), y_batch)
        loss.backward()
        optimizer.step()

    model.eval()
    test_loss = 0
    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            outputs = model(x_batch, x_batch)
            test_loss += criterion(outputs.squeeze(), y_batch).item()
    test_loss /= len(test_loader)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}, Test Loss: {test_loss}')

In [None]:
# 4. Dự đoán và đánh giá mô hình
model.eval()
with torch.no_grad():
    train_predict = model(X_train, X_train).squeeze().numpy()
    y_pred = model(X_test, X_test).squeeze().numpy()

train_predict = scaler.inverse_transform(train_predict.reshape(-1, 1))
y_pred = scaler.inverse_transform(y_pred.reshape(-1, 1))
y_test = scaler.inverse_transform(y_test.reshape(-1, 1))

test_rmse = np.sqrt(np.mean((y_pred - y_test)**2))
print('Testing RMSE:', test_rmse)

x_input = test_data[-time_step:].reshape(1, -1)
temp_input = list(x_input[0])

lst_output = []
i = 0
while i < 30:
    if len(temp_input) > time_step:
        x_input = np.array(temp_input[1:])
        x_input = torch.tensor(x_input.reshape((1, time_step, 1)), dtype=torch.float32)
        yhat = model(x_input, x_input).detach().numpy()
        temp_input.extend(yhat[0].tolist())
        temp_input = temp_input[1:]
        lst_output.extend(yhat.tolist())
        i += 1
    else:
        x_input = torch.tensor(x_input.reshape((1, time_step, 1)), dtype=torch.float32)
        yhat = model(x_input, x_input).detach().numpy()
        temp_input.extend(yhat[0].tolist())
        lst_output.extend(yhat.tolist())
        i += 1

In [None]:

plt.plot(scaler.inverse_transform(train_data), label='Train Data')
plt.plot(np.arange(train_size, train_size+len(test_data)), scaler.inverse_transform(test_data), label='Test Data')


plt.plot(np.arange(train_size+time_step, train_size+len(test_data)-1), y_pred, label='Test Predictions')

plt.plot(np.arange(len(df1_scaled), len(df1_scaled)+30), scaler.inverse_transform(lst_output), label='30 Days Forecast')

plt.legend()
plt.show()