In [25]:
import pandas as pd
from tensorflow.keras.models import load_model
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from evidently.report import Report
from evidently.metric_preset import ClassificationPreset
from evidently.ui.workspace.cloud import CloudWorkspace
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import entropy
import mlflow
from scipy.stats import wasserstein_distance
from river.drift import PageHinkley


In [None]:

# Set MLflow experiment name
mlflow.set_experiment("Concept drift experiments inverting labels")

In [None]:
target_squares = {"g8", "b1", "e5"}

In [None]:
original_label_mapping = {
    "bb": 1, "bk": 1, "bn": 1, "bp": 1, "bq": 1, "br": 1,  # Black pieces (occupied)
    "wb": 1, "wk": 1, "wn": 1, "wp": 1, "wq": 1, "wr": 1,  # White pieces (occupied)
    "empty": 0  # Empty square
}

In [3]:
# Load your model
model = load_model('/Users/ximenamoure/Desktop/drift_last/models/mobilenet_v2_occupancy_tf216.keras')

In [4]:
ref_path = "/Users/ximenamoure/Desktop/drift_last/reference_dataset"

In [5]:
new_path = "/Users/ximenamoure/Chess-Piece-Classification-Dataset/images/processed/occupancy/split0"

In [6]:
#split_name = os.path.basename(new_path)

In [7]:
categories = ["bb", "bk", "bn", "bp", "bq", "br", "wb", "wk", "wn", "wp", "wq", "wr", "empty"]

In [8]:
label_mapping = {category: 1 for category in categories if category != "empty"}
label_mapping["empty"] = 0

In [9]:
# Function to load images and their labels
def load_images_and_labels(base_path, categories):
    images = []
    labels = []
    image_paths = []

    for category in categories:
        category_path = os.path.join(base_path, category)
        for img_name in os.listdir(category_path):
            img_path = os.path.join(category_path, img_name)
            img = cv2.imread(img_path)
            img = cv2.resize(img, (128, 128))
            images.append(img)
            labels.append(label_mapping[category])
            image_paths.append(img_path)


    return np.array(images), np.array(labels), np.array(image_paths)

In [None]:
def load_images_and_labels_with_inversion(base_path, categories):
    images = []
    labels = []
    image_paths = []

    for category in categories:
        category_path = os.path.join(base_path, category)
        for img_name in os.listdir(category_path):
            img_path = os.path.join(category_path, img_name)

            # Load and process the image
            img = cv2.imread(img_path)
            img = cv2.resize(img, (128, 128))
            images.append(img)

            # Determine original label
            original_label = original_label_mapping[category]

            # Extract the square position from the filename
            square_position = img_name.split('_')[-1].split('.')[0]  # Gets "g8" from filename

            # Check if the square position matches the target for inversion
            if square_position in target_squares:
                # Invert the label: occupied (1) becomes empty (0) and vice versa
                inverted_label = 1 - original_label
                labels.append(inverted_label)
            else:
                labels.append(original_label)

            image_paths.append(img_path)

    return np.array(images), np.array(labels), np.array(image_paths)

In [10]:
def get_model_predictions(model, images, batch_size=32, threshold = 0.5):
    num_images = len(images)
    predictions = []

    # Process in batches
    for start in range(0, num_images, batch_size):
        end = min(start + batch_size, num_images)
        batch_images = images[start:end]
        batch_predictions = model.predict(batch_images)
        print("predictions", batch_predictions)

        # Convert probabilities to binary labels based on threshold
        batch_labels = (batch_predictions >= threshold).astype(int)

        # Flatten the array and add to the list of predictions
        predictions.extend(batch_labels.flatten())
        print(f"Processed {end}/{num_images} images ({(end / num_images) * 100:.2f}% complete)")

    return np.array(predictions)

In [11]:
def get_model_scores(model, images, batch_size=32):
    num_images = len(images)
    scores = []
    for start in range(0, num_images, batch_size):
        end = min(start + batch_size, num_images)
        batch_images = images[start:end]
        batch_scores = model.predict(batch_images)
        scores.extend(batch_scores.flatten())

        print(f"Processed {end}/{num_images} images ({(end / num_images) * 100:.2f}% complete)")

    return np.array(scores)

In [12]:
def get_metrics(ground_truth, predictions):
    accuracy = accuracy_score(ground_truth, predictions)
    precision = precision_score(ground_truth, predictions)
    recall = recall_score(ground_truth, predictions)
    f1 = f1_score(ground_truth, predictions)
    return accuracy, precision, recall, f1

In [13]:
# Function to calculate batch-wise error rates
def batch_error_rates(model, images, labels, batch_size=32):
    batch_errors = []
    for start in range(0, len(images), batch_size):
        end = min(start + batch_size, len(images))
        batch_images = images[start:end]
        batch_labels = labels[start:end]

        # Get predictions for the batch
        batch_preds = model.predict(batch_images)
        batch_preds = (batch_preds >= 0.5).astype(int).flatten()

        # Calculate error rate for this batch
        incorrect = np.sum(batch_preds != batch_labels)
        error_rate = incorrect / len(batch_labels)
        batch_errors.append(error_rate)

    return batch_errors


In [14]:
def cusum_test(error_rates, threshold=0.1):
    """
    CUSUM test for detecting shifts in the error rate sequence.

    Args:
        error_rates (list or array): Sequence of error rates.
        threshold (float): Threshold for detecting drift.

    Returns:
        bool: True if drift is detected, False otherwise.
    """
    cumulative_sum = 0
    drift_detected = False

    for error in error_rates:
        # Calculate cumulative sum of deviations
        cumulative_sum += (error - np.mean(error_rates))
        if abs(cumulative_sum) > threshold:
            drift_detected = True
            print("Concept Drift Detected by CUSUM Test")
            break  # Exit loop on drift detection
        if cumulative_sum < 0:
            cumulative_sum = 0

    if not drift_detected:
        print("No Concept Drift Detected by CUSUM Test")
    return drift_detected

In [15]:
def page_hinkley_test(data, threshold=0.1, delta=0.005):
    """
    Page-Hinkley test for detecting sudden changes in a sequence.

    Args:
        data (list or array): Sequence of error rates or other metrics.
        threshold (float): Threshold for detecting drift.
        delta (float): Small constant to reduce the sensitivity to small changes.

    Returns:
        bool: True if drift is detected, False otherwise.
    """
    mean_est = np.mean(data)
    cumulative_sum = 0
    min_sum = 0
    drift_detected = False

    for i, point in enumerate(data):
        cumulative_sum += point - mean_est - delta
        min_sum = min(min_sum, cumulative_sum)
        if cumulative_sum - min_sum > threshold:
            drift_detected = True
            print(f"Concept Drift Detected by Page-Hinkley Test at point {i}")
            break  # Exit loop on drift detection

    if not drift_detected:
        print("No Concept Drift Detected by Page-Hinkley Test")
    return drift_detected

In [16]:
images_ref, labels_ref, img_paths_ref = load_images_and_labels(ref_path, categories)

In [None]:
images, labels_inverted, image_paths = load_images_and_labels_with_inversion(ref_path, categories)


In [None]:
for img_path, label in zip(image_paths, labels_inverted):
    square_position = img_path.split('_')[-1].split('.')[0]
    print(f"Image: {img_path}, Square: {square_position}, Label: {label}")


In [17]:
model = load_model('/Users/ximenamoure/Desktop/drift_last/models/mobilenet_v2_occupancy_tf216.keras')

In [21]:
ws_threshold = 0.12
ws_threshold_prev = 0.08

In [None]:
split_name = "ref_split_label_inversion"

In [None]:
accuracy_drop_threshold = 0.1 # for a 10% decrease
f1_drop_threshold = 0.1 # for a 10% decrease

In [None]:
# --- Start MLflow Run ---
with mlflow.start_run() as run:
    mlflow.set_tag("mlflow.runName", "label_inversion")
    print("Processing and logging reference split...")
    images_ref, labels_ref, img_paths_ref = load_images_and_labels(ref_path, categories)
    predictions_ref = get_model_predictions(model, images_ref)
    batch_errors_ref = batch_error_rates(model, images_ref, labels_ref)
    accuracy_ref, precision_ref, recall_ref, f1_ref = get_metrics(labels_ref, predictions_ref)

    # Log reference metrics and batch errors as an artifact
    mlflow.log_metrics({
        "accuracy_ref": accuracy_ref,
        "precision_ref": precision_ref,
        "recall_ref": recall_ref,
        "f1_ref": f1_ref
    })
    images_new = images_ref
    labels_new = labels_inverted
    predictions_new = get_model_predictions(model, images_new)
    batch_errors_new = batch_error_rates(model, images_new, labels_new)
    accuracy_new, precision_new, recall_new, f1_new = get_metrics(labels_new, predictions_new)
    # Log new split metrics and error rates to MLflow
    mlflow.log_metrics({
            f"{split_name}_accuracy": accuracy_new,
            f"{split_name}_precision": precision_new,
            f"{split_name}_recall": recall_new,
            f"{split_name}_f1": f1_new
    })

    # --- Long-Term Drift Detection with Reference Split ---
    wd_ref = wasserstein_distance(batch_errors_ref, batch_errors_new)
    # Detect drift based on threshold
    drift_detected_ref = wd_ref > ws_threshold

    print("drfit_detected", drift_detected_ref)

    mlflow.log_metric(f"{split_name}_wasserstein_distance_ref", wd_ref)
    mlflow.log_metric(f"{split_name}_drift_detected_ref", int(drift_detected_ref))

    #--- Generate and Log Evidently Report ---
    reference_data = pd.DataFrame({'prediction': predictions_ref, 'target': labels_ref, 'dataset': 'reference'})
    new_data = pd.DataFrame({'prediction': predictions_new, 'target': labels_new, 'dataset': 'new'})
    classification_report = Report(metrics=[ClassificationPreset()])
    classification_report.run(reference_data=reference_data, current_data=new_data)

    # Save and log the Evidently report
    report_file = f"{split_name}_classification_report.html"
    classification_report.save_html(report_file)
    mlflow.log_artifact(report_file)

    results = classification_report.as_dict()
    current_metrics = results["metrics"][0]["result"]["current"]
    reference_metrics = results["metrics"][0]["result"]["reference"]
    accuracy_current = current_metrics["accuracy"]
    f1_current = current_metrics["f1"]
    recall_current = current_metrics["recall"]
    accuracy_reference = reference_metrics["accuracy"]
    f1_reference = reference_metrics["f1"]

    accuracy_significant_drop = (accuracy_reference - accuracy_current) > (accuracy_reference * accuracy_drop_threshold)
    f1_significant_drop = (f1_reference - f1_current) > (f1_reference * f1_drop_threshold)
    print("Significant Accuracy Drop:", accuracy_significant_drop)
    print("Significant F1 Score Drop:", f1_significant_drop)

    mlflow.log_metric("accuracy_significant_drop", int(accuracy_significant_drop))
    mlflow.log_metric("f1_significant_drop", int(f1_significant_drop))
    mlflow.log_metric(f"{split_name}_accuracy", accuracy_current)
    mlflow.log_metric(f"{split_name}_f1", f1_current)
    mlflow.log_metric(f"{split_name}_recall", recall_current)

    target_squares_str = ",".join(target_squares)
    mlflow.log_param("target_squares", target_squares_str)


    print("Run completed and all data logged to MLflow!")
