In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os, numpy as np, pandas as pd, matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.applications.vgg16 import decode_predictions
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from keras.models import Sequential
from keras.layers import Dense
from keras.utils import np_utils
from keras.models import Model
from keras.layers import Flatten
from copy import copy, deepcopy

In [None]:
# HELPERS
def process(data):
    # Returns tuple X, y with data and classes in array form (in format recognizable by VGG16)
    if not os.path.exists(os.path.join("graffiti_data",data)):
        raise FileNotFoundError("No such data folder. Try 'train', 'test', or 'valid'.")
    files = sorted([file for file in \
                    os.listdir(os.path.join("graffiti_data",data))\
                    if file[-4:] == ".jpg"])
    X = [load_img(os.path.join("graffiti_data",data,im)) for im in files]
    X = [img_to_array(im) for im in X]
    # X = [im.reshape((1, im.shape[0], im.shape[1], im.shape[2])) for im in X]
    X = np.array([preprocess_input(im) for im in X])
    classes = pd.read_csv(os.path.join('graffiti_data',data,'_classes.csv'))\
    .sort_values("filename")
    classes.insert(1,"pixvals",list(X))
    classes.reset_index(inplace=True)
    new_path = os.path.join("graffiti_data",f"{data}_data_full.csv")
    if os.path.exists(new_path):
        os.remove(new_path)
    classes.to_csv(new_path)
    X = np.array(list(thing for thing in classes.values[:,2]))
    y = classes.values[:,-4:].astype(np.float32)
    return X,y
def dfprocess(data):
    # Returns tuple X, y with data and classes in array form (in format recognizable by VGG16)
    if not os.path.exists(os.path.join("graffiti_data",data)):
        raise FileNotFoundError("No such data folder. Try 'train', 'test', or 'valid'.")
    files = sorted([file for file in \
                    os.listdir(os.path.join("graffiti_data",data))\
                    if file[-4:] == ".jpg"])
    X = [load_img(os.path.join("graffiti_data",data,im)) for im in files]
    X = [img_to_array(im) for im in X]
    # X = [im.reshape((1, im.shape[0], im.shape[1], im.shape[2])) for im in X]
    X = np.array([preprocess_input(im) for im in X])
    classes = pd.read_csv(os.path.join('graffiti_data',data,'_classes.csv'))\
    .sort_values("filename")
    classes.insert(1,"pixvals",list(X))
    classes.reset_index(inplace=True)
    new_path = os.path.join("graffiti_data",f"{data}_data_full.csv")
    if os.path.exists(new_path):
        os.remove(new_path)
    classes.to_csv(new_path)
    return classes
def combine(sets):
    df = pd.concat(dfprocess(s) for s in sets)
    X = np.array(list(thing for thing in df.values[:,2]))
    y = df.values[:,-4:].astype(np.float32)
    return X,y
def quantize(array):
    # Takes a 1D array and rounds the elements to the nearest integer
    l = list(array)
    return np.array([round(i) for i in l])
def compare(a1,a2,number_of_classes = 4):
    # Determines weather two classifications are the same (specifically for 4 classes)
    if sum(a1==a2) == number_of_classes:
        return True
    return False
def test_accuracy(prediction_array,true_array):
    # Returns accuracy of testing data
    for i in range(len(prediction_array)):
        prediction_array[i] = quantize(prediction_array[i])
    truth_mat = []
    for i in range(len(prediction_array)):
        truth_mat.append(compare(prediction_array[i],true_array[i]))
    acc = sum(truth_mat)/len(truth_mat)
    print(f"Accuracy: {round(100* sum(truth_mat)/len(truth_mat),2)}%")
    return acc

# Inputs: X -- array, y -- array, (train, test, valid) -- ratio numbers (e.g. "100:20:15")
def split_ttv(X,y,train,test,valid):
    assert len(X) == len(y)
    l = len(X)
    t = train+test+valid
    tr_ratio = train/t
    te_ratio = test/t
    tr_index = round(tr_ratio * l)
    te_index = tr_index + round(te_ratio * l)
    tr_x = X[:tr_index]
    te_x = X[tr_index+1:te_index]
    va_x = X[te_index+1:]
    tr_y = y[:tr_index]
    te_y = y[tr_index+1:te_index]
    va_y = y[te_index+1:]
    return (tr_x,tr_y),(te_x,te_y),(va_x,va_y)

In [None]:
# combine all data into one array 
X,y = combine(["train","test","valid"])

In [None]:
# split data 
train, test, valid = split_ttv(X,y,100,20,15)
train_x, train_y = train
test_x, test_y = test
valid_x, valid_y = valid

In [None]:
# Load and process training and validation data
x_train, y_train = process("train")
val_data = process("valid")

In [None]:
# Load VGG16 without the top layer and with pooling
base_model = VGG16(include_top=False, input_shape=(400, 400, 3), pooling='max')

# Properly connect new layers
x = Flatten()(base_model.output)  # Ensure the base model's output connects here
x = Dense(1024, activation='relu')(x)  # Connect this layer to the output of the Flatten layer
output = Dense(4, activation='softmax')(x)  # Connect this Dense layer to the previous Dense layer

# Finalize the model
model = Model(inputs=base_model.input, outputs=output)

# Freeze Layers
for layer in model.layers[-8:]:
    layer.trainable = False
    
# Compile the model
model.compile(optimizer="Adam", loss="CategoricalCrossentropy", metrics=['accuracy'])

# Print the model summary
# model.summary()

In [None]:
hist = model.fit(x_train, y_train, epochs=3, validation_data = val_data)
# model.save("finalproj_vgg_model") # save model 

In [None]:
hist2 = model.fit(x_train, y_train, epochs=2, validation_data = val_data)
# model.save("finalproj_vgg_model") # save model 

In [None]:
# Load test data
x_test, y_test = process("test")

# Create predictions
y_pred = model.predict(x_test)

# Determine test accuracy
test_accuracy(y_pred,y_test)