In [None]:
import pandas as pd
import os
import json
from keras.api.datasets import cifar10, cifar100
from tf_keras.applications import resnet50, vgg19, mobilenet, efficientnet, mobilenet_v2
import time
import tensorflow as tf
import numpy as np
from sklearn.metrics import accuracy_score
from multiprocessing.pool import Pool

In [None]:
tf.device('/GPU:0')

<tensorflow.python.eager.context._EagerDeviceContext at 0x7aa23f8f6100>

In [None]:
BASE_PATH = '/content/drive/MyDrive/Mestrado/Robustness'
MODELS_PATH = f'{BASE_PATH}/optimization'
CIFAR10_C_PATH = f'{BASE_PATH}/CIFAR-10-C'

CORRUPTION_TYPES = {
    'blur': ['defocus_blur', 'glass_blur', 'motion_blur', 'zoom_blur'],
    'digital': ['contrast', 'elastic_transform', 'jpeg_compression', 'pixelate'],
    'noise': ['gaussian_noise', 'impulse_noise', 'shot_noise'],
    'weather': ['brightness', 'fog', 'frost', 'snow']
}
LEN_BY_SEVERITY_LEVEL = 10000

In [None]:
def preprocess_images(images, preprocess_input):
    images = images.astype('float32')
    images = preprocess_input(images)
    return images

def generate_data_with_labels(image_dir):
    all_file_paths = []
    labels = []

    all_folds = os.listdir(image_dir)
    for fold in all_folds:
        fold_path = os.path.join(image_dir, fold)
        all_files_in_fold = os.listdir(fold_path)
        for file in all_files_in_fold:
            file_path = os.path.join(fold_path, file)
            all_file_paths.append(file_path)
            labels.append(fold)
    return pd.DataFrame(data={"file_path": all_file_paths, "labels": labels})


def save_json(output_name, data_dict):
    with open(output_name, 'w') as file:
        json.dump(data_dict, file)
    print("File save with success!")


def load_json(output_name):
    with open(output_name, 'r') as file:
        data_dict = json.load(file)
    print("File loaded with success!")

    return data_dict

In [None]:
def predict_in_tflite(model_tflite_path, images, database_path):
    interpreter = tf.lite.Interpreter(model_tflite_path, num_threads=2)
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]

    y_predicted = []

    total_images = len(images)

    start_time = time.time()
    i = 0

    for image in images:
        if input_details['dtype'] == np.uint8:
            input_scale, input_zero_point = input_details["quantization"]
            image = image / input_scale + input_zero_point
        image = np.expand_dims(image, axis=0).astype(input_details["dtype"])
        interpreter.set_tensor(input_details['index'], image)
        interpreter.invoke()

        output_data = interpreter.get_tensor(output_details['index'])
        y_predicted.append(np.argmax(output_data))

        if i % 1000 == 0:
            current_time = time.time() - start_time
            print(f"{database_path} - Total images {i} - {current_time:.2f}s")
        i+=1


    execution_time = time.time() - start_time
    print(f"{database_path} - Total images {total_images} - {execution_time:.2f}s")

    return y_predicted


def process_predict(images, labels, subject, corruption_type, severity, model_tflite_path):
    print(f"Process predict started for {subject}/{corruption_type}/{severity}")
    corruption_name = CORRUPTION_TYPES[subject][corruption_type]

    database_path =  f'{subject}/{corruption_name}/{severity}'

    preds = predict_in_tflite(model_tflite_path=model_tflite_path, images=images, database_path=database_path)

    accuracy = accuracy_score(labels, preds)

    return subject, corruption_type, severity, 1 - accuracy

def process_corrupted_images(model_tflite_path, output_path, preprocess_input, x_test_image, y_test_image, CIFAR_C_PATH = f'{BASE_PATH}/CIFAR-10-C'):
    labels = np.load(f"{CIFAR_C_PATH}/labels.npy")

    x_test_image = preprocess_images(x_test_image, preprocess_input)

    result = {
      "clean": None,
      "corruptions": {
        'blur': {},
        'digital': {},
        'noise': {},
        'weather': {}
      }
    }


    # Evaluate clean images
    preds = predict_in_tflite(model_tflite_path=model_tflite_path, images=x_test_image, database_path='clean')
    accuracy = accuracy_score(y_test, preds)
    result["clean"] = 1 - accuracy
    print(f"Accuracy for clean is {accuracy}")

    # Evaluate corrupted images
    for subject in CORRUPTION_TYPES.keys():
        for corruption_type in range(len(CORRUPTION_TYPES[subject])):
            corruption_name = CORRUPTION_TYPES[subject][corruption_type]
            all_images_by_corruption = np.load(f"{CIFAR_C_PATH}/{corruption_name}.npy")
            all_images_by_corruption = preprocess_images(all_images_by_corruption, preprocess_input)
            with Pool(processes=5) as pool:
                tasks = []
                for severity in range(1, 6):
                    current_images = all_images_by_corruption[LEN_BY_SEVERITY_LEVEL * (severity - 1): LEN_BY_SEVERITY_LEVEL * severity]
                    current_labels = labels[LEN_BY_SEVERITY_LEVEL * (severity - 1): LEN_BY_SEVERITY_LEVEL * severity]
                    tasks.append((current_images, current_labels, subject, corruption_type, severity, model_tflite_path))
                output = pool.starmap(process_predict, tasks)

                if corruption_name not in result["corruptions"][subject]:
                    result["corruptions"][subject][corruption_name] = {}
                for response in output:
                    result["corruptions"][subject][corruption_name][response[2]] = response[3]
                save_json(output_path, result)

In [None]:
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
# (x_train, y_train), (x_test, y_test) = cifar100.load_data()

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
[1m170498071/170498071[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 0us/step


In [None]:
all_preprocess_inputs = {
    'resnet50': resnet50.preprocess_input,
    'vgg19': vgg19.preprocess_input,
    'mobilenet': mobilenet.preprocess_input,
    'mobilenet_v2': mobilenet_v2.preprocess_input,
}

all_models = os.listdir(MODELS_PATH)
for model in all_models:
  fold_path = os.path.join(MODELS_PATH, model, 'models', 'optimizations')
  result_path = os.path.join(MODELS_PATH, model, 'results')

  if not os.path.isdir(fold_path):
    continue

  all_optimizations = os.listdir(fold_path)
  if not model in all_preprocess_inputs:
    raise Exception("Preprocess not found")
  preprocess_input = all_preprocess_inputs[model]
  for optimization in all_optimizations:
    optimization_path = os.path.join(fold_path, optimization)
    result_name =  optimization.replace('-', '_')
    result_name = result_name.replace('.tflite', '.json')
    result_name = f'ce_{result_name}'
    result_name = os.path.join(result_path, result_name)
    if os.path.exists(result_name) or 'full_integer' not in result_name:
      continue
    print(result_name)

    process_corrupted_images(optimization_path, result_name, preprocess_input, x_test, y_test, CIFAR_C_PATH = f'{BASE_PATH}/CIFAR-10-C')

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
Process predict started for noise/1/4
noise/impulse_noise/4 - Total images 0 - 0.01s
Process predict started for noise/1/5
noise/impulse_noise/5 - Total images 0 - 0.01s
noise/impulse_noise/1 - Total images 1000 - 5.30s
noise/impulse_noise/2 - Total images 1000 - 5.61s
noise/impulse_noise/3 - Total images 1000 - 6.52s
noise/impulse_noise/4 - Total images 1000 - 6.41s
noise/impulse_noise/5 - Total images 1000 - 6.14s
noise/impulse_noise/1 - Total images 2000 - 10.84s
noise/impulse_noise/2 - Total images 2000 - 12.04s
noise/impulse_noise/3 - Total images 2000 - 12.85s
noise/impulse_noise/4 - Total images 2000 - 12.74s
noise/impulse_noise/5 - Total images 2000 - 12.41s
noise/impulse_noise/1 - Total images 3000 - 16.61s
noise/impulse_noise/2 - Total images 3000 - 17.62s
noise/impulse_noise/3 - Total images 3000 - 19.29s
noise/impulse_noise/4 - Total images 3000 - 18.92s
noise/impulse_noise/5 - Total images 3000 - 18.7