In [None]:
import gc
import os
import warnings
import imghdr
import cv2
import glob

import numpy as np
from matplotlib import pyplot as plt
from pathlib import Path

from PIL import Image

In [None]:
import tensorflow as tf
from tensorflow import keras

from keras.layers import *
from keras.models import *
from keras.losses import *

from keras.utils import image_dataset_from_directory, load_img, img_to_array

print(tf.config.list_physical_devices('GPU'))

In [None]:
CAT, DOG = 'Cats', 'Dogs'
uri = 'downloads/{}/{}.jpg'

# baumiao

In [None]:
input_shape = (256, 256, 3) # (heigt, width, D)
image_size = input_shape[:2]
batch_size = 32
N_channels = input_shape[2]
epochs = 20

In [None]:
exp_ext = set()
ds = np.array([])

## import dataset

In [None]:
# method 1
warnings.filterwarnings("error")
bad_paths = []

for e in [CAT,DOG]:
    for pth in Path(f'downloads/{e}').rglob("*"):
        
        ext = imghdr.what(pth)
        if ext is None:
            print(pth,'removed')
            os.remove(pth)
        else:
            exp_ext.add(ext)
            try:
                with Image.open(pth) as img:
                    pxl = np.array(img)
                    if pxl.ndim < 3 or pxl.shape[-1] < 3:
                    # if pxl.ndim != 2:
                        bad_paths.append(pth)
                    else:
                        pass
            except Exception as e:
                bad_paths.append(pth)
                print(pth,e)

warnings.filterwarnings("default")

In [None]:
# method 2
bad_paths = []
shapeset = set()

img_paths = glob.glob(os.path.join('downloads','*/*.*')) # assuming you point to the directory containing the label folders.
for image_path in img_paths:

    try:
        img = load_img(image_path, target_size=image_size)
        img = img_to_array(img)
        shapeset.add(img.shape)
        # img_bytes = tf.io.read_file(image_path)
        # decoded_img = tf.decode_image(img_bytes)
    except Exception as inst:
        print('trouble at', image_path, ':', inst)
        bad_paths.append(image_path)

In [None]:
# method 3
bad_paths = []

for folder_name in ("Cats", "Dogs"):
    folder_path = os.path.join("downloads", folder_name)
    for fname in os.listdir(folder_path):
        fpath = os.path.join(folder_path, fname)
        
        try:
            fobj = open(fpath, "rb")
            is_jfif = tf.compat.as_bytes("JFIF") in fobj.peek(10)
        finally:
            fobj.close()
        if not is_jfif:
            print(f"Found bad path {fpath}")
            bad_paths.append(fpath)

In [None]:
len(bad_paths)

In [None]:
for pth in bad_paths:
    try:
        os.remove(pth)
        print(pth,'removed')
    except Exception as e:
        print('FATAL ERROR @', pth, ':', e)

In [None]:
# def make_dataset(x, y):
#     imgs = []
#     labels = []
#     img_size = (128, 128)

#     for i, j in zip(x, y):
#         img = load_img(i, target_size=img_size)
#         img = img_to_array(img)
#         imgs.append(img)
#         labels.append(j)
#     imgs, labels = np.array(imgs), np.array(labels)
#     return imgs, labels


# x_train, y_train = make_dataset(
#     [pth for e in [CAT,DOG] for pth in Path(f'downloads/{e}').rglob("*") ],
#     [e for e in [CAT,DOG] for pth in Path(f'downloads/{e}').rglob("*")]
#     )

In [None]:
train_ds = image_dataset_from_directory(
    "downloads",
    validation_split=0.2,
    subset="training",
    seed=42,
    image_size=image_size,
    batch_size=batch_size,
    label_mode="binary",
)

In [None]:
val_ds = image_dataset_from_directory(
    "downloads",
    validation_split=0.2,
    subset="validation",
    seed=42,
    image_size=image_size,
    batch_size=batch_size,
    label_mode="binary",
)

In [None]:
class_names = [DOG, CAT]
N_classes = len(class_names)

In [None]:
for image_batch, labels_batch in train_ds:
  print(image_batch.shape)
  print(labels_batch.shape)
  break

In [None]:
# # AUTOTUNE = tf.data.AUTOTUNE
# train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
# val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
model = Sequential(
    [
        Rescaling(1.0 / 255, input_shape=input_shape),
        Lambda(lambda x: (x ** 2)),

        Conv2D(16, N_channels, padding="same", activation="relu"),
        MaxPooling2D(),

        Conv2D(32, N_channels, padding="same", activation="relu"),
        MaxPooling2D(),

        Conv2D(64, N_channels, padding="same", activation="relu"),
        MaxPooling2D(),

        Conv2D(128, N_channels, padding="same", activation="relu"),
        MaxPooling2D(),

        Flatten(),
        Dense(256, activation="relu"),

        Dense(32, activation="relu"),
        
        Dense(8, activation="relu"),

        Dense(N_classes),
    ]
)

In [None]:
model.compile(
    optimizer="adam",
    loss=SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"],
)

In [None]:
model.summary()

In [None]:
history = model.fit(train_ds, validation_data=val_ds, epochs=epochs)

In [None]:
ths = 7
acc = history.history['accuracy'][:ths]
val_acc = history.history['val_accuracy'][:ths]

loss = history.history['loss'][:ths]
val_loss = history.history['val_loss'][:ths]

epochs_range = range(ths)

plt.figure(figsize=(7, 3))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()