# SuperAI Season 4 - Level 2 Hackathon - Forest Type Double Classifier

## Fix Dataset

In [466]:
import numpy as np

In [467]:
def add_features(row) :

    row['NDVI'] = (row['b8'] - row['b4']) / (row['b8'] + row['b4'])
    row['EVI'] = 2.5 * ((row['b8'] - row['b4']) / (row['b8'] + 6 * row['b4'] - 7.5 * row['b2'] + 1.01))
    row['NDWI '] = (row['b3'] - row['b8']) / (row['b3'] + row['b8'])
    row['SAVI '] = (row['b8'] - row['b4']) * (1 + 0.5) / (row['b8'] + row['b4'] + 0.5)
    row['MSAVI'] = (2 * row['b8'] + 1 - ( (2 * row['b8'] + 1) ** 2 - 8 * (row['b8'] - row['b4'])) ** (1 / 2)) / 2
    row['GNDVI '] = (row['b8'] - row['b3']) / (row['b8'] + row['b3'])
    row['RENDVI '] = (row['b8'] - row['b5']) / (row['b8'] + row['b5'])
    row['NDMI '] = (row['b8'] - row['b11']) / (row['b8'] + row['b11'])
    row['GRVI'] = (row['b3'] - row['b4']) / (row['b3'] + row['b4'])
    row['TVI'] = ( (row['b8'] - row['b4']) / (row['b8'] + row['b4'] + 0.5) ) ** (1 / 2)
    row['MCARI'] = ((row['b5'] - row['b4']) - 0.2 * (row['b5'] - row['b3'])) / (row['b5'] / row['b4'])
    row['BSI'] =  ((row['b11'] + row['b4']) - (row['b8'] + row['b2'])) / ((row['b11'] + row['b4']) + (row['b8'] + row['b2']))
    row['NBR'] = (row['b8'] - row['b12']) / (row['b8'] + row['b12'])
    row['MSI'] = row['b11'] / row['b8']
    row['RVI'] = row['b8'] / row['b4']
    row['GCI'] = (row['b8'] / row['b3']) - 1
    
    row['NDVI_664nm'] = (row['b8_a']-row['b4'])/(row['b8_a']+row['b4'])
    row['NDVI_559nm'] = (row['b8_a']-row['b3'])/(row['b8_a']+row['b3'])
    row['NDVI_1640nm'] = (row['b8_a']-row['b11'])/(row['b8_a']+row['b11'])
    row['NDVI_2200nm'] = (row['b8_a']-row['b12'])/(row['b8_a']+row['b12'])

    
    return row

In [468]:
from imblearn.over_sampling import SMOTE 
import pandas as pd

smote = SMOTE(random_state = 42 , sampling_strategy= 'all')

DEF_NONDEF_df = pd.read_csv('./datasets/DEF_NONDEF.csv' , index_col='id' )
DEF_NONDEF_df , label_df  = smote.fit_resample(DEF_NONDEF_df.drop(columns=['nforest_type']) , DEF_NONDEF_df['nforest_type'])
DEF_NONDEF_df = DEF_NONDEF_df.join(label_df)
DEF_NONDEF_df = DEF_NONDEF_df.apply(add_features , axis = 1)
# DEF_NONDEF_df = add_features_focus(DEF_NONDEF_df)

DEF_NONDEF_df.sort_index().to_csv('./datasets/fix_DEF_NONDEF.csv' , index_label='id')

MDF_DDF_df = pd.read_csv('./datasets/MDF_DDF.csv' , index_col='id')
MDF_DDF_df , label_df  = smote.fit_resample(MDF_DDF_df.drop(columns=['nforest_type']) , MDF_DDF_df['nforest_type'])
MDF_DDF_df = MDF_DDF_df.join(label_df)
MDF_DDF_df = MDF_DDF_df.apply(add_features , axis = 1)

# MDF_DDF_df = add_features_focus(MDF_DDF_df)

MDF_DDF_df.sort_index().to_csv('./datasets/fix_MDF_DDF.csv' , index_label='id')

## DEF_NONDEF Classifier

In [18]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader , random_split
import lightning as L
from torch.utils.data import Dataset
import pandas as pd
import torchmetrics


In [19]:
class DEF_NONDEF_Dataset (Dataset) :

    def __init__ (self , annotation_path , transform = None , target_transform = None) :

        self.annotation_path = annotation_path
        self.annotation_file = pd.read_csv(annotation_path , index_col = 'id')
        self.annotation_arrays = self.annotation_file.drop(columns=['nforest_type']).to_numpy().astype('float32') 
        self.annotation_labels = (self.annotation_file['nforest_type']  == 'DEF').to_numpy().astype('float32')
        
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self) :

        return len(self.annotation_file)

    def __getitem__ (self , idx) :

        data  = self.annotation_arrays[idx]
        label = self.annotation_labels[idx]
        
        if self.transform :

            data = self.transform(data)

        if self.target_transform :

            label = self.target_transform(label)

        return data , label

In [20]:
DEF_NONDEF_dataset = DEF_NONDEF_Dataset('./datasets/fix_DEF_NONDEF.csv')

In [21]:
# Define the size of each split
train_size = int(0.9 * len(DEF_NONDEF_dataset))
test_size = len(DEF_NONDEF_dataset) - train_size

# Split the dataset
train_dataset , test_dataset = random_split(DEF_NONDEF_dataset, [train_size , test_size])

train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

In [22]:
class Classifier(nn.Module):
    
    def __init__(self ,num_features):
        
        super().__init__()
        
        self.fc1 = nn.Linear(num_features, 128)  # Increased neurons
        self.batch_norm1 = nn.BatchNorm1d(128) 
        self.dropout1 = nn.Dropout(p = 0.25)     # Increased dropout

        self.fc2 = nn.Linear(128, 128)       # Increased neurons
        self.batch_norm2 = nn.BatchNorm1d(128)
        self.dropout2 = nn.Dropout(p= 0.25)     # Increased dropout

        self.fc3 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm3 = nn.BatchNorm1d(128)
        self.dropout3 = nn.Dropout(p= 0.25)
        
        self.fc4 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm4 = nn.BatchNorm1d(128)
        self.dropout4 = nn.Dropout(p= 0.25)
        
        self.fc5 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm5 = nn.BatchNorm1d(128)
        self.dropout5 = nn.Dropout(p= 0.25)
        
        self.fc6 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm6 = nn.BatchNorm1d(128)
        self.dropout6 = nn.Dropout(p = 0.25)
        
        self.fc7 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm7 = nn.BatchNorm1d(128)
        self.dropout7 = nn.Dropout(p = 0.25)
        
        self.fc8 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm8 = nn.BatchNorm1d(128)
        self.dropout8 = nn.Dropout(p = 0.25)
        
        self.fc9 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm9 = nn.BatchNorm1d(128)
        self.dropout9 = nn.Dropout(p = 0.25)
        
        self.fc10 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm10 = nn.BatchNorm1d(128)
        self.dropout10 = nn.Dropout(p = 0.25)
        
        self.output = nn.Linear(128, 1)

    def forward(self, x):
        
        x = F.relu(self.fc1(x))  # Changed to relu
        x = self.batch_norm1(x)
        x = self.dropout1(x)

        x = F.relu(self.fc2(x))  # Changed to relu
        x = self.batch_norm2(x)
        x = self.dropout2(x)

        x = F.relu(self.fc3(x))  # Changed to relu
        x = self.batch_norm3(x)
        x = self.dropout3(x)

        x = F.relu(self.fc4(x))  # Changed to relu
        x = self.batch_norm4(x)
        x = self.dropout4(x)
        
        x = F.relu(self.fc5(x))  # Changed to relu
        x = self.batch_norm5(x)
        x = self.dropout5(x)
        
        # x = F.relu(self.fc6(x))  # Changed to relu
        # x = self.batch_norm6(x)
        # x = self.dropout6(x)
        
        # x = F.relu(self.fc7(x))  # Changed to relu
        # x = self.batch_norm7(x)
        # x = self.dropout7(x)
        
        # x = F.relu(self.fc8(x))  # Changed to relu
        # x = self.batch_norm8(x)
        # x = self.dropout8(x)
        
        # x = F.relu(self.fc9(x))  # Changed to relu
        # x = self.batch_norm9(x)
        # x = self.dropout9(x)
        
        # x = F.relu(self.fc10(x))  # Changed to relu
        # x = self.batch_norm10(x)
        # x = self.dropout10(x)
        
        x = self.output(x)
        x = torch.sigmoid(x)
        
        return x

In [23]:
class DEF_NONDEF_Classifier(L.LightningModule):
    
    def __init__(self , num_features):
        
        super().__init__()
        
        self.Classifier = Classifier(num_features)
        
        self.train_accuracy = torchmetrics.Accuracy(task='binary')
        self.val_accuracy = torchmetrics.Accuracy(task='binary')

    def training_step(self, batch, batch_idx):
        
        x, y = batch
        y_hat = self.Classifier(x)
        
        loss = F.binary_cross_entropy(y_hat.squeeze() , y)
        acc = self.train_accuracy(y_hat.squeeze(), y)
        
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
        
        return loss
    
    def test_step(self, batch, batch_idx):
        
        x, y = batch
        y_hat = self.Classifier(x)
        
        test_loss = F.binary_cross_entropy(y_hat.squeeze() , y)
        acc = self.train_accuracy(y_hat.squeeze(), y)
        
        self.log('test_loss', test_loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('test_acc', acc, on_step=True, on_epoch=True, prog_bar=True)

    def configure_optimizers(self):
        
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-3)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
        
        return [optimizer], [scheduler]

In [24]:
DEF_NONDEF_Classifier_model = DEF_NONDEF_Classifier(32)

In [25]:
trainer = L.Trainer(max_epochs = 100)
trainer.fit(model = DEF_NONDEF_Classifier_model, train_dataloaders = train_loader )

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name           | Type           | Params
--------------------------------------------------
0 | Classifier     | Classifier     | 155 K 
1 | train_accuracy | BinaryAccuracy | 0     
2 | val_accuracy   | BinaryAccuracy | 0     
--------------------------------------------------
155 K     Trainable params
0         Non-trainable params
155 K     Total params
0.622     Total estimated model params size (MB)
c:\users\teehe\appdata\roaming\python\python311\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

c:\users\teehe\appdata\roaming\python\python311\site-packages\lightning\pytorch\trainer\call.py:54: Detected KeyboardInterrupt, attempting graceful shutdown...


In [None]:
trainer.test(DEF_NONDEF_Classifier_model , dataloaders = test_loader)

c:\users\teehe\appdata\roaming\python\python311\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]

[{'test_loss_epoch': 0.3799695670604706, 'test_acc_epoch': 0.8610315322875977}]

## MDF DDF Classifier

In [None]:
class MDF_DDF_Dataset (Dataset) :

    def __init__ (self , annotation_path , transform = None , target_transform = None) :

        self.annotation_path = annotation_path
        self.annotation_file = pd.read_csv(annotation_path , index_col = 'id')
        self.annotation_arrays = self.annotation_file.drop(columns=['nforest_type']).to_numpy().astype('float32')
        self.annotation_labels = (self.annotation_file['nforest_type']  == 'MDF').to_numpy().astype('float32')
        
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self) :

        return len(self.annotation_file)

    def __getitem__ (self , idx) :

        data  = self.annotation_arrays[idx]
        label = self.annotation_labels[idx]
        
        if self.transform :

            data = self.transform(data)

        if self.target_transform :

            label = self.target_transform(label)

        return data , label

In [None]:
MDF_DDF_dataset = MDF_DDF_Dataset('./datasets/fix_MDF_DDF.csv')


In [None]:
# Define the size of each split
train_size = int(0.9 * len(MDF_DDF_dataset))
test_size = len(MDF_DDF_dataset) - train_size

# Split the dataset
train_dataset , test_dataset = random_split(MDF_DDF_dataset, [train_size , test_size])

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [None]:
class Classifier2(nn.Module):
    
    def __init__(self ,num_features):
        
        super().__init__()
        
        self.fc1 = nn.Linear(num_features, 128)  # Increased neurons
        self.batch_norm1 = nn.BatchNorm1d(128) 
        self.dropout1 = nn.Dropout(p = 0.25)     # Increased dropout

        self.fc2 = nn.Linear(128, 128)       # Increased neurons
        self.batch_norm2 = nn.BatchNorm1d(128)
        self.dropout2 = nn.Dropout(p= 0.25)     # Increased dropout

        self.fc3 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm3 = nn.BatchNorm1d(128)
        self.dropout3 = nn.Dropout(p= 0.25)
        
        self.fc4 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm4 = nn.BatchNorm1d(128)
        self.dropout4 = nn.Dropout(p= 0.25)
        
        self.fc5 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm5 = nn.BatchNorm1d(128)
        self.dropout5 = nn.Dropout(p= 0.25)
        
        self.fc6 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm6 = nn.BatchNorm1d(128)
        self.dropout6 = nn.Dropout(p = 0.25)
        
        self.fc7 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm7 = nn.BatchNorm1d(128)
        self.dropout7 = nn.Dropout(p = 0.25)
        
        self.fc8 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm8 = nn.BatchNorm1d(128)
        self.dropout8 = nn.Dropout(p = 0.25)
        
        self.fc9 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm9 = nn.BatchNorm1d(128)
        self.dropout9 = nn.Dropout(p = 0.25)
        
        self.fc10 = nn.Linear(128, 128)       # Added another layer
        self.batch_norm10 = nn.BatchNorm1d(128)
        self.dropout10 = nn.Dropout(p = 0.25)
        
        self.output = nn.Linear(128, 1)

    def forward(self, x):
        
        x = F.relu(self.fc1(x))  # Changed to relu
        x = self.batch_norm1(x)
        x = self.dropout1(x)

        x = F.relu(self.fc2(x))  # Changed to relu
        x = self.batch_norm2(x)
        x = self.dropout2(x)

        x = F.relu(self.fc3(x))  # Changed to relu
        x = self.batch_norm3(x)
        x = self.dropout3(x)

        x = F.relu(self.fc4(x))  # Changed to relu
        x = self.batch_norm4(x)
        x = self.dropout4(x)
        
        x = F.relu(self.fc5(x))  # Changed to relu
        x = self.batch_norm5(x)
        x = self.dropout5(x)
        
        # x = F.relu(self.fc6(x))  # Changed to relu
        # x = self.batch_norm6(x)
        # x = self.dropout6(x)
        
        # x = F.relu(self.fc7(x))  # Changed to relu
        # x = self.batch_norm7(x)
        # x = self.dropout7(x)
        
        # x = F.relu(self.fc8(x))  # Changed to relu
        # x = self.batch_norm8(x)
        # x = self.dropout8(x)
        
        # x = F.relu(self.fc9(x))  # Changed to relu
        # x = self.batch_norm9(x)
        # x = self.dropout9(x)
        
        # x = F.relu(self.fc10(x))  # Changed to relu
        # x = self.batch_norm10(x)
        # x = self.dropout10(x)
        
        x = self.output(x)
        x = torch.sigmoid(x)
        
        return x

In [None]:
class MDF_DDF_Classifier(L.LightningModule):
    
    def __init__(self , num_features):
        
        super().__init__()
        
        self.Classifier = Classifier2(num_features)
        
        self.train_accuracy = torchmetrics.Accuracy(task='binary')
        self.val_accuracy = torchmetrics.Accuracy(task='binary')

    def training_step(self, batch, batch_idx):
        
        x, y = batch
        y_hat = self.Classifier(x)
        
        loss = F.binary_cross_entropy(y_hat.squeeze() , y)
        acc = self.train_accuracy(y_hat.squeeze(), y)
        
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
        
        return loss
    
    def test_step(self, batch, batch_idx):
        
        x, y = batch
        y_hat = self.Classifier(x)
        
        test_loss = F.binary_cross_entropy(y_hat.squeeze() , y)
        acc = self.train_accuracy(y_hat.squeeze(), y)
        
        self.log('test_loss', test_loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('test_acc', acc, on_step=True, on_epoch=True, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr= 1e-3)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
        return [optimizer], [scheduler]

In [None]:
MDF_DDF_Classifier_model = MDF_DDF_Classifier(32)

In [None]:
trainer = L.Trainer(max_epochs=100)
trainer.fit(model = MDF_DDF_Classifier_model, train_dataloaders = train_loader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name           | Type           | Params
--------------------------------------------------
0 | Classifier     | Classifier2    | 155 K 
1 | train_accuracy | BinaryAccuracy | 0     
2 | val_accuracy   | BinaryAccuracy | 0     
--------------------------------------------------
155 K     Trainable params
0         Non-trainable params
155 K     Total params
0.622     Total estimated model params size (MB)
c:\users\teehe\appdata\roaming\python\python311\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=100` reached.


In [None]:
trainer.test(MDF_DDF_Classifier_model , dataloaders = test_loader)

c:\users\teehe\appdata\roaming\python\python311\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]

[{'test_loss_epoch': 0.5531030297279358, 'test_acc_epoch': 0.7152600288391113}]