In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%env name=samples1_512_fixed

env: name=samples1_512_fixed


In [3]:
# copy samples file to directory
!cp ./drive/Shareddrives/Memoria/samples/$name.zip .

!unzip -o -q $name.zip -d ./samples

# copy tools file
!cp ./drive/Shareddrives/Memoria/code/tools.py .

In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import cv2
from sklearn.cluster import KMeans
#keras resnet_50_v2
from keras.applications.resnet_v2 import ResNet50V2
from keras.applications.resnet_v2 import preprocess_input

from keras.preprocessing import image
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import array_to_img, img_to_array, load_img
from natsort import natsort_keygen

from sklearn.metrics import accuracy_score, jaccard_score,\
                            f1_score, precision_recall_curve,\
                            confusion_matrix, ConfusionMatrixDisplay,\
                            silhouette_score, mutual_info_score,\
                            adjusted_mutual_info_score, normalized_mutual_info_score,\
                            adjusted_rand_score


In [5]:
np.random.seed(42)

In [6]:
# Constants

# dirs
inp_img_dir = "./samples/"
out_dir = ""

# init
start_num = 1

height, width = 32, 32

# placeholder
r = "rule_"

# Classes
class1 = [0, 8, 32, 40, 128, 136, 160, 168]
class2 = [
          1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 19, 
          23, 24, 25, 26, 27, 28, 29, 33, 34, 35, 36, 37, 38, 
          42, 43, 44, 46, 50, 51, 56, 57, 58, 62, 72, 73, 74, 
          76, 77, 78, 94, 104, 108, 130, 132, 134, 138, 140, 
          142, 152, 154, 156, 162, 164, 170, 172, 178, 184, 
          200, 204, 232
        ]
class3 = [18, 22, 30, 45, 60, 90, 105, 122, 126, 146, 150]
class4 = [41, 54, 106, 110]

In [7]:
def getPics(class_rules, samples_dir, n_samples_per_rule=None):
    class_imgs = []

    for r in class_rules:
        imgs_dir = samples_dir + f'rule_{str(r)}/'
        imgs_temp = [imgs_dir + f for f in os.listdir(imgs_dir)]
        class_imgs += imgs_temp[:n_samples_per_rule] # class balancing
    return class_imgs

c1_img_list = getPics(class1, inp_img_dir, 200)
c2_img_list = getPics(class2, inp_img_dir, 19)
c3_img_list = getPics(class3, inp_img_dir, 250)
c4_img_list = getPics(class4, inp_img_dir, 340)

files_target = [np.full(len(c1_img_list), 1), 
         np.full(len(c2_img_list), 2), 
         np.full(len(c3_img_list), 3), 
         np.full(len(c4_img_list), 4), 
         ]

# Data selection

c1_x = c1_img_list.copy()
c2_x = c2_img_list.copy()
c3_x = c3_img_list.copy()
c4_x = c4_img_list.copy()

c1_y = files_target[0].copy()
c2_y = files_target[1].copy()
c3_y = files_target[2].copy()
c4_y = files_target[3].copy()

c_x = np.array(c1_x+c2_x+c3_x+c4_x)
c_y = np.concatenate((c1_y,c2_y,c3_y,c4_y))

dataset = pd.DataFrame({'img':c_x, 'class':c_y})
dataset = dataset.sample(frac=1)
print(dataset.shape)
dataset.head(3)


(6945, 2)


Unnamed: 0,img,class
1330,./samples/rule_160/160_000_0020_4245.png,1
2543,./samples/rule_134/134_000_0192_9029.png,2
1046,./samples/rule_136/136_000_0079_7819.png,1


In [8]:
def getImages(files):
    img_list = []

    for img in files:
        img1 = load_img(out_dir + img)
        x = img_to_array(img1)
        x = np.expand_dims(x, axis=0)
        x = preprocess_input(x)   
        
        img_list.append(x)

    return np.array(img_list)

In [9]:
# Images Preview
def imagePreview(imgsArr):
    imgs_show = [imgsArr[0], imgsArr[50], imgsArr[1600], imgsArr[-4]]
    plt.figure(figsize=(16,8))
    columns = 4
    
    for i, image in enumerate(imgs_show):
        plt.subplot(len(imgs_show) // columns + 1, columns, i + 1)
        
        plt.imshow(array_to_img(image))

    plt.figure()
    plt.show()

In [10]:
X_train, X_test, y_train, y_test = train_test_split(dataset['img'].values,
                                                    dataset['class'].values,
                                                    test_size=0.33,
                                                    random_state=42)

In [11]:
X_train_imgs = getImages(X_train)
# imagePreview(X_train)

In [12]:
X_test_imgs = getImages(X_test)
# imagePreview(X_test)

In [14]:
# Load the pre-trained VGG16 model
input_shape = (height, width, 3)
model = ResNet50V2(weights='imagenet', input_shape=input_shape, include_top=False)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50v2_weights_tf_dim_ordering_tf_kernels_notop.h5


In [15]:
# Extract features from the images
img_features = []

# do feature extraction
for img in X_train_imgs:
    features = model.predict(img, verbose=0)
    img_features.append(features.flatten())

# Clustering with 5 clusters

In [33]:
# Define the number of clusters
num_clusters = 5

In [34]:
# Normalize the features
img_features = np.array(img_features)
img_features_norm = (img_features - img_features.mean()) / img_features.std()

# Perform clustering
kmeans = KMeans(n_clusters=num_clusters, random_state=0).fit(img_features_norm)





In [35]:
# Visualize the clusters
img_list = list(X_train)

len(img_features), len(img_list), len(X_train)

(4653, 4653, 4653)

In [18]:
def parse_rule(w):
  return w.split('/')[2]

def sort_by_column(df, col):
    df2 = df.sort_values(by=col, key=natsort_keygen())
    return df2

def group_by_two_columns(df, col1, col2, col3):
    df2 = df.groupby([col1, col2]).size().reset_index(name=col3)
    return df2

def get_classification(info, rules):
  rules_ids = []
  for n in rules:
    temp = info.loc[info['imgs'] == n]
    ix = temp['sum'].idxmax()
    rules_ids.append(ix)
  return rules_ids

r = class1 + class2 + class3 + class4 
r = [f'rule_{i}' for i in r]

In [36]:
results_kmeans = pd.DataFrame({'imgs': X_train, 'assigned_labels': np.array(kmeans.labels_)})

results_kmeans['imgs'] = results_kmeans['imgs'].apply(parse_rule)

X = group_by_two_columns(results_kmeans,
                         'imgs',
                         'assigned_labels',
                         'sum')

X_ids = get_classification(X, r)
X = sort_by_column(X, 'imgs')

X = X[X.index.isin(X_ids)][['imgs','assigned_labels']]

In [37]:
X

Unnamed: 0,imgs,assigned_labels
1,rule_0,1
2,rule_1,0
71,rule_2,4
95,rule_3,3
109,rule_4,0
...,...,...
63,rule_178,0
67,rule_184,1
72,rule_200,0
73,rule_204,0


In [21]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

In [38]:
dataset['img'] = dataset['img'].apply(parse_rule)
original_labels = sort_by_column(dataset.groupby(by=['img','class'])\
                                 .count()\
                                 .reset_index(),
                                 'img')

In [39]:
print('normalized_mutual_info_score',
      normalized_mutual_info_score(original_labels['class'].values,
                                   X['assigned_labels'].values))
print('adjusted_rand_score',
      adjusted_rand_score(original_labels['class'].values,
                          X['assigned_labels'].values))

normalized_mutual_info_score 0.32741078030976295
adjusted_rand_score 0.20066650985945164


In [40]:
fig = make_subplots(rows=2,
                    cols=1,
                    shared_xaxes=True)
fig.update_yaxes(tickmode='linear')


fig.add_trace(go.Scatter(
    x=X['imgs'],
    y=X['assigned_labels'],
    mode="markers",
    name="resnet & Kmeans 5 clusters",
), row=1, col=1)

fig.add_trace(go.Scatter(
      x=original_labels['img'],
      y=original_labels['class'],
      mode="markers",
      name='original labels',
  ), row=2, col=1)

fig.update_layout(
    title="Clustering Results: Equivalent rules not included",
    height=900,
    width=2000,
    legend_title="Experiment",
    yaxis=dict(title='Classes'),
    )
fig.show()

# Clustering with 4 clusters

In [28]:
# Define the number of clusters
num_clusters = 4

# Perform clustering
kmeans = KMeans(n_clusters=num_clusters, random_state=0).fit(img_features_norm)

# Visualize the clusters
img_list = list(X_train)

results_kmeans = pd.DataFrame({'imgs': X_train, 'assigned_labels': np.array(kmeans.labels_)})

results_kmeans['imgs'] = results_kmeans['imgs'].apply(parse_rule)

X = group_by_two_columns(results_kmeans,
                         'imgs',
                         'assigned_labels',
                         'sum')

X_ids = get_classification(X, r)
X = sort_by_column(X, 'imgs')

X = X[X.index.isin(X_ids)][['imgs','assigned_labels']]
X





Unnamed: 0,imgs,assigned_labels
0,rule_0,0
2,rule_1,2
67,rule_2,3
86,rule_3,1
100,rule_4,2
...,...,...
59,rule_178,2
62,rule_184,0
69,rule_200,2
71,rule_204,2


In [29]:
original_labels = sort_by_column(dataset.groupby(by=['img','class'])\
                                 .count()\
                                 .reset_index(),
                                 'img')

print('normalized_mutual_info_score',
      normalized_mutual_info_score(original_labels['class'].values,
                                   X['assigned_labels'].values))
print('adjusted_rand_score',
      adjusted_rand_score(original_labels['class'].values,
                          X['assigned_labels'].values))

normalized_mutual_info_score 0.33249991289020014
adjusted_rand_score 0.19153958299585772


In [32]:
fig = make_subplots(rows=2,
                    cols=1,
                    shared_xaxes=True)
fig.update_yaxes(tickmode='linear')


fig.add_trace(go.Scatter(
    x=X['imgs'],
    y=X['assigned_labels'],
    mode="markers",
    name="resnet & Kmeans 4 clusters",
), row=1, col=1)

fig.add_trace(go.Scatter(
      x=original_labels['img'],
      y=original_labels['class'],
      mode="markers",
      name='original labels',
  ), row=2, col=1)

fig.update_layout(
    title="Clustering Results: Equivalent rules not included",
    height=900,
    width=2000,
    legend_title="Experiment",
    yaxis=dict(title='Classes'),
    )
fig.show()