In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
from torchvision import transforms, datasets
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt

In [None]:
class Bottleneck(nn.Module):
    exp = 4

    def __init__( self, in_channels, out_channels, stride=1):
        super( Bottleneck, self ).__init__()
        self.conv1 = nn.Conv2d( in_channels, out_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d( out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d( out_channels )
        self.conv3 = nn.Conv2d( out_channels, self.exp * out_channels, kernel_size=1, bias=False )
        self.bn3 = nn.BatchNorm2d( self.exp * out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.exp * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d( in_channels, self.exp * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d( self.exp * out_channels )
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [None]:
class ResNet(nn.Module):
    def __init__( self, block, block_num, num_classes = 5 ):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d( 3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False )
        self.bn1 = nn.BatchNorm2d( self.in_channels )
        self.relu = nn.ReLU( inplace = True )
        self.maxpool = nn.MaxPool2d( kernel_size = 3, stride = 2, padding = 1 )
        
        self.layer1 = self._make_layer( block, 64, block_num[0], stride = 1 )
        self.layer2 = self._make_layer( block, 128, block_num[1], stride = 2 )
        self.layer3 = self._make_layer( block, 256, block_num[2], stride = 2 )
        self.layer4 = self._make_layer( block, 512, block_num[3], stride = 2 )
        self.avgpool = nn.AdaptiveAvgPool2d( ( 1, 1 ) )
        # self.fc = nn.Linear( 512 * block.exp, num_classes )
        # '''
        self.fc = nn.Sequential( nn.Linear( 512 * block.exp, 512),
                                 nn.ReLU(),
                                 nn.Dropout( 0.3 ),
                                 nn.Linear( 512, num_classes )
                                )
        # '''
        
        for m in self.modules():
            if isinstance( m, nn.Conv2d ):
                nn.init.kaiming_normal_( m.weight, mode = 'fan_out', nonlinearity = 'relu' )
                

    def _make_layer( self, block, channel, block_num, stride ):
        strides = [ stride ] + [1]*( block_num - 1 )
        layers = []
        for stride in strides:
            layers.append( block( self.in_channels, channel, stride ) )
            self.in_channels = channel * block.exp
        return nn.Sequential( *layers )

    def forward( self, x ):
        x = self.conv1( x )
        x = self.bn1( x )
        x = F.relu( x )
        x = F.max_pool2d( x, kernel_size = 3, stride = 2, padding = 1 )
        
        x = self.layer1( x )
        x = self.layer2( x )
        x = self.layer3( x )
        x = self.layer4( x )
        
        x = self.avgpool( x )
        x = torch.flatten( x, 1 )
        x = self.fc( x )
        return x

In [None]:
class CassavaDataset( Dataset ):
    def __init__( self, csv_f, img_folder, transform, s_num = 4000 ):
        self.img_labels = pd.read_csv( csv_f )[ :s_num ]
        self.img_folder = img_folder
        self.transform = transform

    def __len__( self ):
        return len( self.img_labels )

    def __getitem__( self, idx ):
        img_name = str( self.img_labels.iloc[ idx, 0 ] )
        label = self.img_labels.iloc[ idx, 1 ]
        img_path = os.path.join( self.img_folder, img_name )
        image = Image.open( img_path ).convert( "RGB" )
        if self.transform:
            image = self.transform( image )
        return image, label

In [None]:
pre = transforms.Compose( [ transforms.RandomResizedCrop( 224 ),
                                   transforms.RandomHorizontalFlip(),
                                   transforms.RandomRotation( 15 ),
                                   transforms.ToTensor(),
                                   transforms.Normalize( ( 0.5, 0.5, 0.5 ), ( 0.5, 0.5, 0.5 ) )
                            ] )

csv_f = '../data/cassava-leaf-disease-classification/train.csv'
img_folder = '../data/cassava-leaf-disease-classification/train_images'
data = CassavaDataset( csv_f, img_folder, transform = pre, s_num = 4000 )

len( data )

In [None]:
train_size = int( 0.9 * len( data ) )
# train_dataset = data[ :train_size ]
train_dataset, test_dataset = random_split( data, [train_size, (len( data ) - train_size) ])

train_dataset

In [None]:
batch_size = 32

trainloader = DataLoader( train_dataset, batch_size = batch_size, shuffle = True )
testloader = DataLoader( test_dataset, batch_size = batch_size, shuffle = False )

In [None]:
num_classes = 5
device = torch.device( "cuda:0" if torch.cuda.is_available() else"cpu" )
def ResNet50( num_classes = num_classes ):
    return ResNet( Bottleneck, [ 3, 4, 6, 3 ], num_classes )

In [None]:
net = ResNet50()
net.to( device )
loss_f = nn.CrossEntropyLoss()
optimizer = optim.Adam( net.parameters(), lr = 0.01 )
t_steps = len( trainloader )
test_num = len( test_dataset )

for epoch in range( 5 ):
    net.train()
    running_loss = 0.0
    for step, data in enumerate( trainloader, start = 0 ):
        imgs, labels = data
        rate = ( ( step + 1 ) / t_steps )
        optimizer.zero_grad()
        outputs = net( imgs.to( device ) )
        loss = loss_f( outputs, labels.to( device ) )
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        
        a = '*' * int( rate * 50 )
        b = '.' * int( ( 1 - rate ) * 50 )
        print( '\r train loss: {:^3.0f}%[ {} -> {} ]{:.3f}'.format( int( rate * 100 ),a, b, loss ), end = '' )
    print()    
    
    net.eval()
    r = 0.0
    best_R = 0.0
    with torch.no_grad():
        for data_test in testloader:
            t_img, t_lbl = data_test
            outputs = net( t_img.to( device ) )
            pred_y = torch.max( outputs, dim = 1 )[1]
            r += ( pred_y == t_lbl.to( device ) ).sum().item()
        acc_R = r / test_num
        if acc_R > best_R:
            best_R = acc_R
            torch.save( net.state_dict(), '../path/Net_best.pth' )
        print( '[epoch %d] train_loss: %.3f    test_accuracy: %.3f' % ( epoch + 1, running_loss, acc_R ) )
        
        
    print( 'finished' )

In [None]:
from PIL import Image


img_transform = transforms.Compose([transforms.Resize((224, 224)),
                                    transforms.ToTensor(),
                                    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
                                   )
img = Image.open( '../data/cassava-leaf-disease-classification/test_images/2216849948.jpg' )
plt.imshow(img)
import json

img = img_transform(img)
img = torch.unsqueeze(img, dim=0)

with open('../data/cassava-leaf-disease-classification/label_num_to_disease_map.json', 'r') as f:
    sorted_class = json.load(f)
# print(sorted_class)
net.load_state_dict(torch.load('../path/Net_best.pth'))
net.eval()

with torch.no_grad():
    output = torch.squeeze(net(img))
    predict = torch.softmax(output, dim=0)
    p_class = torch.argmax(predict).numpy()
    print(p_class, sorted_class[str(p_class)], predict[p_class].item())

In [None]:
import csv

output_path = 'submission.csv'
with open(output_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow( ['image_id', 'label'] )
    writer.writerow( ['2216849948.jpg', p_class ] )