<a href="https://colab.research.google.com/github/jeffheaton/app_deep_learning/blob/main/t81_558_class_10_3_transformer_timeseries.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
try:
    import google.colab
    COLAB = True
    print("Note: using Google CoLab")
except:
    print("Note: not using Google CoLab")
    COLAB = False

# Make use of a GPU or MPS (Apple) if one is available.  (see module 3.2)
import torch
has_mps = torch.backends.mps.is_built()
device = "mps" if has_mps else "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

Note: not using Google CoLab
Using device: mps


In [26]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from torch.optim.lr_scheduler import ReduceLROnPlateau
import plotly.express as px

df = pd.read_csv("SN_d_tot_V2.0.csv")

In [27]:
fig = px.line(df, x='dec_year', y='sn_value', title='Sunspot Value Over Time')
fig.show()


The data preprocessing is the same as was introduced in the previous section. We will use data before the year 2000 as training, the rest is used for validation.

In [None]:
# Data Preprocessing
# removes initial rows where obs_num == 0, which likely contain incomplete data
start_id = max(df[df['obs_num'] == 0].index.tolist()) + 1
df = df[start_id:].copy()
df['sn_value'] = df['sn_value'].astype(float)


df_train = df[df['year'] < 2000]
df_test = df[df['year'] >= 2000]

# Scaling the target variable
spots_train = df_train['sn_value'].to_numpy().reshape(-1, 1)
spots_test = df_test['sn_value'].to_numpy().reshape(-1, 1)

scaler = StandardScaler() # standardize sn_value using standardScaler(mean=0, std=1)
spots_train = scaler.fit_transform(spots_train).flatten().tolist()
spots_test = scaler.transform(spots_test).flatten().tolist()


In [33]:
px.line(spots_train)

In [None]:
px.line(spots_test)

Just like we did for LSTM in the previous section, we again break the data into sequences.

In [None]:
# Sequence Data Preparation
SEQUENCE_SIZE = 10 # is 10 long enough?

def to_sequences(seq_size, obs):
    # convert 1d sunspot sequence into a supervised learning format
    x = []
    y = []
    for i in range(len(obs) - seq_size):
        window = obs[i:(i + seq_size)]
        after_window = obs[i + seq_size]
        x.append(window)
        y.append(after_window)
    return torch.tensor(x, dtype=torch.float32).view(-1, seq_size, 1), torch.tensor(y, dtype=torch.float32).view(-1, 1)

x_train, y_train = to_sequences(SEQUENCE_SIZE, spots_train)
x_test, y_test = to_sequences(SEQUENCE_SIZE, spots_test)

# Setup data loaders for batch, converts training/testing data into PyTorch datasets. use batch size of 32 for training.
train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = TensorDataset(x_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)



In [14]:
# Positional Encoding for Transformer
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [15]:
# Model definition using Transformer
class TransformerModel(nn.Module):
    def __init__(self, input_dim=1, d_model=64, nhead=4, num_layers=2, dropout=0.2):
        super(TransformerModel, self).__init__()

        self.encoder = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.decoder = nn.Linear(d_model, 1)

    def forward(self, x):
        x = self.encoder(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = self.decoder(x[:, -1, :])
        return x

model = TransformerModel().to(device)

In [None]:
# Train the model
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# reduces learning rate when validation loss stops improving
scheduler = ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=3, verbose=True)

epochs = 1000 # one epoch is a single complete pass through the entire training dataset during the model training proces
early_stop_count = 0
min_val_loss = float('inf')

for epoch in range(epochs):
    model.train()
    for batch in train_loader:
        x_batch, y_batch = batch
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

    # Validation
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch in test_loader:
            x_batch, y_batch = batch
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            val_losses.append(loss.item())

    val_loss = np.mean(val_losses)
    scheduler.step(val_loss)

    if val_loss < min_val_loss:
        min_val_loss = val_loss
        early_stop_count = 0
    else:
        early_stop_count += 1
    # implement early stopping after 5 consecutive increases in validation loss
    if early_stop_count >= 5:
        print("Early stopping!")
        break
    print(f"Epoch {epoch + 1}/{epochs}, Validation Loss: {val_loss:.4f}")



Epoch 1/1000, Validation Loss: 0.0370
Epoch 2/1000, Validation Loss: 0.0446
Epoch 3/1000, Validation Loss: 0.0440
Epoch 4/1000, Validation Loss: 0.0444
Epoch 00005: reducing learning rate of group 0 to 5.0000e-04.
Epoch 5/1000, Validation Loss: 0.0445
Early stopping!


We can now evaluate the performance of this model.

In [17]:
# Evaluation
model.eval()
predictions = []
with torch.no_grad():
    for batch in test_loader:
        x_batch, y_batch = batch
        x_batch = x_batch.to(device)
        outputs = model(x_batch)
        predictions.extend(outputs.squeeze().tolist())

rmse = np.sqrt(np.mean((scaler.inverse_transform(np.array(predictions).reshape(-1, 1)) - scaler.inverse_transform(y_test.numpy().reshape(-1, 1)))**2))
print(f"Score (RMSE): {rmse:.4f}")

Score (RMSE): 16.2847


In [34]:
import plotly.graph_objects as go

# Convert test predictions and actual values back to original scale
actual_values = scaler.inverse_transform(y_test.numpy().reshape(-1, 1)).flatten()
predicted_values = scaler.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten()

# Create a sequence index for visualization
time_index = df_test.iloc[SEQUENCE_SIZE:]['dec_year'].values  # Align with test data


In [36]:
fig = go.Figure()

# Add Actual Data
fig.add_trace(go.Scatter(
    x=time_index, y=actual_values,
    mode='lines', name='Actual Sunspots',
    line=dict(color='blue')
))

# Add Predicted Data
fig.add_trace(go.Scatter(
    x=time_index, y=predicted_values,
    mode='lines', name='Predicted Sunspots',
    line=dict(color='red', dash='dot')  # Dashed line for predictions
))

# Customize Layout
fig.update_layout(
    title="Sunspot Prediction using Transformer Model",
    xaxis_title="Year",
    yaxis_title="Sunspot Number",
    legend=dict(x=0, y=1),
    template="plotly_white"
)

# Show Plot
fig.show()
