In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
from tqdm import trange
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict

import torch
import torch.nn as nn
import torch.optim as optim

torch.manual_seed(1)
np.random.seed(1)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
NDVI_PATH = "../data/PROCESSED/ndvi.csv"
PROD_PATH = "../data/PROCESSED/manhuacu.csv"

In [None]:
# MLP Hyperparameters
MLP_WINDOW_SIZE = 5
MLP_BATCH_SIZE = 4
MLP_BASE_HIDDEN_SIZE = 32
MLP_EPOCHS = 300
MLP_LEARNING_RATE = 2e-3

# Best MLP Hyperparameters
# MLP_WINDOW_SIZE = 10
# MLP_BATCH_SIZE = 4
# MLP_BASE_HIDDEN_SIZE = 32
# MLP_EPOCHS = 300
# MLP_LEARNING_RATE = 2e-3

# Validation Loss: 0.1911

# LSTM Hyperparameters
LSTM_WINDOW_SIZE = 20
LSTM_HIDDEN_SIZE = 32
LSTM_NUM_LAYERS = 2
LSTM_DROPOUT = 0.2
LSTM_EPOCHS = 1000
LSTM_BATCH_SIZE = 16
LSTM_LEARNING_RATE = 5e-5

# Best LSTM Hyperparameters So Far
# LSTM_WINDOW_SIZE = 20
# LSTM_HIDDEN_SIZE = 32
# LSTM_NUM_LAYERS = 2
# LSTM_DROPOUT = 0.2
# LSTM_EPOCHS = 1000
# LSTM_BATCH_SIZE = 16
# LSTM_LEARNING_RATE = 5e-5

# Validation Loss: 0.2122

# Computation
LSTM_DROPOUT = LSTM_DROPOUT if LSTM_NUM_LAYERS > 1 else 0

In [None]:
def get_day_of_year_index(date: datetime):
    """Convert date to day of year."""
    return datetime(date.year, date.month, date.day).timetuple().tm_yday - 1


def get_sin_cos(x: float):
    """Convert x to sin and cos."""
    rad = 2 * np.pi * x
    return (np.sin(rad), np.cos(rad))


def encode_date(date: datetime):
    is_leap_year = 1 if date.year % 4 == 0 else 0
    total_year_days = 366 if is_leap_year else 365
    day_index = get_day_of_year_index(date)
    return get_sin_cos(day_index / total_year_days)


# Test
print("Encoding date 2020-01-01")
print(encode_date(datetime(2020, 1, 1)))  # (0.0, 1.0)
print("\n")
print("Encoding date 2020-06-01")
print(encode_date(datetime(2020, 6, 1)))  # (0.5, 0.0)
print("\n")
print("Encoding date 2020-12-31")
print(encode_date(datetime(2020, 12, 31)))  # (0.9999999999999999, 1.0)
print("\n")

## 1. Carregar e Pré-processar Dados

### 1.1. Carregar e pre-processar os Dados

In [None]:
NDVI = pd.read_csv(NDVI_PATH)
NDVI

In [None]:
NDVI["N_Observations"] = NDVI.groupby("Year")["Data"].transform("count")

NDVI[["Date_sin", "Date_cos"]] = NDVI["Data"].apply(
    lambda x: pd.Series(encode_date(datetime.strptime(x, "%Y-%m-%d")))
)

# Assert order by Data (ascending)
NDVI = NDVI.sort_values(by="Data", ascending=True)

NDVI = NDVI[(NDVI["Year"] >= 2000) & (NDVI["Year"] <= 2023)]

NDVI

In [None]:
PROD = pd.read_csv(PROD_PATH)
PROD = PROD[(PROD["Year"] >= 2000) & (PROD["Year"] <= 2023)]
# max_productivity = PROD["Productivity (kg/ha)"].max()
# PROD["Normalized_productivity"] = PROD["Productivity (kg/ha)"] / max_productivity
PROD

### 1.2. Visualizar os dados

In [None]:
NDVI.plot(x="Data", y="NDVI", title="NDVI over time", figsize=(15, 5))

In [None]:
# class MLPDataset(torch.utils.data.Dataset):
#     def __init__(self, ndvi_df, prod_df):
#         self.ndvi_df = ndvi_df
#         self.prod_df = prod_df

#     def __len__(self):
#         return self.ndvi_df["Year"].nunique()

#     def __getitem__(self, idx):
#         years = self.ndvi_df["Year"].sort_values().unique()
#         if idx >= len(years):
#             raise IndexError("Index out of range")
#         year = years[idx]
#         ndvi = self.ndvi_df[self.ndvi_df["Year"] == year]["NDVI"].values
#         prod = self.prod_df[self.prod_df["Year"] == year][
#             "Productivity (kg/ha)"
#         ].values[0]
#         return torch.tensor(ndvi, dtype=torch.float32), torch.tensor(
#             prod, dtype=torch.float32
#         )


# dataset = MLPDataset(NDVI_last_20_per_year, PROD)
# dataset[0]

# train_size = len(dataset) - 8
# valid_size = 4
# test_size = 4
# train_dataset = torch.utils.data.Subset(dataset, range(train_size))
# valid_dateset = torch.utils.data.Subset(dataset, range(train_size, train_size + valid_size))
# test_dataset = torch.utils.data.Subset(dataset, range(train_size + valid_size, train_size + valid_size + test_size))

### 2.2. Preparar Datasets

#### 2.1.1. Normalização

In [None]:
from sklearn.preprocessing import StandardScaler

# Normalizer dados NDVI
NDVI["Year_norm"] = NDVI["Year"].copy()

ndvi_scaler = StandardScaler().fit(NDVI[["NDVI", "Year"]].values)
NDVI[["NDVI_norm", "Year_norm"]] = ndvi_scaler.transform(NDVI[["NDVI", "Year"]].values)

NDVI

In [None]:
# Normalizar produtividade
PROD["Year_norm"] = NDVI["Year"].copy()

prod_scaler = StandardScaler().fit(PROD[["Productivity (kg/ha)", "Year"]].values)
PROD[["Productivity_norm", "Year_norm"]] = prod_scaler.transform(
    PROD[["Productivity (kg/ha)", "Year"]].values
)
PROD

In [None]:
# class DatasetYearOfLast(torch.utils.data.Dataset):
#     """
#     DatasetYearOfLast - Dataset para previsão de produtividade

#     X: Sequências de tamanho <WINDOW_SIZE> de observações de NDVI consecutivas (normalizado -1 a +1)
#     y: Produtividade no Ano da última observação

#     Features:
#     - Sequências de NDVI (Já vem normalizado entre 0 e 1 da fonte)
#     - Sequências de dia do ano com codificação circular no formato de Tupla: (Seno, Cosseno)
#     - Sequência de ano da observação normalizado por z-score

#     Label:
#     - Produtividade (kg/ha) normalizada por z-score, relativa ao ano da última observação
#     """

#     def __init__(self, ndvi_df, prod_df, window_size=LSTM_WINDOW_SIZE):
#         self.ndvi_df = ndvi_df
#         self.prod_df = prod_df
#         self.window_size = window_size

#     def __len__(self):
#         return len(self.ndvi_df) - self.window_size

#     def __getitem__(self, idx):
#         ndvi = self.ndvi_df.iloc[idx : idx + self.window_size][
#             ["NDVI", "Date_sin", "Date_cos", "Year_norm"]
#         ].values

#         year = self.ndvi_df.iloc[idx + self.window_size - 1]["Year"]
#         prod = self.prod_df[self.prod_df["Year"] == year]["Productivity_norm"].values[0]

#         return torch.tensor(ndvi, dtype=torch.float32), torch.tensor(
#             prod, dtype=torch.float32
#         )


class DatasetYearOfLast(torch.utils.data.Dataset):
    def __init__(self, ndvi_df, prod_df, window_size=LSTM_WINDOW_SIZE):
        self.ndvi_df = ndvi_df.copy().reset_index(drop=True)
        self.window_size = window_size
        self.prod_df = prod_df

        # Prepare windows grouped by year
        self.samples = []
        self.available_years = ndvi_df["Year"].unique().tolist()

        for idx, row in self.ndvi_df.iterrows():
            window = ndvi_df.iloc[idx : idx + window_size]
            
            if len(window) < window_size:
                break
            
            last_of_window = self.ndvi_df.iloc[idx + window_size - 1]
            

            if last_of_window["Year"] == row["Year"] or (
                last_of_window["Year"] == row["Year"] + 1
                and row["Year"] + 1 in self.available_years
            ):
                self.samples.append((window, last_of_window["Year"]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        window, year = self.samples[idx]
        ndvi = window[["NDVI", "Date_sin", "Date_cos", "Year_norm"]].values

        prod = self.prod_df[self.prod_df["Year"] == year]["Productivity_norm"].values[0]

        return torch.tensor(ndvi, dtype=torch.float32), torch.tensor(
            prod, dtype=torch.float32
        )

    def get_last_window_of_year(self, year) -> np.ndarray:
        """
        Retorna a última janela do ano
        """

        ndvi = self.ndvi_df[self.ndvi_df["Year"] == year]
        if len(ndvi) < self.window_size:
            raise ValueError(f"Year {year} not found in dataset")
        return (
            ndvi.iloc[-self.window_size :][
                ["NDVI", "Date_sin", "Date_cos", "Year_norm"]
            ].values,
            self.prod_df[self.prod_df["Year"] == year]["Productivity_norm"].values[0],
        )

years_validation = [2004, 2010, 2016, 2022]
years_test = [2005, 2011, 2017, 2023]
years_train = PROD[~PROD["Year"].isin(years_validation + years_test)]["Year"].unique()


# Datasets better
mlp_train_dataset_year_of_last = DatasetYearOfLast(
    NDVI[NDVI["Year"].isin(years_train)], PROD, MLP_WINDOW_SIZE
)
mlp_validation_dataset_year_of_last = DatasetYearOfLast(
    NDVI[NDVI["Year"].isin(years_validation)], PROD, MLP_WINDOW_SIZE
)
mlp_test_dataset_year_of_last = DatasetYearOfLast(
    NDVI[NDVI["Year"].isin(years_test)], PROD, MLP_WINDOW_SIZE
)

lstm_train_dataset_year_of_last = DatasetYearOfLast(
    NDVI[NDVI["Year"].isin(years_train)], PROD, LSTM_WINDOW_SIZE
)
lstm_validation_dataset_year_of_last = DatasetYearOfLast(
    NDVI[NDVI["Year"].isin(years_validation)], PROD, LSTM_WINDOW_SIZE
)
lstm_test_dataset_year_of_last = DatasetYearOfLast(
    NDVI[NDVI["Year"].isin(years_test)], PROD, LSTM_WINDOW_SIZE
)

# Datasets sequential
# train_dataset_year_of_last = DatasetYearOfLast(
#     NDVI[NDVI["Year"] <= 2016], PROD, LSTM_WINDOW_SIZE
# )
# validation_dataset_year_of_last = DatasetYearOfLast(
#     NDVI[(NDVI["Year"] > 2016) & (NDVI["Year"] <= 2020)], PROD, LSTM_WINDOW_SIZE
# )
# test_dataset_year_of_last = DatasetYearOfLast(
#     NDVI[NDVI["Year"] > 2020], PROD, LSTM_WINDOW_SIZE
# )

# print(mlp_validation_dataset_year_of_last[0][0].shape)
print(mlp_validation_dataset_year_of_last[0])
print("\n")
print(mlp_validation_dataset_year_of_last.get_last_window_of_year(2004))

In [None]:
# Sanity check
print("Last window of 2017 in the DataFrame:")
print(
    NDVI[NDVI["Year"] == 2017][["NDVI", "Date_sin", "Date_cos", "Year_norm"]].tail(
        LSTM_WINDOW_SIZE
    )
)

print(
    f"\nProdutivity 2017: {PROD[PROD['Year'] == 2017]['Productivity (kg/ha)'].values[0]}"
)
print(
    f"Produtivity 2017 (normalized): {PROD[PROD['Year'] == 2017]['Productivity_norm'].values[0]}"
)

print(f"\nLast window of 2017 in the DatasetYearOfLast (values should match exactely):")
print(lstm_test_dataset_year_of_last.get_last_window_of_year(2017))

assert (
    lstm_test_dataset_year_of_last.get_last_window_of_year(2017)[0]
    == NDVI[NDVI["Year"] == 2017][["NDVI", "Date_sin", "Date_cos", "Year_norm"]]
    .tail(LSTM_WINDOW_SIZE)
    .values
).all(), "\n❌ Sanity check failed! Please check the DatasetYearOfLast class"
print(
    "\n✅ Sanity check passed for LSTM! You can look values by yourself if you want to double check."
)

In [None]:
class DatasetWeightedAverage(torch.utils.data.Dataset):
    """DatasetWeightedAverage - Dataset para previsão de produtividade

    X: Sequências de tamanho <WINDOW_SIZE> de observações de NDVI consecutivas (normalizado -1 a +1)
    y: Produtividade média ponderada entre a produtividade do ano da primeira observação e do ano da última observação
    - (normalizado por z-score)
    - A média é ponderada pela quantidade de observações do ano da primeira e do ano da última observação

    Features:
    - Sequências de NDVI (Já vem normalizado entre 0 e 1 da fonte)
    - Sequências de dia do ano com codificação circular no formato de Tupla: (Seno, Cosseno)
    - Sequência de ano da observação normalizado por z-score

    Label:
    - Produtividade (kg/ha) média ponderada entre o ano da primeira e do ano da última observação, normalizada por z-score
    """

    def __init__(self, ndvi_df, prod_df, window_size=LSTM_WINDOW_SIZE):
        self.ndvi_df = ndvi_df
        self.prod_df = prod_df
        self.window_size = window_size

    def __len__(self):
        return len(self.ndvi_df) - self.window_size

    def __getitem__(self, idx):
        ndvi = self.ndvi_df.iloc[idx : idx + self.window_size][
            ["NDVI", "Date_sin", "Date_cos", "Year_norm"]
        ].values

        year_first = self.ndvi_df.iloc[idx]["Year"]
        year_last = self.ndvi_df.iloc[idx + self.window_size]["Year"]

        prod_first = self.prod_df[self.prod_df["Year"] == year_first][
            "Productivity_norm"
        ].values[0]
        prod_last = self.prod_df[self.prod_df["Year"] == year_last][
            "Productivity_norm"
        ].values[0]

        n_obs_first = self.ndvi_df.iloc[idx : idx + self.window_size].loc[
            self.ndvi_df.iloc[idx : idx + self.window_size]["Year"] == year_first
        ].shape[0]
        n_obs_last = self.ndvi_df.iloc[idx : idx + self.window_size].loc[
            self.ndvi_df["Year"] == year_last
        ].shape[0]

        # Ponderação
        prod = (n_obs_first * prod_first + n_obs_last * prod_last) / (
            n_obs_first + n_obs_last
        )
        return torch.tensor(ndvi, dtype=torch.float32), torch.tensor(
            prod, dtype=torch.float32
        )
        
# Test Dataset
train_dataset_year_of_last = DatasetWeightedAverage(NDVI[NDVI["Year"] <= 2016], PROD, LSTM_WINDOW_SIZE)
validation_dataset_year_of_last = DatasetWeightedAverage(NDVI[(NDVI["Year"] > 2016) & (NDVI["Year"] <= 2020)], PROD, LSTM_WINDOW_SIZE)
test_dataset_weighted_average = DatasetWeightedAverage(NDVI[NDVI["Year"] > 2020], PROD, LSTM_WINDOW_SIZE)

validation_dataset_year_of_last[10]

## 3. Model training

### 3.1. Multi-layer Perceptron

Essa rede é uma feedforward perceptron multi-layer comum (1 camada interna).

As entradas são os 20 últimos NDVIs do ano, a saída é a produtividade prevista (kg/ha).

In [None]:
class AmplifiedTanh(nn.Module):
    def __init__(self, amplification_factor=1.0):
        super().__init__()
        self.amplification_factor = amplification_factor

    def forward(self, x):
        return self.amplification_factor * torch.tanh(x)


mlp_network = nn.Sequential(
    nn.Flatten(start_dim=1, end_dim=-1),
    nn.Linear(MLP_WINDOW_SIZE * 4, MLP_BASE_HIDDEN_SIZE),
    nn.ReLU(),
    nn.Linear(MLP_BASE_HIDDEN_SIZE, MLP_BASE_HIDDEN_SIZE // 2),
    nn.ReLU(),
    nn.Linear(MLP_BASE_HIDDEN_SIZE // 2, 1),
    AmplifiedTanh(amplification_factor=1.5),
)


def init_linear_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.normal_(m.weight, mean=0.0, std=0.01)
        if m.bias is not None:
            nn.init.normal_(m.bias, mean=0.0, std=0.01)


mlp_network.apply(init_linear_weights)

# for name, param in mlp_network.named_parameters():
#     print(f"{name}: {param}")

# Step-by-step debug the MLP
# x = torch.randn(20, 4)
# print(f"Input shape: {x.shape}\n{x}\n")
# for i, layer in enumerate(mlp_network):
#     x = layer(x)
#     print(f"After layer {i} ({layer.__class__.__name__}): {x.shape}\n{x}\n")

In [None]:
mlp_network = mlp_network.to(device)
optimizer = optim.Adam(mlp_network.parameters(), lr=MLP_LEARNING_RATE)
loss_fn = nn.MSELoss()

mlp_losses_validation = []
mlp_losses_train = []
best_loss = float("inf")
saved_epoch = 0

mlp_train_loader_year_of_last = torch.utils.data.DataLoader(
    mlp_train_dataset_year_of_last,
    batch_size=MLP_BATCH_SIZE,
    shuffle=True,
    drop_last=True,
)
mlp_validation_loader_year_of_last = torch.utils.data.DataLoader(
    mlp_validation_dataset_year_of_last, batch_size=4, shuffle=False, drop_last=True
)

for i in trange(MLP_EPOCHS):
    epoch_losses_train = []

    mlp_network.train()
    for ndvi, prod in mlp_train_loader_year_of_last:
        ndvi, prod = ndvi.to(device), prod.to(device)
        optimizer.zero_grad()
        pred = mlp_network(ndvi)
        loss = loss_fn(pred, prod.unsqueeze(1))
        loss.backward()
        optimizer.step()
        epoch_losses_train.append(loss.item())

    epoch_losses_validation = []
    mlp_network.eval()
    with torch.no_grad():
        for ndvi, prod in mlp_validation_loader_year_of_last:
            ndvi, prod = ndvi.to(device), prod.to(device)
            pred = mlp_network(ndvi)
            loss = loss_fn(pred, prod.unsqueeze(1))
            epoch_losses_validation.append(loss.item())

        if np.mean(epoch_losses_validation) < best_loss:
            best_loss = np.mean(epoch_losses_validation)
            saved_epoch = i + 1
            torch.save(mlp_network.state_dict(), "mlp.pth")

    mlp_losses_train.append(np.mean(epoch_losses_train))
    mlp_losses_validation.append(np.mean(epoch_losses_validation))
print(f"\n\nSaved MLP model\tepoch: {saved_epoch}\tvalidation loss: {best_loss:.4f}")

In [None]:
def plot_loss(train_losses, validation_losses):
    """
    Plots the training and validation losses over epochs.

    Args:
        train_losses (list): List of training loss values for each epoch.
        validation_losses (list): List of validation loss values for each epoch.
    """
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label="Training Loss")
    plt.plot(validation_losses, label="Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training and Validation Loss Over Epochs")
    plt.legend()
    plt.grid(True)
    plt.show()

plot_loss(mlp_losses_train, mlp_losses_validation)

### 3.2. LSTM

In [None]:
import random

random.seed(1)
def cat_name_generator(gender: str): return random.choice({"macho": ["Thor", "Loki", "Zeus", "Hades", "Apolo", "Ares", "Hermes", "Poseidon", "Hércules", "Aquiles", "Ulisses", "Atlas", "Perseu", "Orfeu", "Eros", "Hefesto", "Dionísio", "Héracles", "Cronos", "Prometeu", "Teseu", "Orfeu", "Eolo"], "femea": ["Afrodite", "Artemis", "Deméter", "Hera", "Perséfone", "Atena", "Héstia", "Eris", "Selene", "Gaia", "Tétis", "Eurídice", "Calipso", "Medusa", "Circe"]}[gender])

print("Nome de gato macho:", cat_name_generator("macho"))
print("Nome de gato fêmea:", cat_name_generator("femea"))

In [None]:
import torch.nn as nn
import torch.optim as optim
import numpy as np


# Define model with Linear layer
class LSTMRegressor(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size, hidden_size, num_layers, batch_first=True, dropout=LSTM_DROPOUT
        )
        self.fc = nn.Linear(hidden_size, 1)  # Output a single value

    def forward(self, x, hidden_n=None, hidden_c=None):
        if hidden_n is None or hidden_c is None:
            out, _ = self.lstm(x)
            return self.fc(out[:, -1, :])  # Get output of last time step
        else:
            out, (hidden_n, hidden_c) = self.lstm(x, (hidden_n, hidden_c))
            out = self.fc(out[:, -1, :])  # Get output of last time step
            return out, (hidden_n, hidden_c)


lstm_train_loader_year_of_last = torch.utils.data.DataLoader(
    lstm_train_dataset_year_of_last,
    batch_size=LSTM_BATCH_SIZE,
    shuffle=True,
    drop_last=True,
)
lstm_validation_loader_year_of_last = torch.utils.data.DataLoader(
    lstm_validation_dataset_year_of_last,
    batch_size=4,
    shuffle=False,
    drop_last=True,
)


lstm_model = LSTMRegressor(
    input_size=4, hidden_size=LSTM_HIDDEN_SIZE, num_layers=LSTM_NUM_LAYERS
).to(device)

# for name, param in lstm_model.named_parameters():
#     print(f"{name}: {param}")


def init_lstm_weights(m):
    if isinstance(m, nn.LSTM):
        nn.init.xavier_uniform_(m.weight_ih_l0)
        nn.init.xavier_uniform_(m.weight_hh_l0)


lstm_model.apply(init_lstm_weights)
lstm_model.apply(init_linear_weights)

optimizer = optim.Adam(lstm_model.parameters(), lr=LSTM_LEARNING_RATE)
loss_fn = nn.MSELoss()

lstm_losses_train = []
lstm_losses_validation = []
best_loss = float("inf")
saved_epoch = 0


for i in trange(LSTM_EPOCHS):
    epoch_losses_train = []

    # h_n = torch.zeros(LSTM_NUM_LAYERS, LSTM_BATCH_SIZE, LSTM_HIDDEN_SIZE).to(
    #     device
    # )  # Hidden state
    # h_c = torch.zeros(LSTM_NUM_LAYERS, LSTM_BATCH_SIZE, LSTM_HIDDEN_SIZE).to(
    #     device
    # )  # Cell state

    lstm_model.train()
    for ndvi, prod in lstm_train_loader_year_of_last:
        ndvi, prod = ndvi.to(device), prod.to(device)
        optimizer.zero_grad()
        # pred, (h_n, h_c) = lstm_model(
        #     ndvi, h_n.detach(), h_c.detach()
        # )
        
        # Verificar se isto está certo
        # last_pred está correto?
        pred = lstm_model(ndvi)
        last_pred = pred[:, -1]
        loss = loss_fn(last_pred, prod)
        loss.backward()
        nn.utils.clip_grad_norm_(lstm_model.parameters(), 1.0)
        optimizer.step()
        epoch_losses_train.append(loss.item())

    epoch_losses_validation = []
    lstm_model.eval()
    with torch.no_grad():
        for ndvi, prod in lstm_validation_loader_year_of_last:
            ndvi, prod = ndvi.to(device), prod.to(device)
            pred = lstm_model(ndvi)
            last_pred = pred[:, -1]  # Get the last prediction
            loss = loss_fn(last_pred, prod)
            epoch_losses_validation.append(loss.item())

        if np.mean(epoch_losses_validation) < best_loss:
            best_loss = np.mean(epoch_losses_validation)
            saved_epoch = i + 1
            torch.save(lstm_model.state_dict(), "lstm.pth")

    lstm_losses_train.append(np.mean(epoch_losses_train))
    lstm_losses_validation.append(np.mean(epoch_losses_validation))

print(f"\n\nSaved LSTM model\tepoch: {saved_epoch}\tvalidation loss: {best_loss:.4f}")

In [None]:
plot_loss(lstm_losses_train, lstm_losses_validation)

## 4. Avaliação do Modelo

In [None]:
mlp_test_loader_year_of_last = torch.utils.data.DataLoader(
    mlp_test_dataset_year_of_last, batch_size=4, shuffle=False, drop_last=True
)
lstm_test_loader_year_of_last = torch.utils.data.DataLoader(
    lstm_test_dataset_year_of_last, batch_size=4, shuffle=False, drop_last=True
)

# test MLP
test_losses_mlp = []
mlp_network.eval()
for ndvi, prod in mlp_test_loader_year_of_last:
    ndvi, prod = ndvi.to(device), prod.to(device)
    pred = mlp_network(ndvi)
    loss = loss_fn(pred, prod.unsqueeze(1))
    test_losses_mlp.append(loss.item())
print(f"MLP test loss: {np.mean(test_losses_mlp):.4f}")

# test LSTM
test_losses_lstm = []
lstm_model.eval()
for ndvi, prod in lstm_test_loader_year_of_last:
    ndvi, prod = ndvi.to(device), prod.to(device)
    pred = lstm_model(ndvi)
    last_pred = pred[:, -1]  # Get the last prediction
    loss = loss_fn(last_pred, prod)
    test_losses_lstm.append(loss.item())
print(f"LSTM test loss: {np.mean(test_losses_lstm):.4f}")

In [None]:
mlp_network.eval()
all_preds = []
real_prods = []

with torch.no_grad():
    for ndvi, prod in mlp_test_loader_year_of_last:
        ndvi, prod = ndvi.to(device), prod.to(device)
        pred = mlp_network(ndvi)
        all_preds.append(pred.cpu().numpy())
        real_prods.append(prod.cpu().numpy())

# Flatten the predictions and real productions
all_preds = np.concatenate(all_preds, axis=0).flatten()
real_prods = np.concatenate(real_prods, axis=0).flatten()

# Print predictions and real productions
# print("LSTM Predictions for all test dataset:")
# print(all_preds)
# print("\nReal Production (normalized) for all test dataset:")
# print(real_prods)

plt.figure(figsize=(12, 6))
plt.plot(real_prods, label="Real Production (normalized)", marker='o')
plt.plot(all_preds, label="MLP Predictions (normalized)", marker='x')
plt.xlabel("Index")
plt.ylabel("Normalized Values")
plt.title("MLP Predictions vs Real Production")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
lstm_model.eval()
all_preds = []
real_prods = []

with torch.no_grad():
    for ndvi, prod in lstm_test_loader_year_of_last:
        ndvi, prod = ndvi.to(device), prod.to(device)
        pred = lstm_model(ndvi)
        all_preds.append(pred.cpu().numpy())
        real_prods.append(prod.cpu().numpy())

# Flatten the predictions and real productions
all_preds = np.concatenate(all_preds, axis=0)
real_prods = np.concatenate(real_prods, axis=0)

# Print predictions and real productions
# print("LSTM Predictions for all test dataset:")
# print(all_preds)
# print("\nReal Production (normalized) for all test dataset:")
# print(real_prods)

plt.figure(figsize=(12, 6))
plt.plot(real_prods, label="Real Production (normalized)", marker='o')
plt.plot(all_preds, label="LSTM Predictions (normalized)", marker='x')
plt.xlabel("Index")
plt.ylabel("Normalized Values")
plt.title("LSTM Predictions vs Real Production")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
print("MLP test loss:", np.mean(test_losses_mlp))
for y_test in years_test:
    ndvi, prod = mlp_test_dataset_year_of_last.get_last_window_of_year(y_test)
    ndvi = torch.tensor(ndvi, dtype=torch.float32).unsqueeze(0).to(device)
    pred = mlp_network(ndvi)
    last_pred = pred[:, -1].cpu().detach().numpy()[0]
    test_losses_mlp.append(last_pred)
    print(f"\n[MLP] Predicted (normalized) productivity for {y_test}: {last_pred:.4f}")
    print(f"[MLP] Real productivity (normalized) for {y_test}: {prod:.4f}")

print("\n\nLSTM test loss:", np.mean(test_losses_lstm))
for y_test in years_test:
    ndvi, prod = lstm_test_dataset_year_of_last.get_last_window_of_year(y_test)
    ndvi = torch.tensor(ndvi, dtype=torch.float32).unsqueeze(0).to(device)
    pred = lstm_model(ndvi)
    last_pred = pred[:, -1].cpu().detach().numpy()[0]
    print(f"\n[LSTM] Predicted productivity (normalized) for {y_test}: {last_pred:.4f}")
    print(f"[LSTM] Real productivity (normalized) for {y_test}: {prod:.4f}")