<a href="https://colab.research.google.com/github/shashi913/Adapt_Anti_UAV_detection_system/blob/model_fusion/Fusion_models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import numpy as np
import tensorflow as tf
from tensorflow import keras
import torch
import os
from PIL import Image
import torchvision.transforms as T

# CPU/GPU device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Setup on {device}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Setup on cpu


In [None]:
# !find /content/drive/MyDrive -name "rtdetr-pytorch" -type d

In [None]:
RF_MODEL_PATH = '/content/drive/MyDrive/DroneRF/models_IMPROVED/best_model_improved.keras'
VISION_MODEL_PATH = '/content/drive/MyDrive/FYP_datasets/Vision-RGB/DUT/rtdetr_uav_best_model.pth'

print("Files exist:", os.path.exists(RF_MODEL_PATH), os.path.exists(VISION_MODEL_PATH))

# Load RF model
rf_model = keras.models.load_model(RF_MODEL_PATH)
print("RF loaded (CNN-LSTM 4-class)")

# Load Vision weights
state_dict = torch.load(VISION_MODEL_PATH, map_location=device)
print("Vision weights loaded:", len(state_dict), "keys")

# RT-DETR R18 backbone (your model base)
from torchvision.models import resnet18
vision_model = resnet18(weights='DEFAULT').to(device).eval()
vision_model.fc = torch.nn.Identity()

# Load YOUR trained weights
vision_model.load_state_dict(state_dict, strict=False)
print("Vision model ready (rtdetr_uav_best_model.pth)")


Files exist: True True
RF loaded (CNN-LSTM 4-class)
Vision weights loaded: 6 keys
Vision model ready (rtdetr_uav_best_model.pth)


In [None]:
# ============ PREPROCESSING ============
def preprocess_rf(rf_signal):
    """Convert RF signal to model input (time + freq domain)"""
    rf_signal = np.asarray(rf_signal, dtype=np.float32).flatten()

    # Time domain (5000 samples)
    if len(rf_signal) < 5000:
        rf_signal = np.pad(rf_signal, (0, 5000 - len(rf_signal)))
    else:
        rf_signal = rf_signal[:5000]

    time_sig = (rf_signal - np.mean(rf_signal)) / (np.std(rf_signal) + 1e-8)
    time_input = time_sig.reshape(1, 5000, 1)

    # Frequency domain (2501 FFT bins)
    fft_feat = np.abs(np.fft.rfft(rf_signal))
    if len(fft_feat) < 2501:
        fft_feat = np.pad(fft_feat, (0, 2501 - len(fft_feat)))
    else:
        fft_feat = fft_feat[:2501]

    freq_sig = (fft_feat - np.mean(fft_feat)) / (np.std(fft_feat) + 1e-8)
    freq_input = freq_sig.reshape(1, 2501, 1)

    return time_input, freq_input

def preprocess_image(img_path):
    """Convert image to 640x640 normalized tensor"""
    transform = T.Compose([
        T.Resize((640, 640)),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    img = Image.open(img_path).convert('RGB')
    return transform(img).unsqueeze(0).to(device)

#Load L+H CSV pairs for RF model
def load_rf_csv_pair(l_file, h_file=None):
    """
    Load L and H CSV files and combine them (DroneRF format)

    Args:
        l_file: Path to L (low) CSV file
        h_file: Path to H (high) CSV file. If None, tries to auto-find it.

    Returns:
        Combined RF signal as numpy array
    """
    import os
    import numpy as np

    # Load L file
    if not os.path.exists(l_file):
        print(f"❌ ERROR: L file not found: {l_file}")
        return None

    data_l = np.loadtxt(l_file, delimiter=',', dtype=np.float32).flatten()

    # Try to auto-find H file if not provided
    if h_file is None:
        # Strategy 1: Replace folder name "RF Data_XXXXX_L" → "RF Data_XXXXX_H"
        h_file_candidate1 = l_file.replace('/RF Data_', '/RF_Data_H_TEMP_').replace('_L/', '_H/').replace('/RF_Data_H_TEMP_', '/RF Data_')

        # Strategy 2: Replace filename "XXXXXL_" → "XXXXXH_"
        h_file_candidate2 = l_file.replace('L_', 'H_')

        # Strategy 3: Replace both folder AND filename patterns
        directory = os.path.dirname(l_file)  # Get folder path
        filename = os.path.basename(l_file)  # Get just filename

        # Replace folder: /RF Data_10000_L/RF Data_10000_L/ → /RF Data_10000_H/RF Data_10000_H/
        h_directory = directory.replace('_L/', '_H/').replace('_L1/', '_H1/').replace('_L2/', '_H2/')

        # Replace filename: 10000L_1.csv → 10000H_1.csv
        h_filename = filename.replace('L_', 'H_').replace('L.csv', 'H.csv')

        h_file_candidate3 = os.path.join(h_directory, h_filename)

        # Try all candidates
        for candidate in [h_file_candidate1, h_file_candidate2, h_file_candidate3]:
            if os.path.exists(candidate):
                h_file = candidate
                print(f"✅ Auto-found H file: {os.path.basename(h_file)}")
                break
        else:
            print(f"⚠️ Warning: H file not found, searched:")
            print(f"   1. {os.path.basename(h_file_candidate1)}")
            print(f"   2. {os.path.basename(h_file_candidate2)}")
            print(f"   3. {os.path.basename(h_file_candidate3)}")
            print(f"   Using L file only (may affect accuracy)")
            return data_l

    # Load H file
    if not os.path.exists(h_file):
        print(f"⚠️ Warning: Provided H file not found: {h_file}")
        print(f"   Using L file only")
        return data_l

    data_h = np.loadtxt(h_file, delimiter=',', dtype=np.float32).flatten()

    # Combine L and H
    combined = np.concatenate([data_l, data_h])
    print(f"✅ Combined L+H: {len(data_l)} + {len(data_h)} = {len(combined)} samples")

    return combined




# ============ MULTIMODAL DETECTION ============
def detect_uav(rf_signal=None, rf_h_file=None, img_path=None):
    """
    Multimodal UAV Detection System

    Args:
        rf_signal: RF signal - can be:
                   - Numpy array (combined L+H)
                   - Path to L CSV file (will auto-find H file)
                   - Tuple of (L_path, H_path)
        rf_h_file: Optional H file path (if rf_signal is L file path)
        img_path: Image file path (JPG/PNG)

    Returns:
        Dictionary with detection results
    """
    rf_uav_prob = 0.0
    vision_score = 0.0
    rf_class = None
    rf_pred = None

    # ===== RF DETECTION =====
    if rf_signal is not None:
        # Case 1: Tuple of (L, H) paths
        if isinstance(rf_signal, tuple) and len(rf_signal) == 2:
            rf_signal = load_rf_csv_pair(rf_signal[0], rf_signal[1])

        # Case 2: Single CSV path (L file, auto-find H)
        elif isinstance(rf_signal, str):
            rf_signal = load_rf_csv_pair(rf_signal, rf_h_file)

        # Case 3: Already a numpy array
        # (no change needed)

        # Preprocess and predict
        time_in, freq_in = preprocess_rf(rf_signal)
        rf_pred = rf_model.predict([time_in, freq_in], verbose=0)[0]

        # Calculate UAV probability (avg of drone classes 1-3 vs background 0)
        rf_uav_prob = np.mean(rf_pred[1:])
        rf_class = np.argmax(rf_pred)

    # ===== VISION DETECTION =====
    if img_path is not None and os.path.exists(img_path):
        try:
            img_tensor = preprocess_image(img_path)
            with torch.no_grad():
                features = vision_model(img_tensor)
            # Feature strength as detection proxy
            vision_score = min(torch.norm(features.mean()).item() / 500, 1.0)
        except Exception as e:
            print(f"Vision error: {e}")
            vision_score = 0.0

    # ===== FUSION (Weighted Average) =====
    fused_score = 0.6 * rf_uav_prob + 0.4 * vision_score
    uav_detected = fused_score > 0.5

    # Return results
    return {
        'rf_class': rf_class,
        'rf_predictions': rf_pred,
        'rf_uav_probability': rf_uav_prob,
        'vision_score': vision_score,
        'fused_score': fused_score,
        'UAV_DETECTED': uav_detected
    }


print("Detection system ready!")


Detection system ready!


In [None]:

#Single Detection with L+H pair (AUTO-FIND H file)
print("Single Detection Test (Auto-find H)")


# CORRECTED: Provide L file, H file will be auto-found
rf_l_file = '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/Bepop drone/RF Data_10000_L/RF Data_10000_L/10000L_1.csv'
img_file = '/content/drive/MyDrive/FYP_datasets/Vision-RGB/DUT/test_data/download.jpg'

# Run detection (will auto-find 11000H_1.csv)
result = detect_uav(rf_signal=rf_l_file, img_path=img_file)

# Display results
print(f"\nRF Analysis:")
print(f"   Predicted class: {result['rf_class']}")
print(f"   All class probs: {result['rf_predictions']}")
print(f"   UAV probability: {result['rf_uav_probability']:.1%}")

print(f"\nVision Analysis:")
print(f"   Detection score: {result['vision_score']:.3f}")

print(f"\nMULTIMODAL FUSION:")
print(f"   Combined score: {result['fused_score']:.1%}")
print(f"   Decision: {'UAV DETECTED!' if result['UAV_DETECTED'] else 'No UAV'}")
print("="*70)






Single Detection Test (Auto-find H)
   1. 10000L_1.csv
   2. 10000H_1.csv
   3. 10000H_1.csv
   Using L file only (may affect accuracy)

RF Analysis:
   Predicted class: 0
   All class probs: [0.4516664  0.19100112 0.20047583 0.15685666]
   UAV probability: 18.3%

Vision Analysis:
   Detection score: 0.001

MULTIMODAL FUSION:
   Combined score: 11.0%
   Decision: No UAV


In [None]:
#Explicit L+H pair (tuple format)
print("\n\n Explicit L+H Pair")

#ardrone
# l_file = '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/AR drone/RF Data_10100_L/RF Data_10100_L/10100L_1.csv'
# h_file = '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/AR drone/RF Data_10100_H/RF Data_10100_H/10100H_1.csv'
# img_file = '/content/drive/MyDrive/FYP_datasets/Vision-RGB/DUT/test_data/download.jpg'

#background
# l_file = '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/Background RF activites/RF Data_00000_L1/RF Data_00000_L1/00000L_1.csv'
# h_file = '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/Background RF activites/RF Data_00000_H1/RF Data_00000_H1/00000L_1.csv'
# img_file = '/content/drive/MyDrive/FYP_datasets/Vision-RGB/DUT/test_data/download.jpg'

#Bepop drone
l_file = '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/Bepop drone/RF Data_10000_L/RF Data_10000_L/10000L_1.csv'
h_file = '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/Bepop drone/RF Data_10000_H/RF Data_10000_H/10000H_1.csv'
img_file = '/content/drive/MyDrive/FYP_datasets/Vision-RGB/DUT/test_data/download.jpg'

# Pass as tuple
result = detect_uav(rf_signal=(l_file, h_file), img_path=img_file)
print(f"RF probability: {result['rf_uav_probability']:.1%}")
print(f"Predicted class: {['Background', 'Phantom', 'AR', 'Bepop'][result['rf_class']]}")
print(f"Decision: {'UAV' if result['UAV_DETECTED'] else 'Clear'}")



 Explicit L+H Pair
✅ Combined L+H: 10000000 + 10000000 = 20000000 samples
RF probability: 18.3%
Predicted class: Background
Decision: Clear


In [None]:





#RF-Only Detection (No Image)
print("\n\nRF-Only Detection")


rf_only_result = detect_uav(rf_signal=l_file)
print(f"RF probability: {rf_only_result['rf_uav_probability']:.1%}")
print(f"Decision: {'UAV' if rf_only_result['UAV_DETECTED'] else 'Clear'}")
print("="*70)






# Batch Testing
print("\n\n Batch Test (Multiple L+H Pairs)")

test_cases = [
    {
        'l': '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/Phantom drone/RF Data_11000_L1/11000L_1.csv',
        'name': 'Phantom'
    },
    {
        'l': '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/AR drone/RF Data_10100_L/RF Data_10100_L/10100L_1.csv',
        'name': 'AR Drone'
    },
    {
        'l': '/content/drive/MyDrive/DroneRF/DroneRF_processed_with_chunk_for_multiclass/Background RF activites/RF Data_00000_L1/RF Data_00000_L1/00000L_1.csv',
        'name': 'Background'
    }
]

for i, case in enumerate(test_cases, 1):
    if os.path.exists(case['l']):
        # Auto-find H file
        res = detect_uav(rf_signal=case['l'])
        print(f"{i}. {case['name']:15} → RF: {res['rf_uav_probability']:.0%} | {'UAV' if res['UAV_DETECTED'] else 'Clear'}")
    else:
        print(f"{i}. {case['name']:15} → L file not found")