In [None]:
from sklearn.model_selection import train_test_split
import yaml

# Load the YAML configuration
with open("configs/config.yaml", "r") as file:
    config = yaml.safe_load(file)

# Extract parameters
frac = config['data_split']['frac']
test_size = config['data_split']['test_size']


y = train_labels.sample(frac=frac, random_state=1)
x = train_features.loc[y.index].filepath.to_frame()

# note that we are casting the species labels to an indicator/dummy matrix
x_train, x_eval, y_train, y_eval = train_test_split(
    x, y, stratify=y, test_size=0.25
)

In [None]:
import torch
from torch.utils.data import Dataset
from torchvision import transforms


class ImagesDataset(Dataset):
    """Reads in an image, transforms pixel values, and serves
    a dictionary containing the image id, image tensors, and label.
    """

    def __init__(self, x_df, y_df=None):
        self.data = x_df
        self.label = y_df
        self.transform = transforms.Compose(
            [
                transforms.Resize((224, 224)),
                transforms.ToTensor(),
                transforms.Normalize(
                    mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)
                ),
            ]
        )

    def __getitem__(self, index):
        image = Image.open(self.data.iloc[index]["filepath"]).convert("RGB")
        image = self.transform(image)
        image_id = self.data.index[index]
        # if we don't have labels (e.g. for test set) just return the image and image id
        if self.label is None:
            sample = {"image_id": image_id, "image": image}
        else:
            label = torch.tensor(self.label.iloc[index].values, 
                                 dtype=torch.float)
            sample = {"image_id": image_id, "image": image, "label": label}
        return sample

    def __len__(self):
        return len(self.data)

In [None]:
from torch.utils.data import DataLoader

# Load the YAML configuration
with open("configs/config.yaml", "r") as file:
    config = yaml.safe_load(file)

    
batch_size = config['data_preprocess']['batch_size']

train_dataset = ImagesDataset(x_train, y_train)
train_dataloader = DataLoader(
    train_dataset
    ,batch_size=batch_size # can be adjusted in yaml
    )

# Training

In [None]:
model.classifier = nn.Sequential(
    nn.Linear(1280, 100),  # 1280-dim input corresponds to EfficientNet-B0
    nn.ReLU(inplace=True),  # ReLU activation introduces non-linearity
    nn.Dropout(0.1),  # Dropout to mitigate overfitting
    nn.Linear(100, 8)  # Final dense layer outputs 8 classes
)

# Optionally freeze the backbone weights
for param in model.features.parameters():
    param.requires_grad = False

# Unfreeze the backbone if needed, after training the head
# Adjust the learning rate accordingly during training

In [None]:
import torch.optim as optim

lr = config['optimizer_hyper_p']['lr']
momentum = config['optimizer_hyper_p']['momentum']

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
    model.parameters()
    ,lr=lr
    ,momentum=momentum)

In [None]:
num_epochs = 10

tracking_loss = {}

for epoch in range(1, num_epochs + 1):
    print(f"Starting epoch {epoch}")

    # iterate through the dataloader batches. tqdm keeps track of progress.
    for batch_n, batch in tqdm(
        enumerate(train_dataloader), total=len(train_dataloader)
    ):

        # 1) zero out the parameter gradients so that gradients from previous batches are not used in this step
        optimizer.zero_grad()

        # 2) run the foward step on this batch of images
        outputs = model(batch["image"])

        # 3) compute the loss
        loss = criterion(outputs, batch["label"])
        # let's keep track of the loss by epoch and batch
        tracking_loss[(epoch, batch_n)] = float(loss)

        # 4) compute our gradients
        loss.backward()
        # update our weights
        optimizer.step()