In [None]:

import os
import random

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from PIL import Image
from torchsummary import summary

SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

### **Cassava Leaf Disease**

#### **Load Dataset**

**Download Dataset**

In [None]:
!curl https://storage.googleapis.com/emcassavadata/cassavaleafdata.zip  -O /cassavaleafdata.zip


In [None]:
!unzip /content/cassavaleafdata.zip

**Read Dataset**

In [None]:
data_paths = {
	'train': './train',
	'valid': './validation',
	'test': './test'
}

In [None]:
from imutils import paths


def show_labels(data_paths):
	fig, ax = plt.subplots(1, len(data_paths), figsize = (12, 6))
	for idx, (key, sub_dir) in enumerate(data_paths.items()):
		labels = os.listdir(sub_dir)
		list_data = []
		for label in labels:
			image_files = list(paths.list_images(os.path.join(sub_dir, label)))
			list_data.append(len(image_files))
		ax[idx].bar(labels, list_data)
		ax[idx].set_title(key)
	plt.tight_layout()
	plt.show()


show_labels(data_paths)

labels_dict = {
	"cbb": "Cassava Bacterial Blight (CBB)",
	"cbsd": "Cassava Brown Streak Disease (CBSD)",
	"cgm": "Cassava Green Mottle (CGM)",
	"cmd": "Cassava Mosaic Disease (CMD)",
	"healthy": "Healthy"
}


def plot_images(data_dir, label, num_sample = 6):
	data_dir = os.path.join(data_dir, label)
	image_files = list(paths.list_images(data_dir))[:num_sample]
	fig, ax = plt.subplots(2, num_sample // 2, figsize = (14, 7))
	for i, image_dir in enumerate(image_files):
		img = Image.open(image_dir)
		label = image_dir.split('/')[-2]
		ax[i // (num_sample // 2)][i % (num_sample // 2)].imshow(img)
		ax[i // (num_sample // 2)][i % (num_sample // 2)].set_title(labels_dict[label])
		ax[i // (num_sample // 2)][i % (num_sample // 2)].axis('off')
	plt.tight_layout()
	plt.show()


plot_images(data_paths['train'], label = "cbb")

plot_images(data_paths['train'], label = "cbsd")

plot_images(data_paths['train'], label = "cgm")

plot_images(data_paths['train'], label = "cmd")

plot_images(data_paths['train'], label = "healthy")


#### **Preprocessing**

In [None]:
# load image from path
def loader(path):
	return Image.open(path)


img_size = 150

train_transforms = transforms.Compose([
	transforms.Resize((150, 150)),
	transforms.ToTensor(),
])

train_data = datasets.ImageFolder(
	root = data_paths['train'],
	loader = loader,
	transform = train_transforms
)
valid_data = datasets.ImageFolder(
	root = data_paths['valid'],
	transform = train_transforms
)
test_data = datasets.ImageFolder(
	root = data_paths['test'],
	transform = train_transforms
)

#### **Dataloader**


In [None]:


BATCH_SIZE = 512

train_dataloader = data.DataLoader(
	train_data,
	shuffle = True,
	batch_size = BATCH_SIZE
)
valid_dataloader = data.DataLoader(
	valid_data,
	batch_size = BATCH_SIZE
)
test_dataloader = data.DataLoader(
	test_data,
	batch_size = BATCH_SIZE
)

len(train_dataloader)

inputs, labels = next(iter(train_dataloader))

#### **Model**

In [None]:
from model import LeNetClassifier

In [None]:
num_classes = len(train_data.classes)
num_classes

In [None]:
lenet_model = LeNetClassifier(num_classes)

summary(lenet_model, (3, 150, 150))

inputs.shape

In [None]:
predictions = lenet_model(inputs)

predictions

#### **Loss & Optimizer**

In [None]:
optimizer = optim.Adam(lenet_model.parameters())

criterion = nn.CrossEntropyLoss()

loss = criterion(predictions, labels)
loss

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

lenet_model.to(device)

#### **Trainer**

In [None]:
import time


def train(model, optimizer, criterion, train_dataloader, device, epoch = 0, log_interval = 15):
	model.train()
	total_acc, total_count = 0, 0
	losses = []
	start_time = time.time()

	for idx, (inputs, labels) in enumerate(train_dataloader):
		inputs = inputs.to(device)
		labels = labels.to(device)

		optimizer.zero_grad()

		predictions = model(inputs)

		# compute loss
		loss = criterion(predictions, labels)
		losses.append(loss.item())

		# backward
		loss.backward()
		optimizer.step()
		total_acc += (predictions.argmax(1) == labels).sum().item()
		total_count += labels.size(0)
		if idx % log_interval == 0 and idx > 0:
			elapsed = time.time() - start_time
			print(
				"| epoch {:3d} | {:5d}/{:5d} batches "
				"| accuracy {:8.3f}".format(
					epoch, idx, len(train_dataloader), total_acc / total_count
				)
			)
			total_acc, total_count = 0, 0
			start_time = time.time()

	epoch_acc = total_acc / total_count
	epoch_loss = sum(losses) / len(losses)
	return epoch_acc, epoch_loss


train_acc, train_loss = train(lenet_model, optimizer, criterion, train_dataloader, device)

train_acc, train_loss


In [None]:

def evaluate(model, criterion, valid_dataloader, device):
	model.eval()
	total_acc, total_count = 0, 0
	losses = []

	with torch.no_grad():
		for idx, (inputs, labels) in enumerate(valid_dataloader):
			inputs = inputs.to(device)
			labels = labels.to(device)

			predictions = model(inputs)

			loss = criterion(predictions, labels)
			losses.append(loss.item())

			total_acc += (predictions.argmax(1) == labels).sum().item()
			total_count += labels.size(0)

	epoch_acc = total_acc / total_count
	epoch_loss = sum(losses) / len(losses)
	return epoch_acc, epoch_loss


eval_acc, eval_loss = evaluate(lenet_model, criterion, valid_dataloader, device)

eval_acc, eval_loss

#### **Training**


In [None]:
num_classes = len(train_data.classes)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

lenet_model = LeNetClassifier(num_classes)
lenet_model.to(device)

criterion = torch.nn.CrossEntropyLoss()
learning_rate = 2e-4
optimizer = optim.Adam(lenet_model.parameters(), learning_rate)

num_epochs = 10
save_model = './model'

train_accs, train_losses = [], []
eval_accs, eval_losses = [], []
best_loss_eval = 100

for epoch in range(1, num_epochs + 1):
	epoch_start_time = time.time()
	# Training
	train_acc, train_loss = train(lenet_model, optimizer, criterion, train_dataloader, device, epoch, log_interval = 10)
	train_accs.append(train_acc)
	train_losses.append(train_loss)

	# Evaluation
	eval_acc, eval_loss = evaluate(lenet_model, criterion, valid_dataloader, device)
	eval_accs.append(eval_acc)
	eval_losses.append(eval_loss)

	# Save best model
	if eval_loss < best_loss_eval:
		torch.save(lenet_model.state_dict(), save_model + '/lenet_model.pt')

	# Print loss, acc end epoch
	print("-" * 59)
	print(
		"| End of epoch {:3d} | Time: {:5.2f}s | Train Accuracy {:8.3f} | Train Loss {:8.3f} "
		"| Valid Accuracy {:8.3f} | Valid Loss {:8.3f} ".format(
			epoch, time.time() - epoch_start_time, train_acc, train_loss, eval_acc, eval_loss
		)
	)
	print("-" * 59)

	# Load best model
	lenet_model.load_state_dict(torch.load(save_model + '/lenet_model.pt', weights_only = True))
	lenet_model.eval()


def plot_result(num_epochs, train_accs, eval_accs, train_losses, eval_losses):
	epochs = list(range(num_epochs))
	fig, axs = plt.subplots(nrows = 1, ncols = 2, figsize = (12, 6))
	axs[0].plot(epochs, train_accs, label = "Training")
	axs[0].plot(epochs, eval_accs, label = "Evaluation")
	axs[1].plot(epochs, train_losses, label = "Training")
	axs[1].plot(epochs, eval_losses, label = "Evaluation")
	axs[0].set_xlabel("Epochs")
	axs[1].set_xlabel("Epochs")
	axs[0].set_ylabel("Accuracy")
	axs[1].set_ylabel("Loss")
	plt.legend()


plot_result(num_epochs, train_accs, eval_accs, train_losses, eval_losses)

#### **Evaluation & Prediction**


In [None]:
test_acc, test_loss = evaluate(lenet_model, criterion, test_dataloader, device)
test_acc, test_loss

#### **Inference**

In [None]:
def load_model(model_path, num_classes = 5):
	lenet_model = LeNetClassifier(num_classes)
	lenet_model.load_state_dict(torch.load(model_path, weights_only = True))
	lenet_model.eval()
	return lenet_model


def inference(img_path, model):
	image = Image.open(img_path)
	img_size = 150

	img_transform = transforms.Compose([
		transforms.Resize((150, 150)),
		transforms.ToTensor(),
	])
	img_new = img_transform(image)
	img_new = torch.unsqueeze(img_new, 0)
	with torch.no_grad():
		predictions = model(img_new)
	preds = nn.Softmax(dim = 1)(predictions)
	p_max, yhat = torch.max(preds.data, 1)
	return p_max.item(), yhat.item()


model = load_model('lenet_model_cassava.pt')
preds = inference('data/test/cbsd/test-cbsd-1.jpg', model)
preds


In [None]:
train_data.class_to_idx

In [None]:
idx2label = {
	0: 'cbb',
	1: 'cbsd',
	2: 'cgm',
	3: 'cmd',
	4: 'healthy',
}

idx2label[4]