In [None]:
# import google drive
from google.colab import drive

# mount google drive
drive.mount('/content/drive')


In [None]:
# unzip data.zip
!unzip "/content/drive/MyDrive/studies/ai-project/data.zip" -d /content/data_dir


In [None]:
# imports
import numpy as np
import os
import PIL
import PIL.Image
import tensorflow as tf
import tensorflow_datasets as tfds
import pathlib


In [None]:
# set parameters for the loader
img_height = 1376
img_width = 1039
batch_size = 3


In [None]:
# prepare data_dir
data_dir = "/content/data_dir"
data_dir = pathlib.Path(data_dir)


In [None]:
# count
image_count = len(list(data_dir.glob('*/*.jpg')))


In [None]:
# list_ds
list_ds = tf.data.Dataset.list_files(str(data_dir/'*/*'), shuffle=False)
list_ds = list_ds.shuffle(image_count, reshuffle_each_iteration=False)


In [None]:
# class_names
class_names = np.array(sorted([item.name for item in data_dir.glob('*') if item.name != "LICENSE.txt"]))


In [None]:
# train_ds and val_ds
val_size = int(image_count * 0.2)
train_ds = list_ds.skip(val_size)
val_ds = list_ds.take(val_size)


In [None]:
# debug
print(tf.data.experimental.cardinality(train_ds).numpy())
print(tf.data.experimental.cardinality(val_ds).numpy())


In [None]:
# helper function process_path

def get_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  one_hot = parts[-2] == class_names
  # Integer encode the label
  return tf.argmax(one_hot)

def decode_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.io.decode_jpeg(img, channels=3)
  # resize the image to the desired size
  return tf.image.resize(img, [img_height, img_width])

def process_path(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label
  

In [None]:
# Set `num_parallel_calls` so multiple images are loaded/processed in parallel.
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.map(process_path, num_parallel_calls=AUTOTUNE)
val_ds = val_ds.map(process_path, num_parallel_calls=AUTOTUNE)


In [None]:
# debug
for image, label in train_ds.take(1):
  print("Image shape: ", image.numpy().shape)
  print("Label: ", label.numpy())
  

In [None]:
# helper function configure_for_performance
def configure_for_performance(ds):
  ds = ds.cache()
  ds = ds.shuffle(buffer_size=1000)
  ds = ds.batch(batch_size)
  ds = ds.prefetch(buffer_size=AUTOTUNE)
  return ds


In [None]:
# configure for performance
train_ds = configure_for_performance(train_ds)
val_ds = configure_for_performance(val_ds)


In [None]:
# debug
tfio.experimental.image.decode_tiff(train_ds[0])
image_batch, label_batch = next(iter(train_ds))

plt.figure(figsize=(10, 10))
for i in range(9):
  ax = plt.subplot(3, 3, i + 1)
  plt.imshow(image_batch[i].numpy().astype("uint8"))
  label = label_batch[i]
  plt.title(class_names[label])
  plt.axis("off")


In [None]:
# create model
model = tf.keras.Sequential([
  tf.keras.layers.experimental.preprocessing.Rescaling(1./255),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dense(len(class_names))
])


In [None]:
# compile model
model.compile(
  optimizer='adam',
  loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
  metrics=['accuracy'])


In [None]:
# train model
epochs=10
model.fit(
  train_ds,
  validation_data=val_ds,
  epochs=epochs
)


In [None]:
# prepare data_dir
data_dir = "data_dir"
train_dir = pathlib.Path(data_dir + "/train")
test_dir = pathlib.Path(data_dir + "/test")


In [None]:
# get train and test images
train_images_paths = list(train_dir.glob("*/*.*"))
test_images_paths = list(test_dir.glob("*/*.*"))


In [None]:
# set parameters for the loader
img_height = 1376
img_width = 1039
batch_size = 3


In [None]:
# prepare training data set
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
        train_dir,
        validation_split=0.2,
        subset="training",
        seed=123,
        image_size=(img_height, img_width),
        batch_size=batch_size)


In [None]:
# prepare test data set
test_ds = tf.keras.preprocessing.image_dataset_from_directory(
        test_dir,
        validation_split=0.2,
        subset="validation",
        seed=123,
        image_size=(img_height, img_width),
        batch_size=batch_size)


In [None]:
# prepare class names
class_names = train_ds.class_names


In [None]:
# Configure the dataset for performance
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE)


In [None]:
# normalize data from [0, 255] to [0, 1]
normalization_layer = tf.keras.layers.experimental.preprocessing.Rescaling(1./255)


In [None]:
# prepare normalized data set and use it
train_normalized_ds = train_ds.map(lambda x, y: (normalization_layer(x), y))
train_image_batch, train_labels_batch = next(iter(train_normalized_ds))

test_normalized_ds = test_ds.map(lambda x, y: (normalization_layer(x), y))
test_image_batch, test_labels_batch = next(iter(test_normalized_ds))


In [None]:
# create model
model = tf.keras.Sequential([
  tf.keras.layers.experimental.preprocessing.Rescaling(1./255),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dense(len(class_names))
])


In [None]:
# compile model
model.compile(
  optimizer='adam',
  loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
  metrics=['accuracy'])
  

In [None]:
# model summary
model.summary()


In [None]:
# train model
model.fit(
  train_ds,
  validation_data=test_ds,
  epochs=3
)

In [None]:
# Evaluate accuracy
test_loss, test_acc = model.evaluate(test_ds, verbose=2)

print('\nTest accuracy:', test_acc)
