# basic plant detection for starting Bachelor thesis

## read metadata

In [1]:
# load data PlantCLEF2022_trusted_training_metadata.csv form /data/01_raw/PlantCLEF2022_trusted_training_metadata.csv

import pandas as pd

file_path = "../data/02_processed/merged_data.csv"
try:
    data = pd.read_csv(file_path, delimiter=";")
    if data.empty:
        print("The CSV file is empty")
    else:
        print(data.head())

except FileNotFoundError:
    print(
        f"File not found: {file_path}. Please check the file path and that the file is downloaded."
    )

   classid                                         image_path  \
0  5328909  5328909/de73353bbf8431ec594df8c0c070fa5d562756...   
1  5328909  5328909/cb1b1aac1895f8f5a52e1c85ef8ceae7580e68...   
2  5328909  5328909/4bdb06e3f9b4b61c9ed2a269498d064b41b1d0...   
3  5328909  5328909/2e5095764b764e63fca8150bab3aa1bbc2157a...   
4  5328909  5328909/9886a43a74aa9c493667235bb98786df965706...   

                       species       genus        family        order  \
0  sagittaria latifolia willd.  Sagittaria  Alismataceae  Alismatales   
1  sagittaria latifolia willd.  Sagittaria  Alismataceae  Alismatales   
2  sagittaria latifolia willd.  Sagittaria  Alismataceae  Alismatales   
3  sagittaria latifolia willd.  Sagittaria  Alismataceae  Alismatales   
4  sagittaria latifolia willd.  Sagittaria  Alismataceae  Alismatales   

        class                                   image_backup_url  
0  Liliopsida  https://lab.plantnet.org/LifeCLEF/PlantCLEF202...  
1  Liliopsida  https://lab.plantnet.

## use the 5 most occuring species 

In [2]:
# find the 5 most occuring species in the dataset and remove the rest

AMOUNT_OF_SPECIES = 5

species = data["species"].value_counts().head(AMOUNT_OF_SPECIES)
species = species.index.tolist()
data = data[data["species"].isin(species)]

species = data["species"].unique()
len(species)

5

## delete already existing images

In [3]:
import os

temp_dir = "temp"

# delete and create tmp dir to ensure it's empty
os.system(f"rm -rf {temp_dir}")

# Ensure the tmp/ directory exists
os.makedirs(temp_dir, exist_ok=True)

## download data per plant parallelized

In [4]:
import requests
import os
from concurrent.futures import ThreadPoolExecutor


def download_image(index_url: str) -> None:
    index, url = index_url
    try:
        response = requests.get(url)
        file_path = os.path.join(temp_dir, f"{index}.jpg")

        with open(file_path, "wb") as file:
            file.write(response.content)
        # print(f"Downloaded {url} to {file_path}")
    except Exception as e:
        print(f"Error downloading {url}: {e}")


# Prepare a list of tuples containing the index and URL for each image
index_url_list = [(index, row["image_backup_url"]) for index, row in data.iterrows()]

# Use ThreadPoolExecutor to download images in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(download_image, index_url_list)

## add image path to metadata

In [5]:
data["image_path"] = [f"{temp_dir}/{index}.jpg" for index in data.index]
data.head()

Unnamed: 0,classid,image_path,species,genus,family,order,class,image_backup_url
580,8002952,temp/580.jpg,ambrosia artemisiifolia l.,Ambrosia,Asteraceae,Asterales,Magnoliopsida,https://lab.plantnet.org/LifeCLEF/PlantCLEF202...
581,8002952,temp/581.jpg,ambrosia artemisiifolia l.,Ambrosia,Asteraceae,Asterales,Magnoliopsida,https://lab.plantnet.org/LifeCLEF/PlantCLEF202...
582,8002952,temp/582.jpg,ambrosia artemisiifolia l.,Ambrosia,Asteraceae,Asterales,Magnoliopsida,https://lab.plantnet.org/LifeCLEF/PlantCLEF202...
583,8002952,temp/583.jpg,ambrosia artemisiifolia l.,Ambrosia,Asteraceae,Asterales,Magnoliopsida,https://lab.plantnet.org/LifeCLEF/PlantCLEF202...
584,8002952,temp/584.jpg,ambrosia artemisiifolia l.,Ambrosia,Asteraceae,Asterales,Magnoliopsida,https://lab.plantnet.org/LifeCLEF/PlantCLEF202...


## split into train and test set

In [11]:
from typing import Tuple
import json

import tensorflow as tf
from sklearn.preprocessing import LabelEncoder


IMAGE_SIZE = [224, 224]
BATCH_SIZE = 32
CHANNELS = 3

train_df = data.sample(frac=0.8, random_state=123)  # 80% for training
val_df = data.drop(train_df.index)                  # Remaining 20% for validation


# Fit the label encoder and transform the 'classid' column
label_encoder = LabelEncoder()
data['species_encoded'] = label_encoder.fit_transform(data['species'])

# Create a mapping from encoded labels back to the original class names
classes = {str(index): name for index, name in enumerate(label_encoder.classes_)}
with open('classes.json', 'w', encoding='utf-8') as f:
    json.dump(classes, f, ensure_ascii=False, indent=4)

def preprocess_image(image_path: str, label: str) -> Tuple[tf.Tensor, tf.Tensor]:
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=CHANNELS)
    image = tf.image.resize(image, IMAGE_SIZE)
    image = image / 255.0  # Normalize pixel values
    return image, label

def df_to_dataset(dataframe: pd.DataFrame, shuffle=True, batch_size=BATCH_SIZE) -> tf.data.Dataset:
    images = dataframe['image_path'].values
    labels = dataframe['classid_encoded'].values
    ds = tf.data.Dataset.from_tensor_slices((images, labels))
    ds = ds.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        ds = ds.shuffle(buffer_size=len(dataframe))
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

train_dataset = df_to_dataset(train_df, shuffle=True, batch_size=BATCH_SIZE)
val_dataset = df_to_dataset(val_df, shuffle=False, batch_size=BATCH_SIZE)

## define model management

In [None]:
import importlib
import datetime
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model

AMOUNT_EPOCHS = 10

class ModelManager:
    def __init__(self, model_names, input_shape, num_classes, train_dataset, val_dataset, batch_size):
        self.model_names = model_names
        self.input_shape = input_shape
        self.num_classes = num_classes
        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.batch_size = batch_size

    def load_model(self, model_name):
        module = importlib.import_module('tensorflow.keras.applications')
        model_class = getattr(module, model_name)
        base_model = model_class(weights='imagenet', include_top=False, input_shape=self.input_shape)
        base_model.trainable = False
        x = base_model.output
        x = GlobalAveragePooling2D()(x)
        x = Dense(1024, activation='relu')(x)  # Dense layer for feature interpretation
        predictions = Dense(self.num_classes, activation='softmax')(x)
        model = Model(inputs=base_model.input, outputs=predictions)
        return model

    def train_and_evaluate(self):
        results = []
        for model_name in self.model_names:
            print(f"Training and evaluating {model_name}")
            model = self.load_model(model_name)
            model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
            history = model.fit(
                self.train_dataset,
                steps_per_epoch=len(train_df) // self.batch_size,
                epochs=AMOUNT_EPOCHS,
                validation_data=self.val_dataset,
                validation_steps=len(val_df) // self.batch_size
            )
            val_loss, val_accuracy = model.evaluate(self.val_dataset, steps=len(val_df) // self.batch_size)
            print(f"{model_name} - Validation loss: {val_loss}, Validation accuracy: {val_accuracy}")
            model_filename = f"{model_name}-{val_accuracy:.4f}-{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}.keras"
            model.save(f'../models/{model_filename}')
            results.append((model_name, val_accuracy, model_filename))
            print('---------------------------')
        return results

## train models

In [None]:
model_names = ['MobileNetV2', 'MobileNetV3Large', 'MobileNetV3Small', 'EfficientNetV2S'] 
manager = ModelManager(
    model_names=model_names,
    input_shape=(*IMAGE_SIZE, CHANNELS),
    num_classes=AMOUNT_OF_SPECIES,
    train_dataset=train_dataset,
    val_dataset=val_dataset,
    batch_size=BATCH_SIZE
)

results = manager.train_and_evaluate()
for result in results:
    print(f"Model: {result[0]}, Accuracy: {result[1]:.4f}, Saved as: {result[2]}")