In [3]:
# !pip install image-quality
# !pip install opencv-python==4.6.0.66
# !pip install scikit-learn
#!python image-quality/setup.py install

In [54]:
!pip install brisque

You should consider upgrading via the '/Users/jfhealthlligence/miniconda3/bin/python -m pip install --upgrade pip' command.[0m


In [55]:
#import imquality.brisque as brisque
from brisque import BRISQUE

import cv2
import os
import numpy as np
import re
import skimage
import json
import warnings
warnings.filterwarnings('ignore')

In [56]:
# !rm -r sample_images/blur_images
# !rm -r sample_images/sharpen_images
# !rm -r sample_images/cb_images

In [70]:
path = "sample_images"
closed_fn = "{}/closed_mouth.png".format(path)
open_fn = "{}/open_mouth.png".format(path)
open_fn_2 = "{}/img_2.png".format(path)

closed_orig = cv2.imread(closed_fn)
open_orig = cv2.imread(open_fn)
open_orig_2 = cv2.imread(open_fn_2)

In [71]:
def mkdir_(path):
  try: 
    os.mkdir(path) 
  except OSError as error: 
    print(error)

In [72]:
global blur_path, sharp_path, cb_path
blur_path = "{}/blur_images".format(path)
sharp_path = "{}/sharpen_images".format(path)
cb_path = "{}/cb_images".format(path)

# mkdir_(blur_path)
# mkdir_(sharp_path)
# mkdir_(cb_path)

In [73]:
from numpy import array, linspace
from sklearn.cluster import MeanShift, estimate_bandwidth
from joblib import Parallel, delayed
import joblib
from matplotlib import pyplot as plt 
import math

class ImageAugLabeling:
    
    def __init__(self, img, iters, filename):
        self.img = img
        self.iters = iters
        self.filename = filename
        self.path = "sample_images"
        self.blur_path = blur_path
        self.sharp_path = sharp_path
        self.cb_path = cb_path
        self.alpha = 1.0
        self.beta = 0
        self.brisque_scores = {}
    
    def brisque_calc(self, file_type, path):
        self.brisque_scores = {}
        for fn in os.listdir(path):
            file = os.path.join(path, fn)
            if file_type in file:
                # self.img = cv2.imread(file)
                file_num = int(re.findall('\d+', file)[0])
                obj = BRISQUE(file, url=False)
                brisque_score = obj.score()
                if math.isnan(brisque_score):
                    brisque_score = 0.0
                self.brisque_scores[file_num] = brisque_score
        return self.brisque_scores
    
    def blur_effect(self):
        kernel_sizes = [(3, 3)] + [(x+6, x+6) for x in range(0, self.iters*3-3, 3)]
        i = 1
        for (x, y) in kernel_sizes:
            blur = cv2.blur(self.img, (x, y))
            blur_filename = "{}/Blur_{}_{}.jpg".format(self.blur_path, self.filename, i)
            cv2.imwrite(blur_filename, blur)
            i += 1 
        return self.brisque_calc("Blur", self.blur_path)
            
    def sharpen(self):
       
        i = 1
        for multiplier in range(5, self.iters*5+1, 5):
            blur = cv2.GaussianBlur(self.img, (0, 0), multiplier)
            weights = cv2.addWeighted(self.img, 1.5, blur, -0.9, 0)
        
            h, w = self.img.shape[:2]
            sharpened_img = np.zeros([h, w, 3], dtype=self.img.dtype)
            sharpened_img[0:h, 0:w, :] = weights
            sharpened_filename = "{}/Sharp_{}_{}.jpg".format(self.sharp_path, self.filename, i)
            cv2.imwrite(sharpened_filename, sharpened_img)
            i += 1
        return self.brisque_calc("Sharp", self.sharp_path)  
        
    def contrast_brightness_effect(self):
        for i in range(1, self.iters+1//2):
            cb_img = cv2.convertScaleAbs(self.img, alpha=self.alpha, beta=self.beta)
            self.alpha -= 0.1
            cb_filename = "{}/CB_{}_{}.jpg".format(self.cb_path, self.filename, i)
            cv2.imwrite(cb_filename, cb_img)
        for i in range(self.iters+1//2, self.iters+1):
            cb_img = cv2.convertScaleAbs(self.img, alpha=self.alpha, beta=self.beta)
            self.alpha += 0.1
            cb_filename = "{}/CB_{}_{}.jpg".format(self.cb_path, self.filename, i)
            cv2.imwrite(cb_filename, cb_img)
        return self.brisque_calc("CB", self.cb_path)
    
    def labeling(self, array, model_filename, bandwidth=5, see_plot=False):
        scores_labels = {}
        ms = MeanShift(bandwidth=bandwidth, bin_seeding=True)
        ms.fit(array)
        labels = ms.labels_
        cluster_centers = ms.cluster_centers_
        
        
        labels_unique = np.unique(labels)
        n_clusters_ = len(labels_unique)
        if n_clusters_ != 2: 
            print("{}: No. of clusters: {}. Automatic adjusting...".format(model_filename, n_clusters_))
            if n_clusters_ > 2:
                bandwidth += 1 
            elif n_clusters_ < 2:
                bandwidth -= 1
            return self.labeling(array, model_filename, bandwidth) #automatic adjustment
        else:        
            model_filename += ".pkl"
            joblib.dump(ms, model_filename)
            ms_loaded = joblib.load(model_filename)
            if see_plot:
                brisque_scores_test = [[x] for x in np.random.uniform(0.0, 100.0, 50)]
                y_pred = ms_loaded.predict(brisque_scores_test)

                brisque_scores_1d = [num for arr in brisque_scores_test for num in arr]

                plt.scatter(brisque_scores_1d, brisque_scores_1d, c=y_pred, cmap="viridis")
                plt.xlabel("Feature 1")
                plt.ylabel("Feature 2")
            scores_arr = [score for sub_arr in array.tolist() for score in sub_arr]
            for score, label in zip(scores_arr, labels):
                label_string = ""
                if label == 1: 
                    label_string = "True"
                else:
                    label_string = "False"
                scores_labels[score] = label_string
        return scores_labels
            
        
    def results(self):
        blur_dict = dict(sorted(self.blur_effect().items(), key=lambda item: item[1]))
        blur_arr = array(list(blur_dict.values())).reshape(-1, 1)
        blur_label_dict = self.labeling(blur_arr, "blur_label_model")
        
        sharp_dict = dict(sorted(self.sharpen().items(), key=lambda item: item[1]))
        sharp_arr = array(list(sharp_dict.values())).reshape(-1, 1)
        sharp_label_dict = self.labeling(sharp_arr, "sharp_label_model")
        
        cb_dict = dict(sorted(self.contrast_brightness_effect().items(), key=lambda item: item[1]))
        cb_arr = array(list(cb_dict.values())).reshape(-1, 1)
        cb_label_dict = self.labeling(cb_arr, "cb_label_model")
        
        json.dump(blur_label_dict, open("blur_label_dict.json", 'w'))
        json.dump(sharp_label_dict, open("sharp_label_dict.json", 'w'))
        json.dump(cb_label_dict, open("cb_label_dict.json", 'w'))
        
        return (blur_label_dict, sharp_label_dict, cb_label_dict)

In [74]:
# %%time
# test = ImageAugLabeling(open_orig_2, 50, "open_mouth_two") 
# results = test.results()

# blur_label_dict, sharp_label_dict, cb_label_dict = results
'''
This function and piece of code are only used if and only if the process of the labeling crashes 
your VM/driver after running it. Otherwise, you can run the code block as is with code below being commented
out.
'''

def convert_key(in_dict):
    return {float(k):v for k,v in in_dict.items()}

blur_label_dict = convert_key(json.load(open("blur_label_dict.json")))
sharp_label_dict = convert_key(json.load(open("sharp_label_dict.json")))
cb_label_dict = convert_key(json.load(open("cb_label_dict.json")))

results = (blur_label_dict, sharp_label_dict, cb_label_dict)

In [75]:
from collections import Counter
import random

'''
This piece of code is an additional padding via oversampling to deal with class imbalances that 
the initial augmentation was presenting.
'''

def find_first_false(input_dict):
    for k, v in input_dict.items():
        if v == 'False':
            return k


def padding_augmentation(aug_dict, padding=1000):
    aug_thresh = find_first_false(aug_dict)
    aug_count = Counter(aug_dict.values())
    aug_pad_val = aug_count['False'] - aug_count['True']
    random.seed(42)
    aug_pad = [np.random.uniform(0, aug_thresh) for _ in range(0, int(aug_pad_val))]
    aug_pad_dict = {score:'True' for score in aug_pad}
    aug_pad_dict = dict(list(aug_pad_dict.items()) + list(aug_dict.items()))

    aug_pad_true = [np.random.uniform(0, aug_thresh) for _ in range(0, padding)]
    aug_pad_false = [np.random.uniform(aug_thresh, 100) for _ in range(0, padding)]
    aug_pad_true_dict = {score:'True' for score in aug_pad_true}
    aug_pad_false_dict = {score:'False' for score in aug_pad_false}
    
    
    final_aug_dict = dict(list(aug_pad_true_dict.items()) + list(aug_pad_dict.items()) + list(aug_pad_false_dict.items()))
    return final_aug_dict


blur_label_dict_padded = padding_augmentation(blur_label_dict, 8000)
sharp_label_dict_padded = padding_augmentation(sharp_label_dict, 8000)
cb_label_dict_padded = padding_augmentation(cb_label_dict, 8000)

results = (blur_label_dict_padded, sharp_label_dict_padded, cb_label_dict_padded)

In [76]:
from sklearn.base import BaseEstimator 
from sklearn.linear_model import LogisticRegression

class SwitchClassifier(BaseEstimator):
    def __init__(self, classifier=LogisticRegression()):
        self.classifier = classifier
    
    def fit(self, X, y=None, **kwargs):
        self.classifier.fit(X, y)
    
    def predict(self, X, y=None):
        self.classifier.predict(X)
        
    def predict_proba(self, X, y=None):
        return self.classifier.predict_proba(X)
    
    def score(self, X, y=None):
        return self.classifier.score(X, y)

In [77]:
from sklearn.ensemble import VotingClassifier, RandomForestClassifier
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import learning_curve
from sklearn.metrics import classification_report

from sklearn.metrics import confusion_matrix
import seaborn as sns 
import ast

class TrainAndTest():
    def __init__(self, orig_img_fn, orig_img, results, test_size=0.2, verbose=None, display_train_test_results=None, plot_cm=None):
        self.orig_img_fn = orig_img_fn
        self.orig_img = orig_img
        self.results = results
        self.test_size = test_size
        self.verbose = verbose or 0 
        self.display_train_test_results = display_train_test_results or False
        self.plot_cm = plot_cm or False
        
     
    def grid_search_proc(self, x_train, y_train):
    
        pipe = Pipeline([('preprocessing', MinMaxScaler()), ('classifier', VotingClassifier(SwitchClassifier(),
                                                                                           voting="soft"))])

        lr = LogisticRegression(random_state=42)
        np.random.seed(42)
        mnb = MultinomialNB()
        rf = RandomForestClassifier(random_state=42)

        param_grid = [{'classifier': [lr], 
                       'preprocessing': [MinMaxScaler(feature_range=(0, 1))],
                       "classifier__C": list(np.logspace(0, 4, 10))},
                      {'classifier': [mnb], 'preprocessing':[MinMaxScaler(feature_range=(0, 1))],
                       'classifier__alpha': [0.001, 0.01, 0.1, 1, 10, 100]},
                      {'classifier': [rf], 
                       'preprocessing': [MinMaxScaler(feature_range=(0, 1))],
                       'classifier__n_estimators': [5, 10, 50, 100, 300],
                       'classifier__min_samples_split': [3, 5, 10, 20],
                       'classifier__max_depth': [3, 5, 10, 20]}]


        clf = GridSearchCV(pipe, param_grid, cv = 5, n_jobs= -1, verbose=self.verbose)
        clf.fit(x_train, y_train)
        return clf    
        
    def aug_test_results(self, aug_dict, aug_name, model_fn):
        aug_x, aug_y = array(list(aug_dict.keys())).reshape(-1, 1), np.array(list(aug_dict.values()))
        aug_x_train, aug_x_test, aug_y_train, aug_y_test = train_test_split(aug_x, aug_y,
                                                        test_size=self.test_size, random_state=42)
        
        grid_search = self.grid_search_proc(aug_x_train, aug_y_train)
        grid_search_best = grid_search.best_estimator_
        grid_search_best_str = str(grid_search_best)
        grid_search_best_name = grid_search_best_str[0:grid_search_best_str.index('(')]

        grid_search_cv_score = grid_search.best_score_*100
        grid_search_test_score = grid_search.score(aug_x_test, aug_y_test)*100
        
        if self.display_train_test_results:
            print("Best parameters:")
            print(grid_search_best)
            print("Cross Validation Score: {:.2f}%".format(grid_search_cv_score))
            print("Test Dataset Score: {:.2f}%".format(grid_search_test_score))

        model_fn += ".joblib"
        joblib.dump(grid_search_best, model_fn)
        aug_clf_model = joblib.load(model_fn)

        if self.plot_cm:
            aug_clf_model.fit(aug_x_train, aug_y_train)
            aug_y_pred = aug_clf_model.predict(aug_x_test)
            labels = ['True', 'False']
            matrix = confusion_matrix(aug_y_test, aug_y_pred, labels = labels)
            sns.heatmap(matrix, annot=True, xticklabels = labels, yticklabels = labels)
            plt.ylabel('Actual Label')
            plt.xlabel('Predicted Label')
            plt.title('Confusion Matrix from Augmentation {} and {} Model'.format(aug_name, grid_search_best_name)) 
            plt.show()
            plt.savefig('{}_{}.png'.format(aug_name, grid_search_best_name))
            
            
            train_sizes, train_scores, test_scores = learning_curve(estimator=grid_search_best, 
                                                                    X=aug_x_train, y=aug_y_train,
                                                       cv=10, train_sizes=np.linspace(0.1, 1.0, 10), n_jobs=1)
            train_mean = np.mean(train_scores, axis=1)
            train_std = np.std(train_scores, axis=1)
            test_mean = np.mean(test_scores, axis=1)
            test_std = np.std(test_scores, axis=1)
            #
            # Plot the learning curve
            #
            plt.plot(train_sizes, train_mean, color='blue', marker='o', markersize=5, label='Training Accuracy')
            plt.fill_between(train_sizes, train_mean + train_std, train_mean - train_std, alpha=0.15, color='blue')
            plt.plot(train_sizes, test_mean, color='green', marker='+', markersize=5, linestyle='--', label='Validation Accuracy')
            plt.fill_between(train_sizes, test_mean + test_std, test_mean - test_std, alpha=0.15, color='green')
            plt.title('{}: Learning Curve'.format(aug_name))
            plt.xlabel('Training Data Size')
            plt.ylabel('Model accuracy')
            plt.grid()
            plt.legend(loc='lower right')
            plt.show()
            plt.savefig('{}_{}_Learning Curve'.format(aug_name, grid_search_best_name))
            
            ## Classification Report
            print("{}: Classification Report".format(aug_name))
            print(classification_report(aug_y_test, aug_y_pred))
        return aug_x_train, aug_y_train
    
    def test_display(self): 
        blur_label_dict, sharp_label_dict, cb_label_dict = self.results 
        blur_x_train, blur_y_train = self.aug_test_results(blur_label_dict, "Blur Effect", "blur_clf")
        sharp_x_train, sharp_y_train = self.aug_test_results(sharp_label_dict, "Sharpness Effect", "sharp_clf")
        cb_x_train, cb_y_train = self.aug_test_results(cb_label_dict, "Contrast-Brightness Effect", "cb_clf")
        return blur_x_train, blur_y_train, sharp_x_train, sharp_y_train, cb_x_train, cb_y_train
    
    def final_testing(self):
        obj = BRISQUE(self.orig_img_fn, url=False)
        brisque_score = obj.score()
        blur_x_train, blur_y_train, sharp_x_train, sharp_y_train, cb_x_train, cb_y_train = self.test_display()
        
        orig_img_fn = self.orig_img_fn[self.orig_img_fn.find("/")+1:]
        blur_clf = joblib.load("blur_clf.joblib")
        sharp_clf = joblib.load("sharp_clf.joblib")
        cb_clf = joblib.load("cb_clf.joblib")

        result = {}
        aug_classifiers_dict = {"Bluriness": blur_clf, "Sharpness": sharp_clf, "Contrast/Brightness": cb_clf} 
        for aug_name, classifier_model in aug_classifiers_dict.items():
            predicted_value = ast.literal_eval(classifier_model.predict([[brisque_score]])[0])
            result["Acceptable {}".format(aug_name)] = predicted_value
        response = "Response for Photo: {}\n {}".format(orig_img_fn, result) 
        print(response)
        #return result

In [80]:
%%time
final_result = TrainAndTest(closed_fn, closed_orig, results, 0.3, 2, True, True)
final_result.final_testing()

ValueError: the input array must have size 3 along `channel_axis`, got (946, 580, 4)

In [213]:
%%time
final_result_2 = TrainAndTest(closed_fn, closed_orig, results, 0.3)
final_result_2.final_testing()

Response for Photo: closed_mouth.png
 {'Acceptable Bluriness': True, 'Acceptable Sharpness': False, 'Acceptable Contrast/Brightness': True}
CPU times: user 3.57 s, sys: 187 ms, total: 3.76 s
Wall time: 22 s


In [81]:
import skimage
print(skimage.__version__)

0.19.3


In [82]:
import sklearn
print(sklearn.__version__)

1.1.1
