In [None]:
import cv2
import numpy as np
from PIL import Image
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from mtcnn import MTCNN
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectKBest, f_classif, RFE, SelectFromModel
import numpy.linalg as LA
import pandas as pd
from sklearn.preprocessing import StandardScaler
import math
from sklearn.svm import SVC
from xgboost import XGBClassifier
from sklearn.model_selection import cross_val_score
import warnings
from pathlib import Path
import os

import plotly.io as pio
pio.renderers.default = "notebook"

warnings.filterwarnings('ignore')

In [None]:
class EnhancedFacePredictor:
    
    def __init__(self):
        self.feature_selector = None
        self.scaler = StandardScaler()
        self.models = {}
        self.feature_importance = {}
        self.optimal_features = []

        # troy modify
        self.base_fusion_weights = {
            'weighted_score': 0.75,
            'random_forest': 0.05,
            'xgboost': 0.05,
            'svm': 0.05,
            'statistical': 0.10,
        }
        self._norm_warned_features = set()

    ## Need review
    
    # def _create_advanced_prediction_formula(self, f1_features, f2_features, selected_features, weights):
    #     weighted_score_diff = self._calculate_weighted_score(f1_features, f2_features, selected_features, weights)
    #     rf_score = self._random_forest_prediction(f1_features, f2_features, selected_features)
    #     xgb_score = self._xgboost_prediction(f1_features, f2_features, selected_features)
    #     svm_score = self._svm_prediction(f1_features, f2_features, selected_features)
    #     statistical_score = self._statistical_ensemble(f1_features, f2_features, selected_features)

    #     final_scores = {
    #         'weighted_score': weighted_score_diff * 0.75,    # OG: 0.25
    #         'random_forest': rf_score * 0.05,                # OG: 0.25
    #         'xgboost': xgb_score * 0.05,                     # OG: 0.20
    #         'svm': svm_score * 0.05,                         # OG: 0.15
    #         'statistical': statistical_score * 0.10          # OG: 0.15
    #     }
    #     final_prediction = sum(final_scores.values())
    #     return final_prediction, final_scores

    def _compute_dynamic_weights(self, model_scores: dict) -> dict:

        ## Need review
        """
        Compute per-picture fusion weights.
        model_scores: dict with keys
            ['weighted_score', 'random_forest', 'xgboost', 'svm', 'statistical']
            and their raw scores (can be positive/negative).
        Returns:
            dict with same keys, values sum to 1.
        """
    
        # 1) get base weights
        base = self.base_fusion_weights
    
        # 2) compute confidence of each model for THIS picture
        #    here we use |score|; you can swap to something more fancy later.
        confidences = {k: abs(v) for k, v in model_scores.items()}
    
        # 3) combine base weight and confidence
        #    dynamic_weight_raw = base_weight * confidence
        raw = {}
        for name in model_scores.keys():
            bw = base.get(name, 0.0)
            conf = confidences.get(name, 0.0)
    
            # small epsilon to avoid all-zero in pathological cases
            raw[name] = bw * (conf + 1e-6)
    
        # 4) normalize so that sum(weights) = 1
        total = sum(raw.values())
        if total <= 0:
            # fallback: just use base weights
            return base.copy()
    
        dyn = {name: val / total for name, val in raw.items()}
        return dyn

    def _create_advanced_prediction_formula (self, f1_features, f2_features, selected_features, weights):
        weighted_score_diff = self._calculate_weighted_score(f1_features, f2_features, selected_features, weights)
        rf_score  = self._random_forest_prediction(f1_features, f2_features, selected_features)
        xgb_score = self._xgboost_prediction(f1_features, f2_features, selected_features)
        svm_score = self._svm_prediction(f1_features, f2_features, selected_features)
        statistical_score = self._statistical_ensemble(f1_features, f2_features, selected_features)
    
        # raw per-model scores for THIS picture
        model_scores = {
            'weighted_score': weighted_score_diff,
            'random_forest': rf_score,
            'xgboost': xgb_score,
            'svm': svm_score,
            'statistical': statistical_score,
        }
    
        # 2) get dynamic weights for THIS picture
        dynamic_weights = self._compute_dynamic_weights(model_scores)
    
        # 3) fuse scores using dynamic weights
        final_prediction = sum(
            dynamic_weights[name] * score
            for name, score in model_scores.items()
        )
    
        # (optional) return both for analysis/export
        return final_prediction, {
            **model_scores,
            **{f"w_{k}": v for k, v in dynamic_weights.items()},
        }

    def _calculate_weighted_score(self, f1_features, f2_features, selected_features, weights):
        score_diff = 0
        for feature in selected_features:
            if feature in f1_features and feature in f2_features:
                normalized_diff = self._dynamic_normalization(f1_features[feature], f2_features[feature], feature)
                weight = weights.get(feature, 0.0)
                score_diff += weight * normalized_diff
        return score_diff

    def _dynamic_normalization(self, val1, val2, feature_name):
        normalization_params = {
            'focus_measure': {'max_val': 1000, 'boost_factor': 2.0},
            'texture_variation': {'max_val': 500, 'boost_factor': 1.8},
            'edge_density': {'max_val': 0.1, 'boost_factor': 3.0},
            'border_cohesion': {'max_val': 0.5, 'boost_factor': 2.5},
            'structural_entropy': {'max_val': 1.0, 'boost_factor': 2.2},
            'anisotropy': {'max_val': 1.0, 'boost_factor': 1.5},
            'surface_roughness': {'max_val': 1.0, 'boost_factor': 2.0},
            'multiscale_complexity': {'max_val': 0.01, 'boost_factor': 4.0},
            'cluster_coherence': {'max_val': 0.5, 'boost_factor': 2.0},
            'linearity': {'max_val': 0.1, 'boost_factor': 3.0},
            'planarity': {'max_val': 0.5, 'boost_factor': 1.8},
            'scattering': {'max_val': 0.3, 'boost_factor': 2.0},
            'depth_variance': {'max_val': 3000, 'boost_factor': 1.5},
            'high_frequency_content': {'max_val': 300, 'boost_factor': 1.8},
            'blur_consistency': {'max_val': 500, 'boost_factor': 1.2},
            'frequency_std': {'max_val': 40, 'boost_factor': 1.3},
            'color_consistency': {'max_val': 100, 'boost_factor': 1.1},
            'skin_tone_consistency': {'max_val': 1.0, 'boost_factor': 1.2},
            'border_color_density_cohesion': {'max_val': 100, 'boost_factor': 1.1},
            'border_edge_cohesion': {'max_val': 0.1, 'boost_factor': 2.0}
        }
        params = normalization_params.get(feature_name, {'max_val': 1.0, 'boost_factor': 1.0})
        diff = val1 - val2
        normalized_diff = diff / params['max_val']
        if abs(diff) > (params['max_val'] * 0.1):
            normalized_diff *= params['boost_factor']
        return np.clip(normalized_diff, -1, 1)

## Need review
    # def fit_normalization_stats(self, feature_rows, feature_names=None, min_count=3):
    #     """Fit per-feature scaling from training rows.

    #     feature_rows: list of dicts (one dict per face) with numeric values.
    #     feature_names: optional iterable of expected feature names (e.g., weight keys).
    #     min_count: minimum samples needed before using data-driven scale; otherwise defaults to 1.0.
    #     """
    #     if not feature_rows and not feature_names:
    #         self.normalization_stats = {}
    #         return
    #     names = set(feature_names or [])
    #     for row in feature_rows:
    #         names.update(row.keys())
    #     self.normalization_stats = {}
    #     for feat in sorted(names):
    #         vals = [row[feat] for row in feature_rows if feat in row]
    #         if len(vals) < max(1, min_count):
    #             # fallback scale when not enough samples
    #             self.normalization_stats[feat] = {"scale": 1.0}
    #             continue
    #         vals_arr = np.array(vals)
    #         iqr = np.subtract(*np.percentile(vals_arr, [75, 25]))
    #         std = np.std(vals_arr)
    #         # robust scale: prefer IQR, fall back to std, never below a small floor
    #         scale = max(iqr / 1.349, std, 1e-3)
    #         self.normalization_stats[feat] = {"scale": scale}

    # def _dynamic_normalization(self, val1, val2, feature_name):
    #     stats = getattr(self, "normalization_stats", {}).get(feature_name)
    #     if not stats:
    #         warned = getattr(self, "_norm_warned_features", set())
    #         if feature_name not in warned:
    #             print(f"[warn] No normalization stats for {feature_name}; using fallback tanh scaling.")
    #             warned.add(feature_name)
    #             self._norm_warned_features = warned
    #         return float(np.tanh((val1 - val2) / 3.0))
    #     diff = val1 - val2
    #     scaled = diff / (stats["scale"] + 1e-6)
    #     # smooth clip to keep ordering while bounding influence
    #     return float(np.tanh(scaled / 3.0))
            
    def _random_forest_prediction(self, f1_features, f2_features, selected_features):
        try:
            X = np.array([[f1_features[feat] for feat in selected_features],
                            [f2_features[feat] for feat in selected_features]])
            y = np.array([1, 0])
            rf = RandomForestClassifier(n_estimators=100, random_state=42)
            rf.fit(X, y)
            self.feature_importance['random_forest'] = dict(zip(selected_features, rf.feature_importances_))
            proba = rf.predict_proba(X)[0, 0]
            return (proba - 0.5) * 2
        except Exception as e:
            print(f"Random Forest prediction failed: {e}")
            return 0

    def _xgboost_prediction(self, f1_features, f2_features, selected_features):
        try:
            X = np.array([[f1_features[feat] for feat in selected_features],
                            [f2_features[feat] for feat in selected_features]])
            y = np.array([1, 0])
            xgb = XGBClassifier(n_estimators=100, random_state=42)
            xgb.fit(X, y)
            self.feature_importance['xgboost'] = dict(zip(selected_features, xgb.feature_importances_))
            proba = xgb.predict_proba(X)[0, 0]
            return (proba - 0.5) * 2
        except Exception as e:
            print(f"XGBoost prediction failed: {e}")
            return 0

    def _svm_prediction(self, f1_features, f2_features, selected_features):
        try:
            X = np.array([[f1_features[feat] for feat in selected_features],
                            [f2_features[feat] for feat in selected_features]])
            y = np.array([1, 0])
            X_scaled = self.scaler.fit_transform(X)
            svm = SVC(kernel='rbf', probability=True, random_state=42)
            svm.fit(X_scaled, y)
            proba = svm.predict_proba(X_scaled)[0, 0]
            return (proba - 0.5) * 2
        except Exception as e:
            print(f"SVM prediction failed: {e}")
            return 0

    def _statistical_ensemble(self, f1_features, f2_features, selected_features):
        statistical_measures = []
        for feature in selected_features:
            if feature in f1_features and feature in f2_features:
                val1, val2 = f1_features[feature], f2_features[feature]
                z_score = (val1 - val2) / (np.std([val1, val2]) + 1e-8)
                effect_size = (val1 - val2) / (np.std([val1, val2]) + 1e-8)
                percent_diff = (val1 - val2) / (max(abs(val1), abs(val2)) + 1e-8)
                combined_measure = np.mean([z_score, effect_size, percent_diff])
                statistical_measures.append(combined_measure)
        return np.mean(statistical_measures) if statistical_measures else 0

    def select_optimal_features(self, f1_features, f2_features, all_features, target_ratio=0.6):
        feature_scores = {}
        for feature in all_features:
            if feature in f1_features and feature in f2_features:
                score = 0
                val1, val2 = f1_features[feature], f2_features[feature]
                discriminative_power = abs(val1 - val2) / (abs(val1) + abs(val2) + 1e-8)
                score += discriminative_power * 0.4
                magnitude = max(abs(val1), abs(val2))
                if magnitude > 1e-5:
                    score += min(1.0, magnitude) * 0.3
                else:
                    score -= 0.5
                stability = 1.0 - (abs(val1 - val2) / (max(abs(val1), abs(val2)) + 1e-8))
                score += stability * 0.3
                feature_scores[feature] = score
        n_features = max(5, int(len(all_features) * target_ratio))
        optimal_features = sorted(feature_scores.items(), key=lambda x: x[1], reverse=True)[:n_features]
        self.optimal_features = [feat for feat, score in optimal_features]
        print("\n=== OPTIMAL FEATURE SELECTION ===")
        print("Selected Features:")
        for feat, score in optimal_features:
            print(f"  {feat}: {score:.3f} (Face1: {f1_features[feat]:.3f}, Face2: {f2_features[feat]:.3f})")
        return self.optimal_features

    def calculate_advanced_confidence(self, prediction_scores, selected_features, f1_features, f2_features):
        confidence_factors = {}
        predictions = list(prediction_scores.values())
        consistency = 1.0 - (np.std(predictions) / (np.mean(np.abs(predictions)) + 1e-8))
        confidence_factors['method_consistency'] = consistency * 0.3
        agreeing_features = 0
        for feature in selected_features:
            if (prediction_scores['weighted_score'] > 0 and f1_features[feature] > f2_features[feature]) or \
                (prediction_scores['weighted_score'] < 0 and f2_features[feature] > f1_features[feature]):
                agreeing_features += 1
        feature_agreement = agreeing_features / len(selected_features)
        confidence_factors['feature_agreement'] = feature_agreement * 0.25
        avg_prediction_magnitude = np.mean([abs(score) for score in predictions])
        confidence_factors['prediction_strength'] = avg_prediction_magnitude * 0.25
        feature_quality = len([f for f in selected_features if max(f1_features[f], f2_features[f]) > 0.01])
        feature_quality_ratio = feature_quality / len(selected_features)
        confidence_factors['feature_quality'] = feature_quality_ratio * 0.2
        raw_confidence = sum(confidence_factors.values())
        advanced_confidence = 100 / (1 + np.exp(-10 * (raw_confidence - 0.5)))
        return min(100, max(0, advanced_confidence)), confidence_factors

In [None]:
class FixedFaceAnalyzer:
    def __init__(self):
        ## Need review
        self.authentic_patterns = {
            'focus_measure': {'min': 50, 'max': 500, 'preferred': 'higher'},
            'texture_variation': {'min': 10, 'max': 100, 'preferred': 'higher'},
            'edge_density': {'min': 0.005, 'max': 0.05, 'preferred': 'balanced'},
            'depth_variance': {'min': 500, 'max': 3000, 'preferred': 'balanced'},
            'high_frequency_content': {'min': 100, 'max': 300, 'preferred': 'higher'},
            'blur_consistency': {'min': 50, 'max': 500, 'preferred': 'lower'},
            'structural_entropy': {'min': 0.5, 'max': 0.9, 'preferred': 'balanced'},
            'surface_roughness': {'min': 0.1, 'max': 0.8, 'preferred': 'higher'},
            'cluster_coherence': {'min': 0.3, 'max': 0.8, 'preferred': 'balanced'},
            'anisotropy': {'min': 0.3, 'max': 0.7, 'preferred': 'balanced'}
        }

    def identify_problem_features(self, f1_features, f2_features):
        print("\n PROBLEMATIC FEATURE ANALYSIS")
        print("=" * 50)
        problems = []
        zero_features = []
        for feature in f1_features:
            if f1_features[feature] == 0 and f2_features[feature] == 0:
                zero_features.append(feature)
            elif abs(f1_features[feature]) < 1e-10 and abs(f2_features[feature]) < 1e-10:
                zero_features.append(feature)
        if zero_features:
            print("... ZERO-VALUE FEATURES (likely errors):")
            for feat in zero_features[:5]:
                print(f"   {feat}: Both faces have value 0")
            problems.extend(zero_features)
        identical_features = []
        for feature in f1_features:
            if feature in f2_features:
                diff_ratio = abs(f1_features[feature] - f2_features[feature]) / (max(abs(f1_features[feature]), abs(f2_features[feature])) + 1e-8)
                if diff_ratio < 0.01:
                    identical_features.append((feature, diff_ratio))
        if identical_features:
            print("... NEAR-IDENTICAL FEATURES (no discrimination):")
            for feat, ratio in sorted(identical_features, key=lambda x: x[1])[:5]:
                print(f"   {feat}: Only {ratio:.2%} difference")
            problems.extend([feat for feat, _ in identical_features])
        out_of_range = []
        for feature, pattern in self.authentic_patterns.items():
            if feature in f1_features and feature in f2_features:
                val1, val2 = f1_features[feature], f2_features[feature]
                if val1 < pattern['min'] or val1 > pattern['max'] or val2 < pattern['min'] or val2 > pattern['max']:
                    out_of_range.append((feature, val1, val2, pattern))
        if out_of_range:
            print("... OUT-OF-RANGE FEATURES (suspicious values):")
            for feat, v1, v2, pattern in out_of_range[:5]:
                print(f"   {feat}: Face1={v1:.3f}, Face2={v2:.3f} (expected: {pattern['min']}-{pattern['max']})")
            problems.extend([feat for feat, _, _, _ in out_of_range])
        return problems

    def create_corrected_prediction(self, f1_features, f2_features):
        problematic_features = self.identify_problem_features(f1_features, f2_features)
        reliable_features = []
        for feature in f1_features:
            if feature not in problematic_features:
                if (feature in f1_features and feature in f2_features and
                    f1_features[feature] != f2_features[feature] and
                    max(f1_features[feature], f2_features[feature]) > 0.001):
                    reliable_features.append(feature)
        print(f"\n RELIABLE FEATURES SELECTED ({len(reliable_features)} features):")
        for feat in reliable_features:
            diff = f1_features[feat] - f2_features[feat]
            advantage = "Face1" if diff > 0 else "Face2"
            print(f"   {feat}: {abs(diff):.4f} difference ({advantage})")
        face1_score, face2_score = self._calculate_reliable_scores(f1_features, f2_features, reliable_features)
        if face1_score > face2_score:
            verdict = "Face 1 (Left) is more authentic"
            advantage = face1_score - face2_score
            winner = "Face 1"
        else:
            verdict = "Face 2 (Right) is more authentic"
            advantage = face2_score - face1_score
            winner = "Face 2"
        confidence = self._calculate_reliable_confidence(f1_features, f2_features, reliable_features)
        return {
            'face1_score': face1_score,
            'face2_score': face2_score,
            'verdict': verdict,
            'advantage': advantage,
            'winner': winner,
            'confidence': confidence,
            'reliable_features': reliable_features,
            'problematic_features': problematic_features
        }

    def _calculate_reliable_scores(self, f1_features, f2_features, reliable_features):
        weights = {}
        for feature in reliable_features:
            diff_ratio = abs(f1_features[feature] - f2_features[feature]) / (max(abs(f1_features[feature]), abs(f2_features[feature])) + 1e-8)
            magnitude = max(abs(f1_features[feature]), abs(f2_features[feature]))
            magnitude_weight = min(1.0, magnitude) if magnitude > 0 else 0
            weights[feature] = diff_ratio * 0.7 + magnitude_weight * 0.3
        total_weight = sum(weights.values())
        if total_weight > 0:
            weights = {k: v/total_weight for k, v in weights.items()}
        face1_score = 0
        face2_score = 0
        for feature in reliable_features:
            weight = weights[feature]
            val1, val2 = f1_features[feature], f2_features[feature]
            norm1, norm2 = self._normalize_feature(feature, val1, val2)
            face1_score += weight * norm1
            face2_score += weight * norm2
        return face1_score, face2_score

    def _normalize_feature(self, feature, val1, val2):
        if feature in self.authentic_patterns:
            pattern = self.authentic_patterns[feature]
            range_size = pattern['max'] - pattern['min']
            norm1 = (val1 - pattern['min']) / range_size
            norm2 = (val2 - pattern['min']) / range_size
            norm1 = max(0, min(1, norm1))
            norm2 = max(0, min(1, norm2))
            if pattern['preferred'] == 'lower':
                norm1 = 1 - norm1
                norm2 = 1 - norm2
            return norm1, norm2
        else:
            max_val = max(abs(val1), abs(val2), 1e-8)
            return val1 / max_val, val2 / max_val

    def _calculate_reliable_confidence(self, f1_features, f2_features, reliable_features):
        if not reliable_features:
            return 0
        confidence_factors = []
        num_features_confidence = min(1.0, len(reliable_features) / 10)
        confidence_factors.append(num_features_confidence * 0.2)
        discriminative_powers = []
        for feature in reliable_features:
            diff_ratio = abs(f1_features[feature] - f2_features[feature]) / (max(abs(f1_features[feature]), abs(f2_features[feature])) + 1e-8)
            discriminative_powers.append(diff_ratio)
        avg_discriminative = np.mean(discriminative_powers) if discriminative_powers else 0
        confidence_factors.append(avg_discriminative * 0.4)
        agreeing_features = sum(1 for f in reliable_features if f1_features[f] > f2_features[f])
        agreement_ratio = agreeing_features / len(reliable_features)
        confidence_factors.append(agreement_ratio * 0.4)
        raw_confidence = sum(confidence_factors)
        return min(100, max(0, raw_confidence * 100))

In [None]:
class AdvancedFaceAnalyzerTwoFaces:
    def __init__(self, detection_mode='balanced', subsample_rate=3):
        self.detection_mode = detection_mode
        self.subsample_rate = subsample_rate
        self.haar_cascade = None
        self.mtcnn_detector = None
        self._last_detection_method = None
        self.predictor = EnhancedFacePredictor()
        self.fixer = FixedFaceAnalyzer()

    def _init_detectors(self):
        if self.detection_mode != 'accuracy' and self.haar_cascade is None:
            self.haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        if self.detection_mode != 'speed' and self.mtcnn_detector is None:
            try:
                self.mtcnn_detector = MTCNN()
            except Exception as e:
                print(f"MTCNN initialization failed: {e}")
                self.mtcnn_detector = None

    def _ensure_rgb(self, image):
        if len(image.shape) == 3 and image.shape[2] == 4:
            image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
        elif len(image.shape) == 2:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
        return image

    def detect_faces(self, image_rgb):
        self._init_detectors()
        image_rgb = self._ensure_rgb(image_rgb)
        if self.detection_mode == 'speed':
            self._last_detection_method = 'haar'
            gray = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2GRAY)
            faces = self.haar_cascade.detectMultiScale(gray, 1.1, 4)
            return [{'box': [x, y, w, h]} for (x, y, w, h) in faces]
            
        elif self.detection_mode == 'accuracy':
            if self.mtcnn_detector is None:
                return self.detect_faces(image_rgb)
            self._last_detection_method = 'mtcnn'
            try:
                faces = self.mtcnn_detector.detect_faces(image_rgb)
                return faces
            except Exception as e:
                print(f"MTCNN detection failed: {e}")
                return []
                
        else:
            if self.mtcnn_detector is not None:
                try:
                    faces = self.mtcnn_detector.detect_faces(image_rgb)
                    if len(faces) > 0:
                        self._last_detection_method = 'mtcnn'
                        return faces
                except Exception as e:
                    print(f"MTCNN detection failed: {e}")
            self._last_detection_method = 'haar'
            gray = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2GRAY)
            faces = self.haar_cascade.detectMultiScale(gray, 1.1, 4)
            return [{'box': [x, y, w, h]} for (x, y, w, h) in faces]

    def _frequency_domain_analysis(self, gray_face):
        try:
            f_transform = np.fft.fft2(gray_face)
            f_shift = np.fft.fftshift(f_transform)
            magnitude_spectrum = 20 * np.log(np.abs(f_shift)) + 1
            freq_std = np.std(magnitude_spectrum)
            freq_mean = np.mean(magnitude_spectrum)
            rows, cols = gray_face.shape
            crow, ccol = rows // 2, cols // 2
            high_freq_region = magnitude_spectrum[crow-10:crow+10, ccol-10:ccol+10]
            high_freq_content = np.mean(high_freq_region)
            return {
                'freq_std': freq_std,
                'freq_mean': freq_mean,
                'high_freq_content': high_freq_content
            }
        except:
            return {'freq_std': 0, 'freq_mean': 0, 'high_freq_content': 0}

    def _color_consistency_analysis(self, face_region):
        try:
            lab = cv2.cvtColor(face_region, cv2.COLOR_RGB2LAB)
            l_channel, a_channel, b_channel = cv2.split(lab)
            color_consistency = np.std(a_channel) + np.std(b_channel)
            hsv = cv2.cvtColor(face_region, cv2.COLOR_RGB2HSV)
            skin_mask = cv2.inRange(hsv, (0, 20, 70), (20, 255, 255))
            skin_ratio = np.sum(skin_mask > 0) / (face_region.shape[0] * face_region.shape[1])
            return {
                'color_consistency': color_consistency,
                'skin_tone_consistency': skin_ratio
            }
        except:
            return {'color_consistency': 0, 'skin_tone_consistency': 0}

    def _blur_consistency_analysis(self, gray_face):
        try:
            laplacian_var = cv2.Laplacian(gray_face, cv2.CV_64F).var()
            focus_measure = cv2.Laplacian(gray_face, cv2.CV_64F).var()
            h, w = gray_face.shape
            regions = [
                gray_face[0:h//2, 0:w//2],
                gray_face[0:h//2, w//2:w],
                gray_face[h//2:h, 0:w//2],
                gray_face[h//2:h, w//2:w]
            ]
            region_blurs = [cv2.Laplacian(region, cv2.CV_64F).var() for region in regions]
            blur_consistency = np.std(region_blurs)
            return {
                'laplacian_variance': laplacian_var,
                'focus_measure': focus_measure,
                'blur_consistency': blur_consistency
            }
        except:
            return {'laplacian_variance': 0, 'focus_measure': 0, 'blur_consistency': 0}

    def _process_face_with_cloud(self, image_rgb, face):
        ## Need review
        x, y, w, h = face['box']
        margin = min(w, h) // 3 # troy modify
        x, y = max(0, x - margin), max(0, y - margin)
        w, h = min(w + 2*margin, image_rgb.shape[1] - x), min(h + 2*margin, image_rgb.shape[0] - y)
        face_region = image_rgb[y:y+h, x:x+w]
        
        gray = cv2.cvtColor(face_region, cv2.COLOR_RGB2GRAY)
        laplacian = cv2.Laplacian(gray, cv2.CV_64F).var()
        dynamic_rate = max(1, int(self.subsample_rate * (1 + (100 - laplacian) / 100)))

        xx, yy = np.meshgrid(np.arange(gray.shape[1]), np.arange(gray.shape[0]))
        xx_sub = xx[::dynamic_rate, ::dynamic_rate].flatten()
        yy_sub = yy[::dynamic_rate, ::dynamic_rate].flatten()
        zz_sub = gray[::dynamic_rate, ::dynamic_rate].flatten()
        colors = face_region[::dynamic_rate, ::dynamic_rate].reshape(-1, 3)

        # Store point cloud
        point_cloud = {
            'x': xx_sub.astype(float),
            'y': yy_sub.astype(float),
            'z': zz_sub.astype(float),
            'colors': colors.astype(float) / 255.0
        }

        depth_var = np.var(zz_sub)
        texture_var = cv2.Laplacian(zz_sub.reshape(-1, 1), cv2.CV_64F).var() if zz_sub.size > 0 else 0
        edges = cv2.Canny(gray, 100, 200)
        edge_density = np.sum(edges > 0) / (gray.shape[0] * gray.shape[1]) if gray.size > 0 else 0

        hairline_region_height = h // 4
        hairline_area = gray[0:hairline_region_height, :]
        _, dark_mask = cv2.threshold(hairline_area, 70, 255, cv2.THRESH_BINARY_INV)
        num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(dark_mask, connectivity=8)
        border_cohesion = 0
        if num_labels > 1:
            largest_component_area = sorted(stats[1:, cv2.CC_STAT_AREA], reverse=True)[0]
            total_dark_area = np.sum(dark_mask > 0)
            if total_dark_area > 0:
                cohesion_ratio = largest_component_area / total_dark_area
                border_cohesion = cohesion_ratio / (num_labels - 1)

        hairline_area_color = face_region[0:hairline_region_height, :]
        hairline_area_hsv = cv2.cvtColor(hairline_area_color, cv2.COLOR_RGB2HSV)
        h_channel, s_channel, v_channel = cv2.split(hairline_area_hsv)
        border_color_density_cohesion = np.std(h_channel) + np.std(s_channel) + np.std(v_channel)

        hairline_edges = edges[0:hairline_region_height, :]
        border_edge_cohesion = np.sum(hairline_edges > 0) / (hairline_area.shape[0] * hairline_area.shape[1]) if hairline_area.size > 0 else 0

        freq_features = self._frequency_domain_analysis(gray)
        color_features = self._color_consistency_analysis(face_region)
        blur_features = self._blur_consistency_analysis(gray)

        features = {
            'depth_variance': depth_var,
            'texture_variation': texture_var,
            'edge_density': edge_density,
            'border_cohesion': border_cohesion,
            'border_color_density_cohesion': border_color_density_cohesion,
            'border_edge_cohesion': border_edge_cohesion,
            'point_count': len(zz_sub),
            'frequency_std': freq_features['freq_std'],
            'high_frequency_content': freq_features['high_freq_content'],
            'color_consistency': color_features['color_consistency'],
            'skin_tone_consistency': color_features['skin_tone_consistency'],
            'blur_consistency': blur_features['blur_consistency'],
            'focus_measure': blur_features['focus_measure'],
            'laplacian_variance': blur_features['laplacian_variance']
        }

        try:
            hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
            hist = hist.flatten() / (gray.shape[0] * gray.shape[1])
            hist = hist[hist > 0]
            structural_entropy = -np.sum(hist * np.log2(hist))
            features['structural_entropy'] = structural_entropy
        except:
            features['structural_entropy'] = 0

        try:
            sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
            sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
            gradient_magnitude = np.sqrt(sobelx**2 + sobely**2)
            surface_roughness = np.mean(gradient_magnitude) / 255.0
            features['surface_roughness'] = surface_roughness
        except:
            features['surface_roughness'] = 0

        try:
            resized_small = cv2.resize(gray, (gray.shape[1]//2, gray.shape[0]//2))
            laplacian_small = cv2.Laplacian(resized_small, cv2.CV_64F).var()
            multiscale_complexity = abs(laplacian - laplacian_small) / (laplacian + 1e-8)
            features['multiscale_complexity'] = multiscale_complexity
        except:
            features['multiscale_complexity'] = 0

        try:
            pixel_values = gray.flatten().reshape(-1, 1)
            from sklearn.cluster import KMeans
            kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
            labels = kmeans.fit_predict(pixel_values[:1000])
            cluster_coherence = 1.0 - (np.std(kmeans.cluster_centers_.flatten()) / 255.0)
            features['cluster_coherence'] = cluster_coherence
        except:
            features['cluster_coherence'] = 0

        try:
            if len(zz_sub) > 10:
                covariance_matrix = np.cov(np.vstack([xx_sub, yy_sub, zz_sub]))
                eigenvalues = np.linalg.eigvals(covariance_matrix)
                eigenvalues = np.sort(eigenvalues)[::-1]
                if eigenvalues[0] > 0:
                    linearity = (eigenvalues[0] - eigenvalues[1]) / eigenvalues[0]
                    planarity = (eigenvalues[1] - eigenvalues[2]) / eigenvalues[0]
                    scattering = eigenvalues[2] / eigenvalues[0]
                    features['linearity'] = linearity
                    features['planarity'] = planarity
                    features['scattering'] = scattering
                else:
                    features.update({'linearity': 0, 'planarity': 0, 'scattering': 0})
            else:
                features.update({'linearity': 0, 'planarity': 0, 'scattering': 0})
        except:
            features.update({'linearity': 0, 'planarity': 0, 'scattering': 0})

        try:
            gabor_responses = []
            for theta in [0, np.pi/4, np.pi/2, 3*np.pi/4]:
                kernel = cv2.getGaborKernel((21, 21), 8.0, theta, 10.0, 0.5, 0, ktype=cv2.CV_32F)
                filtered = cv2.filter2D(gray, cv2.CV_8UC3, kernel)
                gabor_responses.append(np.mean(filtered))
            anisotropy = np.std(gabor_responses) / (np.mean(gabor_responses) + 1e-8)
            features['anisotropy'] = anisotropy
        except:
            features['anisotropy'] = 0

        return features, point_cloud

    def analyze_single_image_two_faces(self, image_path):
        try:
            img = cv2.imread(image_path)
            if img is None:
                raise ValueError(f"Could not load image: {image_path}")
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            faces = self.detect_faces(img_rgb)
            if len(faces) < 2:
                print(f"!!! Only {len(faces)} face(s) detected. Using top 2 bounding boxes if possible.")
                faces = sorted(faces, key=lambda f: f['box'][0])
                if len(faces) == 1:
                    faces = [faces[0], faces[0]]
                elif len(faces) == 0:
                    raise ValueError("No faces detected!")

            faces = sorted(faces[:2], key=lambda f: f['box'][0])
            face1, face2 = faces[0], faces[1]

            f1_features, pc1 = self._process_face_with_cloud(img_rgb, face1)
            f2_features, pc2 = self._process_face_with_cloud(img_rgb, face2)

            self.face_point_clouds = {'face1': pc1, 'face2': pc2}

            all_features = list(set(f1_features.keys()) & set(f2_features.keys()))
            selected_features = self.predictor.select_optimal_features(f1_features, f2_features, all_features)

            weights = {}
            for feature in selected_features:
                diff = abs(f1_features[feature] - f2_features[feature])
                max_val = max(abs(f1_features[feature]), abs(f2_features[feature]), 1e-8)
                weights[feature] = diff / max_val

            final_prediction, prediction_scores = self.predictor._create_advanced_prediction_formula(
                f1_features, f2_features, selected_features, weights
            )

            advanced_confidence, confidence_factors = self.predictor.calculate_advanced_confidence(
                prediction_scores, selected_features, f1_features, f2_features
            )

            corrected_result = self.fixer.create_corrected_prediction(f1_features, f2_features)

            if final_prediction > 0:
                verdict = "Face 1 (Left) is more authentic"
                winner = "Face 1"
                advantage = final_prediction
            else:
                verdict = "Face 2 (Right) is more authentic"
                winner = "Face 2"
                advantage = -final_prediction

            result = {
                'verdict': verdict,
                'winner': winner,
                'advantage_score': advantage,
                'confidence': advanced_confidence,
                'confidence_factors': confidence_factors,
                'prediction_scores': prediction_scores,
                'selected_features': selected_features,
                'face1_features': f1_features,
                'face2_features': f2_features,
                'corrected_result': corrected_result,
                'point_clouds': self.face_point_clouds
            }
            return result

        except Exception as e:
            print(f"Analysis failed: {e}")
            return None

    def plot_3d_point_clouds(self, result):
        if not result or 'point_clouds' not in result:
            print("No point cloud data to plot.")
            return

        pc1 = result['point_clouds']['face1']
        pc2 = result['point_clouds']['face2']

        fig = make_subplots(
            rows=1, cols=2,
            specs=[[{'type': 'scatter3d'}, {'type': 'scatter3d'}]],
            subplot_titles=("Face 1 (Left)", "Face 2 (Right)")
        )

        fig.add_trace(
            go.Scatter3d(
                x=pc1['x'], y=pc1['y'], z=pc1['z'],
                mode='markers',
                marker=dict(size=2, color=pc1['colors'], opacity=0.8)
            ),
            row=1, col=1
        )

        fig.add_trace(
            go.Scatter3d(
                x=pc2['x'], y=pc2['y'], z=pc2['z'],
                mode='markers',
                marker=dict(size=2, color=pc2['colors'], opacity=0.8)
            ),
            row=1, col=2
        )

        fig.update_layout(
            title="3D Point Clouds of Detected Faces",
            scene=dict(aspectmode='data'),
            scene2=dict(aspectmode='data'),
            height=600
        )
        fig.show()

    def print_detailed_results(self, result):
        if result is None:
            print("Analysis failed - no results to display")
            return
        print("\n" + "="*60)
        print("ADVANCED FACE AUTHENTICITY ANALYSIS RESULTS")
        print("="*60)
        print(f"\n FINAL VERDICT: {result['verdict']}")
        print(f" Winner: {result['winner']}")
        print(f" Advantage Score: {result['advantage_score']:.4f}")
        print(f" Confidence: {result['confidence']:.1f}%")
        print(f"\nüîç CONFIDENCE BREAKDOWN:")
        for factor, value in result['confidence_factors'].items():
            print(f"   {factor.replace('_', ' ').title()}: {value:.3f}")
        print(f"\n PREDICTION METHOD SCORES:")
        for method, score in result['prediction_scores'].items():
            print(f"   {method.replace('_', ' ').title()}: {score:.4f}")
        print(f"\n CORRECTED ANALYSIS:")
        corrected = result['corrected_result']
        print(f"   Corrected Verdict: {corrected['verdict']}")
        print(f"   Corrected Confidence: {corrected['confidence']:.1f}%")
        print(f"   Reliable Features Used: {len(corrected['reliable_features'])}")
        print(f"\nüìà FEATURE COMPARISON (Top 10):")
        selected_features = result['selected_features'][:10]
        for feature in selected_features:
            f1_val = result['face1_features'][feature]
            f2_val = result['face2_features'][feature]
            diff = f1_val - f2_val
            advantage = "Face1" if diff > 0 else "Face2"
            print(f"   {feature}: F1={f1_val:.4f}, F2={f2_val:.4f}, Diff={diff:.4f} ({advantage})")

In [None]:
def run_analysis_on_single_image(image_path, detection_mode='speed'):
    analyzer = AdvancedFaceAnalyzerTwoFaces(detection_mode=detection_mode)
    result = analyzer.analyze_single_image_two_faces(image_path)
    if result:
        analyzer.print_detailed_results(result)
        analyzer.plot_3d_point_clouds(result)
    else:
        print("... Analysis failed.")
    return result

In [None]:
def export_result_to_csv(result: dict, image_path: str, out_dir: str = "exports"):
   
    # ‡∏™‡∏£‡πâ‡∏≤‡∏á CSV 3 ‡πÑ‡∏ü‡∏•‡πå/‡∏£‡∏π‡∏õ:
    #   1) <basename>_feature_comparison.csv  : ‡πÄ‡∏ó‡∏µ‡∏¢‡∏ö‡∏Ñ‡πà‡∏≤‡∏ü‡∏µ‡πÄ‡∏à‡∏≠‡∏£‡πå F1/F2 + diff
    #   2) <basename>_reliable_features.csv   : ‡∏£‡∏≤‡∏¢‡∏Å‡∏≤‡∏£ reliable features ‡∏ó‡∏µ‡πà‡πÉ‡∏ä‡πâ‡∏à‡∏£‡∏¥‡∏á
    #   3) <basename>_summary.csv             : ‡∏™‡∏£‡∏∏‡∏õ verdict / winner / scores / confidence
    
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    base = os.path.splitext(os.path.basename(image_path))[0]

    # --- 1) Feature comparison ---
    # ‡∏û‡∏¢‡∏≤‡∏¢‡∏≤‡∏°‡∏î‡∏∂‡∏á‡∏•‡∏¥‡∏™‡∏ï‡πå features ‡∏ï‡∏≤‡∏°‡∏•‡∏≥‡∏î‡∏±‡∏ö‡∏ó‡∏µ‡πà‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡πÑ‡∏ß‡πâ; ‡∏ñ‡πâ‡∏≤‡πÑ‡∏°‡πà‡πÄ‡∏à‡∏≠ ‡πÉ‡∏ä‡πâ union ‡∏Ç‡∏≠‡∏á keys
    selected = result.get("selected_features")
    f1 = result.get("face1_features", {}) or {}
    f2 = result.get("face2_features", {}) or {}

    feature_list = list(selected) if selected else sorted(set(f1.keys()) | set(f2.keys()))
    rows = []
    for k in feature_list:
        v1 = f1.get(k, None)
        v2 = f2.get(k, None)
        diff = (v1 - v2) if (v1 is not None and v2 is not None) else None
        rows.append({"feature": k, "face1": v1, "face2": v2, "diff": diff})

    df_feat = pd.DataFrame(rows)
    df_feat.to_csv(os.path.join(out_dir, f"{base}_feature_comparison.csv"), index=False)

    # --- 2) Reliable features (‡∏ñ‡πâ‡∏≤‡∏°‡∏µ‡∏Å‡∏≤‡∏£‡πÅ‡∏Å‡πâ‡πÑ‡∏Ç/‡∏Å‡∏£‡∏≠‡∏á) ---
    reliable = []
    corrected = result.get("corrected_result") or {}
    reliable_features = corrected.get("reliable_features") or result.get("reliable_features")
    if reliable_features:
        # ‡∏™‡∏£‡πâ‡∏≤‡∏á‡∏ï‡∏≤‡∏£‡∏≤‡∏á‡∏£‡∏ß‡∏°‡∏Ñ‡πà‡∏≤ face1/face2/diff ‡πÄ‡∏â‡∏û‡∏≤‡∏∞ reliable
        for k in reliable_features:
            v1 = f1.get(k, None)
            v2 = f2.get(k, None)
            diff = (v1 - v2) if (v1 is not None and v2 is not None) else None
            reliable.append({"feature": k, "face1": v1, "face2": v2, "diff": diff})
        pd.DataFrame(reliable).to_csv(os.path.join(out_dir, f"{base}_reliable_features.csv"), index=False)

    # --- 3) Summary (‡∏´‡∏ô‡∏∂‡πà‡∏á‡πÅ‡∏ñ‡∏ß) ---
    pred = result.get("prediction_scores", {})
    conf = result.get("confidence_factors", {})
    summary = {
        "image": base,
        "verdict": result.get("verdict"),
        "winner": result.get("winner"),
        "advantage_score": result.get("advantage_score"),
        "confidence": result.get("confidence"),
        "corrected_verdict": corrected.get("verdict"),
        "corrected_winner": corrected.get("winner"),
        "corrected_advantage": corrected.get("advantage"),
        "corrected_confidence": corrected.get("confidence"),
        # method scores
        "weighted_score": pred.get("weighted_score"),
        "random_forest": pred.get("random_forest"),
        "xgboost": pred.get("xgboost"),
        "svm": pred.get("svm"),
        "statistical": pred.get("statistical"),
        # confidence factors
        "method_consistency": conf.get("method_consistency"),
        "feature_agreement": conf.get("feature_agreement"),
        "prediction_strength": conf.get("prediction_strength"),
        "feature_quality": conf.get("feature_quality"),
    }
    pd.DataFrame([summary]).to_csv(os.path.join(out_dir, f"{base}_summary.csv"), index=False)

# RESULT

In [None]:
# single image processing
image_path = "/Users/teemtat/Documents/MUICT/RESEARCH/old/Dataset/DFPhoto/df01.jpg"
result = run_analysis_on_single_image(image_path, detection_mode='speed')

In [None]:
## Run all image

# folder_path = "/Users/teemtat/Documents/MUICT/RESEARCH/Dataset/DFPhoto"
# export_folder = "exports/10th (LargeDT margin3 Dynamic Weight adjust)"   # ‡πÇ‡∏ü‡∏•‡πÄ‡∏î‡∏≠‡∏£‡πå export CSVs

# for filename in sorted(os.listdir(folder_path)):
#     if filename.lower().endswith(('.jpg', '.png')):
#         image_path = os.path.join(folder_path, filename)
#         print(f"üöÄ Analyzing {filename} ...")
#         try:
#             result = run_analysis_on_single_image(image_path, detection_mode='speed')
#             export_result_to_csv(result, image_path, out_dir=export_folder)
#             print(f"‚úÖ Exported results for {filename}")
#         except Exception as e:
#             print(f"‚ùå Error processing {filename}: {e}")

# print("üéâ Done! All images analyzed and exported.")

In [None]:
analyzer = AdvancedFaceAnalyzerTwoFaces(detection_mode="speed")
result = analyzer.analyze_single_image_two_faces(image_path)

analyzer.plot_3d_point_clouds(result)

In [None]:
pcs = getattr(analyzer, "face_point_clouds", None) or result.get("face_point_clouds", None)
if pcs is None:
    raise RuntimeError("‡πÑ‡∏°‡πà‡∏û‡∏ö point cloud ‚Äì ‡πÉ‡∏´‡πâ‡∏£‡∏±‡∏ô analyze_single_image_two_faces(...) ‡∏Å‡πà‡∏≠‡∏ô")

if isinstance(pcs, dict):
    try:
        pcs = [pcs[k] for k in sorted(pcs.keys())]
    except Exception:
        pcs = list(pcs.values())
elif not isinstance(pcs, (list, tuple)):
    pcs = [pcs]

use_ellipse_mask = True
step = 2

fig = make_subplots(rows=1, cols=len(pcs), specs=[[{'type':'scene'}]*len(pcs)],
                    subplot_titles=[f"Face {i+1}" for i in range(len(pcs))])

for i, pc in enumerate(pcs, start=1):
    x = np.asarray(pc['x'], dtype=float)
    y = np.asarray(pc['y'], dtype=float)
    z = np.asarray(pc['z'], dtype=float)
    col = (np.asarray(pc['colors'])*255).astype(np.uint8)

    if use_ellipse_mask:
        W = x.max() + 1.0; H = y.max() + 1.0
        cx, cy = W/2.0, H/2.0
        ax, ay = 0.45*W, 0.60*H
        m = (((x - cx)/ax)**2 + ((y - cy)/ay)**2) <= 1.0
    else:
        m = np.ones_like(x, bool)

    idx = np.where(m)[0][::step]
    colors = [f"rgb({r},{g},{b})" for r, g, b in col[idx]]

    fig.add_trace(go.Scatter3d(x=x[idx], y=y[idx], z=z[idx],
                               mode='markers',
                               marker=dict(size=1, color=colors),
                               name=f"Face {i}"),
                  row=1, col=i)

    fig.update_scenes(dict(xaxis_title='x', yaxis_title='y', zaxis_title='z'), row=1, col=i)

fig.update_layout(height=500, width=980, showlegend=False, margin=dict(l=0,r=0,t=30,b=0))
fig.show()

In [None]:
# export_result_to_csv(result, image_path, out_dir="exports")