<a href="https://colab.research.google.com/github/song23jihye/aid_steamtrap/blob/master/stmodel_memae.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#모듈 사용

import os
os.chdir('/content/drive/MyDrive/steam_trap_project')

In [None]:
import torch
import copy
import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
import os
import math
from torch.nn.parameter import Parameter
#from matplotlib import rcb

from torchvision.transforms import transforms
from sklearn.model_selection import train_test_split
from torch import nn, optim
from PIL import Image
import torch.nn.functional as F
from torch.utils.data import Dataset,DataLoader
from models.entropy_loss import EntropyLossEncap
from models.memae_2dmlp_conv import AutoEncoderCov2DMem
from sklearn.preprocessing import MinMaxScaler
from models.memory_module_series import MemModule
sns.set(style='whitegrid', palette='muted', font_scale=1.2)
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]
sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))
rcParams['figure.figsize'] = 15, 8

In [None]:
import numpy as np
from scipy.fft import fft

# Spectral entropy 계산 함수
def spectral_entropy(signal, normalize=False):
    freq_domain = fft(signal)
    magnitude = np.abs(freq_domain)
    if normalize:
        magnitude = magnitude / np.sum(magnitude)
    entropy = -np.sum(magnitude*np.log2(magnitude + np.finfo(float).eps))
    return entropy

# # 엔트로피 계산
# train_entropies = []
# test_entropies = []

# # Train 데이터의 스펙트럼 엔트로피 계산
# for i in range(train_df.shape[0]):
#     row_data = train_df.iloc[i].values
#     entropy = spectral_entropy(row_data, normalize=True)
#     train_entropies.append(entropy)

# # Test 데이터의 스펙트럼 엔트로피 계산
# for i in range(test_df.shape[0]):
#     row_data = test_df.iloc[i].values
#     entropy = spectral_entropy(row_data, normalize=True)
#     test_entropies.append(entropy)

# average_entropy = np.mean(train_entropies)

# 데이터셋 생성 함수
def create_dataset(df):
    sequences = df.astype(np.float32).to_numpy().tolist()
    dataset = [torch.tensor(s).unsqueeze(1).float() for s in sequences]
    n_seq, seq_len, n_features = torch.stack(dataset).shape
    return dataset, seq_len, n_features

# # Train 및 Test 데이터셋 생성
# train_dataset, seq_len, n_features = create_dataset(train_df)
# test_normal_dataset, _, _ = create_dataset(test_df)

class Encoder1(nn.Module):
    def __init__(self, seq_len, n_features, embedding_dim=128,dropout_prob=0.0):
        super(Encoder1, self).__init__()
        self.seq_len = seq_len
        self.n_features = n_features
        self.embedding_dim = embedding_dim
        self.hidden_dim = 2 * embedding_dim
        self.dropout = nn.Dropout(dropout_prob)
        # You might need to adjust the sizes here to fit your needs.
        self.fc1 = nn.Linear(self.n_features, self.hidden_dim) #seq_len * n_features
        self.fc2 = nn.Linear(self.hidden_dim, self.embedding_dim * n_features)

    def forward(self, x):
        try:
            B, S, E = x.shape  # B = Batch size, S = Sequence length, E = Feature dim
        except:
            B = 1
        x = x.view(B, -1)  # Flatten the input: shape becomes (B, S*E)
        x = x.unsqueeze(-1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        # Here, we reshape the output to mimic your original Encoder's output shape
        x = x.view(B, self.seq_len, self.embedding_dim)
        return x


# Modified Decoder with Fully Connected Layers
class Decoder1(nn.Module):
    def __init__(self, seq_len, input_dim=128, n_features=1):
        super(Decoder1, self).__init__()
        self.seq_len = seq_len
        self.input_dim = input_dim  # Embedding dim from Encoder
        self.hidden_dim = 2 * input_dim  # Mimicking original architecture
        self.n_features = n_features  # Usually 1, representing the reconstructed feature
        self.fc1 = nn.Linear(self.input_dim * seq_len, self.hidden_dim)
        self.fc2 = nn.Linear(self.hidden_dim, seq_len * n_features)
        self.output_layer = nn.Linear(seq_len * n_features, seq_len * n_features)

    def forward(self, x):
        B,E,G = x.shape
        #x = x.repeat(1,self.seq_len, self.n_features)
        #x = x.reshape((B, self.input_dim,1))
        x = x.reshape(B, -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.output_layer(x)

        # Reshape to mimic your original Decoder's output shape
        x = x.view(B, self.seq_len, self.n_features)
        return x


# 예측 모델 정의
class Autoencoder(nn.Module):
    def __init__(self, seq_len, n_features, embedding_dim=64):
        super(Autoencoder, self).__init__()
        self.encoder = Encoder1(seq_len, n_features, embedding_dim)
        self.decoder = Decoder1(seq_len, embedding_dim, n_features)
        self.mem_rep = MemModule(mem_dim=500, fea_dim=1280, shrink_thres=0.0025)

    def forward(self, x):
        x = self.encoder(x)
        res_mem = self.mem_rep(x)
        x = res_mem['output']
        att = res_mem['att']
        x = self.decoder(x)
        return x, att

class TimeSeriesDataset(Dataset):
    def __init__(self, sequences,img,labels=None,transform=None):
        self.sequences = sequences
        self.transform = transform
        self.img = img
        self.labels = labels
    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, index):
        img = self.img[index]
        labels = self.labels[index]

        if self.transform:
            img = self.transform(img)
        return self.sequences[index],img,labels

mean = [0.5]
std = [0.5]

# 데이터로더 생성 함수
def create_train_data_loader(df, batch_size,img,labels):
    sequences = df.astype(np.float32).to_numpy().tolist()
    dataset = TimeSeriesDataset([torch.tensor(s).unsqueeze(1).float() for s in sequences],img,labels,transform=transforms.Compose(
                                    [
                                     transforms.ToTensor(),
                                     transforms.Normalize(mean=mean, std=std)]))
    return DataLoader(dataset, batch_size=batch_size, shuffle=False,drop_last=False)

def create_test_data_loader(df,batch_size,img,labels):
    sequences = df.astype(np.float32).to_numpy().tolist()
    dataset = TimeSeriesDataset([torch.tensor(s).unsqueeze(1).float() for s in sequences],img,labels,transform=transforms.Compose(
                                    [
                                     transforms.ToTensor(),
                                     transforms.Normalize(mean=mean, std=std)]))
    return DataLoader(dataset, batch_size=batch_size, shuffle=False)

# 모델 및 데이터로더 생성
train_dataloader = create_train_data_loader(train_df, 1)
test_dataloader = create_test_data_loader(test_df, 1)

# 모델 및 가중치 로드
model_series = Autoencoder(seq_len=1280, n_features=1, embedding_dim=400).to(device)
model_series = torch.load('/content/drive/MyDrive/steam_trap_project/mdlpth/mae_vibration_series_model.pth')
model_series.to(device)
model_series.eval()

# 예측 함수
def predict(model_series,model_img,dataloader):
    entropy_loss_weight=0.0002
    predictions, losses = [], []
    tr_recon_loss_func = nn.MSELoss().to(device)  ###nn.MSELoss().to(device)#
    tr_entropy_loss_func = EntropyLossEncap().to(device)
    criterion = nn.MSELoss().to(device)##nn.MSELoss().to(device)
    with torch.no_grad():
        for batch_idx, (seq_true,frames,labels) in enumerate(dataloader):
            frames = frames.to(device)
            frq_entropy = train_entropies[batch_idx]
            frq_entropy = (frq_entropy+average_entropy)/2
            seq_true = seq_true.to(device)
            labels = labels.to(device).float()
            #print('frames:',frames.shape)
            recon_res = model_img(frames)
            seq_pred, att_w1 = model_series(seq_true)
            recon_frames = recon_res['output']
            r_frame = recon_frames - frames
            sp_error_map = torch.sum(r_frame**2, dim=1)**0.5
            s = sp_error_map.size()
            sp_error_vec = sp_error_map.view(s[0], -1)
            recon_error_frame = torch.mean(sp_error_vec, dim=-1)
            recon_error_series = tr_recon_loss_func(seq_pred, seq_true)
            recon_error = frq_entropy*recon_error_frame + opt.series_w*recon_error_series
            #predictions.append(img_pred.cpu().numpy().flatten())
            losses.append(recon_error.item())
    return predictions, losses

# 예측 및 결과 분석
_, losses = predict(model_series,model_img, train_dataloader)
# 결과 분석 (예: 시각화, 분류 등)
sns.distplot(losses, bins=150, kde=True);

NameError: ignored

In [None]:
df = pd.DataFrame(losses)
print(df.describe())
df.plot.box(title="Box Chart")
plt.grid(linestyle="--", alpha=0.3)
plt.show()