In [1]:
import torch
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
import shutil
import time 

from scipy.integrate import simpson

from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

from torch.utils.tensorboard import SummaryWriter

from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

In [2]:
df = pd.read_pickle("a2f_data.pk.bz2")[:250] #Objetivo do [:250] é remover os pontos iguais a zero
w = df.w
df = df.drop("w", axis=1).T
df=df.drop(["agm002693510"]) #Está repetido com valores de NaN

Ry2cm = 219474.63/2
K_BOLTZMANN_RY = (1.380649E-23/(4.3597447222071E-18/2))

In [3]:
def get_epc(a2f, w):
  K_BOLTZMANN_RY = (1.380649E-23/(4.3597447222071E-18/2))
  la = 2*simpson(a2f/w, w)
  sim = simpson(a2f/w*np.log(w), w)
  wlog = np.exp(2/la*sim)/K_BOLTZMANN_RY
  sim = max(0.0, simpson(2*a2f*w, w))

  w2 = np.sqrt(sim/la)/K_BOLTZMANN_RY
  return wlog, w2, la

def get_Tc(la, wlog, mu):
  if (la > 1.5*mu) and (wlog > 0):
    ad = wlog/1.2*np.exp(-(1.04*(1+la))/(la-mu*(1+0.62*la)))
  else:
    ad = 0.0
  return ad

In [4]:
class FeatureDataset(Dataset):
    
    def __init__(self, data):
        self.X_train = torch.tensor(data.values, dtype=torch.float32)[:,None]
        
    def __len__(self):
        return len(self.X_train)
    
    def __getitem__(self, idx):
        return self.X_train[idx]

In [5]:
test_set, train_set, validation_set= np.split(df.sample(frac=1, random_state=52),[int(.1*len(df)),int(.9*len(df))])#[int(.8*len(df)),int(.9*len(df))])   random.randint(1, 100)[644,size_train+644]

print(len(train_set),len(test_set),len(validation_set))
print(len(train_set)/df.shape[0],len(test_set)/df.shape[0],len(validation_set)/df.shape[0])

batch_size = train_set.shape[0]

train_data= DataLoader(FeatureDataset(train_set), batch_size=128) 

5150 643 644
0.800062140748796 0.09989125368960695 0.10004660556159702


Modelo

In [6]:
class UpScale(nn.Module):
    def __init__(self, size, scale, mode):
        super(UpScale, self).__init__()
        self.interpolate = nn.functional.interpolate
        self.scale=scale
        self.mode=mode
        self.size=size
        
    def forward(self, x):
        if self.size==0:
            x = self.interpolate(x, scale_factor=self.scale, mode=self.mode, align_corners=False)
            return x
        elif self.scale==0:
            x = self.interpolate(x, size=self.size, mode=self.mode, align_corners=False)
            return x




class Autoencoder(nn.Module):
    def __init__(self, initial_size, min_size):
        self.initial_size= initial_size
        self.min_size= min_size

        super(Autoencoder,self).__init__()

        self.encoder = nn.Sequential(                                             #Channels*Dim
            nn.Conv1d(1,8, kernel_size=(16,),stride=(2,),padding=(0,),bias=True), #1*250 -> 8*118
            nn.BatchNorm1d(8),
            nn.SiLU(), 


            nn.Conv1d(8, 16, kernel_size=(16,),stride=(1,),padding=(0,),bias=True), #8*118 -> 16*103
            nn.BatchNorm1d(16),
            nn.SiLU(), 
            
            nn.MaxPool1d(kernel_size=(2,)), #16*103 -> 16*51

            nn.Conv1d(16, 32, kernel_size=(16,),stride=(1,),padding=(0,),bias=True), #16*51 -> 32*36
            nn.BatchNorm1d(32),
            nn.SiLU(),

            nn.Conv1d(32, 64, kernel_size=(16,),stride=(2,),padding=(0,),bias=True), #32*36 -> 64*11
            nn.BatchNorm1d(64),
            nn.SiLU(),

            nn.Conv1d(64, 16, kernel_size=(8,),stride=(1,),padding=(0,),bias=True), #64*11 -> 16*4
            nn.BatchNorm1d(16),
            nn.SiLU(),

            
            nn.Flatten(),
            
            nn.Linear(64,32,bias=True),
            nn.BatchNorm1d(32),
            nn.SiLU(),

            nn.Linear(32,16,bias=True),
            nn.BatchNorm1d(16),
            nn.SiLU(),

            nn.Linear(16,8,bias=True),
            nn.Tanh(), 
            )
          
        self.decoder = nn.Sequential(

            nn.Linear(8,16,bias=True),
            nn.BatchNorm1d(16),
            nn.SiLU(),
            

            nn.Linear(16,32,bias=True),
            nn.BatchNorm1d(32),
            nn.SiLU(),

            nn.Linear(32,64,bias=True),
            nn.BatchNorm1d(64),
            nn.SiLU(),

            nn.Unflatten(1,(16,4)),
            
            nn.ConvTranspose1d(16, 16, kernel_size=(8,),stride=(1,),padding=(0,),output_padding=(0,),bias=True), #16*4 -> 16*11
            nn.BatchNorm1d(16),
            nn.SiLU(),
            
            nn.ConvTranspose1d(16, 64, kernel_size=(16,),stride=(2,),padding=(0,),output_padding=(0,),bias=True), #16*11 -> 64*36
            nn.BatchNorm1d(64),
            nn.SiLU(),


            
            UpScale(scale=0,size=88, mode='linear'), #64*88 #Com scale=2 dava 72
            

            nn.ConvTranspose1d(64,32, kernel_size=(16,),stride=(1,),padding=(0,),output_padding=(0,),bias=True), #64*88 -> 32*103
            nn.BatchNorm1d(32),
            nn.SiLU(),

            nn.ConvTranspose1d(32,16, kernel_size=(16,),stride=(1,),padding=(0,),output_padding=(0,),bias=True), #32*103 -> 16*118
            nn.BatchNorm1d(16),
            nn.SiLU(),

            nn.ConvTranspose1d(16,8, kernel_size=(16,),stride=(2,),padding=(0,),output_padding=(0,),bias=True), #16*118 -> 8*250
            nn.BatchNorm1d(8),
            nn.SiLU(),
            
            nn.ConvTranspose1d(8,1, kernel_size=(1,),stride=(1,),padding=(0,),output_padding=(0,),bias=True), #8*250 -> 1*250
            #nn.ReLU()
        )
        
    def forward(self,x):
        encoded = self.encoder(x)
        decoded= self.decoder(encoded)
        
        return decoded

Loss Function

In [7]:
Lfunction1= nn.MSELoss()

class CustomLoss(nn.Module):
    
    def __init__(self):
        super(CustomLoss, self).__init__()

    def calculator1(self, values, w):

        la= torch.mul(torch.trapezoid(torch.div(values,w),w), 2)

        #sim1=torch.mul(torch.div(values,w),torch.log(w))
        #sim=torch.trapezoid(sim1,w)

        #wlog2=torch.div(2,la)
        #wlog1=torch.exp(torch.mul(wlog2,sim))
        #wlog=torch.div(wlog1,K_BOLTZMANN_RY)

        return la

    def calculator2(self,la,wlog,mu):
        
        ad= torch.mul(torch.div(wlog,1.2),torch.exp(torch.div(torch.mul(torch.add(la,1),-1.04),torch.sub(la,torch.mul(torch.add(1,torch.mul(la,0.62)),mu)))))
        ad= torch.where(la > 1.5*mu, ad,0)

        return ad

        
    
    
    def forward(self, inputs, targets, w):
        inputs=torch.squeeze(inputs)
        targets=torch.squeeze(targets)

        w=torch.tensor(w.values/Ry2cm, dtype=torch.float32)
        w=w.repeat(inputs.shape[0],1)

        la1=self.calculator1(inputs,w)
        la2=self.calculator1(targets,w)


        return torch.mean(torch.pow(torch.sub(la1,la2),2)) +  Lfunction1(inputs,targets)*0.5 

In [8]:
model = Autoencoder(250,8) 

Lfunction2= CustomLoss()


optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=1e-5) 
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=50, factor=0.95) 

In [9]:
writer =  SummaryWriter('runs/teste')

checkpoint_path = "current_checkpoint.pt" 
best_model_path = "best_model.pt"

def save_ckp(state, is_best, checkpoint_path, best_model_path):
    f_path = checkpoint_path
    torch.save(state, f_path)
    if is_best:
        best_fpath = best_model_path
        shutil.copyfile(f_path, best_fpath)

In [10]:
def load_ckp(checkpoint_fpath, model, optimizer):
    checkpoint = torch.load(checkpoint_fpath)
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    loss_min = checkpoint['loss_min']
    return model, optimizer, checkpoint['epoch'], loss_min.item()

Train Loop

In [11]:
outputs =[[],[]]
epochs=20000
epoch=0
loss_train=[100000]
lr_0=0
loss_test=1000
loss_test_list=[]
train_loss_min=100000


while epoch < epochs:
    loss_train=[]
    epoch+=1
    model.train()
    for x in train_data: 
        optimizer.zero_grad()
        recon= model(x)
        
        loss=Lfunction2(x,recon,w)
        torch.autograd.set_detect_anomaly(True)
 
        loss.backward()
        
        optimizer.step()
        loss_train.append(loss.detach().numpy())

    scheduler.step(np.mean(loss_train))
    
    
    input_data = torch.tensor(test_set.values, dtype=torch.float32)[:,None]
    predictions = model(input_data)
    loss1=Lfunction2(input_data,predictions,w)
    loss_test_list.append(loss1.detach().numpy())
    

    if epoch%100==0:
        print(f"Epoch: {epoch+1}, Loss: {np.mean(loss_train):.8f}, Test Loss: {loss1.detach().numpy():.8f}")
            
    writer.add_scalar("Loss_scaled/train", np.mean(loss_train), epoch)
    writer.add_scalar("Loss_scaled_test/train", loss1.detach().numpy(), epoch)
    
    if optimizer.param_groups[0]['lr'] != lr_0:
        print(f"Epoch: {epoch}, Loss: {np.mean(loss_train):.8f}, lr: {optimizer.param_groups[0]['lr']}")
        lr_0=optimizer.param_groups[0]['lr']
        

    outputs[0].append(epoch)
    outputs[1].append(np.mean(loss_train))

    checkpoint = {
        'epoch': epoch,
        'loss_min': np.mean(loss_train),
        'state_dict': model.state_dict(),
        'optimizer': optimizer.state_dict()}
    
    if np.mean(loss_train) <= train_loss_min:
        save_ckp(checkpoint, True, checkpoint_path, best_model_path)
        train_loss_min = np.mean(loss_train)
        
print(f"Epoch: {epoch+1}, Loss: {np.mean(loss_train):.8f}, lr: {optimizer.param_groups[0]['lr']}")

writer.flush()
writer.close()

Epoch: 1, Loss: 5.01968336, lr: 0.01


KeyboardInterrupt: 

In [None]:
plt.yscale('log')

plt.plot(outputs[0],outputs[1], linewidth=1)
plt.plot(outputs[0],loss_test_list, linewidth=1)
plt.axhline(y=min(outputs[1]), color='r', linestyle='-',linewidth=1)
print(min(outputs[1]))

plt.legend()
plt.savefig("loss_train_test.png")
plt.show()

Carregar o modelo

In [None]:
model, _, start_epoch, loss_min = load_ckp(best_model_path , model, optimizer)

In [None]:
input_data = torch.tensor(test_set.values, dtype=torch.float32)[:,None]
predictions = model(input_data)
loss=Lfunction2(predictions,input_data,w)
predictions=torch.squeeze(predictions)
print(predictions.shape)
predictions = pd.DataFrame(predictions.detach().numpy(),  columns=test_set.columns)
print(loss.detach().numpy())


In [None]:
latent_emb = model.encoder(input_data)
latent_emb.shape

In [None]:
predictions = pd.DataFrame(torch.squeeze(model.decoder(latent_emb.detach())).detach().numpy(), columns=test_set.columns, index=test_set.index.tolist())

Calculo dos Tc's e erros

In [None]:
accurate = []
ae_values = []
valores_pca=[]

pca=PCA(n_components=8)
data1=(pd.DataFrame(pca.inverse_transform(pca.fit_transform(test_set)), index=test_set.index))

for i in test_set.index.tolist():
  wlog, w2, la = get_epc(test_set.loc[i], w/Ry2cm)   
  tc = get_Tc(la, wlog, 0.1)

  wlog1, w21, la1 = get_epc(predictions.loc[i], w/Ry2cm)
  tc1 = get_Tc(la1, wlog1, 0.1)

  wlog2, w22, la2 = get_epc(data1.loc[i], w/Ry2cm)
  tc2 = get_Tc(la2, wlog2, 0.1)

  accurate.append(tc)
  ae_values.append(tc1)
  valores_pca.append(tc2)

ae_values = pd.DataFrame(ae_values, index=test_set.index.tolist())
accurate = pd.DataFrame(accurate, index=test_set.index.tolist())
valores_pca = pd.DataFrame(valores_pca, index=test_set.index.tolist())

errors_abs = (ae_values-accurate).abs()
errors2_abs = (valores_pca-accurate).abs()

print(len(accurate),len(ae_values),len(valores_pca))

remover= list(accurate.loc[accurate[0] == 0].to_dict()[0].keys())

accurate=accurate.drop(index=remover)
ae_values=ae_values.drop(index=remover)
valores_pca=valores_pca.drop(index=remover)


print(len(accurate),len(ae_values),len(valores_pca))

errors_rel =(ae_values-accurate).abs()/accurate
errors2_rel =(valores_pca-accurate).abs()/accurate

print(list(accurate.loc[accurate[0] == 0].to_dict()[0].keys()))

In [None]:
mat, mat_a2f = random.choice(list(test_set.T.items()))

print(mat)
plt.plot(w, mat_a2f )
print(len(w),len(mat_a2f))

n=0
for i in mat_a2f[::-1]:
    if i == 0:
        n+=1
    else:
        break

valores_nulos=[]

for mat, mat_a2f in list(test_set.T.items()):
    n2=0
    for i in mat_a2f[::-1]:
        if i == 0:
            n2+=1
        else:
            valores_nulos.append(n2)
            break
print(min(valores_nulos),max(valores_nulos))
print(valores_nulos)
print(437-n)

In [None]:
plt.figure()
plt.yscale('log')

plt.hist(errors_abs, bins=30, range=[0,10],histtype="step",label="AE")
plt.hist(errors2_abs, label="PCA", bins=30, range=[0,10],histtype="step")

print(np.average(errors_abs))
print(np.average(errors2_abs))


plt.legend()
plt.savefig("hist.png")

In [None]:
def mean2(lista):
    return sum(lista)/len(lista)


plt.figure()
plt.yscale('log')

plt.hist(errors_rel, bins=30, range=[0,10],histtype="step",label="AE")
plt.hist(errors2_rel, label="PCA", bins=30, range=[0,10],histtype="step")

print(len(errors_rel))
print(len(errors2_rel))

erros_grandes=list(errors_rel.loc[errors_rel[0] > 100].to_dict()[0].keys())
erros2_grandes=list(errors2_rel.loc[errors2_rel[0] > 100].to_dict()[0].keys())

print(len(erros_grandes))
print(len(erros2_grandes))


list1=errors_rel[0].to_list()
list2=errors2_rel[0].to_list()

print(mean2(list1))
print(mean2(list2))

plt.legend()
plt.savefig("hist.png")

Verifica aleatoriamente os resultados vs PCA e AE

In [None]:
print("AE")
mat, mat_a2f = random.choice(list(test_set.T.items()))
for i in [mat]:
  print("--------------------")
  plt.plot(w,test_set.loc[i],label="Original")
  plt.plot(w, predictions.loc[i],label="Recon AE")
  plt.plot(w, data1.loc[i],label="Recon PCA")
  plt.xlabel="w"
  plt.ylabel="a2f"
  plt.legend()
  plt.show
  print(i, accurate.loc[i][0],ae_values.loc[i][0],errors_rel.loc[i][0],errors_abs.loc[i][0])
  print(i, accurate.loc[i][0],valores_pca.loc[i][0],errors2_rel.loc[i][0],errors2_abs.loc[i][0])
  #plt.savefig(f"pasta/figura_{i}_{errors_rel.loc[i][0]}.png")

