In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, Dense, RepeatVector, TimeDistributed, Layer, MultiHeadAttention
from tensorflow.keras.layers import Bidirectional, Dropout
from tensorflow.keras.layers import Masking, Input, Lambda
import tensorflow_probability as tfp
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
from tensorflow.keras.losses import mse
from numpy.fft import fft
from scipy.stats import skew, kurtosis 
import pandas as pd
import glob
import matplotlib.pyplot as plt
import os
import struct
import glob
from sklearn.metrics import confusion_matrix, roc_auc_score, roc_curve, classification_report, accuracy_score


import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import struct
from numpy.fft import fft
from scipy.stats import skew, kurtosis
import torch
from torch import nn
from torch.nn import functional as F


# Check if CUDA is available and set the device to GPU if it is, otherwise CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class ComplexNumbersDataset(Dataset):
    def __init__(self, filepath, sequence_length, max_samples=None, for_training=True, process_method='data1'):
        self.filepath = filepath
        self.sequence_length = sequence_length
        self.max_samples = max_samples
        self.for_training = for_training
        self.process_method = process_method
        self.samples = self.load_samples()
        self.samples_per_sequence = self.sequence_length
        #self.samples_per_sequence = self.sequence_length if for_training else 1

    def load_samples(self):
        samples = []
        with open(self.filepath, 'rb') as binary_file:
            while True:
                if self.max_samples and len(samples) >= self.max_samples:
                    break
                binary_data = binary_file.read(8)
                if not binary_data:
                    break
                decoded_data = struct.unpack('ff', binary_data)
                if decoded_data[0] == 0 and decoded_data[1] == 0:
                    continue
                samples.append(f"{decoded_data[0]}+{decoded_data[1]}j")
        return samples

    def process_data1(self, sequence_samples):
        #print('sequence_samples:', sequence_samples)
        #print('len(sequence_samples):', len(sequence_samples))
        real_parts = []
        imag_parts = []
        for sample in sequence_samples:
            # Remove potential unwanted characters (spaces, etc.)
            sample = sample.replace(" ", "").replace("+-", "-")
            try:
                c = complex(sample)
                real_parts.append(c.real)
                imag_parts.append(c.imag)
            except ValueError:
                print(f"Failed to convert: {sample}")  # This will show which string failed
                raise
        real_parts = np.array(real_parts)
        imag_parts = np.array(imag_parts)
        # Normalize
        real_parts = (real_parts - np.mean(real_parts)) / np.std(real_parts)
        imag_parts = (imag_parts - np.mean(imag_parts)) / np.std(imag_parts)

        # Combining real and imaginary parts
        X = np.stack((real_parts, imag_parts), axis=1)  # Shape: (sequence_length, 2)
        return torch.tensor(X, dtype=torch.float32)

    def process_data2(self, sequence_samples):
        samples_array = np.array([complex(sample) for sample in sequence_samples], dtype=np.complex64)
        samples_fft = fft(samples_array)
        real_parts = np.real(samples_fft)
        imag_parts = np.imag(samples_fft)
        
        # Normalization
        epsilon = 1e-10
        real_parts_normalized = (real_parts - np.mean(real_parts)) / (np.std(real_parts) + epsilon)
        imag_parts_normalized = (imag_parts - np.mean(imag_parts)) / (np.std(imag_parts) + epsilon)
        
        # Feature extraction
        features = np.column_stack((
            np.mean(real_parts_normalized), np.std(real_parts_normalized), skew(real_parts_normalized),
            kurtosis(real_parts_normalized), np.mean(imag_parts_normalized), np.std(imag_parts_normalized),
            skew(imag_parts_normalized), kurtosis(imag_parts_normalized)
        ))
        
        return torch.tensor(features, dtype=torch.float32).reshape(self.samples_per_sequence, -1)

    def __len__(self):
        return len(self.samples) // self.samples_per_sequence

    def __getitem__(self, idx):
        start_idx = idx * self.samples_per_sequence
        end_idx = start_idx + self.samples_per_sequence
        sequence_samples = self.samples[start_idx:end_idx]
        
        if self.process_method == 'data1':
            X = self.process_data1(sequence_samples)
        elif self.process_method == 'data2':
            X = self.process_data2(sequence_samples)
        else:
            raise ValueError("Invalid process method specified.")
        if self.for_training:
            return X, X
        else:
            return X

#------------------------------------------------------------------------------------------------------

import torch
import numpy as np

def mec_kocaoglu_np(p, q):
    """
    Compute the joint distribution matrix with minimal entropy between two given distributions in PyTorch.
    """
    p = p.float() / p.sum()
    q = q.float() / q.sum()
    J = torch.zeros(q.size(0), p.size(0), dtype=torch.float64)
    M = torch.stack([p, q], dim=0)
    r = torch.min(torch.max(M, dim=1).values)

    while r > 0:
        a_i = torch.argmax(M, dim=1)
        r_updated = torch.min(torch.max(M, dim=1).values)
        update_values = torch.stack([r, r])
        for i, index in enumerate(a_i):
            M[i, index] -= update_values[i]
            J[index, i] += update_values[i]
        r = r_updated

    return J
def apply_mec_to_data(data, num_bins=10, latent_dim=50):
    """
    Apply the MEC transformation to each sample in the data.
    """
    def process_sample(sample):
        # Find the min and max values in the sample and convert them to Python scalars
        min_val = torch.min(sample).item()
        max_val = torch.max(sample).item()
        
        # Now min_val and max_val are Python numbers, suitable for torch.histc()
        sample_histogram = torch.histc(sample, bins=num_bins, min=min_val, max=max_val)
        sample_histogram = sample_histogram.float() / sample_histogram.sum()

        mec_transformed = mec_kocaoglu_np(sample_histogram, sample_histogram)

        # Flatten the 2D to 1D and adjust to match the latent_dim
        transformed_sample = mec_transformed.flatten()
        if transformed_sample.size(0) > latent_dim:
            transformed_sample = transformed_sample[:latent_dim]
        else:
            padding = torch.zeros(latent_dim - transformed_sample.size(0), dtype=torch.float64)
            transformed_sample = torch.cat([transformed_sample, padding], dim=0)

        return transformed_sample

    # Process each sample in the batch
    transformed_batch = torch.stack([process_sample(sample) for sample in data])
    transformed_batch = transformed_batch.float() 

    return transformed_batch

# def apply_mec_to_data(data, num_bins=10, latent_dim=50):
#     """
#     Apply the MEC transformation to each sample in the data.
#     """
#     def process_sample(sample):
#         min_val, max_val = torch.min(sample), torch.max(sample)
#         # Histogram as a way to represent the sample distribution
#         sample_histogram = torch.histc(sample, bins=num_bins, min=min_val, max=max_val)
#         sample_histogram = sample_histogram.float() / sample_histogram.sum()

#         mec_transformed = mec_kocaoglu_np(sample_histogram, sample_histogram)

#         # Flatten the 2D to 1D and adjust to match the latent_dim
#         transformed_sample = mec_transformed.flatten()
#         if transformed_sample.size(0) > latent_dim:
#             transformed_sample = transformed_sample[:latent_dim]
#         else:
#             padding = torch.zeros(latent_dim - transformed_sample.size(0), dtype=torch.float64)
#             transformed_sample = torch.cat([transformed_sample, padding], dim=0)

#         return transformed_sample

#     # Process each sample in the batch
#     transformed_batch = torch.stack([process_sample(sample) for sample in data])

#     return transformed_batch

def process_latent_variables(z):
    """
    Transform latent variables using MEC.
    """
    # Assuming 'z' is a batch of latent variables
    z_transformed = apply_mec_to_data(z)
    return z_transformed

#self Attention LSTM Autoencoder Model
import torch
from torch import nn

class SelfAttentionLayer(nn.Module):
    def __init__(self, embed_size, num_heads):
        super(SelfAttentionLayer, self).__init__()
        self.multihead_attention = nn.MultiheadAttention(embed_dim=embed_size, num_heads=num_heads)

    def forward(self, inputs):
        # PyTorch's MultiheadAttention expects inputs of shape (sequence_length, batch_size, embed_size)
        inputs = inputs.permute(1, 0, 2)
        attn_output, _ = self.multihead_attention(inputs, inputs, inputs)
        return attn_output.permute(1, 0, 2)  # Return to original shape (batch_size, sequence_length, embed_size)

# Variational Autoencoder (VAE) Class

class Encoder(nn.Module):
    def __init__(self, sequence_length, feature_dim, intermediate_dim, latent_dim, epsilon_std=0.1):
        super(Encoder, self).__init__()
        self.lstm1 = nn.LSTM(input_size=feature_dim, hidden_size=intermediate_dim, batch_first=True)
        self.self_attention = SelfAttentionLayer(embed_size=intermediate_dim, num_heads=2)  # Ensure compatibility
        #self.self_attention = SelfAttentionLayer(num_heads=2, key_dim=intermediate_dim)
        self.lstm2 = nn.LSTM(input_size=intermediate_dim, hidden_size=50, batch_first=True)
        self.z_mean = nn.Linear(in_features=50, out_features=latent_dim)
        self.z_log_var = nn.Linear(in_features=50, out_features=latent_dim)
        self.epsilon_std = epsilon_std

    def _sampling3(self, z_mean):
        # Assuming process_latent_variables(z) is adapted for PyTorch
        z_mean_transformed = process_latent_variables(z_mean)
        eps = torch.randn_like(z_mean_transformed) * self.epsilon_std
        return z_mean_transformed
    
    def _sampling(self, z_mean, z_log_var):
        # Standard sampling function, kept for compatibility
        std = torch.exp(0.5 * z_log_var)
        eps = torch.randn_like(std)
        return z_mean + eps * std

    def forward(self, x):
        x, _ = self.lstm1(x)
        x = self.self_attention(x)
        x, (h_n, _) = self.lstm2(x)
        z_mean = self.z_mean(h_n[-1])
        z_log_var = self.z_log_var(h_n[-1])
        z = self._sampling3(z_mean)  # Use _sampling3 as required
        return z, z_mean, z_log_var
    
# class Decoder(nn.Module):
#     def __init__(self, sequence_length, feature_dim, latent_dim):
#         super(Decoder, self).__init__()
#         self.repeat_vector = nn.RepeatVector(sequence_length)
#         self.lstm1 = nn.LSTM(input_size=latent_dim, hidden_size=50, batch_first=True)
#         self.lstm2 = nn.LSTM(input_size=50, hidden_size=100, batch_first=True)
#         self.output_layer = nn.Linear(in_features=100, out_features=feature_dim)

#     def forward(self, z):
#         z = self.repeat_vector(z)
#         z, _ = self.lstm1(z)
#         z, _ = self.lstm2(z)
#         return torch.sigmoid(self.output_layer(z))
class Decoder(nn.Module):
    def __init__(self, sequence_length, feature_dim, latent_dim):
        super(Decoder, self).__init__()
        self.sequence_length = sequence_length
        self.lstm1 = nn.LSTM(input_size=latent_dim, hidden_size=50, batch_first=True)
        self.lstm2 = nn.LSTM(input_size=50, hidden_size=100, batch_first=True)
        self.output_layer = nn.Linear(in_features=100, out_features=feature_dim)

    def forward(self, z):
        # Repeat 'z' for 'sequence_length' times
        # Assuming 'z' shape is [batch_size, latent_dim], we first unsqueeze it to add a sequence length dimension
        # Then repeat across this dimension
        z = z.unsqueeze(1).repeat(1, self.sequence_length, 1)
        
        # Now 'z' shape is [batch_size, sequence_length, latent_dim], suitable for LSTM input
        z, _ = self.lstm1(z)
        z, _ = self.lstm2(z)
        return torch.sigmoid(self.output_layer(z))

class VAE(nn.Module):
    def __init__(self, sequence_length, feature_dim, intermediate_dim, latent_dim, epsilon_std=0.1):
        super(VAE, self).__init__()
        self.encoder = Encoder(sequence_length, feature_dim, intermediate_dim, latent_dim, epsilon_std)
        self.decoder = Decoder(sequence_length, feature_dim, latent_dim)

    def forward(self, x):
        z, z_mean, z_log_var = self.encoder(x)
        reconstructed_x = self.decoder(z)
        return reconstructed_x, z_mean, z_log_var

def vae_loss(reconstructed_x, x, z_mean, z_log_var):
    reconstruction_loss = F.mse_loss(reconstructed_x, x, reduction='sum')
    kl_divergence = -0.5 * torch.sum(1 + z_log_var - z_mean.pow(2) - z_log_var.exp())
    return reconstruction_loss + kl_divergence



# Instantiate and Compile the VAE
import torch
from torch import optim

# Instantiate the VAE
sequence_length = 10
feature_dim = 2
intermediate_dim = 100
latent_dim = 50
epsilon_std = 0.1

#vae_model = VAE(sequence_length, feature_dim, intermediate_dim, latent_dim, epsilon_std)
vae_model = VAE(sequence_length, feature_dim, intermediate_dim, latent_dim, epsilon_std).to(device)

optimizer = optim.Adam(vae_model.parameters(), lr=0.005)


# Model Training
batch_size = 40
max_train_samples = 100000
train_steps = max_train_samples // (batch_size * sequence_length)
max_samples = 100000  # Maximum samples to read (or None to read all)
max_test_samples = 100000

pure_file_pattern = '/home/mreza/5G accelerator/ID_MEC/data generator/pure_data/pure_iq_samples_*.csv'
mixed_file_pattern = '/home/mreza/5G accelerator/ID_MEC/data generator/mixed_data/mixed_iq_samples_*.csv'
pure_file_new = '/home/mreza/5G accelerator/ID_MEC/data generator/New Data-Collection/rx_IQ_pure'
mixed_file_new = '/home/mreza/5G accelerator/ID_MEC/data generator/New Data-Collection/rx_IQ_MIX'
pure_file_old = '/home/mreza/5G accelerator/IQ_samples/data collected/5G_DL_IQ_no_jamming_0924.dat'
mixed_file_old = '/home/mreza/5G accelerator/IQ_samples/data collected/5G_DL_IQ_with_periodic_jamming_0928_02.dat'

# Creating dataset instances for training and validation/testing
train_gen_instance  = ComplexNumbersDataset(filepath=pure_file_new, 
                                            sequence_length=sequence_length, 
                                            max_samples=max_train_samples, for_training=True, 
                                            process_method='data1')

combined_gen_instance = ComplexNumbersDataset(filepath=mixed_file_new, 
                                              sequence_length=sequence_length, 
                                              max_samples=max_samples, for_training=False, 
                                              process_method='data1')

# Creating DataLoader instances for batching
train_gen_instance = DataLoader(train_gen_instance, batch_size=batch_size, shuffle=False, 
                                drop_last=True)
combined_gen_instance = DataLoader(combined_gen_instance, batch_size=batch_size, shuffle=False, 
                                   drop_last=True)

# Custom training loop in PyTorch
num_epochs = 2
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    vae_model.train()  # Set the model to training mode
    
    step = 0  # Initialize step counter if you need it for logging, etc.
    for X_chunk, Y_chunk in train_gen_instance:
        #X_chunk, Y_chunk = X_chunk.float(), Y_chunk.float()
        # Move data to the current device (GPU or CPU)
        #print('X_chunk:', X_chunk)
        #print('Y_chunk:', Y_chunk)
        X_chunk, Y_chunk = X_chunk.to(device), Y_chunk.to(device)

        optimizer.zero_grad()
        reconstructed_x, z_mean, z_log_var = vae_model(X_chunk)
        #loss = vae_loss(reconstructed_x, X_chunk, z_mean, z_log_var)
        loss = F.mse_loss(reconstructed_x, X_chunk, reduction='sum')
        loss.backward()
        optimizer.step()

        if step % 100 == 0:
            print(f"Step {step + 1}/{train_steps}, Loss: {loss}")
        step += 1  # Increment step counter

    print()


num_predictions = 100  # or any other large number
print(f"Number of predictions to be performed: {num_predictions}")
reconstruction_errors = []
all_X_chunk_test = []
all_X_chunk_pred = []
all_intrusion_flags = []
vae_model.eval()  # Set the model to evaluation mode

# Assuming 'prediction_loader' is your DataLoader instance
# Print details of combined_gen_instance DataLoader
print("Prediction DataLoader Details:")
print("Batch size:", combined_gen_instance.batch_size)
print("Number of batches:", len(combined_gen_instance))
print("Dataset used in DataLoader:", type(combined_gen_instance.dataset).__name__)
print("Number of samples in Dataset:", len(combined_gen_instance.dataset))

# Print the first few samples from the dataset used in the DataLoader
for i in range(min(batch_size, len(combined_gen_instance.dataset))):  # Adjust the range as needed
    sample = combined_gen_instance.dataset[i]
    print(f"Sample {i}:", sample)

# Get an iterator from your DataLoader
combined_gen_iterator = iter(combined_gen_instance)

try:
    for _ in range(num_predictions):
        print(f"Prediction number: {_}")
        X_chunk_test = next(combined_gen_iterator)  # Getting the batch

        # Ensure X_chunk_test is on the correct device
        X_chunk_test = X_chunk_test.to(device)

        with torch.no_grad():  # Inference without gradient calculation
            X_chunk_pred, _, _ = vae_model(X_chunk_test)

        chunk_errors = torch.mean((X_chunk_test - X_chunk_pred) ** 2, axis=1)
        reconstruction_errors.extend(chunk_errors.cpu().numpy())        
        all_X_chunk_test.append(X_chunk_test)
        all_X_chunk_pred.append(X_chunk_pred)
except StopIteration:
    print("All samples processed.")

reconstruction_error = np.array(reconstruction_errors)
# Further processing...



#reconstruction_error = np.array(reconstruction_errors)
print('reconstruction_error.shape:', reconstruction_error.shape)
print('Number of NaNs in reconstruction_error:', np.isnan(reconstruction_error).sum())
max_error_per_sequence = reconstruction_error.max(axis=1) # Max error for each sequence
print('max_error_per_sequence:', max_error_per_sequence)

print('max_error_per_sequence.shape:', max_error_per_sequence.shape)

threshold1 = np.percentile(max_error_per_sequence, 98)
print('threshold1:', threshold1)
threshold2 = np.percentile(reconstruction_error, 95)
print('threshold percentile:', threshold2)

is_intrusion_detected = max_error_per_sequence > threshold1  # Boolean array for sequences
print('len(is_intrusion_detected):', len(is_intrusion_detected))
print('is_intrusion_detected.shape:', is_intrusion_detected.shape)

#is_intrusion_detected2 = error_per_sequence > threshold1

num_total_sequences = len(max_error_per_sequence)
num_total_sequences2 = num_predictions * batch_size - num_predictions
print('num_total_sequences:', num_total_sequences)
print('num_total_sequences2:', num_total_sequences2)

#---------------------------------------finish 111-----------------------------------
flat_error_per_sequence = max_error_per_sequence.flatten()
#flat_error_per_sequence2 = error_per_sequence.flatten()
# Determine if intrusion detected for each sequence
for error in flat_error_per_sequence:
    all_intrusion_flags.append(error > threshold1)    
all_X_chunk_test = np.concatenate(all_X_chunk_test, axis=0)
all_X_chunk_pred = np.concatenate(all_X_chunk_pred, axis=0)

#save_path = 'C:\\Users\\Mohammadreza\\Desktop\\My Class\\Proj-DC\\My Works\\My Papers\\intrusion\\data generator\\intrusion_detected'
#plot_with_intrusions8(all_X_chunk_test, all_X_chunk_pred, all_intrusion_flags, sequence_length, save_path)

jamming_detected = reconstruction_error > threshold1
#train_gen_instance.close()
#combined_gen_instance.close()
#Table
flattened_jamming_detected = jamming_detected.flatten()
real_part_detected = jamming_detected[:, 0]
imag_part_detected = jamming_detected[:, 1]

real_true_count = np.sum(real_part_detected)
real_false_count = len(real_part_detected) - real_true_count

imag_true_count = np.sum(imag_part_detected)
imag_false_count = len(imag_part_detected) - imag_true_count
# Overall
overall_true_count = np.sum(flattened_jamming_detected)
overall_false_count = len(flattened_jamming_detected) - overall_true_count
# Table-DataFrame
df = pd.DataFrame({
    'Part': ['Real', 'Imaginary', 'Overall'],
    'True Count': [real_true_count, imag_true_count, overall_true_count],
    'False Count': [real_false_count, imag_false_count, overall_false_count]
})
print(df)
num_jamming_detected = np.sum(jamming_detected)
print(f"Number of jamming sequences detected: {num_jamming_detected} out of {len(flattened_jamming_detected)} sequences")




2024-04-05 14:46:23.887411: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-04-05 14:46:23.888474: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-05 14:46:23.914092: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-05 14:46:23.914651: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Using device: cpu
Epoch 1/2
Step 1/250, Loss: 993.7449340820312
Step 101/250, Loss: 799.8992919921875
Step 201/250, Loss: 800.3550415039062

Epoch 2/2
Step 1/250, Loss: 800.0731201171875
Step 101/250, Loss: 799.9602661132812
Step 201/250, Loss: 800.2033081054688

Number of predictions to be performed: 100
Prediction DataLoader Details:
Batch size: 40
Number of batches: 250
Dataset used in DataLoader: ComplexNumbersDataset
Number of samples in Dataset: 10000
Sample 0: tensor([[ 0.6563,  1.5085],
        [ 0.4633, -0.1398],
        [ 0.4633,  1.0375],
        [-2.0460, -1.5526],
        [-0.3088, -0.6108],
        [ 1.8144,  1.5085],
        [ 0.2702,  0.0809],
        [ 0.2702, -0.1398],
        [-0.5019, -0.6108],
        [-1.0809, -1.0817]])
Sample 1: tensor([[ 0.0661, -0.0354],
        [-0.3745,  1.0962],
        [-1.0353, -1.1669],
        [ 0.7269, -0.4125],
        [-1.4758, -0.4125],
        [-0.3745,  1.8270],
        [-0.1542, -1.5441],
        [ 1.1674,  1.0962],
        [ 2.0

Prediction number: 6
Prediction number: 7
Prediction number: 8
Prediction number: 9
Prediction number: 10
Prediction number: 11
Prediction number: 12
Prediction number: 13
Prediction number: 14
Prediction number: 15
Prediction number: 16
Prediction number: 17
Prediction number: 18
Prediction number: 19
Prediction number: 20
Prediction number: 21
Prediction number: 22
Prediction number: 23
Prediction number: 24
Prediction number: 25
Prediction number: 26
Prediction number: 27
Prediction number: 28
Prediction number: 29
Prediction number: 30
Prediction number: 31
Prediction number: 32
Prediction number: 33
Prediction number: 34
Prediction number: 35
Prediction number: 36
Prediction number: 37
Prediction number: 38
Prediction number: 39
Prediction number: 40
Prediction number: 41
Prediction number: 42
Prediction number: 43
Prediction number: 44
Prediction number: 45
Prediction number: 46
Prediction number: 47
Prediction number: 48
Prediction number: 49
Prediction number: 50
Prediction num

In [2]:
n = 4
# Ensure n sequences are available
n = min(n, len(X_chunk_test) - sequence_index)

# Convert PyTorch tensors to numpy arrays
original_sample_np = X_chunk_test[sequence_index:sequence_index + n].detach().cpu().numpy()
reconstructed_sample_np = X_chunk_pred[sequence_index:sequence_index + n].detach().cpu().numpy()

# Since your data is 3-dimensional (batch, sequence, features), you'll want to concatenate along the sequence axis (axis=1)
original_sample_concat = np.concatenate(original_sample_np, axis=0)
reconstructed_sample_concat = np.concatenate(reconstructed_sample_np, axis=0)

# Plot concatenated sequences
plt.figure(figsize=(14, 6))
plt.plot(original_sample_concat[:, 0], 'b-', label='Original Real Part')  # Assuming first feature is the real part
plt.plot(reconstructed_sample_concat[:, 0], 'r--', label='Reconstructed Real Part')  # Assuming first feature is the real part
plt.legend()
plt.show()


NameError: name 'sequence_index' is not defined

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Define the number of sequences to plot together
n = 4  # Change this to desired number of sequences

# Ensure that we have enough samples for the desired number of sequences
sequence_index = np.random.choice(len(X_chunk_test) - n + 1)

# Convert PyTorch tensors to numpy arrays and concatenate selected sequences
original_sample_concat = np.concatenate(X_chunk_test[sequence_index:sequence_index + n].cpu().numpy(), axis=0)
reconstructed_sample_concat = np.concatenate(X_chunk_pred[sequence_index:sequence_index + n].cpu().numpy(), axis=0)

# Plot concatenated sequences for n = 4
plt.figure(figsize=(14, 6))
plt.plot(original_sample_concat[:, 0], 'b-', label='Original Real Part')
plt.plot(reconstructed_sample_concat[:, 0], 'r--', label='Reconstructed Real Part')
plt.plot(original_sample_concat[:, 1], 'y-', label='Original Imaginary Part')
plt.plot(reconstructed_sample_concat[:, 1], 'g--', label='Reconstructed Imaginary Part')
plt.title(f'Original vs Reconstructed IQ Data for {n} Sequences of Length {sequence_length}')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')
plt.legend()
plt.show()

# Repeat for n = 2
n = 2  # Change this to a different desired number of sequences
sequence_index = np.random.choice(len(X_chunk_test) - n + 1)
original_sample_concat = np.concatenate(X_chunk_test[sequence_index:sequence_index + n].cpu().numpy(), axis=0)
reconstructed_sample_concat = np.concatenate(X_chunk_pred[sequence_index:sequence_index + n].cpu().numpy(), axis=0)

# Plot concatenated sequences for n = 2
plt.figure(figsize=(14, 6))
plt.plot(original_sample_concat[:, 0], 'b-', label='Original Real Part')
plt.plot(reconstructed_sample_concat[:, 0], 'r--', label='Reconstructed Real Part')
plt.plot(original_sample_concat[:, 1], 'y-', label='Original Imaginary Part')
plt.plot(reconstructed_sample_concat[:, 1], 'g--', label='Reconstructed Imaginary Part')
plt.title(f'Original vs Reconstructed IQ Data for {n} Sequences of Length {sequence_length}')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')
plt.legend()
plt.show()


In [None]:

# reconstruction error
plt.figure(figsize=(14, 6))
plt.plot(reconstruction_error, label='Reconstruction Error')
plt.axhline(y=threshold2, color='r', linestyle='--', label='Threshold')
plt.title('Reconstruction Error with Threshold')
plt.xlabel('Sequence Number')
plt.ylabel('Reconstruction Error')
plt.legend()
# plt.savefig('1-Reconstruction Error with Threshold.png')
# plt.close()
plt.show()

In [None]:
# Flatten the reconstruction_error to 1D
reconstruction_error_flat = reconstruction_error.flatten()
# reconstruction error
plt.figure(figsize=(14, 6))
plt.plot(reconstruction_error_flat, label='Reconstruction Error')
plt.axhline(y=threshold2, color='r', linestyle='--', label='Threshold')
plt.title('Reconstruction Error with Threshold')
plt.xlabel('Sequence Number')
plt.ylabel('Reconstruction Error')
plt.legend()
# plt.savefig('1-Reconstruction Error with Threshold.png')
# plt.close()
plt.show()


In [None]:
#Histogram of Reconstruction Errors:
plt.figure(figsize=(14, 6))
plt.hist(reconstruction_error, bins=50, alpha=0.75)
plt.axvline(x=threshold2, color='r', linestyle='--', label='Threshold')
plt.title('Histogram of Reconstruction Errors')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
# plt.savefig('4-Histogram of Reconstruction Errors.png')
# plt.close()
plt.show()


In [None]:
# Flatten the reconstruction_error to 1D
reconstruction_error_flat = reconstruction_error.flatten()
#Histogram of Reconstruction Errors:
plt.figure(figsize=(14, 6))
plt.hist(reconstruction_error_flat, bins=50, alpha=0.75)
plt.axvline(x=threshold2, color='r', linestyle='--', label='Threshold')
plt.title('Histogram of Reconstruction Errors')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
# plt.savefig('4-Histogram of Reconstruction Errors.png')
# plt.close()
plt.show()

In [None]:
#Time Series Plot of IQ Samples:
sample_index = np.random.choice(len(X_chunk_test))
original_sample = X_chunk_test[sample_index]
reconstructed_sample = X_chunk_pred[sample_index]

plt.figure(figsize=(14, 6))
plt.plot(original_sample[:, 0], 'b-', label='Original Real Part')
plt.plot(reconstructed_sample[:, 0], 'r--', label='Reconstructed Real Part')
plt.plot(original_sample[:, 1], 'g-', label='Original Imaginary Part')
plt.plot(reconstructed_sample[:, 1], 'y--', label='Reconstructed Imaginary Part')
plt.title('Original vs Reconstructed IQ Data')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')
plt.legend()
# plt.savefig('5-Original vs Reconstructed IQ Data.png')
# plt.close()
plt.show()


In [None]:
# Convert PyTorch tensors to NumPy arrays before calculating means
avg_real = np.mean(X_chunk_test.cpu().numpy(), axis=1)[:, 0]
avg_imag = np.mean(X_chunk_test.cpu().numpy(), axis=1)[:, 1]

# Assuming reconstruction_errors is a NumPy array or a list of errors
# If reconstruction_errors is a tensor, ensure to convert it with .cpu().numpy() as well
last_errors = np.mean(reconstruction_errors[-len(X_chunk_test):], axis=1)

print("Shape of avg_real:", avg_real.shape)
print("Shape of avg_imag:", avg_imag.shape)
print("Shape of last_errors:", last_errors.shape)

plt.figure(figsize=(14, 6))
plt.scatter(avg_real, last_errors, label='Real Part', alpha=0.5)
plt.axhline(y=threshold2, color='r', linestyle='--', label='Threshold')
plt.title('Reconstruction Error vs. Average Real Part')
plt.xlabel('Average Amplitude')
plt.ylabel('Reconstruction Error')
plt.legend()
plt.show()

plt.figure(figsize=(14, 6))
plt.scatter(avg_imag, last_errors, label='Imaginary Part', alpha=0.5)
plt.axhline(y=threshold2, color='r', linestyle='--', label='Threshold')
plt.title('Reconstruction Error vs. Average Imaginary Part')
plt.xlabel('Average Amplitude')
plt.ylabel('Reconstruction Error')
plt.legend()
plt.show()


In [None]:
# reconstruction error
reconstruction_error_real = reconstruction_error[:, 0]
reconstruction_error_imag = reconstruction_error[:, 1]

# Plot for Real Part
plt.figure(figsize=(14, 6))
mellow_green = '#89C997' 
plt.plot(reconstruction_error_real, label='Reconstruction Error', color=mellow_green)
plt.axhline(y=threshold2, color='r', linestyle='--', label='Threshold')
plt.title('Intrusion Detected by Reconstruction Error',fontsize=16, fontweight='bold')
plt.xlabel('Sequence Number (×10³)', fontsize=16, fontweight='bold')
#plt.xlabel('Sequence Number(*1000)', fontsize=16, fontweight='bold')
plt.ylabel('Reconstruction Error', fontsize=16, fontweight='bold')
for label in (plt.gca().get_xticklabels() + plt.gca().get_yticklabels()):
    label.set_fontsize(12)
    label.set_fontweight('bold')
plt.legend(fontsize=15)
plt.show()

In [None]:
sample_index = np.random.choice(len(X_chunk_test))
original_sample = X_chunk_test[sample_index]
reconstructed_sample = X_chunk_pred[sample_index]

plt.figure(figsize=(16, 8))
plt.plot(original_sample[:, 0], 'b-', label='Original Real Part')
plt.plot(reconstructed_sample[:, 0], 'b--', label='Reconstructed Real Part')
# plt.plot(original_sample[:, 1], 'm-', label='Original Real STD')
# plt.plot(reconstructed_sample[:, 1], 'm--', label='Reconstructed Real STD')
# plt.plot(original_sample[:, 2], 'c-', label='Original Real Skew')
# plt.plot(reconstructed_sample[:, 2], 'c--', label='Reconstructed Real Skew')
# plt.plot(original_sample[:, 3], 'orange', label='Original Real Kurtosis')
# plt.plot(reconstructed_sample[:, 3], 'orange', label='Reconstructed Real Kurtosis', linestyle='--')

plt.plot(original_sample[:, 1], 'g-', label='Original Imaginary Part')
plt.plot(reconstructed_sample[:, 1], 'g--', label='Reconstructed Imaginary Part')
# plt.plot(original_sample[:, 5], 'purple', label='Original Imaginary STD')
# plt.plot(reconstructed_sample[:, 5], 'purple', label='Reconstructed Imaginary STD', linestyle='--')
# plt.plot(original_sample[:, 6], 'brown', label='Original Imaginary Skew')
# plt.plot(reconstructed_sample[:, 6], 'brown', label='Reconstructed Imaginary Skew', linestyle='--')
# plt.plot(original_sample[:, 7], 'pink', label='Original Imaginary Kurtosis')
# plt.plot(reconstructed_sample[:, 7], 'pink', label='Reconstructed Imaginary Kurtosis', linestyle='--')
plt.title('Original vs Reconstructed IQ Data')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')

# Place the legend outside the plot area
plt.legend(loc='upper left', bbox_to_anchor=(1, 1), fontsize='small', title='Legend')

# Adjust layout to make room for the legend
plt.tight_layout()

plt.show()

In [None]:
reconstruction_error_real_parts = reconstruction_error[:, 0]
reconstruction_error_real_std = reconstruction_error[:, 1]
reconstruction_error_real_skew = reconstruction_error[:, 2]
reconstruction_error_real_kurtosis = reconstruction_error[:, 3]
reconstruction_error_imag_parts = reconstruction_error[:, 4]
reconstruction_error_imag_std = reconstruction_error[:, 5]
reconstruction_error_imag_skew = reconstruction_error[:, 6]
reconstruction_error_imag_kurtosis = reconstruction_error[:, 7]


In [None]:
sample_index = np.random.choice(len(X_chunk_test))
original_sample = X_chunk_test[sample_index]
reconstructed_sample = X_chunk_pred[sample_index]

plt.figure(figsize=(16, 8))
# plt.plot(original_sample[:, 0], 'b-', label='Original Real Part')
# plt.plot(reconstructed_sample[:, 0], 'b--', label='Reconstructed Real Part')
# plt.plot(original_sample[:, 1], 'm-', label='Original Real STD')
# plt.plot(reconstructed_sample[:, 1], 'm--', label='Reconstructed Real STD')
plt.plot(original_sample[:, 2], 'c-', label='Original Real Skew')
plt.plot(reconstructed_sample[:, 2], 'c--', label='Reconstructed Real Skew')
plt.plot(original_sample[:, 3], 'orange', label='Original Real Kurtosis')
plt.plot(reconstructed_sample[:, 3], 'orange', label='Reconstructed Real Kurtosis', linestyle='--')

# plt.plot(original_sample[:, 4], 'g-', label='Original Imaginary Part')
# plt.plot(reconstructed_sample[:, 4], 'g--', label='Reconstructed Imaginary Part')
# plt.plot(original_sample[:, 5], 'purple', label='Original Imaginary STD')
# plt.plot(reconstructed_sample[:, 5], 'purple', label='Reconstructed Imaginary STD', linestyle='--')
plt.plot(original_sample[:, 6], 'brown', label='Original Imaginary Skew')
plt.plot(reconstructed_sample[:, 6], 'brown', label='Reconstructed Imaginary Skew', linestyle='--')
plt.plot(original_sample[:, 7], 'pink', label='Original Imaginary Kurtosis')
plt.plot(reconstructed_sample[:, 7], 'pink', label='Reconstructed Imaginary Kurtosis', linestyle='--')
plt.title('Original vs Reconstructed IQ Data')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')

# Place the legend outside the plot area
plt.legend(loc='upper left', bbox_to_anchor=(1, 1), fontsize='small', title='Legend')

# Adjust layout to make room for the legend
plt.tight_layout()

plt.show()

In [None]:
# # Define the number of sequences to plot together
n = 2  # Change this to desired number of sequences
sample_length = sequence_length * n

# Select a random starting sequence for plotting
sequence_index = np.random.choice(len(X_chunk_test) - n + 1)

# Extract and concatenate the original and reconstructed samples
original_sample = np.concatenate(X_chunk_test[sequence_index:sequence_index + n])
reconstructed_sample = np.concatenate(X_chunk_pred[sequence_index:sequence_index + n])

# Plot concatenated sequences
plt.figure(figsize=(14, 6))
plt.plot(original_sample[:, 0], 'b-', label='Original Real Part')
plt.plot(reconstructed_sample[:, 0], 'r--', label='Reconstructed Real Part')
plt.plot(original_sample[:, 1], 'orange', label='Original Real STD')
plt.plot(reconstructed_sample[:, 1], 'orange', label='Reconstructed Real STD', linestyle='--')

plt.plot(original_sample[:, 4], 'y-', label='Original Imaginary Part')
plt.plot(reconstructed_sample[:, 4], 'g--', label='Reconstructed Imaginary Part')
plt.plot(original_sample[:, 5], 'pink', label='Original Imaginary STD')
plt.plot(reconstructed_sample[:, 5], 'pink', label='Reconstructed Imaginary STD', linestyle='--')
plt.title(f'Original vs Reconstructed IQ Data for {n} Sequences of Length {sequence_length}')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')
plt.legend()
# plt.savefig('9-Original vs Reconstructed IQ Data for {n} Sequences of Length {sequence_length}.png')
# plt.close()
plt.show()

In [None]:
# Repeat for n = 9
n = 4  # Change this to desired number of sequences
sequence_index = np.random.choice(len(X_chunk_test) - n + 1)
original_sample = np.concatenate(X_chunk_test[sequence_index:sequence_index + n])
reconstructed_sample = np.concatenate(X_chunk_pred[sequence_index:sequence_index + n])

plt.figure(figsize=(14, 6))
plt.plot(original_sample[:, 2], 'b-', label='Original Real Part Skew')
plt.plot(reconstructed_sample[:, 2], 'r--', label='Reconstructed Real Part Skew')
plt.plot(original_sample[:, 6], 'g-', label='Original Imaginary Part Skew')
plt.plot(reconstructed_sample[:, 6], 'y--', label='Reconstructed Imaginary Part Skew')
plt.title(f'Original vs Reconstructed IQ Data for {n} Sequences of Length {sequence_length}')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')
plt.legend()
# plt.savefig('11-Original vs Reconstructed IQ Data for {n} Sequences of Length {sequence_length}.png')
# plt.close()
plt.show()

In [None]:
# Repeat for n = 9
n = 4  # Change this to desired number of sequences
sequence_index = np.random.choice(len(X_chunk_test) - n + 1)
original_sample = np.concatenate(X_chunk_test[sequence_index:sequence_index + n])
reconstructed_sample = np.concatenate(X_chunk_pred[sequence_index:sequence_index + n])

plt.figure(figsize=(14, 6))
plt.plot(original_sample[:, 3], 'b-', label='Original Real Part Kurtosis')
plt.plot(reconstructed_sample[:, 3], 'r--', label='Reconstructed Real Part Kurtosis')
plt.plot(original_sample[:, 7], 'g-', label='Original Imaginary Part Kurtosis')
plt.plot(reconstructed_sample[:, 7], 'y--', label='Reconstructed Imaginary Part Kurtosis')
plt.title(f'Original vs Reconstructed IQ Data for {n} Sequences of Length {sequence_length}')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')
plt.legend()
# plt.savefig('11-Original vs Reconstructed IQ Data for {n} Sequences of Length {sequence_length}.png')
# plt.close()
plt.show()