# Import Libs

In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
from torch.utils.data import random_split
import pandas as pd

# Setup Data

## Load data

In [2]:
class_mapping = {
    0: 1,
    1: 0,
}
train_dir = r"../datas/data"
test_dir = r"../datas/test"

## Build dataloaders

In [3]:
class MushroomDataset(Dataset):
    def __init__(self, root_dir, transform=None, is_test=False):
        self.root_dir = root_dir
        self.transform = transform
        self.is_test = is_test
        self.samples = []
        self.classes = []
        self.class_to_idx = {}

        if not os.path.exists(root_dir):
            print(f"Warning: Directory {root_dir} does not exist.")
            return
        try:
            items = os.listdir(root_dir)
            if self.is_test or any(item.lower().endswith(('.png', '.jpg', '.jpeg')) for item in items):
                self._setup_test_dataset(root_dir)
            else:
                self._setup_train_dataset(root_dir)
        except Exception as e:
            print(f"Error setting up dataset: {e}")
            self.classes = ["unknown"]
            self.class_to_idx = {"unknown": 0}

    def _setup_test_dataset(self, root_dir):
        self.classes = ["unknown"]
        self.class_to_idx = {"unknown": 0}

        for img_name in sorted(os.listdir(root_dir)):
            if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(root_dir, img_name)
                if os.path.isfile(img_path):
                    self.samples.append((img_path, -1))

    def _setup_train_dataset(self, root_dir):
        self.classes = sorted([d for d in os.listdir(root_dir)
                              if os.path.isdir(os.path.join(root_dir, d))])
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}

        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                for img_name in os.listdir(class_dir):
                    img_path = os.path.join(class_dir, img_name)
                    if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                        self.samples.append((img_path, self.class_to_idx[class_name]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        try:
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image, label
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            placeholder = torch.zeros((3, 32, 32))
            return placeholder, label

class MultiAugmentDataset(Dataset):
    def __init__(self, dataset, num_copies=2, transforms_list=None):
        self.dataset = dataset
        self.num_copies = num_copies

        if transforms_list is None:
            self.transforms_list = [dataset.transform] * num_copies
        else:
            assert len(transforms_list) == num_copies, "Number of transforms must match num_copies"
            self.transforms_list = transforms_list

        self.original_transform = dataset.transform

    def __len__(self):
        return len(self.dataset) * self.num_copies

    def __getitem__(self, idx):
        real_idx = idx % len(self.dataset)
        copy_idx = idx // len(self.dataset)

        self.dataset.transform = self.transforms_list[copy_idx]
        image, label = self.dataset[real_idx]
        self.dataset.transform = self.original_transform

        return image, label

def setup_data_loaders(train_dir, test_dir, batch_size=32, val_split=0.1,
                       transforms_list=None, eval_transform=None, use_multi_augment=True):
    print("Setting up training and test datasets with mixed augmentation strategies...")

    if not os.path.exists(train_dir):
        print(f"Warning: Training directory {train_dir} does not exist!")
        os.makedirs(train_dir, exist_ok=True)

    train_dataset = MushroomDataset(train_dir, transform=None)

    if use_multi_augment:
        print("Using multi-augmentation strategy (6 copies with different transforms)")
        train_dataset = MultiAugmentDataset(
            train_dataset,
            num_copies=5,
            transforms_list=transforms_list
        )
    else:
        train_dataset.transform = transforms_list[0]

    train_size = int((1 - val_split) * len(train_dataset))
    val_size = len(train_dataset) - train_size

    train_dataset, valid_dataset = random_split(
        train_dataset, [train_size, val_size], generator=torch.Generator().manual_seed(42)
    )

    test_dataset = MushroomDataset(test_dir, transform=eval_transform, is_test=True)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

    print(f"Training samples: {len(train_dataset)}")
    print(f"Validation samples: {len(valid_dataset)}")
    print(f"Test samples: {len(test_dataset)}")

    return train_loader, valid_loader, test_loader

In [4]:
train_loader, valid_loader, test_loader = setup_data_loaders(
    train_dir, test_dir,
    batch_size=32,
    val_split=0.1
)

Setting up training and test datasets with mixed augmentation strategies...
Using multi-augmentation strategy (6 copies with different transforms)
Training samples: 2700
Validation samples: 300
Test samples: 100


# Build Model SVM

In [5]:
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
import numpy as np

def preprocess_dataset(dataset):
    X, y = [], []
    for img, label in dataset:                  
        vec = np.array(img).flatten()                           

        X.append(vec)
        y.append(label)
    return np.stack(X), np.array(y)

X_train, y_train = preprocess_dataset(train_loader.dataset)
X_val,   y_val   = preprocess_dataset(valid_loader.dataset)
X_test,  y_test  = preprocess_dataset(test_loader.dataset)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val   = scaler.transform(X_val)
X_test  = scaler.transform(X_test)

my_model = SVC(kernel='linear')
my_model.fit(X_train, y_train)

my_model.scaler_ = scaler



In [6]:
import joblib

# Sau khi huấn luyện xong
model_path = "svm_model.pth"
joblib.dump(my_model, model_path)
print(f"Model saved to {model_path}")

Model saved to svm_model.pth


# Test Model

In [7]:
def calculate_accuracy(csv_path):
    df = pd.read_csv(csv_path)
    expected_labels = []
    expected_labels.extend([1] * 50)
    expected_labels.extend([0] * 50)

    actual_labels = df['label'].tolist()

    n = min(len(actual_labels), len(expected_labels))

    correct = sum(1 for i in range(n) if actual_labels[i] == expected_labels[i])

    accuracy = (correct / n) * 100

    return accuracy

In [8]:
import os
import pandas as pd
import numpy as np
from PIL import Image

def predict_and_create_submission(model, test_loader, class_mapping, feature_extractor=None, size=(32,32), filename='submission.csv'):
    # Extract filenames from dataset
    dataset = test_loader.dataset
    try:
        all_filenames = [os.path.basename(path) for path, _ in dataset.samples]
    except Exception:
        all_filenames = [f"test_image_{i}.jpg" for i in range(len(dataset))]

    # Prepare feature matrix
    features = []
    for idx, item in enumerate(dataset.samples):
        path, _ = item
        img = Image.open(path).convert('RGB').resize(size)
        arr = np.array(img)
        feat = feature_extractor(arr) if feature_extractor else arr.flatten()
        features.append(feat)

    X_test = np.vstack(features)

    # Scale if scaler_ attribute exists
    if hasattr(model, 'scaler_'):
        X_test = model.scaler_.transform(X_test)

    # Predict labels
    preds = model.predict(X_test)

    # Map to actual label strings
    labels = [class_mapping.get(p, p) for p in preds]

    # Build submission DataFrame
    ids = [os.path.splitext(fn)[0] for fn in all_filenames]
    submission_df = pd.DataFrame({'id': ids, 'label': labels})

    # Show sample and save
    print("\nSubmission sample (before saving):")
    print(submission_df.head(10))
    submission_df.to_csv(filename, index=False)
    print(f"Submission file created: {filename}")

    # Verify saved file
    try:
        saved = pd.read_csv(filename)
        if not submission_df.equals(saved):
            print("WARNING: Saved file differs from generated DataFrame!")
    except Exception as e:
        print(f"Error verifying saved file: {e}")

    return submission_df

In [9]:
submission_df = predict_and_create_submission(my_model, test_loader, class_mapping, filename='submission.csv')


Submission sample (before saving):
    id  label
0  101      1
1  102      0
2  103      0
3  104      1
4  105      1
5  106      0
6  107      0
7  108      1
8  109      0
9  110      1
Submission file created: submission.csv


In [10]:
print("\nFinal evaluation on TEST set:", calculate_accuracy('submission.csv'))


Final evaluation on TEST set: 73.0


In [11]:
import torchvision, sklearn, PIL

print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("CUDA toolkit version:", torch.version.cuda)
print("cuDNN version:", torch.backends.cudnn.version())

print("TorchVision version:", torchvision.__version__)
print("NumPy version:", np.__version__)
print("scikit‑learn version:", sklearn.__version__)
print("joblib version:", joblib.__version__)
print("Pillow (PIL) version:", PIL.__version__)

PyTorch version: 2.7.1+cpu
CUDA available: False
CUDA toolkit version: None
cuDNN version: None
TorchVision version: 0.22.1+cpu
NumPy version: 2.3.1
scikit‑learn version: 1.6.1
joblib version: 1.5.1
Pillow (PIL) version: 11.3.0
