In [1]:
import json

with open('dataset/LPI12 dataset.json', 'r') as f:
        TrainData = json.load(f)

In [2]:
import models._config as c

from dataset.RadarDataset import RadarSignalDataset


train_dataset = RadarSignalDataset(TrainData, c.signalTypes[0:c.typeSize], snr_max=2)

Data loading for 'Barker'.Done!
Data loading for 'Costas'.Done!
Data loading for 'Frank'.Done!
Data loading for 'LFM'.Done!
Data loading for 'P1'.Done!
Data loading for 'P2'.Done!
Data loading for 'P3'.Done!
Data loading for 'P4'.Done!
Data loading for 'T1'.Done!
Data loading for 'T2'.Done!
Data loading for 'T3'.Done!
Data loading for 'T4'.Done!


In [3]:
sample_data, sample_label, sample_snr, sample_length = train_dataset[0]

import torch
from torch.nn.utils import rnn as rnn_utils

def collate(batch):
    data, labels, snrs, lengths = zip(*batch)
    data_pad = rnn_utils.pad_sequence([torch.tensor(seq, dtype=torch.float32) for seq in data], batch_first=True)
    
    labels = torch.tensor([c.label_mapping[label] for label in labels], dtype=torch.long)
    snrs = torch.tensor(snrs, dtype=torch.int64)
    lengths = torch.tensor(lengths, dtype=torch.int64)  # 시퀀스 길이를 함께 전달

    return data_pad, labels, snrs, lengths
    
import torch.nn as nn
from torch.utils.data import DataLoader
from models.LSTM import BiLSTM
model = BiLSTM(input_size=2, hidden_size=128, num_layers=2, num_classes=12)
model = nn.DataParallel(model)
model.to('cuda')

loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate)

batch_data, batch_label, batch_snr, batch_length = next(iter(loader))
# Batch data shape: torch.Size([4, 1243, 2])-cpu-torch.float32
# Batch label shape: torch.Size([4])-cpu-torch.int64
# Batch SNR shape: torch.Size([4])-cpu-torch.int64
# Batch length shape: torch.Size([4])-cpu-torch.int64

print(f"Data : {sample_data}>>{batch_data.shape}-{batch_data.device}-{batch_data.dtype}")
print(f"Label : {batch_label.shape}-{batch_label.device}-{batch_label.dtype}")
print(f"SNR : {batch_snr.shape}-{batch_snr.device}-{batch_snr.dtype}")
print(f"Length : {batch_length.shape}-{batch_length.device}-{batch_length.dtype}")

Data : [(-0.995865505850847, -0.772780143825748), (0.528537024543503, 0.693081898619596), (0.556359429830424, 1.46821268703729), (-0.855835300072973, 1.17819101807952), (-1.13152876620604, -0.613120366862707), (0.13538253321669, -0.755300234111235), (1.25072756104799, -0.336904806538208), (0.242027684940508, 1.29508355564195), (0.039248541267727, -0.0326259082304102), (-0.424221026827263, -0.0183839159009666), (-0.271141129712853, -1.86581756765567), (-0.211387425942407, -1.5505621364576), (0.860207949611725, 1.52085594430567), (-0.00865933426806567, 1.85989765308762), (-0.424224910409746, 0.308979098680504), (-0.894720964779146, 0.665993469271625), (-0.972965505530591, -1.02068992987413), (0.412207809702996, 0.855018330428079), (0.785651958390508, 0.72332268465049), (-0.234654191714054, 0.885084823734852), (-2.70273087734781, 1.45726164470342), (-2.10876792694259, -1.06422375892542), (1.92107903908833, -2.43874769401543), (1.12422195536253, 1.47427393648857), (0.0811721665236903, 1.15

In [6]:
import torch
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

from models.LSTM import BiLSTM
# from explainer import LRP  # LRP 클래스가 들어있는 모듈


class LRP:
    def __init__(self, model, epsilon=1e-6):
        self.model = model
        self.epsilon = epsilon
    
    def relevance(self, x, lengths, target=None):
        output, h, w = self.model(x, lengths, lstm_outputs=True)
        target = torch.argmax(output, dim=1) if target is None else target
        # Output shape: torch.Size([batch_size, cls_size])
        # Hidden states shape: torch.Size([batch_size, length, hidden_size*2])
        # Attention weights shape: torch.Size([batch_size, length])
        # Context shape: torch.Size([batch_size, hidden_size*2])
        
        r = torch.zeros_like(output)
        for i in range(output.size(0)):
            r[i, target[i]] = output[i, target[i]] # Target class에 대한 relevance만 생존
        
        r_c = self.bpp_fc(h, r)
        r_h = self.bpp_att(r_c, h, w)
        r_x = self.bpp_lstm(r_h, h, lengths)
        
        print(r_x.shape)
        return r_x
    
    def lstm_gates(self, x_t, h_prev, W_ih, W_hh, b_ih, b_hh, cl_prev):
        gates = torch.matmul(W_ih, x_t) + torch.matmul(W_hh, h_prev) + b_ih + b_hh
        i_t, f_t, o_t, g_t = torch.chunk(gates, 4, dim=0)
        
        i_t = torch.sigmoid(i_t)
        f_t = torch.sigmoid(f_t)
        o_t = torch.sigmoid(o_t)
        cl_t = f_t * cl_prev + i_t * torch.tanh(g_t)
        
        return (f_t, i_t, o_t, cl_t)
    
    def bpp_lstm(self, rel_h, h, x, lengths):
        batch_size, seq_len, hidden_size = h.size()
        
        rel_x = torch.zeros(batch_size, seq_len, 2)
        rel_c_t1 = torch.zeros(batch_size, hidden_size)
        
        W_ih = self.model.lstm.weight_ih_l0
        W_hh = self.model.lstm.weight_hh_l0
        b_ih = self.model.lstm.bias_ih_l0
        b_hh = self.model.lstm.bias_hh_l0
        
        for i in range(batch_size):
            h_prev = torch.zeros(h[i, 0])
            cl_prev = torch.zeros(h_prev)
            for t in reversed(range(seq_len)):
                if t < lengths[i]:
                    h_t = h[i, t]
                    x_t = x[i, t]
                    rel_h_t = rel_h[i, t]
                    gates = self.lstm_gates(x_t, h_prev, W_ih, W_hh, b_ih, b_hh, cl_prev)
                    rel_x[i, t], rel_c_t1 = self.bpp_lstm_cell(h_t, rel_h_t, rel_c_t1, gates)
                    
        return rel_x
        
    def bpp_lstm_cell(self, h_t, rel_h_t, rel_cl_t1, gates):
        f_t, i_t, o_t, cl_t = gates
        rel_cl_t = rel_cl_t1 + rel_h_t * o_t * (1-torch.tanh(cl_t)**2)
        rel_c_t1 = rel_cl_t * f_t
        
        rel_x_t = torch.matmul(self.W_ih.T, rel_cl_t * i_t)
        return rel_x_t, rel_c_t1
        
    def bpp_att(self, r_c, h, w):
        rel_h = torch.zeros_like(h)

        for i in range(h.size(0)):
            rel_h_t = r_c[i].unsqueeze(0) * w[i].unsqueeze(-1)
            rel_h[i] = rel_h_t

        return rel_h
    
    def bpp_fc(self, c, r):
        fc_W = self.model.fc.weight
        rel_c = torch.zeros_like(c)
        for i in range(r.size(0)):
            for j in range(r.size(1)):
                rel_c[i] += (c[i]*fc_W[j]) * r[i, j] / (fc_W[j].abs().sum() + self.epsilon)
        return rel_c
        
    
    
def explain_set(train_dataset):
    model = BiLSTM(input_size=2, hidden_size=128, num_layers=2, num_classes=12)
    model.load_state_dict(torch.load('/home/kiwan/TSC_XAI/ckpts/ 0dB/LSTM_ 0dB_0.0073.pt'))
    model.eval()
    
    lrp = LRP(model)

    batch = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=model.collate)

    fig = plt.figure(figsize=(20, 15))

    with torch.no_grad():
        for batch_idx, (data_batch, labels_batch, snrs_batch, lengths_batch) in enumerate(batch):
            print("Ground Truth:", labels_batch)
            r_scores = lrp.relevance(data_batch, lengths_batch)

            real_part = data_batch[batch_idx][:, 0].cpu().numpy()  # 실수부
            imag_part = data_batch[batch_idx][:, 1].cpu().numpy()  # 허수부
            r_scores_real = r_scores[:, 0].cpu().numpy()   # Relevance 실수부

            # 그래프 그리기
            ax = fig.add_subplot(4, 2, batch_idx + 1)
            ax.plot(real_part, label='Real Part')
            ax.plot(r_scores_real, label='Relevance (Real Part)', linestyle='--')
            ax.set_title(f'Sample {batch_idx+1} (SNR: {snrs_batch[batch_idx]} dB)')
            ax.legend()

            break  # 첫 번째 배치만 시각화

    plt.tight_layout()
    plt.show()

explain_set(train_dataset)

Ground Truth: tensor([0, 2])


TypeError: LRP.bpp_lstm() missing 1 required positional argument: 'lengths'

<Figure size 2000x1500 with 0 Axes>

In [None]:
import torch
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

from models.LSTM import BiLSTM
from explainer import LRP  # LRP 클래스가 들어있는 모듈



def explain_set(train_dataset):
    model = BiLSTM(input_size=2, hidden_size=128, num_layers=2, num_classes=12)
    model.load_state_dict(torch.load('/home/kiwan/TSC_XAI/ckpts/ 0dB/LSTM_ 0dB_0.0073.pt'))
    model.eval()
    lrp = LRP(model)

    batch = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=model.collate)

    fig = plt.figure(figsize=(20, 15))

    with torch.no_grad():
        for data_batch, labels, snr, lengths in batch:
            lengths = lengths.cpu().long()
            for i in range(len(data_batch)):
                data = data_batch[i].unsqueeze(0)  # 하나의 샘플만 가져오기
                seq_len = lengths[i].unsqueeze(0)
                print(seq_len) 
                
                # LRP로 Relevance 스코어 계산
                r_scores = lrp.get_relevance(data, seq_len)

                # 실제 신호와 Relevance 시각화
                real_part = data_batch[i][:, 0].cpu().numpy()  # 실수부
                imag_part = data_batch[i][:, 1].cpu().numpy()  # 허수부
                r_scores_real = r_scores[:, 0].cpu().numpy()   # Relevance 실수부

                # 그래프 그리기
                ax = fig.add_subplot(4, 2, i + 1)
                ax.plot(real_part, label='Real Part')
                ax.plot(r_scores_real, label='Relevance (Real Part)', linestyle='--')
                ax.set_title(f'Sample {i+1} (SNR: {snr[i]} dB)')
                ax.legend()

            break  # 첫 번째 배치만 시각화

    plt.tight_layout()
    plt.show()

explain_set(train_dataset)

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt

# Define the directory containing the dataset
dataset_directory = '/data/kiwan/LPI_Radar_1000_'
sclass = ['Barker', 'Costas', 'Frank', 'LFM', 'P1', 'P2', 'P3', 'P4', 'T1', 'T2', 'T3', 'T4']
j = 0
for cls in sclass:
    print(cls, end=' ')
    pth = os.path.join(dataset_directory, cls)
    files = os.listdir(pth)
    i = 0
    for file in files:
        if not file.endswith('-noisy.npy'):
            continue
        # print('Processing', cls, file)
        data = np.load(os.path.join(pth, file))
        print(data.shape, end=' ')
        data = data[:, 0]
        # plt.plot(data.shape)
        # plt.show()
        i = i + 1
        if i == 5:
            break
    print()
    j = j + 1
    if j == 12:
        break   


In [None]:
import os
import matplotlib.pyplot as plt

def complex_sep(data):
    data = data.replace('i', 'j')  # 'i'를 'j'로 변경하여 복소수 형식에 맞춤
    complex_numbers = data.split(',')  # 쉼표로 구분된 복소수 분리
    complex_list = []
    for num in complex_numbers:
        num = complex(num.strip())
        complex_list.append(num)
    return complex_list

dataset = '/data/kiwan/dataset-CWD-1000/'
signals = ['Barker', 'Costas', 'Frank', 'LFM', 'P1', 'P2', 'P3', 'P4', 'T1', 'T2', 'T3', 'T4']

for signal in signals:
    real_parts = []
    plt.figure(figsize=(20, 5))
    data_dir = os.path.join(dataset, signal)
    data_file = os.listdir(data_dir)
    for ie, file in enumerate(data_file):
        with open(os.path.join(data_dir, file), 'r') as f:
            data = f.readlines()
            data = [complex_sep(d) for d in data]
            data = [item for sublist in data for item in sublist] 

        for c in data:
            real_parts.append(c.real)

        plt.axvline(x=len(real_parts), color='r', linestyle='--')
            
        if ie == 5:
            break


    plt.plot(real_parts, label='Real Part')
    # plt.plot(imag_parts, label='Imaginary Part')
    plt.title(f'{signal} Signal: Real and Imaginary Parts over Time')
    plt.xlabel('Sample Index')
    plt.ylabel('Value')
    plt.legend()
    plt.grid(True)
    plt.show()
        

In [None]:


samples_per_class = 1

class_samples = {signal_type: [] for signal_type in set(train_dataset.labels)}

for idx in range(len(train_dataset)):
    _, label, _, _ = train_dataset[idx]
    if len(class_samples[label]) < samples_per_class:
        class_samples[label].append(idx)

    if all(len(samples) == samples_per_class for samples in class_samples.values()):
        break

for label, indices in class_samples.items():
    fig, axes = plt.subplots(len(indices), 2, figsize=(12, 3 * len(indices)))
    fig.suptitle(f'Signal Visualization for Class: {label}', fontsize=16)

    for i, idx in enumerate(indices):
        data_sample, _, snr, length = train_dataset[idx]
        
        print(len(data_sample), length)
        # 실수부와 허수부 추출
        real_part = [point[0] for point in data_sample]
        imag_part = [point[1] for point in data_sample]
        
        axes[0].plot(real_part)
        axes[0].set_title(f'Real Part (SNR: {snr} dB)')
        axes[0].set_xlabel('Time Step')
        axes[0].set_ylabel('Amplitude')

        axes[1].plot(imag_part)
        axes[1].set_title(f'Imaginary Part (SNR: {snr} dB)')
        axes[1].set_xlabel('Time Step')
        axes[1].set_ylabel('Amplitude')


    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.savefig(f'./dataset/imgs/signal_visualization_class_{label}.png')
    plt.show()