In [11]:
import torch as th 
import torch.nn as nn 
import torch.nn.functional as F 
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pytorch_lightning as pl 
from pytorch_lightning import Trainer, LightningModule

import torchdyn 
from torchdyn.core import NeuralODE

import os
import cv2 
import numpy as np 
import matplotlib.pyplot as plt 

import warnings 
warnings.filterwarnings("ignore")

## DATA

In [12]:
class CircuitFused(Dataset):
    def __init__(self, datadir, image_size):
        super().__init__()
        self.datadir = datadir
        self.image_size = image_size
        self.images, self.labels = self.datareader()

    def datareader(self):
        X = []
        Y = []
        folders = os.listdir(self.datadir)
        for c, folder in enumerate(folders):
            try:
                files = os.listdir(self.datadir+folder)
                for file in files:
                    image = cv2.imread(self.datadir+folder+"/"+file)
                    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                    image = cv2.resize(image, (self.image_size[0], self.image_size[1]), cv2.INTER_AREA)/255.0
                    image = np.array(image, dtype= np.float32)
                    arr = np.zeros(15, dtype="float32")
                    arr[int(c)]=1
                    image = image.reshape(1, self.image_size[0], self.image_size[1])
                    X.append(image)
                    Y.append(arr)
            except:
                pass
        return X, Y

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        label = th.from_numpy(np.array(self.labels[idx]))
        image = th.from_numpy(np.array(self.images[idx]))
        sample = {"Image": image, "Label": label}
        return sample

## MODELS

### CNNODE Block

In [13]:
class CNNODEBlock(nn.Module):
    def __init__(self, filters:int, kernel:int, moment:float, alpha=0.1):
        super(CNNODEBlock, self).__init__()
        self.conv = nn.Conv2d(filters, filters, kernel)
        self.act = nn.LeakyReLU(alpha)
        self.pad = nn.ZeroPad2d(int((kernel-1)/2))
        self.norm = nn.BatchNorm2d(filters, momentum=moment)

    def forward(self, x):
        return self.act(self.norm(self.pad(self.conv(x))))

### UPsampling Conv Block 

In [14]:
class UPBlock(nn.Module):
    def __init__(self, infilter:int, outfilter:int, kernel:int, moment:float):
        super(UPBlock, self).__init__()
        self.conv1 = nn.Conv2d(infilter, outfilter, kernel)
        self.norm1 = nn.BatchNorm2d(outfilter, momentum=moment)

        self.conv2 = nn.Conv2d(outfilter, outfilter, kernel)
        self.norm2 = nn.BatchNorm2d(outfilter, momentum=moment)

        self.act = nn.ReLU()
        self.pad = nn.ZeroPad2d(int((kernel-1)/2.0))
        self.pool = nn.MaxPool2d(2, 2)
    
    def forward(self, x):
        x = self.act(self.norm1(self.pad(self.conv1(x))))
        x = self.act(self.norm2(self.pad(self.conv2(x))))
        return self.pool(x)

### Dense Block

#### Dense Block 1.0

In [15]:
class DenseBlock(nn.Module):
    def __init__(self, filter:int, dense:int, drop:float, classes:int):
        super(DenseBlock, self).__init__()
        self.pool = nn.AdaptiveAvgPool2d((1,1))
        self.flat = nn.Flatten()
        self.dense = nn.Linear(filter, dense)
        self.drop = nn.Dropout(drop)
        self.final = nn.Linear(dense, classes)
    
    def forward(self,x):
        x = self.dense(self.drop(self.flat(self.pool(x))))
        return F.softmax(self.final(self.drop(F.relu(x))), dim=-1)

#### Dense Block 2.0

In [16]:
class DenseODE(nn.Module):
    def __init__(self, dense:int, drop:int):
        super(DenseODE, self).__init__()
        self.den1 = nn.Linear(dense, dense*2)
        self.act = nn.Tanh()
        self.den2 = nn.Linear(dense*2, dense)

    def forward(self, x):
        return self.den2(self.act(self.den1(x)))


class DenseBlock2(nn.Module):
    def __init__(self, filter:int, dense:int=256, drop:float=0.2, classes:int=15):
        super(DenseBlock2, self).__init__()
        self.pool = nn.AdaptiveAvgPool2d((1,1))
        self.flat = nn.Flatten()
        self.dense = nn.Linear(filter, dense)
        self.drop = nn.Dropout(drop)
        self.final = nn.Linear(dense, classes)
        f = DenseODE(dense, drop)
        self.ODE = NeuralODE(f, sensitivity='adjoint', solver='rk4', solver_adjoint='dopri5', atol_adjoint=1e-4, rtol_adjoint=1e-4)


    def forward(self, x, tspan):
        x = F.relu(self.dense(self.drop(self.flat(self.pool(x)))))
        t, x = self.ODE(x, tspan)
        return F.softmax(self.final(x[-1]), dim=-1)

### CNNODE

In [17]:
class CNNODE(nn.Module):
    def __init__(self, num=3, filter=64, dense=256, classes=15, gf=2, kernel=3, moment=0.9, drop=0.2, aplha=0.1):
        super(CNNODE, self).__init__()
        self.cnnode = nn.ModuleList([])
        self.upsamp = nn.ModuleList([])
        self.final = DenseBlock2(filter*pow(gf, num), dense, drop, classes)
        self.conv = UPBlock(1, filter, kernel, moment)
        
        for _ in range(num):
            f = CNNODEBlock(filter, kernel, moment, aplha)
            model = NeuralODE(f, sensitivity='adjoint', solver='rk4', solver_adjoint='dopri5', atol_adjoint=1e-4, rtol_adjoint=1e-4)
            self.cnnode.append(model)
            self.upsamp.append(UPBlock(filter, filter*2, kernel, moment))
            filter = filter*gf

    def forward(self, x, t_span1, t_span2):
        x = self.conv(x)
        for neuralode, neuralnetwork in zip(self.cnnode, self.upsamp):
            t, x = neuralode(x, t_span1)
            x = neuralnetwork(x[-1])
        return self.final(x, t_span2)  

## TRAIN

### Training Class

In [18]:
class Learner(LightningModule):
    def __init__(self, data:Dataset, model:nn.Module, t_span1:th.tensor, t_span2:th.tensor, batchsize:int=20, learning_rate:float=1e-4, split=0.8):
        super(Learner, self).__init__()
        self.split = int(split*data.__len__())
        self.model, self.t_span1, self.t_span2 = model, t_span1, t_span2
        self.data, self.valid = random_split(data, [self.split, data.__len__()-self.split])
        self.batchsize = batchsize
        self.learning_rate = learning_rate
        

    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_idx):
        x = batch["Image"]
        y = batch["Label"]
        y_hat = self.model(x, self.t_span1, self.t_span2)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        accuracy = self.acc(y_hat, y)
        self.log_dict({"traning_loss": loss, "Accuracy": accuracy}, on_step=True, on_epoch=True, prog_bar=True, logger=False)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x = batch["Image"]
        y = batch["Label"]
        y_hat = self.model(x, self.t_span1, self.t_span2)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        accuracy = self.acc(y_hat, y)
        cur_lr = self.trainer.optimizers[0].param_groups[0]['lr']
        self.log("lr", cur_lr, prog_bar=True, on_step=True)
        self.log_dict({"valid_loss": loss, "valid_Acc": accuracy}, on_step=True, on_epoch=True, prog_bar=True, logger=False)
        return loss
    
    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr =self.learning_rate)
        sch = optim.lr_scheduler.StepLR(
        optimizer, step_size  = 10 , gamma = 0.1)
        return {
            "optimizer":optimizer,
            "lr_scheduler" : {
                "scheduler" : sch,
                "monitor" : "train_loss",
            }
        }
    
    def train_dataloader(self):
        return DataLoader(self.data, batch_size=self.batchsize, shuffle=True)
    
    def val_dataloader(self):

        return DataLoader(self.valid, batch_size=self.batchsize, shuffle=True)
    
    def acc(self, y_pred, y_true):
        c=0
        for pred, true in zip(y_pred, y_true):
            if th.argmax(pred) == th.argmax(true):
                c = c+1
        return c/self.batchsize

### Training Values

In [19]:
PATH = r'./CircuitSolver/'
model = CNNODE()
tconv = th.linspace(0, 1, 10)
tdense = th.linspace(0, 1, 10)
data = CircuitFused(PATH, (64, 64))

Your vector field callable (nn.Module) should have both time `t` and state `x` as arguments, we've wrapped it for you.
Your vector field callable (nn.Module) should have both time `t` and state `x` as arguments, we've wrapped it for you.
Your vector field callable (nn.Module) should have both time `t` and state `x` as arguments, we've wrapped it for you.
Your vector field callable (nn.Module) should have both time `t` and state `x` as arguments, we've wrapped it for you.


### Training 

In [23]:
from pytorch_lightning.callbacks import RichProgressBar
from pytorch_lightning.callbacks.progress.rich_progress import RichProgressBarTheme

progress_bar = RichProgressBar(
    theme=RichProgressBarTheme(
        description="blue",
        progress_bar="green_yellow",
        progress_bar_finished="green1",
        progress_bar_pulse="#6206E0",
        batch_progress="blue",  
        time="black",
        processing_speed="black",
        metrics="black", 
    ),
)
learn = Learner(data, model, tconv, tdense, 25, 1e-5)
trainer = pl.Trainer(min_epochs=30, max_epochs=30, callbacks=progress_bar, accelerator="mps")
trainer.fit(learn)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


Output()

In [24]:
th.save(model, "./NODE2_98.pt")