In [None]:
import numpy as np
import random

from tqdm import tqdm
from collections import Counter
from utils_import import load_data
from utils_preprocess import split_data, compute_energy_matrix_and_labels
from utils_clustering import create_cluster, cluster_mapping
from utils_test import predict_labels
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

random.seed(1337)
np.random.seed(1337)

# Asumption: all signals consist of 50k samples
n_samples = 50000
interv = 1024 # Hyperparameter 1
array_length = (n_samples // interv) - 1
n_frec_div = 16 # Hyperparameter 2

# Load data
signals_clean = load_data('../dataset/Jamming/Clean', '../dataset/Jamming/metadata.csv')
signals_narrowband = load_data('../dataset/Jamming/Narrowband', '../dataset/Jamming/metadata.csv')
signals_wideband = load_data('../dataset/Jamming/Wideband', '../dataset/Jamming/metadata.csv')

# Partition train=0.8, test=0.2
clean_train, clean_test = split_data(signals_clean, 0.8)
narrowband_train, narrowband_test = split_data(signals_narrowband, 0.8)
wideband_train, wideband_test = split_data(signals_wideband, 0.8)

train = []
train.extend(clean_train)  
train.extend(narrowband_train)  
train.extend(wideband_train) 
test = [] 
test.extend(clean_test)  
test.extend(narrowband_test) 
test.extend(wideband_test) 

print(f"Nº señales entrenamiento: {len(train)}")
print(f"Nº señales test: {len(test)}")

random.shuffle(train)
random.shuffle(test)

class_mapping = {"Clean": 0, "Narrowband": 1, "Wideband": 2}


In [None]:
from utils_preprocess import *
import matplotlib.pyplot as plt

d = train[0]["Data"] # Clean
f = signal_interval(d)
print(f.shape)
plt.plot(f[20])
plt.show()

d = train[1]["Data"] # Narrowband
start = train[1]['JammingStartTime']//1024
print(train[1])
f = signal_interval(d)
print(f.shape)
plt.plot(f[start-1])
plt.plot(f[start])
plt.show()


d = train[6]["Data"] # Wideband
start = train[6]['JammingStartTime']//1024
print(train[6])
f = signal_interval(d)
print(f.shape)
plt.plot(f[start-1])
plt.plot(f[start])
plt.show()

t = f[start-2:start+1]
print("-"*100)
print(t)
e=energy_arrays(t, 16)
print(e)


In [None]:
# 1) -- Train --

# Building energy arrays for each train signal (x=window samples, y=frecuency divisions z=signal)
train_energy_dif_matrix, sample_labels = compute_energy_matrix_and_labels(train, n_samples, interv, n_frec_div, class_mapping)
train_energy_dif_matrix = np.abs(train_energy_dif_matrix)
# Creating K-Means model based on energy arrays
cluster = create_cluster(train_energy_dif_matrix, k=3)
print(f"\n--- Centros de cluster ---\n{cluster.cluster_centers_}") 

# Mapping cluster to original classes
cluster_map = cluster_mapping(cluster.labels_, sample_labels, class_mapping)
print(f"\nMapping clusters to predominant classes: {cluster_map}")

In [None]:
# 2) -- Test -- 

test_energy_dif_matrix, y_true = compute_energy_matrix_and_labels(test, n_samples, interv, n_frec_div, class_mapping)
test_energy_dif_matrix = np.abs(test_energy_dif_matrix)

y_pred = [cluster_map[label] for label in cluster.predict(test_energy_dif_matrix)]
# Nota: por el momento predice en exceso clase 1 (corregir desbalanceo, clase mayoritaria tiene muchas más ocurrencias)
print(np.bincount(y_pred))

# True signal classification
signal_true = np.zeros(len(test), dtype=np.int8)
for i, signal in enumerate(test):
    signal_true[i] = class_mapping[signal["Class"]]

# Predicted signal classification
signal_pred = predict_labels(y_pred, N=len(test), array_length=array_length)

In [None]:
# 3) -- Metrics --

# Accuracy
acc = accuracy_score(signal_true, signal_pred)
print(f"\nAccuracy: {acc}")        

# Confusion Matrix
cm = confusion_matrix(signal_true, signal_pred)
print(f"\nConfusion Matrix:\n{cm}")

# Classification Report
print(f"\nClassification Report:")
print(classification_report(signal_true, signal_pred))

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA 
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
#from sklearn.mixture import GaussianMixture


pca = PCA(2) 
X = np.abs(test_energy_dif_matrix)
pca_data = pd.DataFrame(pca.fit_transform(X),columns=['PC1','PC2']) 
kmeans =KMeans(n_clusters=3).fit(X)
pca_data['cluster'] = pd.Categorical(kmeans.labels_)
#kmeans = GaussianMixture(n_components=3)
#pca_data['cluster'] = pd.Categorical(kmeans.fit_predict(X,sample_labels))

fig,ax = plt.subplots()
scatter = ax.scatter(pca_data['PC1'], pca_data['PC2'],c=pca_data['cluster'],cmap='Set3',alpha=0.1)
legend1 = ax.legend(*scatter.legend_elements(),
                    loc="upper left", title="")
ax.add_artist(legend1)
plt.show()

In [None]:
d = train[1]["Data"] # Narrowband
start = train[1]['JammingStartTime']//1024

#test_energy_dif_matrix[50000+start]
np.argmax(sample_labels>0)
print(np.argmax(test_energy_dif_matrix>0.1))
print(sample_labels[159])
#test_energy_dif_matrix[340:343]
sample_labels[159]
test_energy_dif_matrix[340:343]

In [None]:
np.bincount(sample_labels)