In [None]:
import numpy
import matplotlib.pyplot as plt

from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, BatchNormalization, Activation
from keras.constraints import maxnorm
from keras.layers.convolutional import Conv2D, MaxPooling2D
from keras.utils import np_utils
from keras.preprocessing import image_dataset_from_directory

from sklearn.model_selection import train_test_split

from PIL import Image

import glob
import os
import zipfile
from struct import unpack
import shutil
from tqdm import tqdm

In [None]:
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [None]:
if not os.path.exists('kaggle'):
  os.mkdir('kaggle')
# TODO setup kaggle credentials in an external, non-comitted file
with open('kaggle/kaggle.json', 'w') as file:
  file.write('{"username" : "","key" : ""}')
os.environ['KAGGLE_CONFIG_DIR'] = "kaggle"

In [None]:
!kaggle datasets download -d maysee/mushrooms-classification-common-genuss-images

In [None]:
# name of the zip file you want to unzip
local_zip = 'mushrooms-classification-common-genuss-images.zip'
# opening a file with mode parameter 'r' : read existing file
zip_ref = zipfile.ZipFile(local_zip, 'r')
# extract all contents of the zip file
zip_ref.extractall('')
# close the file
zip_ref.close()

In [None]:
!rm mushrooms-classification-common-genuss-images.zip
!rm -rf mushrooms
!rm -rf sample_data

In [None]:
marker_mapping = {
    0xffd8 : "Start of Image",
    0xffe0 : "Application Default Header",
    0xffdb : "Quantization Table",
    0xffc0 : "Start of Frame",
    0xffc4 : "Define Huffman Table",
    0xffda : "Start of Scan",
    0xffd9 : "End of Image"
}

class JPEG:
    def __init__(self, image_file):
        with open(image_file, 'rb') as f:
            self.img_data = f.read()
    
    def decode(self):
        data = self.img_data
        while(True):
            marker, = unpack(">H", data[0:2])
            # print(marker_mapping.get(marker))
            if marker == 0xffd8:
                data = data[2:]
            elif marker == 0xffd9:
                return
            elif marker == 0xffda:
                data = data[-2:]
            else:
                lenchunk, = unpack(">H", data[2:4])
                data = data[2+lenchunk:]            
            if len(data) == 0:
                break        

bads = []

for img in tqdm(glob.glob('Mushrooms/**/*')):
  image = JPEG(img)
  try:
    image.decode()   
  except:
    bads.append(img)

for name in tqdm(bads):
  print('[X]',name)
  os.remove(name)

In [None]:
shrooms = glob.glob('Mushrooms/**/*')

train, test = train_test_split(shrooms, test_size = 0.2)

names = set()
for file in tqdm(shrooms):
  name = file.replace('Mushrooms/','').split('/')[0]
  names.add(name)

for name in tqdm(names):
  for set_name in ('train','test'):
    os.makedirs('{}/{}'.format(set_name,name), exist_ok = True)

for source in tqdm(train):
  target = source.replace('Mushrooms', 'train')
  shutil.copy(source,target)

for source in tqdm(test):
  target = source.replace('Mushrooms', 'test')
  shutil.copy(source,target)


In [None]:
dataset_config = {
    'labels'            : 'inferred',
    'label_mode'        : 'categorical',
    'class_names'       : ['Russula', 'Entoloma', 'Amanita', 'Lactarius', 'Cortinarius', 'Hygrocybe', 'Agaricus', 'Suillus', 'Boletus'],
    'color_mode'        : 'grayscale',
    'batch_size'        : 64,
    'shuffle'           : True,
    'seed'              : 42,
    'validation_split'  : 0.25,
    'image_size'        : (256, 256),
    'interpolation'     : 'bilinear',
    'follow_links'      : False
    }

train_dataset = image_dataset_from_directory(
    'train',
    labels            = dataset_config['labels'],
    label_mode        = dataset_config['label_mode'],
    class_names       = dataset_config['class_names'],
    color_mode        = dataset_config['color_mode'],
    batch_size        = dataset_config['batch_size'],
    image_size        = dataset_config['image_size'],
    shuffle           = dataset_config['shuffle'],
    seed              = dataset_config['seed'],
    validation_split  = dataset_config['validation_split'],
    subset            = 'training',
    interpolation     = dataset_config['interpolation'],
    follow_links      = dataset_config['follow_links'],
)

test_dataset = image_dataset_from_directory(
    'test',
    labels            = dataset_config['labels'],
    label_mode        = dataset_config['label_mode'],
    class_names       = dataset_config['class_names'],
    color_mode        = dataset_config['color_mode'],
    batch_size        = dataset_config['batch_size'],
    image_size        = dataset_config['image_size'],
    shuffle           = dataset_config['shuffle'],
    seed              = dataset_config['seed'],
    subset            = None,
    interpolation     = dataset_config['interpolation'],
    follow_links      = dataset_config['follow_links'],
)

val_dataset = image_dataset_from_directory(
    'train',
    labels            = dataset_config['labels'],
    label_mode        = dataset_config['label_mode'],
    class_names       = dataset_config['class_names'],
    color_mode        = dataset_config['color_mode'],
    batch_size        = dataset_config['batch_size'],
    image_size        = dataset_config['image_size'],
    shuffle           = dataset_config['shuffle'],
    seed              = dataset_config['seed'],
    validation_split  = dataset_config['validation_split'],
    subset            = 'validation',
    interpolation     = dataset_config['interpolation'],
    follow_links      = dataset_config['follow_links'],
)

In [None]:
# https://blogs.oracle.com/meena/simple-neural-network-model-using-keras-and-grid-search-hyperparameterstuning
activation = ['relu','tanh','sigmoid','linear']
momentum = [0.0, 0.2, 0.5, 0.6, 0.8, 0.9]
learn_rate = [0.001, 0.01, 0.1, 0.2, 0.3]
weight_constraint = [1, 2, 3, 4, 5]
epochs = [1, 10, 20, 25, 50, 100, 150]
batch_size = [8, 16, 32, 64, 128]
param_grid = {
    'activation'        : activation,
    'momentum'          : momentum,
    'learn_rate'        : learn_rate,
    'weight_constraint' : weight_constraint,
    'epochs'            : epochs,
    'batch_size'        : batch_size,
}

In [None]:
# Create the model
model = Sequential()

model.add(Conv2D(32, (3, 3), input_shape=(dataset_config['image_size'][0], dataset_config['image_size'][1], 1), padding='same'))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())

model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(BatchNormalization())

model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(BatchNormalization())

model.add(Conv2D(128, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())

model.add(Flatten())
model.add(Dropout(0.2))

model.add(Dense(256, kernel_constraint=maxnorm(3)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Dense(128, kernel_constraint=maxnorm(3)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())

model.add(Dense(9))
model.add(Activation('softmax'))

epochs = 50
optimizer = 'Adam'

model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
model.fit(train_dataset, validation_data = val_dataset, epochs = epochs, batch_size = 64)

In [None]:
# Final evaluation of the model
scores = model.evaluate(test_dataset, verbose = 0)
print("Accuracy: %.2f%%" % (scores[1] * 100))