# Validation of drift detection methods
## Comparing Proposed method(mahalanobis distance base) and Conventional method(prediction possibility base)

# ------------------------------------------------------------------------------------------

## Prepare

In [None]:
#import
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPool2D
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from keras.callbacks import EarlyStopping
from tensorflow.keras.models import save_model, load_model

from tensorflow.keras.layers import Dense, Activation, Dropout, Flatten, Input, Lambda
from tensorflow.keras.utils import plot_model, to_categorical
from keras.callbacks import TensorBoard

import matplotlib.pyplot as plt
import plotly.express as px
import glob
import cv2
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import random
import time
from mpl_toolkits.mplot3d import Axes3D

import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.spatial import distance

from tensorflow.keras.applications import MobileNet
from tensorflow.keras.optimizers import SGD
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score
import seaborn as sns

import tensorflow.keras.backend as K
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

from keras.models import Model
from gradcamutils import GradCam, GradCamPlusPlus, ScoreCam, GuidedBackPropagation, superimpose, read_and_preprocess_img, build_guided_model

from affine_a_method_det import set_deterministic, fit_affine_A_method_deterministic, inverse_transform_ridge, mahalanobis_sq

In [None]:
#fuction for making label
def create_image_labels(n_light, n_water, n_blackline, n_discoloration, n_dotsonline, n_adhesion, n_scratch):
    """
    function for making label corresponding to each abnormal mode
    
    Args:
        n_light (int): the number of light images
        n_water (int): the number of water images
        n_blackline (int): the number of black line images
        n_discoloration (int): the number of discoloration images
        n_dotsonline (int): the number of dots on line images
        n_adhesion (int): the number of adhesion images
        n_scrtch (int): the number of surface scratches images
    
    Returns:
        pd.DataFrame: 1collumn=image„ÄÅ2collumn=label
    """
    # Name list for each abnormal mode
    light_data = [f"Light-{i}" for i in range(1, n_light + 1)]
    water_data = [f"Water-{i}" for i in range(1, n_water + 1)]
    blackline_data = [f"BlackLine-{i}" for i in range(1, n_blackline + 1)]
    discoloration_data = [f"Discoloration-{i}" for i in range(1, n_discoloration + 1)]
    dotsonline_data = [f"DotsOnLine-{i}" for i in range(1, n_dotsonline + 1)]
    copper_data = [f"Adhesion-{i}" for i in range(1, n_adhesion + 1)]
    spark_data = [f"SurfaceScratch-{i}" for i in range(1, n_scratch + 1)]

    # Label list
    light_labels = [0] * n_light
    water_labels = [1] * n_water
    blackline_labels = [2] * n_blackline
    discoloration_labels = [3] * n_discoloration
    dotsonline_labels = [4] * n_dotsonline
    adhesion_labels = [5] * n_adhesion
    scratch_labels = [6] * n_scratch

    # make dataframe by combining data
    data = list(zip(light_data + water_data + blackline_data + discoloration_data + dotsonline_data + copper_data + spark_data, 
                    light_labels + water_labels + blackline_labels + discoloration_labels + dotsonline_labels + copper_labels + spark_labels))
    df = pd.DataFrame(data, columns=["image", "label"])

    return df

def load_7mode_images(src_dirs):
    """
    src_dirs: list containing full pass of seven folders
              ex: [
                   r"C:/path/mode0/*jpg",
                   r"C:/path/mode1/*jpg",
                   ...
                  ]

    return:
      all_images      :list of all images(cv2) 
      images_by_class : list of each abnormal mode images [list0, list1, ..., list6]
      labels          : list of labels corresponding to each image
      nums            : list of the number of each class images [n0,n1,...,n6]
    """

    assert len(src_dirs) == 7, "Designate 7 folder pass"

    images_by_class = []
    nums = []

    # process
    for i, path in enumerate(src_dirs):
        filepaths = glob.glob(path)
        print(f"Class {i}: {len(filepaths)} files")

        imgs = []
        for fp in filepaths:
            img = cv2.imread(fp)
            if img is not None:
                imgs.append(img)

        images_by_class.append(imgs)
        nums.append(len(imgs))

    # combine
    all_images = []
    for cls_imgs in images_by_class:
        all_images.extend(cls_imgs)

    # make label
    labels = create_image_labels(nums) 

    return all_images, images_by_class, labels, nums


In [None]:
#get pass

#wire raw images
#TrainingData
source_dirs = [
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
]
#TestData
source_test_dirs = [
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
]

#bright images
#TrainingData
target1_dirs = [
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
]
#TestData
test_target1_dirs = [
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
]

#camera dust images
#TrainingData
target2_dirs = [
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
]
#TestData
test_target2_dirs = [
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
    r"C:\Users\pass",
]


In [None]:
#get data and label
source_all_imgs, source_imgs_by_class, source_label, source_num = load_7mode_images(source_dirs)
source_test_all_imgs, source_test_imgs_by_class, source_test_label, source_test_num = load_7mode_images(source_test_dirs)
target1_all_imgs, target1_imgs_by_class, target1_label, target1_num = load_7mode_images(target1_dirs)
test_target1_all_imgs, test_target1_imgs_by_class, test_target1_label, test_target1_num = load_7mode_images(test_target1_dirs)
target2_all_imgs, target2_imgs_by_class, target2_label, target2_num = load_7mode_images(target2_dirs)
test_target2_all_imgs, test_target2_imgs_by_class, test_target2_label, test_target2_num = load_7mode_images(test_target2_dirs)

In [None]:
#normalization of images
sourcefile_list = [file.astype(float)/255 for file in source_all_imgs]
sourcefile_list = [cv2.resize(file, (360, 270)) for file in sourcefile_list]
sourcefile_test_list = [file.astype(float)/255 for file in source_test_all_imgs]
sourcefile_test_list = [cv2.resize(file, (360, 270)) for file in sourcefile_test_list]
targetfile1_list = [file.astype(float)/255 for file in target1_all_imgs]
targetfile1_list = [cv2.resize(file, (360, 270)) for file in targetfile1_list]
test_targetfile1_list = [file.astype(float)/255 for file in test_target1_all_imgs]
test_targetfile1_list = [cv2.resize(file, (360, 270)) for file in test_targetfile1_list]
targetfile2_list = [file.astype(float)/255 for file in target2_all_imgs]
targetfile2_list = [cv2.resize(file, (360, 270)) for file in targetfile2_list]
test_targetfile2_list = [file.astype(float)/255 for file in test_target2_all_imgs]
test_targetfile2_list = [cv2.resize(file, (360, 270)) for file in test_targetfile2_list]

#numpy list
original_source_label = source_label["label"]
original_source_label = np.array(original_source_label)
original_source_test_label = source_test_label["label"]
original_source_test_label = np.array(original_source_test_label)
original_target1_label = target1_label["label"]
original_target1_label = np.array(original_target1_label)
original_test_target1_label = test_target1_label["label"]
original_test_target1_label = np.array(original_test_target1_label)
original_target2_label = target2_label["label"]
original_target2_label = np.array(original_target2_label)
original_test_target2_label = test_target2_label["label"]
original_test_target2_label = np.array(original_test_target2_label)


#dummy parameter 
source_label = to_categorical(source_label["label"])
source_test_label = to_categorical(source_test_label["label"])
target1_label = to_categorical(target1_label["label"])
test_target1_label = to_categorical(test_target1_label["label"])
target2_label = to_categorical(target2_label["label"])
test_target2_label = to_categorical(test_target2_label["label"])

#change the data to numpy list
#save original data
raw_sourcefile_list = sourcefile_list
raw_sourcefile_test_list = sourcefile_test_list
raw_targetfile1_list = targetfile1_list
raw_test_targetfile1_list = test_targetfile1_list
raw_targetfile2_list = targetfile2_list
raw_test_targetfile2_list = test_targetfile2_list

#numpy list
sourcefile_list = np.array(sourcefile_list)
sourcefile_test_list = np.array(sourcefile_test_list)
targetfile1_list = np.array(targetfile1_list)
test_targetfile1_list = np.array(test_targetfile1_list)
targetfile2_list = np.array(targetfile2_list)
test_targetfile2_list = np.array(test_targetfile2_list)

# ------------------------------------------------------------------------------------------

## Proposed method

In [None]:
# read base model
base_model = load_model('BaseModel.h5')
# separate feature extractor and output layer
feature_output = base_model.layers[-5].output

# GlobalAveragePooling
pooled_output = layers.GlobalAveragePooling2D()(feature_output)

# make feature extractor from base model
feature_extractor = Model(inputs=base_model.input, outputs=pooled_output)
feature_extractor.summary()

In [None]:
#extract features(feature vector) from each image
def extract_features(model, images):
    return model.predict(images, batch_size=32)

source_features = extract_features(feature_extractor, sourcefile_list)
target1_features = extract_features(feature_extractor, targetfile1_list)
target2_features = extract_features(feature_extractor, targetfile2_list)

In [None]:
#reduce the dimension from 1024 to 100
n_comp = 100
pca = PCA(n_components=n_comp, svd_solver='full', random_state=42)
features_pca = pca.fit_transform(source_features)
# mapping data to the pca space made above
target1_features_pca = pca.transform(target1_features)
target2_features_pca = pca.transform(target2_features)

In [None]:
# ===== calculate center of each cluster =====
cluster_stats = {}
for label in np.unique(original_source_label):
    cluster_data = features_pca[original_source_label == label]
    center = np.mean(cluster_data, axis=0)
    cov_matrix = np.cov(cluster_data, rowvar=False)
    cov_inv = np.linalg.pinv(cov_matrix)  
    cluster_stats[label] = {
        "center": center,
        "cov_inv": cov_inv
    }

# ===== calculate mahalanobis distance as drift =====
raw_drift_scores = []
target_drift_scores = []
provided_drift_scores = []
results = []
full_results = []

# list containing minimum mahalanobis distance for each data
orig_min_dists = []   # raw images
gamma_min_dists = []  # bright images
dirt_min_dists = []   # camera dust images


#calculate drift for raw images
for i, vec in enumerate(features_pca):
    distances = {}
    for label, stats in cluster_stats.items():
        center = stats["center"]
        cov_inv = stats["cov_inv"]
        dist = distance.mahalanobis(vec, center, cov_inv)
        distances[label] = dist

    min_dist_label = min(distances, key=distances.get)
    min_dist = distances[min_dist_label]

    orig_min_dists.append(min_dist)

    results.append({
        "index": i,
        "nearest_cluster": min_dist_label,
        "mahalanobis_distance": round(min_dist, 4)
    })
    full_results.append({
        "index": i,
        "distance_from_0": distances[0],
        "distance_from_1": distances[1],
        "distance_from_2": distances[2],
        "distance_from_3": distances[3],
        "distance_from_4": distances[4],
        "distance_from_5": distances[5],
        "distance_from_6": distances[6]
    })

#calculate drift for bright images
for i, vec in enumerate(all_target1_300_features_pca):
    distances = {}
    for label, stats in cluster_stats.items():
        center = stats["center"]
        cov_inv = stats["cov_inv"]
        dist = distance.mahalanobis(vec, center, cov_inv)
        distances[label] = dist

    min_dist_label = min(distances, key=distances.get)
    min_dist = distances[min_dist_label]

    gamma_min_dists.append(min_dist)

    results.append({
        "index": i,
        "nearest_cluster": min_dist_label,
        "mahalanobis_distance": round(min_dist, 4)
    })
    full_results.append({
        "index": i,
        "distance_from_0": distances[0],
        "distance_from_1": distances[1],
        "distance_from_2": distances[2],
        "distance_from_3": distances[3],
        "distance_from_4": distances[4],
        "distance_from_5": distances[5],
        "distance_from_6": distances[6]
    })

#calculate drift for camera dust images
for i, vec in enumerate(all_target2_300_features_pca):
    distances = {}
    for label, stats in cluster_stats.items():
        center = stats["center"]
        cov_inv = stats["cov_inv"]
        dist = distance.mahalanobis(vec, center, cov_inv)
        distances[label] = dist

    min_dist_label = min(distances, key=distances.get)
    min_dist = distances[min_dist_label]

    dirt_min_dists.append(min_dist)

    results.append({
        "index": i,
        "nearest_cluster": min_dist_label,
        "mahalanobis_distance": round(min_dist, 4)
    })
    full_results.append({
        "index": i,
        "distance_from_0": distances[0],
        "distance_from_1": distances[1],
        "distance_from_2": distances[2],
        "distance_from_3": distances[3],
        "distance_from_4": distances[4],
        "distance_from_5": distances[5],
        "distance_from_6": distances[6]
    })


# save as CSV
pd.DataFrame(results).to_csv("mahalanobis_cluster_distances.csv", index=False)
pd.DataFrame(full_results).to_csv("mahalanobis_full_cluster_distances.csv", index=False)
print("output done: mahalanobis_cluster_distances.csv")

# ------------------------------------------------------------------------------------------

## Conventional method

In [None]:
#calculate prediction possibility

#get prediction result from base model
source_predictions = base_model.predict(sourcefile_list)
target1_predictions = base_model.predict(targetfile1_list)
target2_predictions = base_model.predict(targetfile2_list)

#get predicition possobility
source_possibility_results = source_predictions.max(axis = 1)
target1_possibility_results = target1_predictions.max(axis = 1)
target2_possibility_results = target2_predictions.max(axis = 1)

#save
np.savetxt("source_possibility_results.csv", source_possibility_results)
np.savetxt("target1_possibility_results.csv", target1_possibility_results)
np.savetxt("target2_possibility_results.csv", target2_possibility_results)