In [76]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
import librosa
import librosa.display
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import logging
from sklearn.preprocessing import LabelEncoder

## Loading data

In [2]:
DATA_PATH = '../data/raw/'
IMG_SIZE = 122

# Create necessary directories
dronerf_path = os.path.join(DATA_PATH, 'dronerf')
input_path = os.path.join(DATA_PATH, 'input')
os.makedirs(input_path, exist_ok=True)

# Create spectrograms
name_map = {
    'AR drone': 'ar',
    'Bepop drone': 'bepop',
    'Background RF activities': 'bg',
    'Phantom drone': 'phantom'
}

sample_rate = 40e6 

## Converting into spectrograms

In [3]:
def create_spectrogram(file_path, export_path):
    data = pd.read_csv(file_path, header=None).values.flatten()

    # Normalize data
    data = librosa.util.normalize(data)
    
    # Compute spectrogram
    N = int(32e-3 * sample_rate)
    hop_length = N // 2  # 50% overlap
    S = librosa.stft(data, n_fft=N, hop_length=hop_length, window='hamming')
    S_db = librosa.amplitude_to_db(np.abs(S), ref=np.max)
    
    # Plot spectrogram
    fig, ax = plt.subplots(figsize=(IMG_SIZE/100, IMG_SIZE/100))
    img = librosa.display.specshow(S_db, sr=sample_rate, hop_length=hop_length, x_axis=None, y_axis=None, ax=ax, cmap='gray_r')
    
    ax.set_axis_off()
    fig.subplots_adjust(left=0, right=1, top=1, bottom=0)
    plt.savefig(export_path, bbox_inches='tight', pad_inches=0)
    plt.close(fig)
    
    with Image.open(export_path) as img:
        print(f"Image shape for {os.path.basename(export_path)}: {img.size}")

In [4]:
for drone_folder in os.listdir(dronerf_path):
    drone_path = os.path.join(dronerf_path, drone_folder)
    if not os.path.isdir(drone_path):
        continue
    
    drone = name_map.get(drone_folder)
    if drone is None:
        print(f"Warning: Unknown drone folder '{drone_folder}'. Skipping.")
        continue
    
    export_path = os.path.join(input_path, drone)
    os.makedirs(export_path, exist_ok=True)
    
    lower_folder = os.path.join(drone_path, "lower")
    if not os.path.isdir(lower_folder):
        print(f"Warning: No 'lower' folder found in '{drone_folder}'. Skipping.")
        continue
    
    for file in os.listdir(lower_folder):
        if file.endswith('.csv'):
            file_path = os.path.join(lower_folder, file)
            export_file = os.path.join(export_path, f"{os.path.splitext(file)[0]}.png")
            create_spectrogram(file_path, export_file)

print("Spectrograms created successfully.")

Image shape for 00000L_28.png: (122, 122)
Image shape for 00000L_14.png: (122, 122)
Image shape for 00000L_15.png: (122, 122)
Image shape for 00000L_29.png: (122, 122)
Image shape for 00000L_17.png: (122, 122)
Image shape for 00000L_16.png: (122, 122)
Image shape for 00000L_12.png: (122, 122)
Image shape for 00000L_13.png: (122, 122)
Image shape for 00000L_11.png: (122, 122)
Image shape for 00000L_39.png: (122, 122)
Image shape for 00000L_38.png: (122, 122)
Image shape for 00000L_10.png: (122, 122)
Image shape for 00000L_5.png: (122, 122)
Image shape for 00000L_4.png: (122, 122)
Image shape for 00000L_6.png: (122, 122)
Image shape for 00000L_7.png: (122, 122)
Image shape for 00000L_3.png: (122, 122)
Image shape for 00000L_2.png: (122, 122)
Image shape for 00000L_0.png: (122, 122)
Image shape for 00000L_1.png: (122, 122)
Image shape for 00000L_40.png: (122, 122)
Image shape for 00000L_9.png: (122, 122)
Image shape for 00000L_8.png: (122, 122)
Image shape for 00000L_35.png: (122, 122)
Im

## Spectrograms to feature matrix for models

In [132]:
def standardize(x):
    return (x - np.min(x)) / (np.max(x) - np.min(x))

In [135]:
X_list = []
y = []

for drone in ['ar', 'bepop', 'bg', 'phantom']:
    pngs_path = os.path.join(input_path, drone)
    if not os.path.exists(pngs_path):
        print(f"Warning: Directory not found: {pngs_path}")
        continue
    
    drone_pngs_path = [os.path.join(pngs_path, f) for f in os.listdir(pngs_path) if f.endswith('.png')]
    
    for png_path in drone_pngs_path:
        img = np.array(Image.open(png_path).convert('L'))
        x = np.rot90(img, k=3)  # rotate
        x = standardize(x)
        x = x.reshape(1, IMG_SIZE*IMG_SIZE)
        
        X_list.append(x)
        y.append(drone)

X = np.vstack(X_list)
X = pd.DataFrame(X, columns=[f'p{i+1}' for i in range(IMG_SIZE*IMG_SIZE)])

## Split data into training and testing

For anomaly detection cases, we don't care about specific drones quite yet. Due to this, I wanted to see how binary classification occurs, supervised. So, we map every drone label as "drone".

In [136]:
df = pd.concat([pd.DataFrame({'y': y}), X], axis=1)
df['y'] = df['y'].map(lambda x: 'bg' if x == 'bg' else 'drone')

le = LabelEncoder()
df['y_encoded'] = le.fit_transform(df['y'])

X = df.drop(['y', 'y_encoded'], axis=1)
y = df['y_encoded']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42, stratify=y)

# Check distributions
print("Full dataset distribution:")
print(df['y'].value_counts(normalize=True))

print("\nTraining set distribution:")
print(y_train.value_counts(normalize=True))

print("\nTest set distribution:")
print(y_test.value_counts(normalize=True))

Full dataset distribution:
y
bg       0.87234
drone    0.12766
Name: proportion, dtype: float64

Training set distribution:
y_encoded
0    0.885714
1    0.114286
Name: proportion, dtype: float64

Test set distribution:
y_encoded
0    0.833333
1    0.166667
Name: proportion, dtype: float64


# Isolation forest for classification

In [138]:
from sklearn.ensemble import IsolationForest
from sklearn.metrics import roc_auc_score, precision_recall_curve, average_precision_score

In [192]:
def evaluate_model(y_true, y_pred, scores):
    auc_roc = roc_auc_score(y_true, scores)
    avg_precision = average_precision_score(y_true, scores)
    precision, recall, thresholds = precision_recall_curve(y_true, scores)
    f1_scores = 2 * (precision * recall) / (precision + recall)
    optimal_threshold = thresholds[np.argmax(f1_scores)]
    optimal_f1 = np.max(f1_scores)
    return {
        'AUC-ROC': auc_roc,
        'Average Precision': avg_precision,
        'Recall': recall,
        'Optimal F1-Score': optimal_f1,
        'Optimal Threshold': optimal_threshold
    }

In [193]:
X_train_bg = X_train[y_train == 0]

In [194]:
iso_forest = IsolationForest(contamination=0.1, random_state=42)
iso_forest.fit(X_train_bg)

In [195]:
train_scores = iso_forest.decision_function(X_train)
test_scores = iso_forest.decision_function(X_test)

In [196]:
y_train_binary = y_train
y_test_binary = y_test

In [197]:
class ZScoreDetector:
    def __init__(self, threshold=3.0):
        self.threshold = threshold
        self.mean = None
        self.std = None
    
    def fit(self, X):
        self.mean = np.mean(X, axis=0)
        self.std = np.std(X, axis=0)
    
    def predict(self, X):
        z_scores = np.abs((X - self.mean) / self.std)
        return np.any(z_scores > self.threshold, axis=1).astype(int)

    def decision_function(self, X):
        z_scores = np.abs((X - self.mean) / self.std)
        return -np.max(z_scores, axis=1)  

In [198]:
z_detector = ZScoreDetector(threshold=3.0)
z_detector.fit(X_train_bg)
z_score_pred = z_detector.predict(X_test)

In [199]:
print(classification_report(y_test, z_score_pred))

              precision    recall  f1-score   support

           0       0.00      0.00      0.00        10
           1       0.17      1.00      0.29         2

    accuracy                           0.17        12
   macro avg       0.08      0.50      0.14        12
weighted avg       0.03      0.17      0.05        12



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [200]:
# Standardize the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Isolation Forest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
iso_forest.fit(X_train_scaled)
iso_scores = -iso_forest.decision_function(X_test_scaled)  # Negative so that higher scores indicate anomalies
iso_preds = iso_forest.predict(X_test_scaled)
iso_preds = (iso_preds == -1).astype(int)  # Convert to binary where 1 is anomaly

# ZScoreDetector
z_detector = ZScoreDetector(threshold=3.0)
z_detector.fit(X_train_scaled)
z_scores = -z_detector.decision_function(X_test_scaled)  # Negative so that higher scores indicate anomalies
z_preds = z_detector.predict(X_test_scaled)

# Evaluate both models
iso_metrics = evaluate_model(y_test, iso_preds, iso_scores)
z_metrics = evaluate_model(y_test, z_preds, z_scores)

print("Isolation Forest Metrics:")
for metric, value in iso_metrics.items():
    print(f"{metric}: {value:.4f}")

print("\nZScoreDetector Metrics:")
for metric, value in z_metrics.items():
    print(f"{metric}: {value:.4f}")

# Additional comparison metrics
print("\nAdditional Comparison:")
print(f"Isolation Forest F1-Score: {f1_score(y_test, iso_preds):.4f}")
print(f"ZScoreDetector F1-Score: {f1_score(y_test, z_preds):.4f}")

# Confusion matrices
from sklearn.metrics import confusion_matrix

print("\nIsolation Forest Confusion Matrix:")
print(confusion_matrix(y_test, iso_preds))

print("\nZScoreDetector Confusion Matrix:")
print(confusion_matrix(y_test, z_preds))

Isolation Forest Metrics:
AUC-ROC: 1.0000
Average Precision: 1.0000


TypeError: unsupported format string passed to numpy.ndarray.__format__