In [4]:
import glob
import os
import time

import cv2
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from torch.nn import CrossEntropyLoss
from torch.optim import Adam

from torch.utils.tensorboard import SummaryWriter

from copy import deepcopy
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

#### 数据预处理


In [5]:
path = './flower_photos/'
w = 200
h = 200
c = 3

In [6]:
def read_img(path):
    cate=[path+x for x in os.listdir(path) if os.path.isdir(path+x)] 
    imgs = []
    labels = []
    for idx, folder in enumerate(cate):
        for im in glob.glob(folder + '/*.jpg'):
            img = cv2.imread(im)
            img = cv2.resize(img, (w, h))
            imgs.append(img)
            labels.append(idx)
    return np.asarray(imgs, np.float32), np.asarray(labels, np.int32)

In [7]:
data, label = read_img(path)

data = torch.FloatTensor(data).permute(0, 3, 1, 2)
label = torch.LongTensor(label)

data.shape, label.shape

(torch.Size([516, 3, 200, 200]), torch.Size([516]))

In [8]:
seed = 109
np.random.seed(seed)

(x_train, x_val, y_train, y_val) = train_test_split(data, label, test_size=0.20, random_state=seed)
x_train = x_train / 255
x_val = x_val / 255

flower_dict = {0:'bee', 1:'blackberry', 2:'blanket', 3:'bougainvilliea', 4:'bromelia', 5:'foxglove'}
class_list = ['bee', 'blackberry', 'blanket', 'bougainvilliea', 'bromelia', 'foxglove']

#### TORCH框架

In [9]:
class FlowerDataset(Dataset):
    def __init__(self, data, label):
        super(FlowerDataset, self).__init__()
        self.data = data
        self.label = label
    
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.label[idx]

In [10]:
'''
    model
    采取和tf一样的CNN架构
'''
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()

        self.conv_model = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(25*25*64, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 6),
        )

    def forward(self, data):
        x = self.conv_model(data)
        x = x.view(x.size()[0], -1)
        x = self.fc(x)
        return x

In [11]:
num_epochs = 100

#### TRAIN AND EVAL

In [12]:
class Stat:
    def __init__(self, training, writer=None):
        self.step = 0
        self.loss = []
        self.labels = []
        self.pred_labels = []
        self.training = training
        self.writer = writer
    
    def add(self, pred, labels, loss):
        labels = labels.numpy()
        pred = pred.detach().numpy()
        pred_labels = np.argmax(pred, axis = 1)
        self.loss.append(loss)
        self.labels.extend(labels)
        self.pred_labels.extend(pred_labels)

    def log(self):
        self.step += 1
        acc = accuracy_score(self.labels, self.pred_labels)
        loss = sum(self.loss) / len(self.loss)
        self.loss = []
        self.labels = []
        self.pred_labels = []
        if not self.writer:
            return loss, acc
        if self.training:
            self.writer.add_scalar('train_loss', loss, self.step)
            self.writer.add_scalar('train_acc', acc, self.step)
        else:
            self.writer.add_scalar('dev_loss', loss, self.step)
            self.writer.add_scalar('dev_acc', acc, self.step)
        return loss, acc

In [13]:
def train(model, train_data_loader, dev_data_loader):
    loss_func = CrossEntropyLoss()
    optimizer = Adam(
        model.parameters(),
        lr=0.001,
        weight_decay=0.01,
    )

    writer = SummaryWriter('./summary/' + time.strftime('%m-%d_%H.%M', time.localtime()))
    train_stat = Stat(training=True, writer=writer)
    dev_stat = Stat(training=False, writer=writer)


    best_acc, best_net = 0.0, None
    for epoch in range(num_epochs):
        print(f"--- epoch: {epoch + 1} ---")
        for iter, batch in enumerate(train_data_loader):
            model.train()
            data, labels = batch[0], batch[1]
            pred_outputs = model(data)
            loss = loss_func(pred_outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_stat.add(pred_outputs, labels, loss.item())
        train_loss, train_acc = train_stat.log()

        model.eval()
        with torch.no_grad():
            for batch in dev_data_loader:
                data, labels = batch[0], batch[1]
                pred_outputs = model(data)
                loss = loss_func(pred_outputs, labels)
                dev_stat.add(pred_outputs, labels, loss.item())
        dev_loss, dev_acc = dev_stat.log()
        print(  f"training loss: {train_loss:.4f}, acc: {train_acc:.2%}, " \
                f"dev loss: {dev_loss:.4f}, acc: {dev_acc:.2%}.")

        if dev_acc > best_acc:
            best_acc = dev_acc
            best_net = deepcopy(model.state_dict())
            
    print(f"best dev acc: {best_acc:.4f}")
    return best_net

#### TEST

In [183]:
def test(model, test_data_loader):
    test_stat = Stat(training=False)
    model.eval()
    with torch.no_grad():
        for batch in test_data_loader:
            data, labels = batch[0], batch[1]
            pred_outputs = model(data)
            test_stat.add(pred_outputs, labels, 0)

    report = classification_report(
        test_stat.labels,
        test_stat.pred_labels,
        target_names=class_list,
        digits=4,
        zero_division=0,
    )
    print(report)

In [184]:
path_test = '../TestImages/'
imgs=[]
for im in glob.glob(path_test+'/*.jpg'):
    img=cv2.imread(im)           
    img=cv2.resize(img,(w,h)) 
    imgs.append(img)  
imgs = np.asarray(imgs,np.float32)    

test_data = imgs / 255
test_label = [x for x in range(len(test_data))]

test_data = torch.FloatTensor(test_data).permute(0, 3, 1, 2)
test_label = torch.LongTensor(test_label)
test_data.shape, test_label.shape

(torch.Size([6, 3, 200, 200]), torch.Size([6]))

In [185]:
test_set = FlowerDataset(test_data, test_label)
test_data_loader = DataLoader(test_set, batch_size=len(test_data), shuffle=False)
test(net, test_data_loader)

                precision    recall  f1-score   support

           bee     1.0000    1.0000    1.0000         1
    blackberry     1.0000    1.0000    1.0000         1
       blanket     1.0000    1.0000    1.0000         1
bougainvilliea     1.0000    1.0000    1.0000         1
      bromelia     1.0000    1.0000    1.0000         1
      foxglove     1.0000    1.0000    1.0000         1

      accuracy                         1.0000         6
     macro avg     1.0000    1.0000    1.0000         6
  weighted avg     1.0000    1.0000    1.0000         6



In [186]:
# net = MyModel()
# net.load_state_dict(torch.load('./model/model-04-21_20.44.pkl'))
# test(net, test_data_loader)