In [16]:
# Import necessary libraries
import zipfile
import os
import numpy as np
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder

In [17]:
import os
from PIL import Image, ImageFile
from torchvision import datasets, transforms, models
import torch
from torch.utils.data import DataLoader, Dataset
from torch import nn, optim
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import numpy as np
import seaborn as sns
import pandas as pd

In [18]:
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

In [19]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Define the path to the ZIP file in your Google Drive
zip_file_path = '/content/drive/MyDrive/ADATELEMZÉS_HF/top_100_species_images.zip'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [20]:
#### Load the dataset from CSV file
df = pd.read_csv('/content/drive/MyDrive/ADATELEMZÉS_HF/FungiCLEF2023_train_metadata_PRODUCTION.csv')

#### Find the top 100 species by number of examples
top_100_species = df['species'].value_counts().head(100).index.tolist()

#### Filter the dataframe to only include rows from the top 100 species
df_top_100 = df[df['species'].isin(top_100_species)]

#### Create a new dataframe with only the 'image_path' and 'poisonous' columns
top_100_pairs = df_top_100[['image_path', 'poisonous']]
top_100_pairs['poisonous'] = top_100_pairs['poisonous'].astype(float)

#### Print the shape of the final dataframe
print(f"Final dataframe shape: {top_100_pairs.shape}")

unique_poisonous_types = top_100_pairs['poisonous'].nunique()
print(f"Number of unique types: {unique_poisonous_types}")
print(top_100_species)

# Create a dictionary that maps each species to a unique integer label
#species_labels = {poisonous: index for index, poisonous in top_100_poisonous}
poisinous_labels = df_top_100.groupby('species')['poisonous'].first().to_dict()

print(poisinous_labels)
print(top_100_pairs.head())


KeyboardInterrupt



In [None]:
from torchvision import transforms

# Define transformations to be applied to each image (resizing, converting to tensor, normalizing)
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images to 288x288 pixels
    transforms.ToTensor(),  # Convert image to a tensor (used by PyTorch)
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [None]:
import os
from PIL import Image
from torch.utils.data import Dataset
import zipfile

# Define a custom dataset class for loading our images and labels
class CustomDataset(Dataset):
    def __init__(self, zip_file_path, top_100_df, transform=None):
        self.zip_file_path = zip_file_path
        self.top_100_df = top_100_df
        self.transform = transform
        self.image_paths = []  # List to store image paths
        self.labels = []       # List to store image labels
        image_count = 0
        label_count = 0
        iter = 0

        print("Zip folder path:", zip_file_path)  # Print the folder path

        # Open the ZIP file
        with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
            # Get the name of the folder
            folder_name = os.path.splitext(os.path.basename(zip_file_path))[0]
            print("Folder name:", folder_name)  # Print the subfolder name

            # Loop through each file in the ZIP file
            for file_info in zip_file.infolist():
                file_name = file_info.filename
                # print("File name:", file_name)  # Print the file name

                if file_name.startswith(folder_name + '/') and file_name.endswith('.JPG') and not file_name.startswith('__MACOSX/'):
                    #print("Matched file:", file_name)  # Print the matched file name

                    if(iter % 100 == 0):
                        # Get the image path after the '/' symbol
                        image_path_after_slash = os.path.basename(file_name)
                        #print("Needed name: ", image_path_after_slash)

                        # Extract the label from the file name
                        label = self.top_100_df.loc[self.top_100_df['image_path'] == image_path_after_slash, 'poisonous'].values[0]
                        print("Extracted label:", label)  # Print the extracted label

                        self.image_paths.append(file_name)  # Store the image path within the ZIP file
                        image_count += 1
                        self.labels.append(label)  # Store the corresponding label
                        label_count += 1

                    iter = iter + 1

                    if(label_count % 100 == 0):
                      print(label_count)

        # print("Image paths:", self.image_paths)
        # print("Labels:", self.labels)
        print(len(set(self.labels)))
        print("ZIP file path:", zip_file_path)  # Print the ZIP file path
        print("Subfolder name:", folder_name)  # Print the subfolder name
        print("Image count:", image_count)
        print("Label count:", label_count)
        print(self.labels[:10])
        #self.labels = [float(label) for label in self.labels]
        #print("Labels dtype:", self.labels.dtype)
        #print("Unique labels:", torch.unique(self.labels))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Open the ZIP file and read the image
        with zipfile.ZipFile(self.zip_file_path, 'r') as zip_file:
            img_path = self.image_paths[idx]
            image = Image.open(zip_file.open(img_path)).convert("RGB")
            label = self.labels[idx]

            if self.transform:
                image = self.transform(image)

            if image is None or label is None:
                print(f"Found None at index {idx}: image={image}, label={label}")

        return image, label

In [None]:
# Create dataset and dataloader objects for the training dataset
dataset = CustomDataset(zip_file_path=zip_file_path, top_100_df=top_100_pairs, transform=transform)

# Create DataLoaders to efficiently load data in batches
train_loader = DataLoader(dataset, batch_size=16, shuffle=True)

In [None]:
for images, labels in train_loader:
    print(f"Images shape: {images.shape}, Labels shape: {labels.shape}")
    labels = labels.to(torch.float32)
    break

In [None]:
print("Labels dtype:", labels.dtype)
print("Unique labels:", torch.unique(labels))

In [None]:
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights

# Define the model using pre-trained ResNet-50
class MushroomClassifier(nn.Module):
    def __init__(self, num_classes=1):
        super(MushroomClassifier, self).__init__()
        # Use the updated weights argument
        weights = EfficientNet_B0_Weights.IMAGENET1K_V1
        self.model = efficientnet_b0(weights=weights)
        self.model.classifier = nn.Sequential(
            nn.Dropout(0.2),  # Optional dropout
            nn.Linear(self.model.classifier[1].in_features, num_classes)
        )

    def forward(self, x):
        return self.model(x)

# Instantiate the model and move it to GPU if available
num_classes = 1
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MushroomClassifier(num_classes=num_classes)
model.to(device)
print("Model initialized successfully.")

In [None]:
# Set up the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [None]:
from sklearn.model_selection import KFold


# Number of folds
num_folds = 5

# Initialize KFold
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)

# Placeholder for cross-validation results
fold_results = []

# Define the number of epochs
num_epochs = 10

In [None]:
from torch.nn import BCEWithLogitsLoss
from torch.optim import Adam
from torch.optim.lr_scheduler import CosineAnnealingLR

# Cross-validation loop
for fold, (train_idx, val_idx) in enumerate(kf.split(dataset)):
    print(f"\nFold {fold + 1}/{num_folds}")

    # Split dataset into training and validation sets for this fold
    print("Splitting dataset...")
    train_subset = torch.utils.data.Subset(dataset, train_idx)
    val_subset = torch.utils.data.Subset(dataset, val_idx)

    # Create DataLoaders
    print("Creating DataLoaders...")
    train_loader = DataLoader(train_subset, batch_size=1, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=1, shuffle=False)

    # Initialize the model
    print("Initializing the model...")
    model = MushroomClassifier(num_classes=1).to(device)

    # Define loss, optimizer, and scheduler
    print("Setting up loss function, optimizer, and scheduler...")
    criterion = BCEWithLogitsLoss()
    optimizer = Adam(model.parameters(), lr=1e-3)
    scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=1e-5)

    # Training and validation loop for this fold
    for epoch in range(num_epochs):
        print(f"\n--- Epoch {epoch + 1}/{num_epochs} ---")
        model.train()
        train_loss = 0.0

        # Training
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device).float()

            # Forward pass
            print("Forward pass...")
            outputs = model(images).squeeze(1)
            loss = criterion(outputs, labels)

            # Backward pass
            print("Backward pass...")
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        # Scheduler step
        print("Scheduler step...")
        scheduler.step()

        # Validation
        print("Validation...")
        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device).float()

                outputs = model(images).squeeze()
                loss = criterion(outputs, labels)

                val_loss += loss.item()

                # Convert logits to predictions
                preds = torch.sigmoid(outputs) > 0.5
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # Calculate metrics
        val_loss /= len(val_loader)
        train_loss /= len(train_loader)
        f1 = f1_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds)
        recall = recall_score(all_labels, all_preds)

        print(f"Epoch {epoch + 1} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | F1: {f1:.4f} | Precision: {precision:.4f} | Recall: {recall:.4f}")

    # Log metrics for this fold
    fold_results.append({
        "fold": fold + 1,
        "f1_score": f1,
        "precision": precision,
        "recall": recall,
        "val_loss": val_loss
    })


In [None]:
# Summary of cross-validation results
results_df = pd.DataFrame(fold_results)
print("\nCross-validation results:")
print(results_df)
print("\nAverage F1 Score:", results_df["f1_score"].mean())