Cat Dog Classification
===

In [2]:
#importing the packages
import tensorflow as tf
import os
import re
import numpy as np
import zipfile
import matplotlib.pyplot as plt

# TensorFlow 2.x equivalents
from tensorflow import keras
from tensorflow.keras.callbacks import TensorBoard

In [11]:
from google.colab import drive
drive.mount('/content/drive')

# Now you can access files in your Google Drive


Mounted at /content/drive


#### Define some mathematical functions

In [3]:
# Here 0 means Cat and 1 means Dog
CAT = 0
DOG = 1
#Returns whether the low memory mode is used.
IS_LOW_MEMORY_MODE = True
#current working directory of a process.
cwd = os.getcwd()
#This method is called when RandomState is initialized
np.random.seed(2124)

In [16]:
import os
import zipfile

def prepare_file():
    # Define paths to your files
    cwd = '/content/drive/MyDrive/data'  # Update this path as needed
    file_list = ['train', 'test']
    flag = True

    for file_name in file_list:
        zip_filename = file_name + '.zip'
        dest_filename = os.path.join(cwd, zip_filename)

        if os.path.exists(dest_filename):
            images_path = os.path.join(cwd, file_name)

            with zipfile.ZipFile(dest_filename, 'r') as zip_ref:
                zip_ref.extractall(images_path)
                print(f'Extracted {zip_filename} to {images_path}')
        else:
            print(f'{zip_filename} does not exist in {cwd}')
            flag = False

    return flag


In [5]:
#Method to read the image label
def read_image_label_list(folder_dir):
    dir_list = os.listdir(os.path.join(cwd,folder_dir))
    filenames = []
    labels = []

    for i, d in enumerate(dir_list):
        if re.search("train",folder_dir):
            if re.search("cat", d):
                labels.append(CAT)
            else:
                labels.append(DOG)
        else:
            labels.append(-1)
        filenames.append(os.path.join(cwd, folder_dir, d))
    return filenames, labels

In [6]:
#Method to read the image from disk
def read_images_from_disk(input_queue):
    filename = input_queue[0]
    label = input_queue[1]

    file_contents = tf.read_file(filename)
    image = tf.image.decode_image(file_contents, channels=3)
    image.set_shape([None, None, 3])

    return image, label

In [7]:
#Method to generate input function
def gen_input_fn(image_list, label_list, batch_size, shuffle):

    def input_fn():
        images = tf.convert_to_tensor(image_list, dtype=tf.string)
        labels = tf.convert_to_tensor(label_list, dtype=tf.int32)

        input_queue = tf.train.slice_input_producer(
            [images, labels],
            capacity=batch_size * 5,
            shuffle=shuffle,
            name="file_input_queue"
        )

        image, label = read_images_from_disk(input_queue)
        image = tf.image.resize_images(image, (224, 224), tf.image.ResizeMethod.NEAREST_NEIGHBOR)

        image_batch, label_batch = tf.train.batch(
            [image, label],
            batch_size=batch_size,
            num_threads=1,
            name="batch_queue",
            capacity=batch_size * 10,
            allow_smaller_final_batch = False
        )

        return (
            tf.identity(image_batch, name="features"),
            tf.identity(label_batch, name="label")
        )

    return input_fn

In [8]:
#Method to train a valid input function
def train_valid_input_fn(data_dir, train_batch_size, valid_batch_size=None):
    img, labels = read_image_label_list(data_dir)
    img = np.array(img)
    labels = np.array(labels)
    data_size = img.shape[0]

    print("Data size: " + str(data_size))
    split = int(0.7 * data_size)

    random_seq = np.random.permutation(data_size)

    img = img[random_seq]
    labels = labels[random_seq]

    if valid_batch_size == None:
        valid_batch_size = train_batch_size

    return (
        gen_input_fn(img[0:split], labels[0:split], train_batch_size, shuffle = True),
        gen_input_fn(img[split:], labels[split:], valid_batch_size, shuffle = False)
           )

In [9]:
#Method to test input function
def test_input_fn(data_dir,batch_size):
    image_list, label_list = read_image_label_list(data_dir)
    return gen_input_fn(image_list, label_list, batch_size, shuffle = False), image_list

### Data visualize

In [19]:
if prepare_file():
    print("Files completed!")

Extracted train.zip to /content/drive/MyDrive/data/train
test.zip does not exist in /content/drive/MyDrive/data


In [20]:
#Method to plot data
def plot_img(data, label=None):
    plt.ion()
    plt.figure()
    plt.imshow(data)
    if label is not None:
        plt.title(label)

In [24]:
def plot_img(image, label):
    plt.imshow(image)
    plt.title(label)
    plt.axis('off')
    plt.show()

def preview_img():
    # Check current working directory
    cwd = os.getcwd()
    print(f"Current working directory: {cwd}")

    # Verify the directory and file pattern
    directory = 'data/train'
    if os.path.exists(directory):
        files = os.listdir(directory)
        print(f"Files in '{directory}': {files}")
    else:
        print(f"Directory '{directory}' does not exist.")
        return

    # Create a dataset with the correct file pattern
    file_pattern = os.path.join(directory, '*.jpg')  # Ensure correct pattern
    dataset = tf.data.Dataset.list_files(file_pattern)

    def load_and_preprocess_image(filename):
        image = tf.io.read_file(filename)
        image = tf.image.decode_image(image, channels=3)  # Ensure 3 channels (RGB)
        image = tf.image.resize(image, [224, 224])  # Resize image if needed
        image = image / 255.0  # Normalize image
        label = tf.strings.split(filename, '/')[-1]  # Extract label from filename
        return image, label

    dataset = dataset.map(load_and_preprocess_image)
    dataset = dataset.batch(1)

    for images, labels in dataset.take(5):  # Take 5 samples from the dataset
        images = images.numpy()
        labels = labels.numpy()
        for i in range(len(images)):
            plot_img(images[i], str(labels[i]))

# Call the function to preview images
preview_img()

Current working directory: /content
Directory 'data/train' does not exist.


### Define Model

In [28]:
#Cat-Dog Method Declaration

def catdog_model(inputs, is_training):
    with tf.variable_scope('catdog', values=[inputs]):
        with slim.arg_scope(
            [slim.conv2d, slim.fully_connected],
            activation_fn=tf.nn.relu6,
            weights_initializer=tf.truncated_normal_initializer(0.0, 0.01)):

            net = inputs

            if IS_LOW_MEMORY_MODE == False:
                net = slim.repeat(net, 2, slim.conv2d, 64, [3, 3], scope='conv1')
                net = slim.max_pool2d(net, [2, 2], scope='pool1')

                net = slim.repeat(net, 2, slim.conv2d, 128, [3, 3], scope='conv2')
                net = slim.max_pool2d(net, [2, 2], scope='pool2')

                net = slim.repeat(net, 4, slim.conv2d, 256, [3, 3], scope='conv3')
                net = slim.max_pool2d(net, [2, 2], scope='pool3')
                net = slim.repeat(net, 4, slim.conv2d, 512, [3, 3], scope='conv4')
                net = slim.max_pool2d(net, [2, 2], scope='pool4')
                net = slim.repeat(net, 4, slim.conv2d, 512, [3, 3], scope='conv5')
                net = slim.max_pool2d(net, [2, 2], scope='pool5')

                net = tf.reshape(net, [-1, 7 * 7 * 512])

                net = slim.fully_connected(net, 2048, scope='fc6')
                net = slim.dropout(net, 0.5, is_training=is_training, scope='dropout6')

                net = slim.fully_connected(net, 2048, scope='fc7')
                net = slim.dropout(net, 0.5, is_training=is_training, scope='dropout7')

                net = slim.fully_connected(net, 2, activation_fn=None, scope='fc8')

            else:
                # Model for my Mac T_T
                net = tf.image.resize_images(net, (72, 72), tf.image.ResizeMethod.NEAREST_NEIGHBOR)

                net = slim.repeat(net, 1, slim.conv2d, 64, [3, 3], scope='conv1')
                net = slim.max_pool2d(net, [2, 2], scope='pool1')

                net = slim.repeat(net, 1, slim.conv2d, 128, [3, 3], scope='conv2')
                net = slim.max_pool2d(net, [2, 2], scope='pool2')

                net = slim.repeat(net, 2, slim.conv2d, 256, [3, 3], scope='conv3')
                net = slim.max_pool2d(net, [2, 2], scope='pool3')

                net = tf.reshape(net, [-1, 9 * 9 * 256])

                net = slim.fully_connected(net, 1024, scope='fc4')
                net = slim.dropout(net, 0.5, is_training=is_training, scope='dropout4')

                net = slim.fully_connected(net, 1024, scope='fc5')
                net = slim.dropout(net, 0.5, is_training=is_training, scope='dropout5')

                net = slim.fully_connected(net, 2, activation_fn=None, scope='fc6')

            return net

In [29]:
#Cat-Dog Model function
def catdog_model_fn(features, labels, mode, params):

    is_training = False
    if mode == learn.ModeKeys.TRAIN:
        is_training = True

    output = catdog_model(features, is_training)

    log_loss = None
    train_op = None
    eval_metric_ops = None

    softmax_predictions = tf.nn.softmax(output)

    if mode != learn.ModeKeys.INFER:
        onehot_labels = tf.one_hot(
            tf.cast(labels, tf.int32),
            depth = 2
        )
        log_loss = tf.identity(
            tf.losses.log_loss(
                onehot_labels,
                tf.nn.softmax(output),
                reduction = tf.losses.Reduction.MEAN
            ),
            name = "log_loss_tensor"
        )
        eval_metric_ops = {
            "log_loss": log_loss
        }

    if mode == learn.ModeKeys.TRAIN:
        train_op = tf.contrib.layers.optimize_loss(
            loss = log_loss,
            global_step = tf.contrib.framework.get_global_step(),
            learning_rate = params['learning_rate'],
            optimizer = "Adam"
        )

    predictions = {
        'predict': softmax_predictions
    }

    return model_fn.ModelFnOps(
        mode = mode,
        predictions = predictions,
        loss = log_loss,
        train_op = train_op,
        eval_metric_ops = eval_metric_ops
    )

In [35]:
import tensorflow as tf

# Define the model function for tf.estimator
def catdog_model_fn(features, labels, mode, params):
    # Extract the images from the features dictionary
    images = features['image']

    # Build the CNN model
    net = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(images)
    net = tf.keras.layers.MaxPooling2D((2, 2))(net)
    net = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(net)
    net = tf.keras.layers.MaxPooling2D((2, 2))(net)
    net = tf.keras.layers.Flatten()(net)
    net = tf.keras.layers.Dense(128, activation='relu')(net)
    logits = tf.keras.layers.Dense(10)(net)  # Adjust output units as needed

    predictions = {
        'classes': tf.argmax(input=logits, axis=1),
        'probabilities': tf.nn.softmax(logits)
    }

    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)

    # Loss function
    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)

    if mode == tf.estimator.ModeKeys.TRAIN:
        optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])
        train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
        return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)

    eval_metric_ops = {
        'accuracy': tf.metrics.accuracy(labels=labels, predictions=predictions['classes'])
    }
    return tf.estimator.EstimatorSpec(mode=mode, loss=loss, eval_metric_ops=eval_metric_ops)


#### Train the Model

In [37]:
import tensorflow as tf
import logging

# Configure TensorFlow logging
logger = tf.get_logger()
logger.setLevel(logging.INFO)


In [44]:
def catdog_model_fn(features, labels, mode, params):
    images = features['image']

    # Build the CNN model
    net = tf.keras.layers.Input(shape=[224, 224, 3])(images)  # Adjust input shape
    net = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(net)
    net = tf.keras.layers.MaxPooling2D((2, 2))(net)
    net = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(net)
    net = tf.keras.layers.MaxPooling2D((2, 2))(net)
    net = tf.keras.layers.Flatten()(net)
    net = tf.keras.layers.Dense(128, activation='relu')(net)
    logits = tf.keras.layers.Dense(10)(net)  # Adjust output units as needed

    predictions = {
        'classes': tf.argmax(input=logits, axis=1),
        'probabilities': tf.nn.softmax(logits)
    }

    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)

    # Loss function
    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)

    if mode == tf.estimator.ModeKeys.TRAIN:
        optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])
        train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
        return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)

    eval_metric_ops = {
        'accuracy': tf.metrics.accuracy(labels=labels, predictions=predictions['classes'])
    }
    return tf.estimator.EstimatorSpec(mode=mode, loss=loss, eval_metric_ops=eval_metric_ops)


#### Final Prediction

In [49]:
import os

# Define the path to your test directory
test_data_dir = '/content/data'  # Adjust this path if needed

# Check if the test directory exists
if not os.path.exists(test_data_dir):
    print(f"Test directory '{test_data_dir}' does not exist.")
else:
    # Proceed with your test input function
    test_fn, image_test_list = test_input_fn(test_data_dir, 32)
    test_n = len(image_test_list)

    print("Test size: %d" % test_n)

    # Create the result directory if it doesn't exist
    result_path = os.path.join(cwd, 'result/result.txt')
    os.makedirs(os.path.dirname(result_path), exist_ok=True)

    with open(result_path, 'w+') as result_file:
        result_file.write('id,label\n')

        predictions = classifier.predict(input_fn=test_fn, as_iterable=True)
        for i, p in enumerate(predictions):
            if i >= test_n:
                break

            id = image_test_list[i].split("/")[-1]
            id = id.split(".")[0]

            if i % 100 == 0:
                print("Predict %d %s: %f" % (i, image_test_list[i], p["probabilities"][1]))

            result_file.write("%s,%f\n" % (id, p["probabilities"][1]))

    print('Finish!!')


Test directory '/content/data' does not exist.
