In [1]:
#--Tensorflow
from tensorflow.keras.layers import Conv2D, Activation, Flatten, Dense
from tensorflow.keras import Sequential
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.data import AUTOTUNE
import tensorflow as tf
#--Others
from imutils import paths
import matplotlib.pyplot as plt
import os

In [24]:
((trainX, trainLabels), (textX, testLabels)) = cifar10.load_data()

#### Prepare preprocess function

In [2]:
def load_images(imagePath):
    #--Image
    image = tf.io.read_file(imagePath)
    image = tf.io.decode_jpeg(image, channels=3)
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    image = tf.image.resize(image, (156,156))
    #--Labels
    label = tf.strings.split(imagePath, os.path.sep)[-2]
    return(image, label)

#### There are two ways to do augmentation: with a series of operations or sequential layers

In [4]:
#-----------------------1: Series of operations ----------------------------
#This way needs to define the augmentation on the go
def augment_using_ops(images, labels):
    images = tf.image.random_flip_left_right(images)
    images = tf.image.random_flip_up_down(images)
    images = tf.image.rot90(images)
    return(images, labels)

In [19]:
#-----------------------2: Layers ----------------------------
def augment_using_layers(images, labels, aug):
    images = aug(images)
    return (images, labels)

#### Define the "basic" pipeline

In [20]:
batch_size = 8

imagePaths = list(paths.list_images("G:\\pyimage_univ\\CNN_tf\\tf_data\\cancer_data\\trainig"))

ds = tf.data.Dataset.from_tensor_slices(imagePaths)
ds = (ds.shuffle(len(imagePaths), 42).map(load_images, num_parallel_calls=AUTOTUNE).
        cache().batch(batch_size))

In [25]:
#define augmentation for using layers
trainAug = Sequential([
	preprocessing.Rescaling(scale=1.0 / 255),
	preprocessing.RandomFlip("horizontal_and_vertical"),
	preprocessing.RandomZoom(
		height_factor=(-0.05, -0.15),
		width_factor=(-0.05, -0.15)),
	preprocessing.RandomRotation(0.3)
])

testAug = Sequential([
	preprocessing.Rescaling(scale=1.0 / 255)
])

In [27]:
#### Add augmentation for layers
trainDS = tf.data.Dataset.from_tensor_slices((trainX, trainLabels))
trainDS = (
	trainDS
	.shuffle(batch_size * 100)
	.batch(batch_size)
	.map(lambda x, y: (trainAug(x), y),num_parallel_calls=tf.data.AUTOTUNE)
	.prefetch(tf.data.AUTOTUNE)
)

#### And for test too
testDS = tf.data.Dataset.from_tensor_slices((textX, testLabels))
testDS = (
	testDS
	.batch(batch_size)
	.map(lambda x, y: (testAug(x), y), num_parallel_calls=tf.data.AUTOTUNE)
	.prefetch(tf.data.AUTOTUNE)
)

In [None]:
model = Sequential()
model.add(Conv2D(32, (3, 3), padding="same",
	input_shape=(32, 32, 3)))
model.add(Activation("relu"))
model.add(Flatten())
model.add(Dense(10))
model.add(Activation("softmax"))

In [None]:
model.compile(loss="sparse_categorical_crossentropy",
	optimizer="sgd", metrics=["accuracy"])

# train the model
print("[INFO] training model...")
H = model.fit(
	trainDS,
	validation_data=testDS,
	epochs=EPOCHS)

In [None]:
(loss, accuracy) = model.evaluate(testDS)