In [None]:
# imports
# preprocessing
from PIL import Image
import numpy as np

# training
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# visualization
import matplotlib.pyplot as plt

In [None]:
# We want to find the next vegetation distribution in 3 (n for later) months. Therefore we use time series data from the years 2015 - 2023. We 
# take one image every three months and do image segmentation. First we need to identify the percentage that the different classes have in the
# segmented image. For the beginning we will only use the classes: Forest and No Forest. Future work would include more classes (i.e. 7)
# The images have the dimensions 48 x 48.

# Convert the iamge data into sequential list data where one list represents one time series. Each entry in the list is the percentage of forest
# in that year.

def load_images():
    # TODO
    return None

def export_mask(image):
    # image must be a PIL image
    # Extract the mask layer of the image
    channels = image.split()
    mask = channels[3]
    return mask

# Compute the percentage of ones (forest) in the mask
def compute_forest_percentage(mask):
    num_ones = np.sum(mask == 1)
    total_pixels = mask.size
    percentage_ones = num_ones / total_pixels
    return percentage_ones
    

In [None]:
# forest percentages is a dictionary with an index as key and a list of time series forest percentages as value
# returns a train and test dataloader that can directly be used for training
def create_data_set(forest_percentages : dict, batch_size : int = 100):
    X = []
    y = []
    # fill training set
    for entry in forest_percentages:
        X.append(forest_percentages[entry][:-1])
        y.append(forest_percentages[entry][-1])

    X_train, y_train, X_test, y_test = np.array(X[:80]), np.array(y[:80]), np.array(X[80:]), np.array(y[80:])
    
    # transform into pytorch tensors
    X_train = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)  # Add a channel dimension
    y_train = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
    X_test = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)  # Add a channel dimension
    y_test = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)
   

    train_dataset = TensorDataset(X_train, y_train) 
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)  
    test_dataset = TensorDataset(X_test, y_test) 
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)  
    
    return train_dataloader, test_dataloader

# generate some dummy test data
def generate_dummy_data(seq_length=36):
    forest_percentages = {}
    for i in range(100):
        data = np.random.rand(seq_length)
        # Sort the data in descending order along each sequence
        sorted_data = np.sort(data)[::-1].copy() # Copy to avoid negative strides
        forest_percentages[i] = sorted_data

    return forest_percentages   

train_dataloader, test_dataloader = create_data_set(generate_dummy_data())
    

In [None]:
# Now we train a CNN model that predicts the next sequence. 
# Why CNN? 
# Even though the data is sequential in time, all the information is already there upfront. The network can learn the temporal connections.
# Architecture: 36 x 1 -> 4 (x 4) -> 36 (x 36) -> 4 (x 4) -> 1
# Explanation: input -> conv. layer -> layer for overfitting -> dimension reduction -> output

class OneDCNN(nn.Module):
    def __init__(self):
        super(OneDCNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=4, kernel_size=4, stride=1, padding=1)
        self.conv2 = nn.Conv1d(in_channels=4, out_channels=35, kernel_size=4, stride=4, padding=2)
        self.fc = nn.Linear(315, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))   
        x = torch.flatten(x, 1)  # Flatten the tensor for the fully connected layer
        x = F.sigmoid(self.fc(x))
        return x
    
# evaluate the model
def evaluate_model(model, dataloader, device, criterion):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0.0
    with torch.no_grad():  # No gradients needed for evaluation
        for X_batch, y_batch in dataloader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            total_loss += loss.item() * X_batch.size(0)
    avg_loss = total_loss / len(dataloader.dataset)
    return avg_loss

In [None]:
# Train the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = OneDCNN().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 50
train_losses = []
test_losses = []
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch_idx, (X_batch, y_batch) in enumerate(train_dataloader):
        optimizer.zero_grad()
        outputs = model(X_batch.to(device))
        loss = criterion(outputs, y_batch.to(device))
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * X_batch.size(0)
        
    epoch_loss = running_loss / len(train_dataloader.dataset)
    train_losses.append(epoch_loss)  
    test_loss = evaluate_model(model, test_dataloader, device, criterion)
    test_losses.append(test_loss)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

In [None]:
# plot the MSE

plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.title('Training Loss (MSE) Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.legend()
plt.show()