<a href="https://colab.research.google.com/github/t-willi/Simula/blob/main/AE_V3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import torch
from torch import nn
import matplotlib.pyplot as plt
import torchvision
from torchvision import datasets, models, transforms
from torchvision.transforms import ToTensor
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from timeit import default_timer as timer 
import glob
import torch.optim as optim
from sklearn.preprocessing import normalize
from sklearn.preprocessing import MinMaxScaler



if torch.cuda.is_available()==True:
  device="cuda:0"
else:
  device ="cpu"

In [6]:
def normalize_df_per_column(df):
  for i,k in enumerate(df.columns):
    max=df.iloc[:,i].max()
    min=df.iloc[:,i].min()
    df.iloc[:,i] = df.iloc[:,i].apply(lambda x: ((x-min)/(max-min)))
  return df


In [7]:
def normalize_list(l):
  max=max(l)
  min=min(l)
  l=l.apply(lambda x:((x-min)/(max-min)))
  return l



In [12]:
# Create custom dataset class to load ECG data into dataset, containing 
# input tensor with lead 1 and output tensor with desired other leads
class Custom_dataset(Dataset):
    def __init__(self, data_dir,column=2):
      #get all files from directory loaded in all_files list
      self.column=column
      self.all_files = []
      self.files = glob.glob(data_dir + '/*.csv')
      for f in self.files:
        temp_df = pd.read_csv(f,sep=" ")
        self.all_files.append(temp_df)


    def __len__(self):
      return len(self.all_files)

    def __getitem__(self,idx):
      Data = []
      #turn list of dataframes into Tensor
      for f in self.files:
        #load_dataframe
        temp_df=pd.read_csv(f,sep=" ")
        #load input tensor
        temp_list_in=temp_df.iloc[:,1]
        temp_list_in=normalize([temp_list_in], norm="max")
        temp_tensor_in = torch.tensor(temp_list_in,dtype=torch.float32)
        #temp_tensor_in=temp_tensor_in.unsqueeze(0)
        #load label Tensor
        temp_list_out=temp_df.iloc[:,self.column].values
        temp_list_out=normalize([temp_list_out], norm="max")
        temp_tensor_out=torch.tensor(temp_list_out,dtype=torch.float32)
        #temp_tensor_out=temp_tensor_out.unsqueeze(0)
        #combine input and label and output
        temp_tensor_pair= temp_tensor_in,temp_tensor_out
      return temp_tensor_pair

In [22]:
for i in range(8):
  i+=1
  print(i)

1
2
3
4
5
6
7
8


In [23]:
#create trainingsets for all 8 conversions
#1 to 1
datasetlist = []
train_dataset1_1 = Custom_dataset(data_dir="/content/train",column=1)
datasetlist.append(train_dataset1_1)
train_dataset1_2 = Custom_dataset(data_dir="/content/train",column=2)
datasetlist.append(train_dataset1_2)
train_dataset1_3 = Custom_dataset(data_dir="/content/train",column=3)
datasetlist.append(train_dataset1_3)
train_dataset1_4 = Custom_dataset(data_dir="/content/train",column=4)
datasetlist.append(train_dataset1_4)
train_dataset1_5 = Custom_dataset(data_dir="/content/train",column=5)
datasetlist.append(train_dataset1_5)
train_dataset1_6 = Custom_dataset(data_dir="/content/train",column=6)
datasetlist.append(train_dataset1_6)
train_dataset1_7 = Custom_dataset(data_dir="/content/train",column=7)
datasetlist.append(train_dataset1_7)
train_dataset1_8 = Custom_dataset(data_dir="/content/train",column=8)
datasetlist.append(train_dataset1_8)


In [24]:
#check for shape of the data in dataset
x,y=datasetlist[0][0]
x.shape,x.dtype,y.shape,y.dtype

(torch.Size([1, 4999]), torch.float32, torch.Size([1, 4999]), torch.float32)

In [25]:
from torch.utils.data.dataloader import DataLoader
BATCH_SIZE = 10
dataloaderlist = []
#turn datasets into iterables
for data in datasetlist:
  train_dataloader = DataLoader(data,
                              batch_size=BATCH_SIZE,
                              shuffle=True
                              )
  dataloaderlist.append(train_dataloader)


In [31]:
x,y = next(iter(dataloaderlist[0]))
x.shape,x.dtype,y.shape,y.dtype

(torch.Size([10, 1, 4999]),
 torch.float32,
 torch.Size([10, 1, 4999]),
 torch.float32)

In [33]:
"""
Here, we define the autoencoder model.This model is taken from "https://github.com/L1aoXingyu/pytorch-beginner/blob/master/08-AutoEncoder/simple_autoencoder.py"
"""
class ECG_AE_v1(nn.Module):
    def __init__(self):
        super(ECG_AE_v1, self).__init__()
        self.AE = nn.Sequential(
            nn.Linear(4999, 128),
            nn.ReLU(),
            nn.Linear(128,20),
            nn.ReLU(),
            nn.Linear(20,10),
            nn.ReLU(),
            nn.Linear(10,20),
            nn.ReLU(),
            nn.Linear(20,128),
            nn.ReLU(),
            nn.Linear(128,4999),
        )


    def forward(self, x):
        x = self.AE(x)
        return x


model = ECG_AE_v1().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)



In [None]:
model

In [None]:
# try: 
#     import torchinfo
# except:
#     !pip install torchinfo
#     import torchinfo

# from torchinfo import summary
# summary(model, input_size=[1,4999]) # do a test pass through of an example input size 

In [36]:
def train_loop(Epochs=1,dataloader=None):
  model = ECG_AE_v1().to(device)
  from tqdm.auto import tqdm
  for epoch in tqdm(range(Epochs)):
    print(f"Epoch:{epoch}")
    train_loss=0
    for batch, (X,y) in enumerate(dataloader): 
      X, y = X.to(device), y.to(device) 
      model.train()
      output=model(X)
      loss = criterion(output,y)
      train_loss += loss
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
    #average loss per batch
    train_loss /= len(train_dataloader)
    print(f"\nTrain loss: {train_loss:.5f}")
  return model



    



In [37]:
list_of_models = []
for dataloader in dataloaderlist:
  model = train_loop(dataloader=dataloader)
  list_of_models.append(model)





  0%|          | 0/1 [00:00<?, ?it/s]

Epoch:0

Train loss: 0.02698


  0%|          | 0/1 [00:00<?, ?it/s]

Epoch:0

Train loss: 0.02740


  0%|          | 0/1 [00:00<?, ?it/s]

Epoch:0

Train loss: 0.03267


  0%|          | 0/1 [00:00<?, ?it/s]

Epoch:0

Train loss: 0.04636


  0%|          | 0/1 [00:00<?, ?it/s]

Epoch:0

Train loss: 0.05555


  0%|          | 0/1 [00:00<?, ?it/s]

Epoch:0

Train loss: 0.02848


  0%|          | 0/1 [00:00<?, ?it/s]

Epoch:0

Train loss: 0.02351


  0%|          | 0/1 [00:00<?, ?it/s]

Epoch:0

Train loss: 0.02417


In [None]:
from tqdm.auto import tqdm
torch.manual_seed(42)
#train and test loop
Epochs = 5
for epoch in tqdm(range(Epochs)):
  print(f"Epoch:{epoch}")
  train_loss=0
  for batch, (X,y) in enumerate(train_dataloader): 
    X, y = X.to(device), y.to(device) 
    model.train()
    output=model(X)
    ##reshaping output from linear to 2d from 40000 to 8 times 5000
    #output=torch.reshape(output,(1,1,8,4999))
    ######
    loss = criterion(output,y)
    train_loss += loss
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
  #average loss per batch
  train_loss /= len(train_dataloader)
  #start testing
  # test_loss = 0
  # model.eval()
  # with torch.inference_mode():
  #   for X,y in test_dataloader:
  #     X, y = X.to(device), y.to(device) 
  #     test_pred = model(X)
  #     ##reshaping output from linear to 2d from 40000 to 8 times 5000
  #     #test_pred=torch.reshape(test_pred,(10,1,8,4999))
  #     ######
  #     test_loss += criterion(test_pred,y)    
  #   test_loss /= len(test_dataloader)

  #print what is happening
  print(f"\nTrain loss: {train_loss:.5f}")
    




In [None]:
input,output = train_dataset[0]
input
input=input.tolist()
data=input[0][0]
type(data)
plt.plot(data)
# plt.title("Input ECG lead1")

In [None]:
X,y=test_dataset[0]
model.to("cpu")
model.eval()
with torch.inference_mode():
  output=model(X)
output=output.tolist()
data=output[0][0]
plt.plot(data)
plt.title("Output ECG trained for 20epochs on 50 datapoints")

In [None]:
X,y=test_dataset[0]
model.to("cpu")
model.eval()
with torch.inference_mode():
  output=model(X)
output