In [1]:
pip install tensorflow

^C
Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras

from tensorflow.keras.preprocessing.image import load_img, ImageDataGenerator
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling1D, AveragePooling2D, GlobalAveragePooling2D, ZeroPadding2D, Dense, Dropout, Flatten, BatchNormalization, Input, Activation, Add
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import CategoricalAccuracy, BinaryAccuracy

from sklearn.model_selection import train_test_split

import os
from PIL import Image

import warnings
warnings.filterwarnings("ignore")

import sys
print("Python Version:", sys.version)
print("TensorFlow Version", tf.__version__)

KeyboardInterrupt: 

In [None]:
# using GPU for faster processing

phys_devices = tf.config.list_physical_devices('GPU')
if len(phys_devices) > 0:
    # use the first visible GPU
    tf.config.experimental.set_visible_devices(phys_devices[0], 'GPU')
    print(f"Using GPU device {phys_devices[0].name}\n")
else:
    print("No GPU devices found!\n")

if tf.config.list_physical_devices('GPU'):
    print("GPU is available")
else:
    print("GPU is not available")

In [None]:
# load furniture data from CSV files
furniture_df = pd.read_csv("furniture_data.csv")
furniture_sliding_df = pd.read_csv("furniture_sliding_data.csv")

# combine both datasets into a single dataframe
full_furniture_df = pd.concat([furniture_df, furniture_sliding_df], axis=0)

In [None]:
dev, test = train_test_split(furniture_df, train_size = 0.85, shuffle = True, random_state = 333)

In [None]:
dev.shape, test.shape

In [None]:
# define batch size and image dimensions
BATCH_SIZE = 32
IMG_WIDTH  = 224
IMG_HEIGHT = 224

# data augmentation for training
train_datagen = ImageDataGenerator(rescale = 1/255.0,
                                width_shift_range = 0.05,
                                height_shift_range = 0.05,
                                rotation_range = 40,
                                zoom_range = 0.1,
                                shear_range = 0.2,
                                brightness_range = [0.5, 1.0],
                                validation_split = 0.15, 
                                fill_mode = 'nearest',
                                horizontal_flip = True,)

# data normalization for testing
test_datagen = ImageDataGenerator(rescale = 1/255.0)

In [None]:
# create data generators for training, validation, and testing
train_ds = train_datagen.flow_from_dataframe(
    dataframe = dev,
    x_col = 'Image_Path',
    y_col = 'Furniture_Category',
    color_mode = 'rgb',
    class_mode = 'categorical',
    target_size = (IMG_WIDTH, IMG_HEIGHT),
    batch_size = BATCH_SIZE,
    shuffle = True,
    seed = 42,
    subset = 'training'
)

val_ds = train_datagen.flow_from_dataframe(
    dataframe = dev,
    x_col = 'Image_Path',
    y_col = 'Furniture_Category',
    color_mode = 'rgb',
    class_mode = 'categorical',
    target_size = (IMG_WIDTH, IMG_HEIGHT),
    batch_size = BATCH_SIZE,
    shuffle = True,
    seed = 42,
    subset = 'validation'
)

test_ds = test_datagen.flow_from_dataframe(
    dataframe = test,
    x_col = 'Image_Path',
    y_col = 'Furniture_Category',
    color_mode = 'rgb',
    target_size = (IMG_WIDTH, IMG_HEIGHT),
    class_mode = 'categorical',
    batch_size = BATCH_SIZE,
    shuffle = False  # maintain order for testing
)

In [None]:
print(train_ds.class_indices)

In [None]:
# Check image
batchX, batchY = train_ds.__next__()

print(batchX.shape)
print(batchY.shape)

for i in range(3):
    image = batchX[i]
    label = batchY[i]

    print('Label: ', label)

    plt.imshow(image)
    plt.show()

In [None]:
def identity_block(x, filters):
    x_skip = x

    # first layer
    x = Conv2D(filters[0], kernel_size = (1, 1), strides = (1, 1), padding = 'valid')(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)

    # second layer
    x = Conv2D(filters[1], kernel_size = (3, 3), strides = (1, 1), padding = 'same')(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)

    # third layer
    x = Conv2D(filters[2], kernel_size = (1, 1), strides = (1, 1), padding = 'valid')(x)
    x = BatchNormalization()(x)

    x = Add()([x, x_skip])
    x = Activation('relu')(x)

    return x

def convolutional_block(x, filters, strides = (2, 2)):
    x_skip = x

    # first convolutional layer
    x = Conv2D(filters[0], kernel_size = (1, 1), strides = strides, padding = 'valid')(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)

    # second convolutional layer
    x = Conv2D(filters[1], kernel_size = (3, 3), strides = (1, 1), padding = 'same')(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)

    # third convolutional layer
    x = Conv2D(filters[2], kernel_size = (1, 1), strides = (1, 1), padding = 'valid')(x)
    x = BatchNormalization()(x)

    x_skip = Conv2D(filters[2], kernel_size = (1, 1), strides = strides, padding = 'valid')(x_skip)
    x_skip = BatchNormalization()(x_skip)

    x = Add()([x, x_skip])
    x = Activation('relu')(x)

    return x

In [None]:
def ResNet50(shape=(224, 224, 3), classes = 1000):
    # input layer
    x_input = Input(shape)
    x = ZeroPadding2D(padding = (3, 3))(x_input)

    # stage 1
    x = Conv2D(64, kernel_size = (7, 7), strides = (2, 2), padding = 'valid')(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size = (3, 3), strides = (2, 2), padding = 'same')(x)

    # stage 2
    x = convolutional_block(x, filters = [64, 64, 256], strides = (1, 1))
    for _ in range(2):  # 3 - 1
        x = identity_block(x, filters = [64, 64, 256])

    # stage 3
    x = convolutional_block(x, filters = [128, 128, 512])
    for _ in range(3):  # 4 - 1
        x = identity_block(x, filters = [128, 128, 512])

    # stage 4
    x = convolutional_block(x, filters = [256, 256, 1024])
    for _ in range(5):  # 6 - 1 
        x = identity_block(x, filters = [256, 256, 1024])

    # stage 5
    x = convolutional_block(x, filters = [512, 512, 2048])
    for _ in range(2):  # 3 - 1
        x = identity_block(x, filters = [512, 512, 2048])

    # output layer
    x = GlobalAveragePooling2D()(x)
    x = Dense(classes, activation='softmax')(x)

    model = Model(inputs = x_input, outputs = x, name = "ResNet50")
    return model

In [None]:
# adjust 'classes' argument
resnet50_model = ResNet50(shape = (224, 224, 3), classes = 6)  
resnet50_model.summary()

In [None]:
# Adam configuration
opt = Adam(learning_rate = 0.001)
resnet50_model.compile(
    optimizer = opt,
    loss = CategoricalCrossentropy(), 
    metrics = [CategoricalAccuracy()]
)

In [None]:
# Early stopping callback configuration
early_stopping_callback = EarlyStopping(
    monitor = 'val_accuracy',  # monitor validation accuracy
    patience = 10,              # stop if no improvement after 10 epochs
    restore_best_weights = True, # restore weights with best performance
    verbose = 1,                
    min_delta = 0.0001,
)

In [None]:
# adjust batch size and epochs as needed
history = resnet50_model.fit(train_ds, validation_data = val_ds, epochs = 50, batch_size = 32, callbacks = [early_stopping_callback])