In [None]:
!pip install keras_unet_collection

# Standard library imports
from glob import glob

# Third-party imports for data manipulation
import numpy as np
import pandas as pd

# Image processing libraries
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Machine Learning and Neural Network libraries
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.utils import Sequence, to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from keras_unet_collection import models, utils
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
df = pd.read_csv('/content/drive/MyDrive/forest_db/forest.csv')
df.head(3)

# Segmentation

In [None]:
# Initial split: 80% for training, 20% for temp (validation + test)
df_train, df_temp = train_test_split(df, test_size=0.2, random_state=42)

# Second split on the temp: 50% of temp for validation, 50% for test
df_val, df_test = train_test_split(df_temp, test_size=0.5, random_state=42)

In [None]:
class DataFrameSequenceSegmentation(Sequence):
    def __init__(self, df, batch_size, image_size, augment=False, **kwargs):
        super().__init__(**kwargs)
        self.df = df.copy()
        self.batch_size = batch_size
        self.image_size = image_size
        self.augment = augment

        if self.augment:
            self.datagen = ImageDataGenerator(
                rotation_range=40,
                width_shift_range=0.2,
                height_shift_range=0.2,
                shear_range=0.2,
                zoom_range=0.4,
                horizontal_flip=True,
                fill_mode='nearest'
            )
        else:
            self.datagen = ImageDataGenerator()

    def __len__(self):
        return np.ceil(len(self.df) / self.batch_size).astype(int)

    def __getitem__(self, idx):
        batch_slice = slice(idx * self.batch_size, (idx + 1) * self.batch_size)
        batch_df = self.df.iloc[batch_slice]
        images = [plt.imread(img) for img in batch_df['img_path']]
        labels = [plt.imread(mask) for mask in batch_df['mask_path']]

        # Apply augmentation
        if self.augment:
            images = np.array([
                self.datagen.random_transform(image) for image in images
            ])
            labels = np.array([
                self.datagen.random_transform(mask) for mask in labels
            ])

        return np.array(images), np.array(labels)

    def on_epoch_end(self):
        self.df = self.df.sample(frac=1).reset_index(drop=True)  # Shuffle the dataset

In [None]:
def iou(y_true, y_pred, smooth=1e-6):
    # Flatten the input arrays to 1D to simplify the intersection and union calculations
    y_true_flat = K.flatten(y_true)
    y_pred_flat = K.flatten(y_pred)

    # Calculate intersection and union
    intersection = K.sum(y_true_flat * y_pred_flat)
    union = K.sum(y_true_flat) + K.sum(y_pred_flat) - intersection

    # Calculate IoU
    iou = (intersection + smooth) / (union + smooth)
    return iou

In [None]:
model = models.att_unet_2d(
      (256, 256, 3),
      filter_num=[64, 128, 256, 512, 1024],
      n_labels=1,
      stack_num_down=2,
      stack_num_up=2,
      activation='ReLU',
      atten_activation='ReLU',
      attention='add',
      output_activation='Sigmoid',
      batch_norm=True,
      pool=False,
      unpool=False,
      backbone='VGG16',
      weights='imagenet',
      freeze_backbone=True,
      freeze_batch_norm=True,
      name='attunet'
)

In [None]:
model.compile(loss=keras.losses.binary_crossentropy, optimizer=keras.optimizers.SGD(learning_rate=1e-2), metrics=[iou])

In [None]:
image_size = (256, 256, 3)
batch_size = 8

train_generator = DataFrameSequenceSegmentation(df_train, batch_size=batch_size, image_size=image_size, augment=False)
validation_generator = DataFrameSequenceSegmentation(df_val, batch_size=batch_size, image_size=image_size, augment=False)

In [None]:
early_stopping_callback = EarlyStopping(
    monitor='val_iou',         # Monitor validation loss
    min_delta=3e-2,            # Minimum change to qualify as an improvement
    patience=4,                # Stop after 4 epochs without improvement
    verbose=1,                 # Output messages
    mode='max',                # Stop when the monitored quantity stops decreasing
    restore_best_weights=True  # Restore model weights from the best epoch
)

model.fit(
    train_generator,
    epochs=1,
    validation_data=validation_generator,
    callbacks=[early_stopping_callback]
)

In [None]:
test_generator = DataFrameSequenceSegmentation(df_test, batch_size=batch_size, image_size=image_size)
y_pred = model.predict(test_generator)

In [None]:
# Evaluate the model by iou metric
test_loss, test_iou = model.evaluate(test_generator)

print(f"Test Loss: {test_loss}")
print(f"Test IoU: {test_iou}")

In [None]:
%matplotlib inline

def ax_decorate_box(ax):
    [j.set_linewidth(0) for j in ax.spines.values()]
    ax.tick_params(axis="both", which="both", bottom=False, top=False, \
               labelbottom=False, left=False, right=False, labelleft=False)
    return ax

i = 0
for ind, pair in df_test.iterrows():
    fig, AX = plt.subplots(1, 3, figsize=(8, (8-0.2)/3))
    plt.subplots_adjust(0, 0, 1, 1, hspace=0, wspace=0.1)
    for ax in AX:
        ax = ax_decorate_box(ax)

    AX[0].imshow(plt.imread(pair['img_path']), cmap=plt.cm.jet)
    AX[1].imshow(plt.imread(pair['mask_path']), cmap=plt.cm.jet)
    AX[2].imshow(y_pred[i], cmap=plt.cm.jet)

    AX[0].set_title("Original", fontsize=14)
    AX[1].set_title("Density", fontsize=14)
    AX[2].set_title('', fontsize=14)

    i += 1

In [None]:
model.save('/content/drive/MyDrive/forest_db/segmentation.keras')

# Classificator

In [None]:
# Initial split: 80% for training, 20% for temp (validation + test)
df_train, df_temp = train_test_split(df, test_size=0.2, random_state=42)

# Second split on the temp: 50% of temp for validation, 50% for test
df_val, df_test = train_test_split(df_temp, test_size=0.5, random_state=42)

In [None]:
class DataFrameSequenceClassification(Sequence):
    def __init__(self, df, batch_size, image_size, augment=False, ):
        self.df = df.copy()
        self.batch_size = batch_size
        self.image_size = image_size
        self.augment = augment

        self.label_encoder = LabelEncoder()
        self.df['encoded_labels'] = self.label_encoder.fit_transform(self.df['forest_type'])

        if self.augment:
            self.datagen = ImageDataGenerator(
                rotation_range=40,
                width_shift_range=0.2,
                height_shift_range=0.2,
                shear_range=0.2,
                zoom_range=0.4,
                horizontal_flip=True,
                fill_mode='nearest'
            )
        else:
            self.datagen = ImageDataGenerator()

    def __len__(self):
        return np.ceil(len(self.df) / self.batch_size).astype(int)

    def __getitem__(self, idx):
        batch_slice = slice(idx * self.batch_size, (idx + 1) * self.batch_size)
        batch_df = self.df.iloc[batch_slice]
        images = [(plt.imread(row['img_path']) * plt.imread(row['mask_path'])[:, :, np.newaxis]) for ind, row in batch_df[['img_path', 'mask_path']].iterrows()]
        labels = to_categorical(batch_df['encoded_labels'].values, num_classes=3)

        # Apply augmentation
        if self.augment:
            images = np.array([
                self.datagen.random_transform(image) for image in images
            ])

        return np.array(images), np.array(labels)

    def on_epoch_end(self):
        self.df = self.df.sample(frac=1).reset_index(drop=True)  # Shuffle the dataset

    @property
    def classes(self):
        # Expose the learned classes externally
        return self.label_encoder.classes_

In [None]:
base_model = ResNet50(
    weights='imagenet',
    include_top=False,
    input_shape=(256, 256, 3)
)

x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu', kernel_regularizer=l2(0.1))(x)
predictions = Dense(3, activation='softmax', kernel_regularizer=l2(0.1))(x)

# Create the full model
model = Model(inputs=base_model.input, outputs=predictions)

In [None]:
for layer in base_model.layers:
    layer.trainable = False

In [None]:
optimizer = Adam(learning_rate=1e-2)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
image_size = (256, 256, 3)
batch_size = 16

train_generator = DataFrameSequenceClassification(df_train, batch_size=batch_size, image_size=image_size, augment=True)
validation_generator = DataFrameSequenceClassification(df_val, batch_size=batch_size, image_size=image_size, augment=False)

In [None]:
early_stopping_callback = EarlyStopping(
    monitor='loss',            # Monitor validation loss
    min_delta=1e-1,            # Minimum change to qualify as an improvement
    patience=10,               # Stop after 10 epochs without improvement
    verbose=1,                 # Output messages
    mode='max',                # Stop when the monitored quantity stops decreasing
    restore_best_weights=True  # Restore model weights from the best epoch
)

model.fit(
    train_generator,
    epochs=100,
    validation_data=validation_generator,
    callbacks=[early_stopping_callback]
)

In [None]:
test_generator = DataFrameSequenceClassification(df_train, batch_size=batch_size, image_size=image_size)

# Evaluate the model
test_loss, test_accuracy = model.evaluate(test_generator)

print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

In [None]:
test_images, test_labels = next(iter(test_generator))
predictions = model.predict(test_images)

In [None]:
model.save('/content/drive/MyDrive/forest_db/classification.keras')