In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torch.nn import functional as F
import time
import optuna
import copy

Модель

In [None]:
class TemporalFusionTransformer(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, dropout_rate):
        super(TemporalFusionTransformer, self).__init__()
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.attn = nn.MultiheadAttention(hidden_size, num_heads=4, batch_first=True)
        self.fc1 = nn.Linear(hidden_size, 64)
        self.fc2 = nn.Linear(64, output_size)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        rnn_out, _ = self.rnn(x)
        attn_out, _ = self.attn(rnn_out, rnn_out, rnn_out)
        out = self.fc1(attn_out[:, -1, :])
        out = F.relu(out)
        out = self.dropout(out)
        out = self.fc2(out)
        return out

Загрузка данных и создание датасета

In [None]:
df = pd.read_csv('daily_accidents_hol_dw_week.csv', parse_dates=['CRASH DATE'])
df.set_index('CRASH DATE', inplace=True)

df = df[['CRASH_COUNT', 'is_weekend', 'month', 'is_holiday']]
df_daily = df.resample('D').sum()

scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(df_daily)

df_scaled = pd.DataFrame(data_scaled, index=df_daily.index, columns=df_daily.columns)

def create_sequences(data, seq_length):
    sequences = []
    labels = []
    for i in range(len(data) - seq_length):
        # input: [seq_length, 4]
        seq = data[i:i + seq_length]
        label = data[i + seq_length][0]
        sequences.append(seq)
        labels.append(label)
    return np.array(sequences), np.array(labels)

seq_length = 30
train_size = int(len(df_scaled) * 0.8)

train_data = data_scaled[:train_size]
test_data = data_scaled[train_size:]

train_seq, train_labels = create_sequences(train_data, seq_length)
test_seq, test_labels = create_sequences(test_data, seq_length)

train_seq = torch.tensor(train_seq, dtype=torch.float32)
train_labels = torch.tensor(train_labels, dtype=torch.float32).unsqueeze(1)

test_seq = torch.tensor(test_seq, dtype=torch.float32)
test_labels = torch.tensor(test_labels, dtype=torch.float32).unsqueeze(1)

train_dataset = TensorDataset(train_seq, train_labels)
test_dataset = TensorDataset(test_seq, test_labels)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


Функция подбора гиперпараметров

In [None]:
def objective(trial):
    hidden_size = trial.suggest_categorical("hidden_size", [x for x in range(32, 257, 8) if x % 4 == 0])
    num_layers = trial.suggest_int("num_layers", 1, 4)
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)
    learning_rate = trial.suggest_float("lr", 1e-4, 1e-2, log=True)

    model = TemporalFusionTransformer(
        input_size=4,
        hidden_size=hidden_size,
        output_size=1,
        num_layers=num_layers,
        dropout_rate=dropout_rate
    )

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    model.train()
    for epoch in range(20):  # можно увеличить
        for inputs, labels in train_loader:
            inputs, labels = inputs, labels
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    # Оценка на валидации
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs, labels
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    return val_loss / len(test_loader)



Подбор гиперпараметров

In [None]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=30)
print("Best parameters:", study.best_params)

[I 2025-05-04 11:26:38,095] A new study created in memory with name: no-name-6898db8d-dee0-4559-9b80-049968d0b9ea
[I 2025-05-04 11:31:04,339] Trial 0 finished with value: 0.0009579231807341178 and parameters: {'hidden_size': 184, 'num_layers': 2, 'dropout_rate': 0.2019379559757229, 'lr': 0.00018007232131067282}. Best is trial 0 with value: 0.0009579231807341178.
[I 2025-05-04 11:34:48,603] Trial 1 finished with value: 0.00095137149716417 and parameters: {'hidden_size': 256, 'num_layers': 1, 'dropout_rate': 0.29055960151156107, 'lr': 0.00015621281223311914}. Best is trial 1 with value: 0.00095137149716417.
[I 2025-05-04 11:36:47,831] Trial 2 finished with value: 0.0009363084915094078 and parameters: {'hidden_size': 96, 'num_layers': 3, 'dropout_rate': 0.3367547976497126, 'lr': 0.0027445526749991763}. Best is trial 2 with value: 0.0009363084915094078.
[I 2025-05-04 11:37:35,267] Trial 3 finished with value: 0.0014039397821761668 and parameters: {'hidden_size': 88, 'num_layers': 1, 'dropo

Best parameters: {'hidden_size': 200, 'num_layers': 1, 'dropout_rate': 0.2080915535702409, 'lr': 0.0028327642777341757}


Инициализация модели и функция обучения

In [None]:

model = TemporalFusionTransformer(input_size=4, hidden_size=200, num_layers=1, dropout_rate=0.2080915535702409, output_size=1)
optimizer = optim.Adam(model.parameters(), lr=0.0028327642777341757)
criterion = nn.MSELoss()

def train_model(model, train_loader, optimizer, criterion, epochs=100):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

Обучение

In [6]:
start_time = time.time()
train_model(model, train_loader, optimizer, criterion, epochs=100)
end_time = time.time()

Epoch [10/100], Loss: 0.0073
Epoch [20/100], Loss: 0.0029
Epoch [30/100], Loss: 0.0063
Epoch [40/100], Loss: 0.0038
Epoch [50/100], Loss: 0.0040
Epoch [60/100], Loss: 0.0021
Epoch [70/100], Loss: 0.0033
Epoch [80/100], Loss: 0.0079
Epoch [90/100], Loss: 0.0032
Epoch [100/100], Loss: 0.0027


Предсказание и метрики

In [None]:
def evaluate_model(model, test_loader):
    model.eval()
    predictions = []
    true_values = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            predictions.append(outputs.numpy())
            true_values.append(labels.numpy())

    predictions = np.concatenate(predictions, axis=0)
    true_values = np.concatenate(true_values, axis=0)

    return predictions, true_values

predictions, true_values = evaluate_model(model, test_loader)
n_samples = predictions.shape[0]
preds_extended = np.zeros((n_samples, 4))
preds_extended[:, 0] = predictions[:, 0]
predictions = scaler.inverse_transform(preds_extended)[:, 0]
true_extended = np.zeros((true_values.shape[0], 4))
true_extended[:, 0] = true_values[:, 0]
true_values = scaler.inverse_transform(true_extended)[:, 0]

rmse = np.sqrt(mean_squared_error(true_values, predictions))
mae = mean_absolute_error(true_values, predictions)
r2 = r2_score(true_values, predictions)

correlation = np.corrcoef(true_values.flatten(), predictions.flatten())[0, 1]

print(f"Transformer RMSE: {rmse}, MAE: {mae}, R²: {r2}, Correlation: {correlation}")

Transformer RMSE: 29.933020379448173, MAE: 23.394549634348877, R²: 0.2654410910007079, Correlation: 0.5993734666430955


Время обучения

In [15]:
print(f"Время обучения: {end_time - start_time} секунд")

Время обучения: 1849.3701479434967 секунд
