In [1]:
import matplotlib.pyplot as plt
import numpy as np
import glob
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import ipywidgets as widgets
from IPython.display import display, Image as IPImage
from PIL import Image
from ipywidgets import interact, Dropdown, IntText
import cv2
from skimage import feature
from scipy.stats import skew, kurtosis
from sklearn.feature_selection import mutual_info_classif, SelectKBest
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from MirrorExtractor.mirror_extractor import MirrorExtractor


class MirrorFeatureExtractor:
    """Extracts features from mirror images"""
    
    @staticmethod
    def extract_all_features(mirror_img):
        """
        Extract all features from a single mirror image
        
        Args:
            mirror_img: numpy array of shape (H, W, 3)
            
        Returns:
            dict: Dictionary of feature name -> feature value
        """
        features = {}
        
        # Brightness features
        features['brightness_mean'] = np.mean(mirror_img)
        features['brightness_std'] = np.std(mirror_img)
        features['brightness_min'] = np.min(mirror_img)
        features['brightness_max'] = np.max(mirror_img)
        
        # Channel-specific features
        for i, channel in enumerate(['R', 'G', 'B']):
            features[f'{channel}_mean'] = np.mean(mirror_img[:, :, i])
            features[f'{channel}_std'] = np.std(mirror_img[:, :, i])
        
        # Statistical features
        features['skewness'] = skew(mirror_img.flatten())
        features['kurtosis'] = kurtosis(mirror_img.flatten())
        
        # Texture features (if you want to add them later)
        # gray = cv2.cvtColor(mirror_img, cv2.COLOR_RGB2GRAY)
        # features['entropy'] = -np.sum(gray * np.log2(gray + 1e-10))
        
        return features


class MirrorAnomalyDetector:
    """Detects anomalies in mirror images"""
    
    def __init__(self, extractor, feature_extractor=None, contamination=0.1):
        """
        Args:
            extractor: MirrorExtractor instance
            feature_extractor: MirrorFeatureExtractor instance
            contamination: Expected proportion of outliers (for IsolationForest)
        """
        self.extractor = extractor
        self.feature_extractor = feature_extractor or MirrorFeatureExtractor()
        self.contamination = contamination
        
        # Storage for baseline models
        self.baseline_stats = {}  # Statistical baseline (mean, std)
        self.ml_models = {}       # ML models (IsolationForest)
        self.scalers = {}         # Feature scalers
        self.feature_names = None
        
    def extract_mirror_from_image(self, img_path, mirror_id):
        """Extract a single mirror from an image"""
        img = np.array(Image.open(img_path).convert('RGB'))
        x_coords, y_coords = self.extractor.get_coords(mirror_id)
        cropped = self.extractor.extract_polygon_region_cv2(img, x_coords, y_coords)
        return cropped
    
    def extract_features_from_images(self, img_path_list, mirror_id):
        """
        Extract features from multiple images for a specific mirror
        
        Returns:
            pandas DataFrame with features
        """
        features_list = []
        
        for img_path in img_path_list:
            mirror_img = self.extract_mirror_from_image(img_path, mirror_id)
            features = self.feature_extractor.extract_all_features(mirror_img)
            features_list.append(features)
        
        df = pd.DataFrame(features_list)
        return df
    
    def build_baseline(self, img_path_list, mirror_id_list):
        """
        Build baseline models for multiple mirrors
        
        Args:
            img_path_list: List of image paths for training
            mirror_id_list: List of mirror IDs to create baselines for
        """
        print(f"Building baseline from {len(img_path_list)} images...")
        
        for mirror_id in mirror_id_list:
            print(f"Processing mirror {mirror_id}...")
            
            # Extract features
            features_df = self.extract_features_from_images(img_path_list, mirror_id)
            
            if self.feature_names is None:
                self.feature_names = features_df.columns.tolist()
            
            # Statistical baseline
            self.baseline_stats[mirror_id] = {
                'mean': features_df.mean().to_dict(),
                'std': features_df.std().to_dict(),
                'min': features_df.min().to_dict(),
                'max': features_df.max().to_dict(),
                'percentile_95': features_df.quantile(0.95).to_dict(),
                'percentile_5': features_df.quantile(0.05).to_dict(),
            }
            
            # ML-based baseline (Isolation Forest)
            scaler = StandardScaler()
            features_scaled = scaler.fit_transform(features_df)
            
            iso_forest = IsolationForest(
                contamination=self.contamination,
                random_state=42,
                n_estimators=100
            )
            iso_forest.fit(features_scaled)
            
            self.scalers[mirror_id] = scaler
            self.ml_models[mirror_id] = iso_forest
            
        print("Baseline building complete!")
    
    def detect_anomaly_statistical(self, features, mirror_id, n_std=3):
        """
        Detect anomaly using statistical method (mean ± n*std)
        
        Returns:
            dict: Anomaly results
        """
        baseline = self.baseline_stats[mirror_id]
        anomalies = {}
        
        for feature_name, value in features.items():
            mean = baseline['mean'][feature_name]
            std = baseline['std'][feature_name]
            
            # Check if value is outside n standard deviations
            lower_bound = mean - n_std * std
            upper_bound = mean + n_std * std
            
            is_anomaly = value < lower_bound or value > upper_bound
            
            anomalies[feature_name] = {
                'value': value,
                'mean': mean,
                'std': std,
                'is_anomaly': is_anomaly,
                'z_score': (value - mean) / (std + 1e-10),
                'lower_bound': lower_bound,
                'upper_bound': upper_bound
            }
        
        # Overall anomaly decision
        num_anomalies = sum(1 for v in anomalies.values() if v['is_anomaly'])
        is_overall_anomaly = num_anomalies > 0
        
        return {
            'is_anomaly': is_overall_anomaly,
            'num_anomalous_features': num_anomalies,
            'total_features': len(features),
            'feature_details': anomalies
        }
    
    def detect_anomaly_ml(self, features, mirror_id):
        """
        Detect anomaly using ML method (Isolation Forest)
        
        Returns:
            dict: Anomaly results
        """
        # Convert features to DataFrame
        features_df = pd.DataFrame([features])
        
        # Scale features
        features_scaled = self.scalers[mirror_id].transform(features_df)
        
        # Predict
        prediction = self.ml_models[mirror_id].predict(features_scaled)[0]
        anomaly_score = self.ml_models[mirror_id].score_samples(features_scaled)[0]
        
        is_anomaly = prediction == -1
        
        return {
            'is_anomaly': is_anomaly,
            'anomaly_score': anomaly_score,
            'prediction': prediction
        }
    
    def detect_anomaly(self, img_path, mirror_id, method='both', n_std=3):
        """
        Detect anomaly in a new image
        
        Args:
            img_path: Path to the image
            mirror_id: Mirror ID to check
            method: 'statistical', 'ml', or 'both'
            n_std: Number of standard deviations for statistical method
            
        Returns:
            dict: Complete anomaly detection results
        """
        # Extract mirror and features
        mirror_img = self.extract_mirror_from_image(img_path, mirror_id)
        features = self.feature_extractor.extract_all_features(mirror_img)
        
        results = {
            'mirror_id': mirror_id,
            'image_path': img_path,
            'features': features
        }
        
        # Statistical detection
        if method in ['statistical', 'both']:
            results['statistical'] = self.detect_anomaly_statistical(
                features, mirror_id, n_std
            )
        
        # ML detection
        if method in ['ml', 'both']:
            results['ml'] = self.detect_anomaly_ml(features, mirror_id)
        
        # Combined decision
        if method == 'both':
            results['is_anomaly'] = (
                results['statistical']['is_anomaly'] or 
                results['ml']['is_anomaly']
            )
        elif method == 'statistical':
            results['is_anomaly'] = results['statistical']['is_anomaly']
        else:
            results['is_anomaly'] = results['ml']['is_anomaly']
        
        return results
    
    def print_anomaly_report(self, results):
        """Print a formatted anomaly detection report"""
        print(f"\n{'='*60}")
        print(f"ANOMALY DETECTION REPORT")
        print(f"{'='*60}")
        print(f"Mirror ID: {results['mirror_id']}")
        print(f"Image: {results['image_path']}")
        print(f"\nOVERALL RESULT: {'⚠️ ANOMALY DETECTED' if results['is_anomaly'] else '✓ NORMAL'}")
        
        if 'statistical' in results:
            stat = results['statistical']
            print(f"\n--- Statistical Method ---")
            print(f"Anomalous features: {stat['num_anomalous_features']}/{stat['total_features']}")
            
            # Show top anomalies
            anomalous_features = [
                (name, details) 
                for name, details in stat['feature_details'].items() 
                if details['is_anomaly']
            ]
            
            if anomalous_features:
                print("\nTop anomalous features:")
                for name, details in sorted(anomalous_features, 
                                           key=lambda x: abs(x[1]['z_score']), 
                                           reverse=True)[:5]:
                    print(f"  - {name}: {details['value']:.2f} "
                          f"(expected: {details['mean']:.2f} ± {details['std']:.2f}, "
                          f"z-score: {details['z_score']:.2f})")
        
        if 'ml' in results:
            ml = results['ml']
            print(f"\n--- ML Method (Isolation Forest) ---")
            print(f"Prediction: {'ANOMALY' if ml['is_anomaly'] else 'NORMAL'}")
            print(f"Anomaly score: {ml['anomaly_score']:.4f}")
        
        print(f"{'='*60}\n")


# ============================================================================
# USAGE EXAMPLE
# ============================================================================

def main():
    # Initialize
    extractor = MirrorExtractor(
        "/home/pgliwny/Praca/Computer_vision_for_MAGIC/data/crossings_points.pkl"
    )
    detector = MirrorAnomalyDetector(extractor, contamination=0.05)
    
    # Get training images
    img_list = glob.glob(
        "/home/pgliwny/Praca/Computer_vision_for_MAGIC/data/webcam_useful_image/webcam_useful_images/*.jpg"
    )
    print(f"Found {len(img_list)} training images")
    
    # Define mirrors to monitor
    mirror_id_list = [15, 50, 100]
    
    # Build baseline (use subset for faster testing)
    detector.build_baseline(img_list[:100], mirror_id_list)  # Use more images in production
    
    # Test on new image
    new_image_path = '/home/pgliwny/Praca/Computer_vision_for_MAGIC/data/webcam_useful_image/webcam_useful_images/image_2024-05-09_1500.jpg'
    
    print("\n" + "="*60)
    print("TESTING ANOMALY DETECTION ON NEW IMAGE")
    print("="*60)
    
    for mirror_id in mirror_id_list:
        results = detector.detect_anomaly(
            new_image_path, 
            mirror_id, 
            method='both',
            n_std=2  # Adjust sensitivity (lower = more sensitive)
        )
        detector.print_anomaly_report(results)



In [2]:
main()

Found 380 training images
Building baseline from 100 images...
Processing mirror 15...
Processing mirror 50...
Processing mirror 100...
Baseline building complete!

TESTING ANOMALY DETECTION ON NEW IMAGE

ANOMALY DETECTION REPORT
Mirror ID: 15
Image: /home/pgliwny/Praca/Computer_vision_for_MAGIC/data/webcam_useful_image/webcam_useful_images/image_2024-05-09_1500.jpg

OVERALL RESULT: ✓ NORMAL

--- Statistical Method ---
Anomalous features: 0/12

--- ML Method (Isolation Forest) ---
Prediction: NORMAL
Anomaly score: -0.4788


ANOMALY DETECTION REPORT
Mirror ID: 50
Image: /home/pgliwny/Praca/Computer_vision_for_MAGIC/data/webcam_useful_image/webcam_useful_images/image_2024-05-09_1500.jpg

OVERALL RESULT: ✓ NORMAL

--- Statistical Method ---
Anomalous features: 0/12

--- ML Method (Isolation Forest) ---
Prediction: NORMAL
Anomaly score: -0.4645


ANOMALY DETECTION REPORT
Mirror ID: 100
Image: /home/pgliwny/Praca/Computer_vision_for_MAGIC/data/webcam_useful_image/webcam_useful_images/image_