<a href="https://colab.research.google.com/github/mysertkaya/Fake-UAV/blob/main/Fake_UAV_Training_and_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Load Data For Training

In [None]:
import pandas as pd
import numpy as np

# Parse the log data
file_path = 'type8_label.bin'  # Change to your actual file path
data = []
previous_timestamp = None  # Track previous timestamp
precision = 4  # Set your desired precision for timestamp difference

with open(file_path, 'r') as file:
    for line in file:
        parts = line.split()
        label = 1 if parts[0] == "Attack" else 0  # Binary classification (Normal=0, Attack=1)
        timestamp = float(parts[1].strip("()"))  # Convert time to float
        time_diff = round(timestamp - previous_timestamp, precision) if previous_timestamp is not None else 0
        previous_timestamp = timestamp  # Update previous timestamp

        # interface = parts[2]  # Interface (e.g., can0)
        can_id = int(parts[3], 16)  # Convert CAN ID from hex to int
        length = int(parts[4].strip("[]"))  # Data length
        data_bytes = [int(byte, 16) for byte in parts[5:]]  # Convert hex payload to integers

        data.append({"timestamp_diff": time_diff, "can_id": can_id,
                     "length": length, "payload": data_bytes, "label": label})

# Convert to DataFrame
df = pd.DataFrame(data)

# Convert payload list into multiple columns (max length is 8 in this dataset)
max_payload_len = df['payload'].apply(len).max()  # Find the maximum length of payload
df_payload = pd.DataFrame(df['payload'].apply(lambda x: x + [0] * (max_payload_len - len(x))).tolist(),
                          columns=[f'byte_{i}' for i in range(max_payload_len)])

# Concatenate the payload columns with the original DataFrame
df = pd.concat([df.drop(columns='payload'), df_payload], axis=1)

df.head()


# Random Forest Training

In [None]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# This code piece is used for both flooding and fuzzing scenario

# Define features (X) and labels (y)
X = df.drop(columns=['label'])
y = df['label']

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train Random Forest Classifier
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)

# Predictions
y_pred = clf.predict(X_test)

# Accuracy Score
accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy:.4f}')

import joblib
joblib.dump(clf, 'random_forest_model.pkl')

#MLP Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import numpy as np

# # Özellikler ve hedef değişkeni ayırma
X = df_combined.drop('label', axis=1).values  # Özellikler
y = df_combined['label'].values  # Etiketler

# Veriyi eğitim ve test olarak ayırma
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Veriyi ölçeklendirme (MLP için genellikle gereklidir)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# PyTorch tensörlerine dönüştürme
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Veri setini DataLoader'a dönüştürme
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# GPU'da eğitim için cihaz seçimi
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Model tanımı
class CustomMLP(nn.Module):
    def __init__(self):
        super(CustomMLP, self).__init__()

        # Özelleştirilmiş katmanlar (Dropout, Batch Normalization)
        # self.fc1 = nn.Linear(X_train.shape[1], 128)  # İlk katman
        self.fc1 = nn.Linear(11, 128)  # İlk katman
        self.bn1 = nn.BatchNorm1d(128)
        self.dropout1 = nn.Dropout(0.5)

        self.fc2 = nn.Linear(128, 64)  # İkinci katman
        self.bn2 = nn.BatchNorm1d(64)
        self.dropout2 = nn.Dropout(0.5)

        self.fc3 = nn.Linear(64, 32)  # Üçüncü katman
        self.bn3 = nn.BatchNorm1d(32)
        self.dropout3 = nn.Dropout(0.5)

        self.fc4 = nn.Linear(32, 2)  # Çıkış katmanı (2 sınıf)

    def forward(self, x):
        x = torch.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)

        x = torch.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)

        x = torch.relu(self.bn3(self.fc3(x)))
        x = self.dropout3(x)

        x = self.fc4(x)  # Son katman
        return x

# Modeli oluşturma ve GPU'ya taşıma
model = CustomMLP().to(device)

# Kaybı ve optimizasyonu tanımlama
criterion = nn.CrossEntropyLoss()  # Kategorik sınıflandırma için uygun kayıp fonksiyonu
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Eğitim fonksiyonu
def train_model(model, train_loader, criterion, optimizer, device, epochs=20):
    model.train()  # Eğitim moduna al
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()  # Gradients sıfırlama

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass ve optimize etme
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}")

# Test fonksiyonu
def test_model(model, test_loader, device):
    model.eval()  # Test moduna al
    y_pred = []
    y_true = []

    with torch.no_grad():  # Gradient hesaplama yapma
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            y_pred.extend(predicted.cpu().numpy())
            y_true.extend(labels.cpu().numpy())

    print("Accuracy:", accuracy_score(y_true, y_pred))
    print(classification_report(y_true, y_pred))

# Modeli eğit
train_model(model, train_loader, criterion, optimizer, device, epochs=20)

# Modeli test et
test_model(model, test_loader, device)


import pickle

with open('mlp_model.pkl', 'wb') as f:
    pickle.dump(mlp, f)

# Demo

In [10]:
import time
import joblib
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import concurrent.futures
import os

# GPU device setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

class CustomMLP(nn.Module):
    def __init__(self):
        super(CustomMLP, self).__init__()

        # Özelleştirilmiş katmanlar (Dropout, Batch Normalization)
        self.fc1 = nn.Linear(11, 128)  # İlk katman
        self.bn1 = nn.BatchNorm1d(128)
        self.dropout1 = nn.Dropout(0.5)

        self.fc2 = nn.Linear(128, 64)  # İkinci katman
        self.bn2 = nn.BatchNorm1d(64)
        self.dropout2 = nn.Dropout(0.5)

        self.fc3 = nn.Linear(64, 32)  # Üçüncü katman
        self.bn3 = nn.BatchNorm1d(32)
        self.dropout3 = nn.Dropout(0.5)

        self.fc4 = nn.Linear(32, 2)  # Çıkış katmanı (2 sınıf)

    def forward(self, x):
        x = torch.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)

        x = torch.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)

        x = torch.relu(self.bn3(self.fc3(x)))
        x = self.dropout3(x)

        x = self.fc4(x)  # Son katman
        return x

class SelectiveAttackDetector:
    def __init__(self, scaler_path=None, enable_models=None):
        """
        Initialize detector with selective model loading

        Args:
            scaler_path: Path to scaler file for MLP model
            enable_models: Dictionary to enable/disable models
                          {'flooding': True, 'fuzzing': True, 'replay': True}
                          If None, all models are enabled by default
        """
        # Default: enable all models
        if enable_models is None:
            enable_models = {'flooding': True, 'fuzzing': True, 'replay': True}

        self.enabled_models = enable_models
        self.models = {}

        print("Loading models...")
        print(f"Model configuration: {self.enabled_models}")

        # Load models based on configuration
        if self.enabled_models.get('flooding', False):
            try:
                self.models['flooding'] = joblib.load('random_forest_model.pkl')
                print("✓ Flooding model loaded successfully")
            except Exception as e:
                print(f"✗ Failed to load flooding model: {e}")
                self.enabled_models['flooding'] = False
        else:
            print("- Flooding model disabled")

        if self.enabled_models.get('fuzzing', False):
            try:
                self.models['fuzzing'] = joblib.load('random_forest_model_fuzzy.pkl')
                print("✓ Fuzzing model loaded successfully")
            except Exception as e:
                print(f"✗ Failed to load fuzzing model: {e}")
                self.enabled_models['fuzzing'] = False
        else:
            print("- Fuzzing model disabled")

        if self.enabled_models.get('replay', False):
            try:
                self.models['replay'] = CustomMLP().to(device)
                self.models['replay'].load_state_dict(torch.load('model_27_05_replay.pth', map_location=device))
                self.models['replay'].eval()
                print("✓ Replay model loaded successfully")

                # Load scaler for replay model
                if scaler_path and os.path.exists(scaler_path):
                    self.scaler = joblib.load(scaler_path)
                    print(f"✓ Scaler loaded from: {scaler_path}")
                else:
                    print("⚠ Warning: No scaler found! Creating dummy scaler - you need to fit it with training data!")
                    self.scaler = StandardScaler()
                    self.scaler_fitted = False

            except Exception as e:
                print(f"✗ Failed to load replay model: {e}")
                self.enabled_models['replay'] = False
        else:
            print("- Replay model disabled")

        # Check if at least one model is enabled
        if not any(self.enabled_models.values()):
            raise ValueError("At least one model must be enabled!")

        print(f"Active models: {[k for k, v in self.enabled_models.items() if v]}")

        # Feature names for models
        self.feature_names = ["timestamp_diff", "can_id", "length",
                 "byte_0", "byte_1", "byte_2", "byte_3",
                 "byte_4", "byte_5", "byte_6", "byte_7"]

    def file_to_dataframe(self, file_path, precision=4):
        """Convert the entire file to DataFrame"""
        print(f"Converting file {file_path} to DataFrame...")

        data = []
        previous_timestamp = None

        with open(file_path, 'r') as file:
            for line_num, line in enumerate(file, 1):
                try:
                    parts = line.split()
                    if len(parts) < 5:
                        continue

                    label = 1 if parts[0] == "Attack" else 0
                    timestamp = float(parts[1].strip("()"))
                    time_diff = round(timestamp - previous_timestamp, precision) if previous_timestamp is not None else 0
                    previous_timestamp = timestamp

                    can_id = int(parts[3], 16)
                    length = int(parts[4].strip("[]"))
                    data_bytes = [int(byte, 16) for byte in parts[5:]]

                    data.append({
                        "timestamp_diff": time_diff,
                        "can_id": can_id,
                        "length": length,
                        "payload": data_bytes,
                        "label": label,
                        "original_line": line.strip()
                    })

                except (ValueError, IndexError) as e:
                    print(f"Error parsing line {line_num}: {line.strip()}, Error: {e}")
                    continue

        # Convert to DataFrame
        df = pd.DataFrame(data)

        if df.empty:
            print("Warning: No valid data found in file!")
            return df

        # Convert payload list into multiple columns (pad to 8 bytes)
        max_payload_len = 8  # Standard CAN frame has max 8 bytes
        df_payload = pd.DataFrame(
            df['payload'].apply(lambda x: x + [0] * (max_payload_len - len(x))).tolist(),
            columns=[f'byte_{i}' for i in range(max_payload_len)]
        )

        # Concatenate the payload columns with the original DataFrame
        df = pd.concat([df.drop(columns='payload'), df_payload], axis=1)

        print(f"DataFrame created successfully with {len(df)} rows and {len(df.columns)} columns")
        return df

    def predict_flooding_batch(self, df):
        """Batch prediction for flooding using Decision Tree"""
        if not self.enabled_models.get('flooding', False):
            return np.zeros(len(df), dtype=int)

        feature_df = df[self.feature_names]
        predictions = self.models['flooding'].predict(feature_df)
        return predictions

    def predict_fuzzing_batch(self, df):
        """Batch prediction for fuzzing using Decision Tree"""
        if not self.enabled_models.get('fuzzing', False):
            return np.zeros(len(df), dtype=int)

        feature_df = df[self.feature_names]
        predictions = self.models['fuzzing'].predict(feature_df)
        return predictions

    def predict_replay_batch(self, df):
        """Batch prediction for replay using MLP on GPU"""
        if not self.enabled_models.get('replay', False):
            return np.zeros(len(df), dtype=int)

        feature_df = df[self.feature_names]

        # Check if scaler is fitted
        if not hasattr(self.scaler, 'scale_') or self.scaler.scale_ is None:
            print("Warning: Scaler not fitted! Using raw features...")
            features_scaled = feature_df.values
        else:
            features_scaled = self.scaler.transform(feature_df.values)

        # Convert to tensor and predict in batches
        X_tensor = torch.tensor(features_scaled, dtype=torch.float32).to(device)

        predictions = []
        batch_size = 1000  # Process in batches to manage GPU memory

        with torch.no_grad():
            for i in range(0, len(X_tensor), batch_size):
                batch = X_tensor[i:i+batch_size]
                outputs = self.models['replay'](batch)
                _, predicted = torch.max(outputs, 1)
                predictions.extend(predicted.cpu().numpy())

        return np.array(predictions)

    def cascade_predict_batch(self, df):
        """Cascade prediction on entire DataFrame with selective models"""
        print("Running cascade predictions...")
        print(f"Active models in cascade: {[k for k, v in self.enabled_models.items() if v]}")

        n_samples = len(df)
        predictions = np.zeros(n_samples, dtype=int)  # 0 = Normal, 1 = Attack
        attack_types = np.full(n_samples, 'unknown', dtype=object)
        true_labels = df['label'].values

        # Initialize model statistics
        model_stats = {}
        for model_name in ['flooding', 'fuzzing', 'replay']:
            model_stats[model_name] = {
                'predictions': None,
                'correct': 0,
                'total_checked': 0,
                'false_positive': 0,
                'false_negative': 0,
                'enabled': self.enabled_models.get(model_name, False)
            }

        remaining_mask = np.ones(n_samples, dtype=bool)  # Track which samples still need checking

        # Step 1: Check flooding (if enabled)
        if self.enabled_models.get('flooding', False):
            print("Step 1: Flooding detection...")
            flooding_preds = self.predict_flooding_batch(df)
            model_stats['flooding']['predictions'] = flooding_preds
            model_stats['flooding']['total_checked'] = n_samples

            # Calculate flooding model accuracy
            flooding_correct = np.sum((flooding_preds == 1) & (true_labels == 1)) + np.sum((flooding_preds == 0) & (true_labels == 0))
            model_stats['flooding']['correct'] = flooding_correct
            model_stats['flooding']['false_positive'] = np.sum((flooding_preds == 1) & (true_labels == 0))
            model_stats['flooding']['false_negative'] = np.sum((flooding_preds == 0) & (true_labels == 1))

            flooding_mask = flooding_preds == 1
            predictions[flooding_mask] = 1
            attack_types[flooding_mask] = 'flooding'
            remaining_mask = remaining_mask & ~flooding_mask

            print(f"Flooding attacks detected: {np.sum(flooding_mask)}")
            print(f"Flooding model accuracy: {flooding_correct/n_samples*100:.2f}% ({flooding_correct}/{n_samples})")
        else:
            print("Step 1: Flooding detection - SKIPPED (disabled)")

        # Step 2: Check fuzzing for remaining samples (if enabled)
        if self.enabled_models.get('fuzzing', False) and np.sum(remaining_mask) > 0:
            print("Step 2: Fuzzing detection...")
            remaining_df = df[remaining_mask]
            remaining_true_labels = true_labels[remaining_mask]
            fuzzing_preds = self.predict_fuzzing_batch(remaining_df)

            model_stats['fuzzing']['predictions'] = np.full(n_samples, -1)
            model_stats['fuzzing']['predictions'][remaining_mask] = fuzzing_preds
            model_stats['fuzzing']['total_checked'] = len(remaining_df)

            # Calculate fuzzing model accuracy on remaining samples
            fuzzing_correct = np.sum((fuzzing_preds == 1) & (remaining_true_labels == 1)) + np.sum((fuzzing_preds == 0) & (remaining_true_labels == 0))
            model_stats['fuzzing']['correct'] = fuzzing_correct
            model_stats['fuzzing']['false_positive'] = np.sum((fuzzing_preds == 1) & (remaining_true_labels == 0))
            model_stats['fuzzing']['false_negative'] = np.sum((fuzzing_preds == 0) & (remaining_true_labels == 1))

            fuzzing_attack_mask = fuzzing_preds == 1

            # Update predictions for fuzzing attacks
            remaining_indices = np.where(remaining_mask)[0]
            fuzzing_indices = remaining_indices[fuzzing_attack_mask]
            predictions[fuzzing_indices] = 1
            attack_types[fuzzing_indices] = 'fuzzing'

            # Update remaining mask
            remaining_mask[fuzzing_indices] = False

            print(f"Fuzzing attacks detected: {np.sum(fuzzing_attack_mask)}")
            print(f"Fuzzing model accuracy: {fuzzing_correct/len(remaining_df)*100:.2f}% ({fuzzing_correct}/{len(remaining_df)})")
        else:
            if not self.enabled_models.get('fuzzing', False):
                print("Step 2: Fuzzing detection - SKIPPED (disabled)")
            else:
                print("Step 2: Fuzzing detection - SKIPPED (no remaining samples)")

        # Step 3: Check replay for remaining samples (if enabled)
        if self.enabled_models.get('replay', False) and np.sum(remaining_mask) > 0:
            print("Step 3: Replay detection...")
            final_remaining_df = df[remaining_mask]
            final_remaining_true_labels = true_labels[remaining_mask]
            replay_preds = self.predict_replay_batch(final_remaining_df)

            model_stats['replay']['predictions'] = np.full(n_samples, -1)
            model_stats['replay']['predictions'][remaining_mask] = replay_preds
            model_stats['replay']['total_checked'] = len(final_remaining_df)

            # Calculate replay model accuracy on final remaining samples
            replay_correct = np.sum((replay_preds == 1) & (final_remaining_true_labels == 1)) + np.sum((replay_preds == 0) & (final_remaining_true_labels == 0))
            model_stats['replay']['correct'] = replay_correct
            model_stats['replay']['false_positive'] = np.sum((replay_preds == 1) & (final_remaining_true_labels == 0))
            model_stats['replay']['false_negative'] = np.sum((replay_preds == 0) & (final_remaining_true_labels == 1))

            replay_attack_mask = replay_preds == 1

            # Update predictions for replay attacks
            final_remaining_indices = np.where(remaining_mask)[0]
            replay_indices = final_remaining_indices[replay_attack_mask]
            predictions[replay_indices] = 1
            attack_types[replay_indices] = 'replay'
            remaining_mask[replay_indices] = False

            print(f"Replay attacks detected: {np.sum(replay_attack_mask)}")
            print(f"Replay model accuracy: {replay_correct/len(final_remaining_df)*100:.2f}% ({replay_correct}/{len(final_remaining_df)})")
        else:
            if not self.enabled_models.get('replay', False):
                print("Step 3: Replay detection - SKIPPED (disabled)")
            else:
                print("Step 3: Replay detection - SKIPPED (no remaining samples)")

        # Set remaining as normal
        normal_mask = predictions == 0
        attack_types[normal_mask] = 'all_normal'

        return predictions, attack_types, model_stats

    def parallel_predict_batch(self, df):
        """Parallel prediction on entire DataFrame with selective models"""
        print("Running parallel predictions...")
        print(f"Active models in parallel: {[k for k, v in self.enabled_models.items() if v]}")

        true_labels = df['label'].values
        n_samples = len(df)

        # Run predictions in parallel for enabled models only
        predictions_dict = {}

        with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
            futures = {}

            if self.enabled_models.get('flooding', False):
                futures['flooding'] = executor.submit(self.predict_flooding_batch, df)

            if self.enabled_models.get('fuzzing', False):
                futures['fuzzing'] = executor.submit(self.predict_fuzzing_batch, df)

            if self.enabled_models.get('replay', False):
                futures['replay'] = executor.submit(self.predict_replay_batch, df)

            # Get results
            for model_name, future in futures.items():
                predictions_dict[model_name] = future.result()

        # For disabled models, create zero predictions
        for model_name in ['flooding', 'fuzzing', 'replay']:
            if model_name not in predictions_dict:
                predictions_dict[model_name] = np.zeros(n_samples, dtype=int)

        # Calculate individual model statistics
        model_stats = {}
        for model_name in ['flooding', 'fuzzing', 'replay']:
            preds = predictions_dict[model_name]
            enabled = self.enabled_models.get(model_name, False)

            if enabled:
                correct = np.sum((preds == 1) & (true_labels == 1)) + np.sum((preds == 0) & (true_labels == 0))
                false_positive = np.sum((preds == 1) & (true_labels == 0))
                false_negative = np.sum((preds == 0) & (true_labels == 1))

                print(f"{model_name.capitalize()} model accuracy: {correct/n_samples*100:.2f}% ({correct}/{n_samples})")
                print(f"  False Positives: {false_positive}, False Negatives: {false_negative}")
            else:
                correct = false_positive = false_negative = 0
                print(f"{model_name.capitalize()} model: DISABLED")

            model_stats[model_name] = {
                'predictions': preds,
                'correct': correct,
                'total_checked': n_samples if enabled else 0,
                'false_positive': false_positive,
                'false_negative': false_negative,
                'enabled': enabled
            }

        predictions = np.zeros(n_samples, dtype=int)
        attack_types = np.full(n_samples, 'all_normal', dtype=object)

        # Priority: flooding > fuzzing > replay (only for enabled models)
        flooding_mask = np.zeros(n_samples, dtype=bool)
        fuzzing_mask = np.zeros(n_samples, dtype=bool)
        replay_mask = np.zeros(n_samples, dtype=bool)

        if self.enabled_models.get('flooding', False):
            flooding_mask = predictions_dict['flooding'] == 1

        if self.enabled_models.get('fuzzing', False):
            fuzzing_mask = (predictions_dict['fuzzing'] == 1) & (~flooding_mask)

        if self.enabled_models.get('replay', False):
            replay_mask = (predictions_dict['replay'] == 1) & (~flooding_mask) & (~fuzzing_mask)

        predictions[flooding_mask] = 1
        attack_types[flooding_mask] = 'flooding'

        predictions[fuzzing_mask] = 1
        attack_types[fuzzing_mask] = 'fuzzing'

        predictions[replay_mask] = 1
        attack_types[replay_mask] = 'replay'

        print(f"Final predictions - Flooding attacks: {np.sum(flooding_mask)}")
        print(f"Final predictions - Fuzzing attacks: {np.sum(fuzzing_mask)}")
        print(f"Final predictions - Replay attacks: {np.sum(replay_mask)}")

        return predictions, attack_types, model_stats

    def process_file(self, input_file, output_file=None, method="cascade", print_all=True):
        """Process entire file by first converting to DataFrame then predicting"""
        print(f"Processing file: {input_file}")
        print(f"Method: {'Cascade (sequential)' if method == 'cascade' else 'Parallel'}")
        print(f"Enabled models: {[k for k, v in self.enabled_models.items() if v]}")
        print("-" * 80)

        start_time = time.time()

        # Step 1: Convert file to DataFrame
        df = self.file_to_dataframe(input_file)

        if df.empty:
            print("No data to process!")
            return []

        conversion_time = time.time()
        print(f"File conversion time: {conversion_time - start_time:.4f} seconds")

        # Step 2: Make predictions
        if method == "cascade":
            predictions, attack_types, model_stats = self.cascade_predict_batch(df)
        else:  # parallel
            predictions, attack_types, model_stats = self.parallel_predict_batch(df)

        prediction_time = time.time()
        print(f"Prediction time: {prediction_time - conversion_time:.4f} seconds")

        # Step 3: Calculate statistics and prepare results
        true_labels = df['label'].values
        correct_predictions = np.sum((predictions == 1) & (true_labels == 1)) + np.sum((predictions == 0) & (true_labels == 0))
        total_lines = len(df)
        attack_count = np.sum(predictions == 1)
        accuracy = correct_predictions / total_lines * 100 if total_lines > 0 else 0

        # Prepare results
        results = []
        for i in range(len(df)):
            pred_label = "Attack" if predictions[i] == 1 else "Normal"
            results.append((true_labels[i], pred_label, attack_types[i], df.iloc[i]['original_line']))

        # Print line-by-line results if requested
        if print_all:
            print("\n" + "="*100)
            print("LINE-BY-LINE RESULTS")
            print("="*100)
            for i, (true_label, pred_label, attack_type, original_line) in enumerate(results, 1):
                true_str = "Attack" if true_label == 1 else "Normal"
                correct = "✓" if ((true_label == 1 and pred_label == "Attack") or
                                (true_label == 0 and pred_label == "Normal")) else "✗"

                print(f"Line {i:6d}: True={true_str:6s} | Pred={pred_label:6s} | Via={attack_type:12s} | {correct}")

        end_time = time.time()

        # Final statistics
        print(f"\n{'='*80}")
        print(f"FINAL RESULTS")
        print(f"{'='*80}")
        print(f"Enabled models: {[k for k, v in self.enabled_models.items() if v]}")
        print(f"Total lines processed: {total_lines}")
        print(f"Correct predictions: {correct_predictions}")
        print(f"Accuracy: {accuracy:.2f}%")
        print(f"Attacks detected: {attack_count}")
        print(f"Normal traffic: {total_lines - attack_count}")
        print(f"Attack detection ratio: {attack_count/total_lines*100:.2f}%")
        print(f"Total processing time: {end_time - start_time:.4f} seconds")
        print(f"Lines per second: {total_lines/(end_time - start_time):.2f}")

        # Attack type breakdown
        unique_types, counts = np.unique(attack_types, return_counts=True)
        print(f"\nAttack type breakdown:")
        for attack_type, count in zip(unique_types, counts):
            print(f"  {attack_type}: {count} ({count/total_lines*100:.2f}%)")

        # Individual model performance statistics
        print(f"\n{'='*80}")
        print(f"INDIVIDUAL MODEL PERFORMANCE")
        print(f"{'='*80}")

        for model_name, stats in model_stats.items():
            if stats['enabled'] and stats['total_checked'] > 0:
                accuracy_model = stats['correct'] / stats['total_checked'] * 100
                precision = stats['correct'] / (stats['correct'] + stats['false_positive']) * 100 if (stats['correct'] + stats['false_positive']) > 0 else 0
                recall = stats['correct'] / (stats['correct'] + stats['false_negative']) * 100 if (stats['correct'] + stats['false_negative']) > 0 else 0
                f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

                print(f"\n{model_name.upper()} MODEL:")
                print(f"  Status: ENABLED")
                print(f"  Samples checked: {stats['total_checked']}")
                print(f"  Correct predictions: {stats['correct']}")
                print(f"  Accuracy: {accuracy_model:.2f}%")
                print(f"  False Positives: {stats['false_positive']}")
                print(f"  False Negatives: {stats['false_negative']}")
                print(f"  Precision: {precision:.2f}%")
                print(f"  Recall: {recall:.2f}%")
                print(f"  F1-Score: {f1_score:.2f}%")
            else:
                print(f"\n{model_name.upper()} MODEL:")
                print(f"  Status: DISABLED")

        # Save results if output file specified
        if output_file:
            self._save_results(results, output_file)
            print(f"\nResults saved to: {output_file}")

        return results

    def _save_results(self, results, output_file):
        """Save results to file"""
        with open(output_file, 'w') as f:
            f.write("true_label,predicted_label,attack_type,original_line\n")
            for true_label, pred_label, attack_type, original_line in results:
                f.write(f"{true_label},{pred_label},{attack_type},\"{original_line}\"\n")

detector_replay = SelectiveAttackDetector(
    scaler_path='scaler.pkl',
    enable_models={'flooding': False, 'fuzzing': False, 'replay': True}
)
detector_replay.process_file('type6_label.bin', 'results_replay_only.csv', method='cascade', print_all=False)

# # Usage Examples
# if __name__ == "__main__":
#     exit()
#     print("="*80)
#     print("SELECTIVE ATTACK DETECTOR - USAGE EXAMPLES")
#     print("="*80)

#     # Example 1: Enable all models (default behavior)
#     print("\n1. ALL MODELS ENABLED (Default)")
#     print("-" * 40)
#     detector_all = SelectiveAttackDetector(
#         scaler_path='scaler.pkl',
#         enable_models={'flooding': True, 'fuzzing': True, 'replay': True}
#     )
#     # detector_all.process_file('type7_label.bin', 'results_all_models.csv', method='cascade', print_all=False)

#     print("\n" + "="*80)

#     # Example 2: Only flooding model
#     print("\n2. ONLY FLOODING MODEL")
#     print("-" * 40)
#     detector_flooding = SelectiveAttackDetector(
#         scaler_path='scaler.pkl',
#         enable_models={'flooding': False, 'fuzzing': False, 'replay': True}
#     )
#     detector_flooding.process_file('type5_label.bin', 'results_flooding_only.csv', method='cascade', print_all=False)

#     print("\n" + "="*80)

#     # Example 3: Only replay model
#     print("\n3. ONLY REPLAY MODEL")
#     print("-" * 40)
#     detector_replay = SelectiveAttackDetector(
#         scaler_path='scaler.pkl',
#         enable_models={'flooding': False, 'fuzzing': False, 'replay': True}
#     )
#     # detector_replay.process_file('type10_label.bin', 'results_replay_only.csv', method='cascade', print_all=False)

#     print("\n" + "="*80)

#     # Example 4: Flooding + Fuzzing (no replay)
#     print("\n4. FLOODING + FUZZING (No Replay)")
#     print("-" * 40)
#     detector_no_replay = SelectiveAttackDetector(
#         scaler_path='scaler.pkl',
#         enable_models={'flooding': True, 'fuzzing': True, 'replay': False}
#     )
#     # detector_no_replay.process_file('type7_label.bin', 'results_no_replay.csv', method='cascade', print_all=False)

#     print("\n" + "="*80)

#     # Example 5: Custom configuration with actual processing
#     print("\n5. CUSTOM CONFIGURATION - ACTUAL PROCESSING")
#     print("-" * 40)

#     # Choose your configuration here:
#     custom_config = {
#         'flooding': True,   # Enable flooding detection
#         'fuzzing': False,   # Disable fuzzing detection
#         'replay': True      # Enable replay detection
#     }

#     detector_custom = SelectiveAttackDetector(
#         scaler_path='scaler.pkl',
#         enable_models=custom_config
#     )

#     # Process with custom configuration
#     # detector_custom.process_file('type10_label.bin', 'results_custom.csv', method='cascade', print_all=True)

#     print("\n" + "="*80)
#     print("USAGE INSTRUCTIONS:")
#     print("="*80)
#     print("To use specific models, create detector with enable_models parameter:")
#     print("enable_models = {'flooding': True/False, 'fuzzing': True/False, 'replay': True/False}")
#     print("")

Using device: cpu
Loading models...
Model configuration: {'flooding': False, 'fuzzing': False, 'replay': True}
- Flooding model disabled
- Fuzzing model disabled
✓ Replay model loaded successfully
✓ Scaler loaded from: scaler.pkl
Active models: ['replay']
Processing file: type6_label.bin
Method: Cascade (sequential)
Enabled models: ['replay']
--------------------------------------------------------------------------------
Converting file type6_label.bin to DataFrame...
DataFrame created successfully with 241321 rows and 13 columns
File conversion time: 4.2589 seconds
Running cascade predictions...
Active models in cascade: ['replay']
Step 1: Flooding detection - SKIPPED (disabled)
Step 2: Fuzzing detection - SKIPPED (disabled)
Step 3: Replay detection...
Replay attacks detected: 80873
Replay model accuracy: 95.51% (230484/241321)
Prediction time: 0.5523 seconds

FINAL RESULTS
Enabled models: ['replay']
Total lines processed: 241321
Correct predictions: 230484
Accuracy: 95.51%
Attacks d

[(np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.000000)  can0  05040601   [8]  A6 35 00 00 00 00 00 9C'),
 (np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.000180)  can0  05040601   [7]  00 00 00 00 00 00 7C'),
 (np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.002787)  can0  05040601   [8]  A6 35 00 00 00 00 00 9D'),
 (np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.002945)  can0  05040601   [7]  00 00 00 00 00 00 7D'),
 (np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.007147)  can0  05040601   [8]  A6 35 00 00 00 00 00 9E'),
 (np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.007306)  can0  05040601   [7]  00 00 00 00 00 00 7E'),
 (np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.010037)  can0  05040601   [8]  A6 35 00 00 00 00 00 9F'),
 (np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.010201)  can0  05040601   [7]  00 00 00 00 00 00 7F'),
 (np.int64(0),
  'Normal',
  'all_normal',
  'Normal (000.012577)  can0  05040601   

# Gradio

In [2]:
!pip install gradio



In [9]:
# Gradio Interface Addition - Add this to the end of your existing code
import gradio as gr
def create_gradio_interface():
    """Create Gradio web interface for the attack detector"""

    def process_with_gradio(file_content, flooding_enabled, fuzzing_enabled, replay_enabled, method):
        """Process file content through Gradio interface"""
        try:
            # Create temporary file from uploaded content
            temp_filename = "temp_input.bin"
            with open(temp_filename, 'w') as f:
                f.write(file_content)

            # Configure models based on user selection
            enable_models = {
                'flooding': flooding_enabled,
                'fuzzing': fuzzing_enabled,
                'replay': replay_enabled
            }

            # Check if at least one model is enabled
            if not any(enable_models.values()):
                return "❌ Error: At least one model must be enabled!", "", ""

            # Initialize detector with selected models
            detector = SelectiveAttackDetector(
                scaler_path='scaler.pkl',
                enable_models=enable_models
            )

            # Process the file
            results = detector.process_file(
                temp_filename,
                method=method.lower(),
                print_all=False  # Don't print all lines in web interface
            )

            # Clean up temp file
            if os.path.exists(temp_filename):
                os.remove(temp_filename)

            # Generate summary report
            summary_report = generate_summary_report(results, enable_models)

            # Generate detailed results
            detailed_results = generate_detailed_results(results)

            # Generate statistics
            stats_report = generate_stats_report(results)

            return summary_report, detailed_results, stats_report

        except Exception as e:
            return f"❌ Error processing file: {str(e)}", "", ""

    def generate_summary_report(results, enabled_models):
        """Generate summary report"""
        if not results:
            return "No results to display"

        total_lines = len(results)
        correct_predictions = sum(1 for true_label, pred_label, _, _ in results
                                if (true_label == 1 and pred_label == "Attack") or
                                   (true_label == 0 and pred_label == "Normal"))

        attack_predictions = sum(1 for _, pred_label, _, _ in results if pred_label == "Attack")
        accuracy = (correct_predictions / total_lines * 100) if total_lines > 0 else 0

        # Count attack types
        attack_types = {}
        for _, pred_label, attack_type, _ in results:
            if pred_label == "Attack":
                attack_types[attack_type] = attack_types.get(attack_type, 0) + 1

        enabled_models_str = ", ".join([k.title() for k, v in enabled_models.items() if v])

        report = f"""
📊 **DETECTION SUMMARY**
{'='*50}
🔧 **Enabled Models:** {enabled_models_str}
📝 **Total Lines Processed:** {total_lines}
✅ **Correct Predictions:** {correct_predictions}
🎯 **Overall Accuracy:** {accuracy:.2f}%
🚨 **Attacks Detected:** {attack_predictions}
🟢 **Normal Traffic:** {total_lines - attack_predictions}

🎭 **Attack Type Breakdown:**
"""

        for attack_type, count in attack_types.items():
            percentage = (count / total_lines * 100) if total_lines > 0 else 0
            report += f"   • {attack_type.title()}: {count} ({percentage:.1f}%)\n"

        return report

    def generate_detailed_results(results):
        """Generate detailed line-by-line results"""
        if not results:
            return "No results to display"

        detailed = "📋 **DETAILED RESULTS**\n"
        detailed += "="*80 + "\n"
        detailed += f"{'Line':<8} {'True':<8} {'Predicted':<10} {'Via':<12} {'Status':<8}\n"
        detailed += "-"*80 + "\n"

        for i, (true_label, pred_label, attack_type, original_line) in enumerate(results[:100], 1):  # Limit to first 100 lines
            true_str = "Attack" if true_label == 1 else "Normal"
            correct = "✅" if ((true_label == 1 and pred_label == "Attack") or
                             (true_label == 0 and pred_label == "Normal")) else "❌"

            detailed += f"{i:<8} {true_str:<8} {pred_label:<10} {attack_type:<12} {correct}\n"

        if len(results) > 100:
            detailed += f"\n... and {len(results) - 100} more lines\n"

        return detailed

    def generate_stats_report(results):
        """Generate statistics report"""
        if not results:
            return "No results to display"

        # Calculate confusion matrix values
        tp = sum(1 for true_label, pred_label, _, _ in results
                if true_label == 1 and pred_label == "Attack")
        tn = sum(1 for true_label, pred_label, _, _ in results
                if true_label == 0 and pred_label == "Normal")
        fp = sum(1 for true_label, pred_label, _, _ in results
                if true_label == 0 and pred_label == "Attack")
        fn = sum(1 for true_label, pred_label, _, _ in results
                if true_label == 1 and pred_label == "Normal")

        precision = (tp / (tp + fp) * 100) if (tp + fp) > 0 else 0
        recall = (tp / (tp + fn) * 100) if (tp + fn) > 0 else 0
        f1_score = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0

        stats = f"""
📈 **PERFORMANCE METRICS**
{'='*50}
🎯 **Confusion Matrix:**
   • True Positives (TP): {tp}
   • True Negatives (TN): {tn}
   • False Positives (FP): {fp}
   • False Negatives (FN): {fn}

📊 **Metrics:**
   • Precision: {precision:.2f}%
   • Recall: {recall:.2f}%
   • F1-Score: {f1_score:.2f}%

🔍 **Error Analysis:**
   • False Positive Rate: {(fp/(fp+tn)*100) if (fp+tn) > 0 else 0:.2f}%
   • False Negative Rate: {(fn/(fn+tp)*100) if (fn+tp) > 0 else 0:.2f}%
"""
        return stats

    # Create Gradio interface
    with gr.Blocks(title="UAVCAN Bus Attack Detector", theme=gr.themes.Soft()) as demo:
        gr.Markdown("""
        # 🚁 UAVCAN Bus Attack Detector

        Upload your UAVCAN bus data file and configure the detection models to analyze potential attacks.

        **Supported Attack Types:**
        - 🌊 **Flooding**: Detects flooding attacks using Random Forest
        - 🎯 **Fuzzing**: Detects fuzzing attacks using Random Forest
        - 🔄 **Replay**: Detects replay attacks using Neural Network (MLP)
        """)

        with gr.Row():
            with gr.Column(scale=1):
                gr.Markdown("### 📁 Input Configuration")

                file_input = gr.Textbox(
                    label="UAVCAN Bus Data",
                    placeholder="Paste your UAVCAN bus data here...\nFormat: [Label] (timestamp) can_id [length] byte0 byte1 ...",
                    lines=10,
                    max_lines=20
                )

                gr.Markdown("### ⚙️ Model Configuration")

                flooding_cb = gr.Checkbox(
                    label="🌊 Enable Flooding Detection",
                    value=True,
                    info="Uses Random Forest to detect flooding attacks"
                )

                fuzzing_cb = gr.Checkbox(
                    label="🎯 Enable Fuzzing Detection",
                    value=True,
                    info="Uses Random Forest to detect fuzzing attacks"
                )

                replay_cb = gr.Checkbox(
                    label="🔄 Enable Replay Detection",
                    value=True,
                    info="Uses Neural Network to detect replay attacks"
                )

                method_radio = gr.Radio(
                    choices=["Cascade", "Parallel"],
                    label="🔄 Processing Method",
                    value="Cascade",
                    info="Cascade: Sequential detection, Parallel: All models run simultaneously"
                )

                analyze_btn = gr.Button("🔍 Analyze Data", variant="primary", size="lg")

        with gr.Row():
            with gr.Column():
                summary_output = gr.Markdown(label="📊 Summary Report")

        with gr.Row():
            with gr.Column():
                detailed_output = gr.Textbox(
                    label="📋 Detailed Results",
                    lines=15,
                    max_lines=20,
                    show_copy_button=True
                )

            with gr.Column():
                stats_output = gr.Markdown(label="📈 Performance Metrics")

        # Event handler
        analyze_btn.click(
            fn=process_with_gradio,
            inputs=[file_input, flooding_cb, fuzzing_cb, replay_cb, method_radio],
            outputs=[summary_output, detailed_output, stats_output]
        )

        # Example data
        gr.Markdown("""
        ### 📝 Example Data Format
        ```
        Normal (1234567890.123) 123 [8] 01 02 03 04 05 06 07 08
        Attack (1234567890.456) 456 [8] AA BB CC DD EE FF 00 11
        Normal (1234567890.789) 789 [4] 12 34 56 78
        ```
        """)

    return demo

# Add this at the very end of your original code
if __name__ == "__main__":
    # Original command line examples (commented out for web interface)
    """
    print("="*80)
    print("SELECTIVE ATTACK DETECTOR - USAGE EXAMPLES")
    print("="*80)
    # ... your existing examples ...
    """

    # Launch Gradio interface
    print("🚀 Starting Gradio Web Interface...")
    print("📡 Access the interface at: http://localhost:7860")

    demo = create_gradio_interface()
    demo.launch(
        server_name="0.0.0.0",  # Allow external access
        share=True,  # Create public link
        show_error=True
    )

🚀 Starting Gradio Web Interface...
📡 Access the interface at: http://localhost:7860
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://4b2ae0b3c424cf8b34.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
