# Anomaly Detection and Resource Usage Classification for Server Monitoring

## 1. Setup and Data Preparation

 Import necessary libraries

In [1]:
import pandas as pd
import numpy as np
import polars as pl
from typing import Dict, List, Tuple, Optional, Any
import warnings
from dataclasses import dataclass
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.feature_selection import VarianceThreshold
import logging
from pathlib import Path
import joblib

In [2]:
@dataclass
class DataQualityReport:
    """Data quality assessment results"""
    constant_features: List[str]
    high_missing_features: List[str]
    highly_correlated_pairs: List[Tuple[str, str, float]]
    outlier_percentage: Dict[str, float]
    temporal_gaps: List[str]
    summary_stats: Dict[str, Any]

class AdaptiveDataPreprocessor:
    """
    Layer 1: Foundation Data Preprocessing for Multi-Modal System Metrics

    Handles vmstat, iostat, netstat, and process metrics with domain-aware
    feature engineering and quality assurance for anomaly detection.
    """

    def __init__(self,
                 sequence_length: int = 50,
                 sampling_rate: float = 0.2,
                 correlation_threshold: float = 0.95,
                 missing_threshold: float = 0.1,
                 variance_threshold: float = 0.01):

        self.sequence_length = sequence_length
        self.sampling_rate = sampling_rate
        self.correlation_threshold = correlation_threshold
        self.missing_threshold = missing_threshold
        self.variance_threshold = variance_threshold

        # Feature definitions from your data
        self.feature_groups = {
            'vmstat': ['r', 'b', 'avm', 'fre', 'pi', 'po', 'fr', 'interface_in', 'cs', 'us', 'sy', 'idle'],
            'iostat': ['tps', 'kb_read', 'kb_wrtn', 'service_time', 'avg_queue_size', 'avg_wait_time'],
            'netstat': ['ipkts', 'opkts', 'ierrs', 'oerrs', 'ipkts_rate', 'opkts_rate', 'ierrs_rate', 'oerrs_rate'],
            'process': ['cpu', 'mem', 'command']
        }

        # Domain knowledge for feature importance
        self.critical_features = {
            'vmstat': ['us', 'sy', 'idle', 'r', 'b'],  # CPU and load
            'iostat': ['tps', 'service_time', 'avg_queue_size'],  # I/O performance
            'netstat': ['ipkts_rate', 'opkts_rate'],  # Network throughput
            'process': ['cpu', 'mem']  # Process resources
        }

        self.scalers = {}
        self.selected_features = {}
        self.quality_report = None
        self.logger = self._setup_logger()

    def _setup_logger(self) -> logging.Logger:
        """Setup logging for preprocessing pipeline"""
        logger = logging.getLogger('DataPreprocessor')
        logger.setLevel(logging.INFO)
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
        return logger

    def load_and_sample_data(self, file_paths: Dict[str, str]) -> Dict[str, pd.DataFrame]:
        """
        Load and sample data from multiple CSV files efficiently

        Args:
            file_paths: Dictionary mapping data type to file path

        Returns:
            Dictionary of sampled DataFrames
        """
        data = {}

        for data_type, file_path in file_paths.items():
            self.logger.info(f"📂 Processing {data_type} from {file_path}")

            # Use Polars for faster loading, then convert to pandas
            df_pl = pl.read_csv(file_path)
            total_rows = df_pl.height

            # Sample data
            if self.sampling_rate < 1.0:
                sample_size = int(total_rows * self.sampling_rate)
                df_pl = df_pl.sample(n=sample_size, seed=42)
                self.logger.info(f"✅ Loaded {sample_size:,} rows from {total_rows:,} total ({self.sampling_rate*100:.1f}%)")

            # Convert to pandas for compatibility
            df = df_pl.to_pandas()

            # Basic timestamp validation
            if 'timestamp' in df.columns:
                df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
                invalid_timestamps = df['timestamp'].isna().sum()
                if invalid_timestamps > 0:
                    self.logger.warning(f"⚠️ Found {invalid_timestamps} invalid timestamps")
                    df = df.dropna(subset=['timestamp'])

            data[data_type] = df

        return data

    def assess_data_quality(self, data: Dict[str, pd.DataFrame]) -> DataQualityReport:
        """
        Comprehensive data quality assessment

        Args:
            data: Dictionary of DataFrames by data type

        Returns:
            DataQualityReport with quality metrics
        """
        self.logger.info("🔍 Assessing data quality...")

        constant_features = []
        high_missing_features = []
        highly_correlated_pairs = []
        outlier_percentage = {}
        temporal_gaps = []
        summary_stats = {}

        for data_type, df in data.items():
            # Get numeric columns only
            numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
            if 'timestamp' in numeric_cols:
                numeric_cols.remove('timestamp')

            # Check for constant values (variance near zero)
            for col in numeric_cols:
                if df[col].var() < self.variance_threshold:
                    constant_features.append(f"{data_type}.{col}")
                    self.logger.warning(f"⚠️ {data_type}.{col}: constant values (var={df[col].var():.6f})")

            # Check for high missing values
            missing_pct = df[numeric_cols].isnull().mean()
            for col, pct in missing_pct.items():
                if pct > self.missing_threshold:
                    high_missing_features.append(f"{data_type}.{col}")
                    self.logger.warning(f"🔧 {data_type}.{col}: {pct*100:.1f}% missing")

            # Check for highly correlated features
            if len(numeric_cols) > 1:
                corr_matrix = df[numeric_cols].corr().abs()
                for i in range(len(corr_matrix.columns)):
                    for j in range(i+1, len(corr_matrix.columns)):
                        if corr_matrix.iloc[i, j] > self.correlation_threshold:
                            highly_correlated_pairs.append((
                                f"{data_type}.{corr_matrix.columns[i]}",
                                f"{data_type}.{corr_matrix.columns[j]}",
                                corr_matrix.iloc[i, j]
                            ))

            # Outlier detection using IQR
            for col in numeric_cols:
                Q1 = df[col].quantile(0.25)
                Q3 = df[col].quantile(0.75)
                IQR = Q3 - Q1
                outliers = ((df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))).sum()
                outlier_percentage[f"{data_type}.{col}"] = (outliers / len(df)) * 100

            # Temporal gap detection
            if 'timestamp' in df.columns:
                df_sorted = df.sort_values('timestamp')
                time_diffs = df_sorted['timestamp'].diff()
                median_interval = time_diffs.median()
                large_gaps = (time_diffs > median_interval * 3).sum()
                if large_gaps > 0:
                    temporal_gaps.append(f"{data_type}: {large_gaps} gaps > 3x median interval")

            # Summary statistics
            summary_stats[data_type] = {
                'rows': len(df),
                'features': len(numeric_cols),
                'missing_data_pct': df[numeric_cols].isnull().mean().mean() * 100,
                'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024**2
            }

        self.quality_report = DataQualityReport(
            constant_features=constant_features,
            high_missing_features=high_missing_features,
            highly_correlated_pairs=highly_correlated_pairs,
            outlier_percentage=outlier_percentage,
            temporal_gaps=temporal_gaps,
            summary_stats=summary_stats
        )

        return self.quality_report

    def engineer_features(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
        """
        Domain-aware feature engineering for system metrics

        Args:
            data: Dictionary of raw DataFrames

        Returns:
            Dictionary of feature-engineered DataFrames
        """
        self.logger.info("⚙️ Engineering domain-specific features...")

        engineered_data = {}

        for data_type, df in data.items():
            df_eng = df.copy()

            # Add temporal features
            if 'timestamp' in df.columns:
                df_eng['hour'] = df['timestamp'].dt.hour
                df_eng['day_of_week'] = df['timestamp'].dt.dayofweek
                df_eng['is_weekend'] = df['timestamp'].dt.dayofweek >= 5

            # Data type specific engineering
            if data_type == 'vmstat':
                # CPU utilization ratios
                if all(col in df.columns for col in ['us', 'sy', 'idle']):
                    df_eng['cpu_busy'] = df_eng['us'] + df_eng['sy']
                    df_eng['cpu_ratio'] = df_eng['cpu_busy'] / (df_eng['cpu_busy'] + df_eng['idle'] + 1e-6)

                # Memory pressure indicators
                if all(col in df.columns for col in ['avm', 'fre']):
                    df_eng['memory_pressure'] = df_eng['avm'] / (df_eng['avm'] + df_eng['fre'] + 1e-6)

                # Load balancing
                if 'r' in df.columns and 'b' in df.columns:
                    df_eng['total_load'] = df_eng['r'] + df_eng['b']

            elif data_type == 'iostat':
                # I/O efficiency metrics
                if all(col in df.columns for col in ['kb_read', 'kb_wrtn', 'tps']):
                    df_eng['io_throughput'] = (df_eng['kb_read'] + df_eng['kb_wrtn']) / (df_eng['tps'] + 1e-6)

                # Queue utilization
                if 'avg_queue_size' in df.columns and 'service_time' in df.columns:
                    df_eng['queue_efficiency'] = df_eng['avg_queue_size'] / (df_eng['service_time'] + 1e-6)

            elif data_type == 'netstat':
                # Network balance
                if 'ipkts_rate' in df.columns and 'opkts_rate' in df.columns:
                    df_eng['net_balance'] = (df_eng['ipkts_rate'] - df_eng['opkts_rate']) / (df_eng['ipkts_rate'] + df_eng['opkts_rate'] + 1e-6)
                    df_eng['net_total_rate'] = df_eng['ipkts_rate'] + df_eng['opkts_rate']

                # Error rates
                if all(col in df.columns for col in ['ierrs_rate', 'oerrs_rate', 'ipkts_rate', 'opkts_rate']):
                    total_pkts = df_eng['ipkts_rate'] + df_eng['opkts_rate']
                    total_errs = df_eng['ierrs_rate'] + df_eng['oerrs_rate']
                    df_eng['error_rate'] = total_errs / (total_pkts + 1e-6)

            elif data_type == 'process':
                # Resource utilization ratios
                if 'cpu' in df.columns and 'mem' in df.columns:
                    df_eng['resource_ratio'] = df_eng['cpu'] / (df_eng['mem'] + 1e-6)

            # Rolling statistics for temporal patterns
            numeric_cols = df_eng.select_dtypes(include=[np.number]).columns.tolist()
            for col in numeric_cols[:5]:  # Limit to avoid memory explosion
                if col not in ['hour', 'day_of_week', 'is_weekend']:
                    df_eng[f'{col}_rolling_mean'] = df_eng[col].rolling(window=5, min_periods=1).mean()
                    df_eng[f'{col}_rolling_std'] = df_eng[col].rolling(window=5, min_periods=1).std()

            engineered_data[data_type] = df_eng

        return engineered_data

    def select_features(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
        """
        Intelligent feature selection based on domain knowledge and statistics

        Args:
            data: Dictionary of feature-engineered DataFrames

        Returns:
            Dictionary of DataFrames with selected features
        """
        self.logger.info("🎯 Selecting optimal features...")

        selected_data = {}

        for data_type, df in data.items():
            numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()

            # Start with critical features
            selected_features = []
            for critical_feature in self.critical_features.get(data_type, []):
                if critical_feature in numeric_cols:
                    selected_features.append(critical_feature)

            # Remove constant features identified in quality assessment
            if self.quality_report:
                constant_features = [f.split('.')[-1] for f in self.quality_report.constant_features
                                   if f.startswith(data_type)]
                numeric_cols = [col for col in numeric_cols if col not in constant_features]

            # Remove highly correlated features (keep first one)
            if self.quality_report:
                correlated_features = set()
                for feat1, feat2, corr in self.quality_report.highly_correlated_pairs:
                    if feat1.startswith(data_type) and feat2.startswith(data_type):
                        feat1_name = feat1.split('.')[-1]
                        feat2_name = feat2.split('.')[-1]
                        if feat1_name in selected_features:
                            correlated_features.add(feat2_name)
                        else:
                            correlated_features.add(feat1_name)
                numeric_cols = [col for col in numeric_cols if col not in correlated_features]

            # Add engineered features
            engineered_features = [col for col in numeric_cols
                                 if any(suffix in col for suffix in ['_ratio', '_pressure', '_efficiency', '_balance', '_rate', '_rolling_mean'])]
            selected_features.extend(engineered_features[:5])  # Limit engineered features

            # Add remaining high-variance features
            remaining_features = [col for col in numeric_cols if col not in selected_features]
            if remaining_features:
                # Calculate variance for remaining features
                variances = df[remaining_features].var().sort_values(ascending=False)
                high_variance_features = variances.head(10 - len(selected_features)).index.tolist()
                selected_features.extend(high_variance_features)

            # Ensure we have timestamp and essential columns
            essential_cols = ['timestamp'] if 'timestamp' in df.columns else []
            if data_type == 'process' and 'command' in df.columns:
                essential_cols.append('command')

            final_features = essential_cols + selected_features
            selected_data[data_type] = df[final_features]

            self.selected_features[data_type] = final_features
            self.logger.info(f"✅ {data_type}: Selected {len(final_features)} features from {len(df.columns)} original")

        return selected_data

    def normalize_and_scale(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
        """
        Robust scaling for anomaly detection

        Args:
            data: Dictionary of DataFrames with selected features

        Returns:
            Dictionary of scaled DataFrames
        """
        self.logger.info("📏 Normalizing and scaling features...")

        scaled_data = {}

        for data_type, df in data.items():
            df_scaled = df.copy()
            numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()

            if numeric_cols:
                # Use RobustScaler for anomaly detection (less sensitive to outliers)
                scaler = RobustScaler()
                df_scaled[numeric_cols] = scaler.fit_transform(df[numeric_cols])
                self.scalers[data_type] = scaler

            scaled_data[data_type] = df_scaled

        return scaled_data

    def create_sequences(self, data: Dict[str, pd.DataFrame]) -> Dict[str, np.ndarray]:
        """
        Create sequences for time series analysis

        Args:
            data: Dictionary of scaled DataFrames

        Returns:
            Dictionary of sequence arrays
        """
        self.logger.info(f"🔗 Creating sequences of length {self.sequence_length}...")

        sequences = {}

        for data_type, df in data.items():
            if 'timestamp' in df.columns:
                df_sorted = df.sort_values('timestamp')
                numeric_cols = df_sorted.select_dtypes(include=[np.number]).columns.tolist()

                if len(numeric_cols) > 0:
                    values = df_sorted[numeric_cols].values

                    # Create overlapping sequences
                    sequence_list = []
                    for i in range(len(values) - self.sequence_length + 1):
                        sequence_list.append(values[i:i + self.sequence_length])

                    if sequence_list:
                        sequences[data_type] = np.array(sequence_list)
                        self.logger.info(f"✅ {data_type}: Created {len(sequence_list)} sequences of shape {sequences[data_type].shape}")

        return sequences

    def fit_transform(self, file_paths: Dict[str, str]) -> Tuple[Dict[str, np.ndarray], DataQualityReport]:
        """
        Complete preprocessing pipeline

        Args:
            file_paths: Dictionary mapping data type to file path

        Returns:
            Tuple of (processed sequences, quality report)
        """
        self.logger.info("🚀 Starting Layer 1 preprocessing pipeline...")

        # Step 1: Load and sample data
        data = self.load_and_sample_data(file_paths)

        # Step 2: Assess data quality
        quality_report = self.assess_data_quality(data)

        # Step 3: Feature engineering
        engineered_data = self.engineer_features(data)

        # Step 4: Feature selection
        selected_data = self.select_features(engineered_data)

        # Step 5: Normalization and scaling
        scaled_data = self.normalize_and_scale(selected_data)

        # Step 6: Create sequences
        sequences = self.create_sequences(scaled_data)

        self.logger.info("✅ Layer 1 preprocessing pipeline completed!")

        return sequences, quality_report

    def save_preprocessor(self, filepath: str):
        """Save the fitted preprocessor"""
        joblib.dump({
            'scalers': self.scalers,
            'selected_features': self.selected_features,
            'config': {
                'sequence_length': self.sequence_length,
                'sampling_rate': self.sampling_rate,
                'correlation_threshold': self.correlation_threshold,
                'missing_threshold': self.missing_threshold,
                'variance_threshold': self.variance_threshold
            }
        }, filepath)
        self.logger.info(f"💾 Preprocessor saved to {filepath}")

    def load_preprocessor(self, filepath: str):
        """Load a fitted preprocessor"""
        loaded = joblib.load(filepath)
        self.scalers = loaded['scalers']
        self.selected_features = loaded['selected_features']
        config = loaded['config']
        self.sequence_length = config['sequence_length']
        self.sampling_rate = config['sampling_rate']
        self.correlation_threshold = config['correlation_threshold']
        self.missing_threshold = config['missing_threshold']
        self.variance_threshold = config['variance_threshold']
        self.logger.info(f"📂 Preprocessor loaded from {filepath}")



In [3]:
# Example usage:
if __name__ == "__main__":
    # Initialize preprocessor
    preprocessor = AdaptiveDataPreprocessor(
        sequence_length=50,
        sampling_rate=0.2,
        correlation_threshold=0.8
    )

    # Define file paths (adjust to your actual paths)
    file_paths = {
        'vmstat': 'D:\\projet\\exports\\vmstat_metric.csv',
        'iostat': 'D:\\projet\\exports\\iostat_metric.csv',
        'netstat': 'D:\\projet\\exports\\netstat_metric.csv',
        'process': 'D:\\projet\\exports\\process_metric.csv'
    }

    # Run preprocessing pipeline
    sequences, quality_report = preprocessor.fit_transform(file_paths)

    # Print results
    print("\n📊 Processing Results:")
    for data_type, seq_array in sequences.items():
        print(f"{data_type}: {seq_array.shape[0]} sequences, {seq_array.shape[1]} timesteps, {seq_array.shape[2]} features")

    # Save for later use
    preprocessor.save_preprocessor('layer1_preprocessor.pkl')

2025-07-07 09:53:31,657 - DataPreprocessor - INFO - 🚀 Starting Layer 1 preprocessing pipeline...
2025-07-07 09:53:31,660 - DataPreprocessor - INFO - 📂 Processing vmstat from D:\projet\exports\vmstat_metric.csv
2025-07-07 09:53:32,304 - DataPreprocessor - INFO - ✅ Loaded 250,080 rows from 1,250,404 total (20.0%)
2025-07-07 09:53:32,960 - DataPreprocessor - INFO - 📂 Processing iostat from D:\projet\exports\iostat_metric.csv
2025-07-07 09:53:34,528 - DataPreprocessor - INFO - ✅ Loaded 1,126,140 rows from 5,630,702 total (20.0%)
2025-07-07 09:53:37,999 - DataPreprocessor - INFO - 📂 Processing netstat from D:\projet\exports\netstat_metric.csv
2025-07-07 09:53:38,746 - DataPreprocessor - INFO - ✅ Loaded 462,362 rows from 2,311,810 total (20.0%)
2025-07-07 09:53:39,830 - DataPreprocessor - INFO - 📂 Processing process from D:\projet\exports\process_metric.csv
2025-07-07 09:53:43,768 - DataPreprocessor - INFO - ✅ Loaded 2,108,957 rows from 10,544,788 total (20.0%)
2025-07-07 09:53:50,882 - Data

MemoryError: Unable to allocate 954. MiB for an array with shape (250031, 50, 10) and data type float64