In [2]:
import os
import cv2
import random
import itertools
import numpy as np
import pandas as pd
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import BinaryAccuracy
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.layers import Conv2D, Dense, Dropout, Input, concatenate, Flatten, MaxPooling2D

In [25]:
root = "color_classification_dataset"
resolution = (224, 224)

def create_lookup():
    look = os.path.join(root, "train_data")
    lookup = dict()
    colors = set()
    for dire in os.listdir(look):
        color = dire.split(".")[0].split("_")[1]
        colors.add(color)
        lookup[dire] = color
    return lookup, colors
lookup, colors = create_lookup()

In [51]:
pairs = list()
def push_to_df(pivot, cmp):
    label = 0
    if lookup[pivot] == lookup[cmp]:
        label = 1
    pairs.append([pivot, cmp, label])
    
def create_training_data():
    files = list(lookup.keys())
    random.shuffle(files)
    img_count = 0
    window = 20
    for index in range(len(files)):        
        pivot = files[index]
        if index == 0:
            look = window
            for wind_i in range(1, look):
                cmp_file = files[index + wind_i]
                push_to_df(pivot, cmp_file)
        if index > 0:
            look_ahead_floor = index + 1
            look_ahead_ceil = look_ahead_floor + window
            look_back_ceil = index - 1
            look_back_floor = look_back_ceil - window
            pivot = files[index]
            if index >= window and index < len(files) - window:
                for i in range(look_back_ceil, look_back_floor,  -1):
                    cmp_file = files[i]
                    push_to_df(pivot, cmp_file)
                for i in range(look_ahead_floor, look_ahead_ceil):
                    cmp_file = files[i]
                    push_to_df(pivot, cmp_file)

            elif index < len(files) - window:
                for i in range(index - 1, -1, -1):
                    cmp_file = files[i]
                    push_to_df(pivot, cmp_file)
            else:
                for i in range(index+1, len(files)):
                    cmp_file = files[i]
                    push_to_df(pivot, cmp_file)
        push_to_df(files[index], files[index])
create_training_data()
df = pd.DataFrame(pairs)
df.columns = ["file1", "file2", "label"]

3637


In [56]:
def load_file_data(file):
    file_path = os.path.join(root, "train_data", file)
    if not os.path.exists(file_path):
        sub_dir = lookup[file]
        file_path = os.path.join(parents[1], sub_dir, file)
    file = cv2.imread(file_path)
    file = cv2.resize(file, resolution, interpolation=cv2.INTER_CUBIC)/255
    # hist = cv2.calcHist([file], [0, 1, 2], None, [256, 256, 256], [0, 300, 0, 300, 0, 300])
    return file

def preprocess_data():
    # df = pd.read_csv("flower_data.csv", index_col=False)
    train = int(len(df.index) * 0.8)
    test = int(len(df.index) * .15)
    validate = int(len(df.index) * .05)
    train_data = df[:train]
    test_data = df[train:train+test]
    validate_data = df[train+test: train+test+validate]
    return train_data, test_data, validate_data

train_data, test_data, validation_data = preprocess_data()
print(len(train_data.index))
print(len(test_data.index))
def generate_input_data(src_data):
    for index, data in src_data.iterrows():
        file_1 = load_file_data(data["file1"])
        file_2 = load_file_data(data["file2"])
        yield (np.array([file_1]), np.array([file_2])), np.array([float(data["label"])])

2909
545


In [62]:
resolution_model = resolution + (3,)
def build_model():
    input_1 = Input(shape=resolution_model)
    model1 = Conv2D(15, 3, activation='linear')(input_1)
    model1 = Conv2D(31, 3, activation='relu')(model1)
    model1 = Conv2D(22, 3, activation='relu')(model1)
    model1 = MaxPooling2D((3,3))(model1)
    model1 = Flatten()(model1)
    model1 = Dropout(0.25)(model1)
    model1 = Dense(200, activation="relu")(model1)
    
    input_2 = Input(shape=resolution_model)
    model2 = Conv2D(15, 3, activation='linear')(input_2)
    model2 = Conv2D(31, 3, activation='relu')(model2)
    model2 = Conv2D(22, 3, activation='relu')(model2)
    model2 = MaxPooling2D((3,3))(model2)
    model2 = Flatten()(model2)
    model2 = Dropout(0.25)(model2)
    model2 = Dense(200, activation="relu")(model2)

    concat = concatenate([model1, model2])
    model2 = Dropout(0.25)(concat)
    concat = Dense(13, activation="softmax")(concat)
    output = Dense(1, activation="sigmoid")(concat)
    model = Model(inputs=[input_1, input_2], outputs=output)
    return model
model = build_model()
model.summary()

Model: "functional_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_7 (InputLayer)            [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
input_8 (InputLayer)            [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
conv2d_16 (Conv2D)              (None, 98, 98, 15)   420         input_7[0][0]                    
__________________________________________________________________________________________________
conv2d_19 (Conv2D)              (None, 98, 98, 15)   420         input_8[0][0]                    
_______________________________________________________________________________________

In [57]:
model.compile(optimizer="adam", loss=BinaryCrossentropy(), metrics=[BinaryAccuracy()])
model.fit(generate_input_data(train_data), epochs=3, steps_per_epoch=969, validation_data=generate_input_data(test_data), validation_steps=109)
model.evaluate(generate_input_data(validation_data))

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.45026469230651855, 0.8342541456222534]

In [59]:
def predict(img1, img2):
    img1 = cv2.imread(img1)
    print(resolution)
    img1 = cv2.resize(img1, (100, 100), interpolation=cv2.INTER_CUBIC)/255
    # img1 = cv2.calcHist([img1], [0, 1, 2], None, [256, 256, 256], [0, 800, 0, 800, 0, 800])
    img2 = cv2.imread(img2)
    img2 = cv2.resize(img2, (100, 100), interpolation=cv2.INTER_CUBIC)/255
    # img2 = cv2.calcHist([img2], [0, 1, 2], None, [256, 256, 256], [0, 800, 0, 800, 0, 800])
    return model.predict([np.array([img1]), np.array([img2])])
    
print(predict('test_images/test_blue.png', 'test_images/test_green.png'))

[[0.14814684]]
