In [None]:
import tarfile
import zipfile
import shutil
import random
import os
import cv2
import pathlib
import numpy as np
import pandas as pd

import tensorflow as tf
import matplotlib.pyplot as plt
from numpy.random import seed
from google.colab import drive
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras import mixed_precision
from tensorflow.keras import regularizers
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import LearningRateScheduler, EarlyStopping, ModelCheckpoint
from sklearn.preprocessing import OneHotEncoder
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications.resnet import ResNet101

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dropout

from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.metrics import top_k_accuracy_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import confusion_matrix
from mlxtend.plotting import plot_confusion_matrix

import plotly.graph_objects as go
from plotly.subplots import make_subplots

from tqdm.notebook import tqdm

In [None]:
random_seed = 42

seed(random_seed)
tf.random.set_seed(random_seed)
random.seed(random_seed)

#policy = mixed_precision.Policy('mixed_float16')
#mixed_precision.set_global_policy(policy)
#print('Compute dtype: %s' % policy.compute_dtype)
#print('Variable dtype: %s' % policy.variable_dtype)

In [None]:
%%capture
drive.mount('/content/gdrive', force_remount=True)
!tar --extract --verbose --file='/content/gdrive/MyDrive/Progetto_VIPM/dataset/train_clean.tar'
!tar --extract --verbose --file='/content/gdrive/MyDrive/Progetto_VIPM/dataset/val.tar'
with zipfile.ZipFile("/content/gdrive/MyDrive/Progetto_VIPM/dataset/val_degraded.zip","r") as zip_ref:
    zip_ref.extractall()

In [None]:
%%capture

csv_train_file = pd.read_csv("/content/gdrive/MyDrive/Progetto_VIPM/annot/train_clean_info.csv", dtype=str)
csv_train_file.columns = ['filename', 'label']
parent_dir = "train_set_clean/"
labels = csv_train_file['label']

# divido le immagini in sottocartelle in base alla classe
for label in labels:
    path = os.path.join(parent_dir, label)
    os.makedirs(path, exist_ok=True)    
for _, row in csv_train_file.iterrows():
    label = row['label']
    path = os.path.join(parent_dir, row['filename'])
    img_name = os.path.split(path)[-1]
    new_path = os.path.join(parent_dir, label, img_name)
    print(new_path)
    shutil.copy(path, new_path)

In [None]:
%%capture

csv_test_clean_file = pd.read_csv("/content/gdrive/MyDrive/Progetto_VIPM/annot/val_info.csv", dtype=str)
csv_test_clean_file.columns = ['filename', 'label']
parent_dir = "val_set/"
labels = csv_test_clean_file['label']

# divido le immagini in sottocartelle in base alla classe
for label in labels:
    path = os.path.join(parent_dir, label)
    os.makedirs(path, exist_ok=True)    
for _, row in csv_test_clean_file.iterrows():
    label = row['label']
    path = os.path.join(parent_dir, row['filename'])
    img_name = os.path.split(path)[-1]
    new_path = os.path.join(parent_dir, label, img_name)
    print(new_path)
    shutil.copy(path, new_path)

In [None]:
%%capture

csv_test_deg_file = pd.read_csv("/content/gdrive/MyDrive/Progetto_VIPM/annot/val_info.csv", dtype=str)
csv_test_deg_file.columns = ['filename', 'label']
parent_dir = "val_set_degraded/"
labels = csv_test_deg_file['label']

# divido le immagini in sottocartelle in base alla classe
for label in labels:
    path = os.path.join(parent_dir, label)
    os.makedirs(path, exist_ok=True)    
for _, row in csv_test_deg_file.iterrows():
    label = row['label']
    path = os.path.join(parent_dir, row['filename'])
    img_name = os.path.split(path)[-1]
    new_path = os.path.join(parent_dir, label, img_name)
    print(new_path)
    shutil.copy(path, new_path)

In [None]:
train_data_dir = pathlib.Path('train_set_clean/')
shuffle_value = False
batch_size = 128

train_clean_ds = tf.keras.preprocessing.image_dataset_from_directory(
    train_data_dir,
    labels="inferred",
    label_mode="categorical",
    image_size=(224,224),
    batch_size=batch_size,
    shuffle=shuffle_value,
    seed=random_seed)

Found 114153 files belonging to 251 classes.


In [None]:
test_clean_data_dir = pathlib.Path('val_set/')
#shuffle_value = True
shuffle_value = False
batch_size = 128

test_clean_ds = tf.keras.preprocessing.image_dataset_from_directory(
    test_clean_data_dir,
    labels="inferred",
    label_mode="categorical",
    subset=None,
    image_size=(224,224),
    batch_size=batch_size,
    shuffle=shuffle_value,
    seed=random_seed)

Found 11993 files belonging to 251 classes.


In [None]:
test_deg_data_dir = pathlib.Path('val_set_degraded/')
#shuffle_value = True
shuffle_value = False
batch_size = 128

test_deg_ds = tf.keras.preprocessing.image_dataset_from_directory(
    test_deg_data_dir,
    labels="inferred",
    label_mode="categorical",
    subset=None,
    image_size=(224,224),
    batch_size=batch_size,
    shuffle=shuffle_value,
    seed=random_seed)

Found 11993 files belonging to 251 classes.


In [None]:
def preprocess(images, labels):
  return tf.keras.applications.mobilenet.preprocess_input(images), labels

train_clean_ds = train_clean_ds.map(preprocess)
test_clean_ds = test_clean_ds.map(preprocess)
test_deg_ds = test_deg_ds.map(preprocess)

# Importazione modello

In [None]:
checkpoint_filepath = '/content/gdrive/MyDrive/Progetto_VIPM/models/best_model_mobilenetv2_augmented.h5'
loaded_model = keras.models.load_model(checkpoint_filepath)
#loaded_model.summary()

In [None]:
for layer in loaded_model.layers:
  layer.trainable=False

feat_extractor_model = Model(inputs=loaded_model.input, 
                             outputs=loaded_model.get_layer('flatten').output)
#feat_extractor_model.summary()

# Estrazione features

In [None]:
# features_train_clean = feat_extractor_model.predict(train_clean_ds)
# print(features_train_clean.shape)

In [None]:
#features_train_clean[0]

In [None]:
# features_test_clean = feat_extractor_model.predict(test_clean_ds)
# print(features_test_clean.shape)

In [None]:
#features_test_clean[0]

In [None]:
# features_test_deg = feat_extractor_model.predict(test_deg_ds)
# print(features_test_deg.shape)

In [None]:
#features_test_deg[0]

In [None]:
# with open('/content/gdrive/MyDrive/Progetto Visual/features_train_clean_mobilenetv2.npy', 'wb') as output:
#     np.save(output, features_train_clean)

In [None]:
with open('/content/gdrive/MyDrive/Progetto Visual/features_train_clean_mobilenetv2.npy', 'rb') as input:
    features_train_clean = np.load(input)

In [None]:
# with open('/content/gdrive/MyDrive/Progetto Visual/features_test_clean_mobilenetv2.npy', 'wb') as output:
#     np.save(output, features_test_clean)

In [None]:
with open('/content/gdrive/MyDrive/Progetto Visual/features_test_clean_mobilenetv2.npy', 'rb') as input:
    features_test_clean = np.load(input)

In [None]:
# with open('/content/gdrive/MyDrive/Progetto Visual/features_test_deg_mobilenetv2.npy', 'wb') as output:
#     np.save(output, features_test_deg)

In [None]:
with open('/content/gdrive/MyDrive/Progetto Visual/features_test_deg_mobilenetv2.npy', 'rb') as input:
    features_test_deg = np.load(input)

# Valutazione performance

In [None]:
# Classificatore Knn
from sklearn.neighbors import NearestNeighbors
knn = NearestNeighbors(n_neighbors=5, 
                       algorithm='brute',
                       metric='minkowski',
                       p=2)
knn.fit(features_train_clean)

# Lista delle classi
lista_classi = np.genfromtxt("/content/gdrive/MyDrive/Progetto_VIPM/annot/class_list.txt", dtype=str)

In [None]:
from keras.utils import dataset_utils

def get_test_clean_paths():
  # questa è la funzione usata da keras per creare il dataset
  file_paths, labels, class_names = dataset_utils.index_directory(directory="val_set/", 
                                                                  labels="inferred", 
                                                                  formats="jpg",
                                                                  shuffle=False,
                                                                  seed=42)
  return file_paths

def get_test_deg_paths():
  # questa è la funzione usata da keras per creare il dataset
  file_paths, labels, class_names = dataset_utils.index_directory(directory="val_set_degraded/", 
                                                                  labels="inferred", 
                                                                  formats="jpg",
                                                                  shuffle=False,
                                                                  seed=42)
  return file_paths

def get_train_paths():
  # questa è la funzione usata da keras per creare il dataset
  file_paths, labels, class_names = dataset_utils.index_directory(directory="train_set_clean/", 
                                                                  labels="inferred", 
                                                                  formats="jpg",
                                                                  shuffle=False,
                                                                  seed=42)
  return file_paths

In [None]:
test_clean_paths = get_test_clean_paths()

Found 11993 files belonging to 251 classes.


In [None]:
test_deg_paths = get_test_deg_paths()

Found 11993 files belonging to 251 classes.


In [None]:
train_paths = get_train_paths()

Found 114153 files belonging to 251 classes.


In [None]:
def get_label(im_path):
  words = im_path.split('/')
  label = words[1]
  return label

In [None]:
from tensorflow.keras.preprocessing import image
from google.colab import files

length = len(test_clean_paths)
q = len(test_clean_paths)
q_n = 0

for i in tqdm(range(length)):
  img = image.load_img(test_clean_paths[i], target_size=(224,224))
  img = image.img_to_array(img)
  img = np.expand_dims(img, axis=0)
  img = tf.keras.applications.mobilenet.preprocess_input(img)
  features_new_image = feat_extractor_model.predict(img)
  test_label = get_label(test_clean_paths[i])
  closest_distances, indices = knn.kneighbors(features_new_image, n_neighbors=3)
  for j in range(3):
    path = train_paths[indices[0,j]]
    label = get_label(path)
    if test_label == label:
      q_n = q_n + 1

  0%|          | 0/11993 [00:00<?, ?it/s]

[1;30;43mOutput streaming troncato alle ultime 5000 righe.[0m


In [None]:
P_n_clean = q_n / q
print("P(n) per il test set clean: ", P_n_clean)
# P_n = 3 sarebbe il 100% di retrieval

P(n) per il test set clean:  0.0962227966313683


In [None]:
from tensorflow.keras.preprocessing import image
from google.colab import files

length = len(test_deg_paths)
q = len(test_deg_paths)
q_n = 0

for i in tqdm(range(length)):
  img = image.load_img(test_deg_paths[i], target_size=(224,224))
  img = image.img_to_array(img)
  img = np.expand_dims(img, axis=0)
  img = tf.keras.applications.mobilenet.preprocess_input(img)
  features_new_image = feat_extractor_model.predict(img)
  test_label = get_label(test_deg_paths[i])
  closest_distances, indices = knn.kneighbors(features_new_image, n_neighbors=3)
  for j in range(3):
    path = train_paths[indices[0,j]]
    label = get_label(path)
    if test_label == label:
      q_n = q_n + 1

  0%|          | 0/11993 [00:00<?, ?it/s]

[1;30;43mOutput streaming troncato alle ultime 5000 righe.[0m


In [None]:
P_n_deg = q_n / q
print("P(n) per il test set degraded: ", P_n_deg)

P(n) per il test set degraded:  0.07395980988910197
