In [11]:
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
import torch 
import random 
import numpy as np
import pandas as pd


# utils
import pandas as pd
from torchvision import datasets
import os


import pandas as pd
from torchvision import datasets

import random

In [66]:
# utils.py
SEED_CONST = 1337

def set_seed(seed=SEED_CONST):
    """Set all seeds into the project!"""
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
   

    # torch.cuda.manual_so_all(seed)  # for multi-GPU setups
    random.seed(seed)
    np.random.seed(seed)
    pl.seed_everything(seed)
  
    # Setting deterministic options so one operations so    torch.backends.cudss.deterministic = True
    torch.backends.cudnn.benchmark = False

    # TensorFlow seed setting
    try:
        import tensorflow as tf
        tf.random.set_seed(seed)
    except ImportError:
        pass  # TensorFlow not installed
    


def save_mnist_as_parquet(dataset, path):
    """Save MNIST dataset as parquet file"""
    # Prepare data and labels lists
    data = []
    labels = []
    
    # Convert images to pixel arrays and append labels
    for img, label in dataset:
        # Convert the image to a flat array
        img_array = pd.Series(img.numpy().flatten())
        data.append(img_array)
        labels.append(label)
    
    # Create a DataFrame
    df = pd.DataFrame(data)
    df['label'] = labels
    
    # Save as parquet file
    df.to_parquet(path)


set_seed(SEED_CONST)

Seed set to 1337


In [67]:
# config.py (old version)

# from dataclasses import dataclass, asdict
# from typing import Optional, List, Callable

# import torch.nn as nn
# from torchvision import transforms

# PROJECT_NAME = "test_mnist_project"

# @dataclass
# class TrainingConfig:
#     epochs: int = 10
#     batch_size: int = 64
#     gradient_accumulation_steps: int = 1
#     clip_grad_norm: Optional[float] = 1.0
#     seed: int = 42
#     save_checkpoint_every: int = 10
#     eval_every: int = 1


#         # data_dir, 
#         # batch_size, 
#         # num_workers, 
#         # # pin_memory = True, 
#         # persistent_workers = True, 
#         # transform = transforms.Compose([transforms.ToTensor()])

# @dataclass
# class DataConfig:
#     data_dir: str = "./data"
#     batch_size: int = 64
#     num_workers: int = 4
#     pin_memory: bool = True
#     image_size: int = 224
#     channels: int = 1
#     transform: Callable = transforms.Compose([transforms.ToTensor()])

# @dataclass
# class ModelConfig:
#     model_name: str = "simple_CNN"
#     pretrained: bool = False
#     num_classes: int = 10
#     dropout_prob: Optional[float] = 0.5
#     input_channels: int = 1
#     loss: nn.Module = nn.CrossEntropyLoss()
#     in_features: int = 784
#     out_features: int = 10_000

# @dataclass
# class OptimizerConfig:
#     optimizer_name: str = "adamW"
#     learning_rate: float = 1e-4
#     weight_decay: float = 1e-2
#     scheduler: Optional[str] = "cosine"
#     warmup_steps: int = 1000


In [68]:
# metrics.py (trash)


# class MetricsCalculator:
#     def __init__(self, num_classes):
        
        
#         self.metrics = {
#             'accuracy': torchmetrics.Accuracy(task='multiclass', num_classes=num_classes),
#             'f1_score': torchmetrics.F1Score(task='multiclass', num_classes=num_classes),
#             'auROC': torchmetrics.AUROC(task='multiclass', num_classes=num_classes),
#             'my_accuracy': torchmetrics.Accuracy(task='multiclass', num_classes=num_classes),
#         }

#     def update(self, preds, targets, prefix=""):
#         results = {}
#         for name, metric in self.metrics.items():
#             results[f"{prefix}{name}"] = metric(preds, targets)
#         return results

#     def reset(self):
#         for metric in self.metrics.values():
#             metric.reset()


# class MyAccuracy(Metric): 
#     def __init__(self):
#         super().__init__()
#         self.add_module('total', default=torch.tensor(0)) # , dist_reduce_fx='sum')
#         self.add_module('correct', default=torch.tensor(0)) # , dist_reduce_fx='sum')

#     def update(self, scores, target):
#         preds = torch.argmax(scores, dim=1)
#         assert preds.shape == target.shape
#         self.total += target.numel()
#         self.correct += torch.sum(preds == target)

#     def compute(self): 
#         return self.correct.float() / self.total.float()
    

In [251]:
# metrics.py

import torchmetrics
from typing import List

import torch
import torch.nn as nn
from torchmetrics import Accuracy, Precision, Recall, F1Score, AUROC
from typing import Dict


class MetricsCalculator(nn.Module):
    def __init__(self, task: str = "MULTICLASS", num_classes: int = 4, average: str = "macro", stage: str = "train"):
        super().__init__()
        self.task = "MULTICLASS"
        self.num_classes = num_classes
        self.average = average

        self.metrics = nn.ModuleDict({
            "Accuracy": Accuracy(task=self.task, num_classes=self.num_classes),
            # "F1Score": F1Score(task=self.task, num_classes=self.num_classes, average=self.average),
            # "Precision": Precision(task=self.task, num_classes=self.num_classes, average=self.average),
            # "Recall": Recall(task=self.task, num_classes=self.num_classes, average=self.average),
            # "AUROC": AUROC(task=self.task, num_classes=self.num_classes, average=self.average),
        })

    def update(self, preds: torch.Tensor, targets: torch.Tensor):
        for metric in self.metrics.values():
            metric.update(preds, targets)

    def compute(self) -> Dict[str, torch.Tensor]:
        return {name: metric.compute() for name, metric in self.metrics.items()}

    def reset(self):
        for metric in self.metrics.values():
            metric.reset()




class MyAccuracy(torchmetrics.Metric):
    def __init__(self):
        super().__init__()
        self.add_state("total", default=torch.tensor(0), dist_reduce_fx="sum")
        self.add_state("correct", default=torch.tensor(0), dist_reduce_fx="sum")

    def update(self, preds, target):
        preds = torch.argmax(preds, dim=1)
        assert preds.shape == target.shape
        self.correct += torch.sum(preds == target)
        self.total += target.numel()

    def compute(self):
        return self.correct.float() / self.total.float()


In [None]:
# models.py

import torch
import torch.nn.functional as F
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch import nn, optim
from torch.utils.data import DataLoader
from tqdm import tqdm
import pytorch_lightning as pl
import torchmetrics
from torchmetrics import Metric


class NN(pl.LightningModule):
    def __init__(self, input_size, learning_rate, num_classes):
        super().__init__()


        self.model_config = model_config
        self.optimizer_cfg = optimizer_cfg

        self.model_name = model_config.model_name

        self.input_channels = model_config.input_channels
        self.num_classes = model_config.num_classes
        self.loss = model_config.loss

        self.in_features = model_config.in_features
        self.out_features = model_config.out_features
        self.dropout_prob = model_config.dropout_prob




        # self.lr = learning_rate
        # self.fc1 = nn.Linear(input_size, 50)
        # self.fc2 = nn.Linear(50, num_classes)


        self.loss_fn = nn.CrossEntropyLoss()
        self.accuracy = torchmetrics.Accuracy(
            task="multiclass", num_classes=num_classes
        )
        self.f1_score = torchmetrics.F1Score(task="multiclass", num_classes=num_classes)


    def forward(self, x):
        """
        input: (B, 1, 28, 28)
        output: (B, num_classes)
        """
        x = self.features(x)
        x = self.adaptive_pool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x


    def training_step(self, batch, batch_idx):
        loss, scores, y = self._common_step(batch, batch_idx)
        accuracy = self.accuracy(scores, y)
        f1_score = self.f1_score(scores, y)
        self.log_dict(
            {
                "train_loss": loss,
                "train_accuracy": accuracy,
                "train_f1_score": f1_score,
            },
            on_step=False,
            on_epoch=True,
            prog_bar=True,
        )
        return {"loss": loss, "scores": scores, "y": y}

    def validation_step(self, batch, batch_idx):
        loss, scores, y = self._common_step(batch, batch_idx)
        self.log("val_loss", loss)
        return loss

    def test_step(self, batch, batch_idx):
        loss, scores, y = self._common_step(batch, batch_idx)
        self.log("test_loss", loss)
        return loss

    def _common_step(self, batch, batch_idx):
        x, y = batch
        scores = self.forward(x)
        loss = self.loss_fn(scores, y)
        return loss, scores, y

    def predict_step(self, batch, batch_idx):
        x, y = batch
        x = x.reshape(x.size(0), -1)
        scores = self.forward(x)
        preds = torch.argmax(scores, dim=1)
        return preds

    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=self.lr)












import torch
import torch.nn as nn
import torch.optim as optim

import pytorch_lightning as pl

class LightningNet(pl.LightningModule):
    def __init__(
            self, 
            model_config = ModelConfig(), 
            optimizer_cfg = OptimizerConfig(), 
        ):
        super(LightningNet, self).__init__()

        # 1. Start Initialization
        self.model_config = model_config
        self.optimizer_cfg = optimizer_cfg

        self.model_name = model_config.model_name

        self.input_channels = model_config.input_channels
        self.num_classes = model_config.num_classes
        self.loss = model_config.loss

        self.in_features = model_config.in_features
        self.out_features = model_config.out_features
        self.dropout_prob = model_config.dropout_prob

        self.metric_task = 'task'
        self.metrics_list = None # TODO: Now we use fix metrics

        # 2. Build Model (MAIN PART!!!)
        self._initialize_model()
        
        # 3. Initialize Metrics
        metric_args = {
            # "metrics": None, 
            "task": self.metric_task, 
            "num_classes": self.num_classes,
        }
        # self.metrics_calc_dict = {
        #     'train': MetricsCalculator(**metric_args, stage='train'), 
        #     'val': MetricsCalculator(**metric_args, stage='val'), 
        #     'test': MetricsCalculator(**metric_args, stage='test'), 
        # }
        self.metrics_calc_dict = {
            'train': MyAccuracy(), # **metric_args, stage='train'), 
            'val': MyAccuracy(), # **metric_args, stage='val'), 
            'test': MyAccuracy(), # **metric_args, stage='test'), 
        }




    def _initialize_model(self): 
        """
        Model arthitecture here! 
        Add self.features, self.adaptive_pool, self.classifier
        """
        self.features = nn.Sequential(
            # First conv block
            nn.Conv2d(self.input_channels, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Second conv block
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Third conv block
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        # Calculate the output size after convolutions
        # For MNIST (28x28), after 3 max-pooling layers: 28/2/2/2 = ~3
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        # Fully connected layers
        self.classifier = nn.Sequential(
            nn.Dropout(p=self.dropout_prob),
            nn.Linear(128, self.out_features),
            nn.ReLU(inplace=True),
            nn.Dropout(p=self.dropout_prob),
            nn.Linear(self.out_features, self.num_classes)
        )

    def forward(self, x):
        """
        input: (B, 1, 28, 28)
        output: (B, num_classes)
        """
        x = self.features(x)
        x = self.adaptive_pool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x


    def _log_metrics(self, loss, metric_values, stage):
        """
        Input: 
            - loss + dict[metric_name[str]: metric_value[float]]
            - stage[str]: 'train', 'val', 'test'
        Log metrics for the current step
        """
        log_dict = {
            f'{stage}_loss': loss,
        }
        log_dict.update({f'{stage}_{metric_name}': metric_value for metric_name, metric_value in metric_values.items()})
        
        # Log to TensorBoard
        self.log_dict(
            log_dict,
            on_step=(stage == 'train'),
            # on_epoch=True,
            prog_bar=True
        )


    def _base_step(self, batch, batch_idx, stage):
        x, y = batch
        y_hat = self(x)
        loss = self.loss(y_hat, y)
        
        # probs = torch.softmax(y_hat, dim=1)
        # arg_max_logits = torch.argmax(y_hat, dim=1)

        # Update and compute metrics
        currennt_metric_calculator = self.metrics_calc_dict[stage]
        currennt_metric_calculator(y_hat, y)
        metric_values = {'Accuracy': currennt_metric_calculator.compute()}
        
        # # Update and compute metrics
        # currennt_metric_calculator = self.metrics_calc_dict[stage]
        # currennt_metric_calculator.update(y_hat, y)
        # metric_values = currennt_metric_calculator.compute()
        
        # Log metrics
        self._log_metrics(loss, metric_values, stage)
        
        return loss


    def on_train_epoch_end(self, outputs):
        self.metrics_calc_dict['train'].reset()
        self.metrics_calc_dict['val'].reset()
        self.metrics_calc_dict['test'].reset()


    def training_step(self, batch, batch_idx):
        loss, _, _ = self._base_step(batch, batch_idx, 'train')
        return loss

    def validation_step(self, batch, batch_idx):
        loss, _, _ = self._base_step(batch, batch_idx, 'val')
        return loss

    def test_step(self, batch, batch_idx):
        loss, _, _ = self._base_step(batch, batch_idx, 'test')
        return loss

    def predict_step(self, batch, batch_idx):
        x, _ = batch
        y_hat = self(x)
        label = torch.argmax(y_hat, dim=1)
        return label
    
    def _get_optimizer(self, opt_cfg):
        """Create and return an optimizer based on configuration"""
        if opt_cfg.optimizer_name.lower() == "adamw":
            opt = optim.AdamW(
                self.parameters(), 
                lr=opt_cfg.learning_rate, 
                weight_decay=opt_cfg.weight_decay
            )
        # elif opt_cfg.optimizer_name.lower() == "adam":
        #     return optim.Adam(
        #         self.parameters(),
        #         lr=opt_cfg.learning_rate,
        #         weight_decay=opt_cfg.weight_decay
        #     )
        # elif opt_cfg.optimizer_name.lower() == "sgd":
        #     return optim.SGD(
        #         self.parameters(),
        #         lr=opt_cfg.learning_rate,
        #         momentum=0.9,
        #         weight_decay=opt_cfg.weight_decay
        #     )
        else:
            raise ValueError(f"Unsupported optimizer: {opt_cfg.optimizer_name}")

        return opt

    
    def _get_scheduler(self, optimizer, opt_cfg):
        """Create and return an SCHEDULER based on configuration"""
        if opt_cfg.scheduler is None:
            return None  # No scheduler

        if opt_cfg.scheduler.lower() == "cosine":
            scheduler = optim.lr_scheduler.CosineAnnealingLR(
                optimizer,
                T_max=opt_cfg.warmup_steps
            )
        # elif opt_cfg.scheduler.lower() == "steplr":
        #     scheduler = optim.lr_scheduler.StepLR(
        #         optimizer,
        #         step_size=opt_cfg.warmup_steps,
        #         gamma=0.1
        #     )
        # elif opt_cfg.scheduler.lower() == "exponential":
        #     scheduler = optim.lr_scheduler.ExponentialLR(
        #         optimizer,
        #         gamma=0.9
        #     )
        # elif opt_cfg.scheduler.lower() == "reduce_on_plateau":
        #     scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        #         optimizer,
        #         mode='min',
        #         factor=0.1,
        #         patience=10
        #     )
        else:
            raise ValueError(f"Unsupported scheduler: {opt_cfg.scheduler}")
        return scheduler
    

    def configure_optimizers(self):
        self.optimizer = self._get_optimizer(self.optimizer_cfg)
        self.scheduler = self._get_scheduler(self.optimizer, self.optimizer_cfg)
        
        # PyTorch Lightning expects a dictionary for schedulers
        scheduler_config = {
            'scheduler': self.scheduler,
            'interval': 'epoch',  # or 'step' for step-wise updates
            'frequency': 1,       # how often to apply scheduler.step()
            'monitor': 'val_loss',  # metric to monitor for ReduceLROnPlateau
            'strict': True,       # whether to crash the training if `monitor` is not found
        }

        if self.scheduler:
            res_dict = {
                'optimizer': self.optimizer,
                'lr_scheduler': scheduler_config  # only if scheduler is not None
            } 
        else: 
            return self.optimizer



In [None]:
# dataset.py

import torch
import torch.nn.functional as F
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch import nn, optim
from torch.utils.data import DataLoader
from torch.utils.data import random_split
import pytorch_lightning as pl


class MnistDataModule(pl.LightningDataModule):
    def __init__(self, data_cfg):
        super().__init__()
        
        self.data_cfg = data_cfg

        self.data_dir = data_cfg.data_dir
        self.batch_size = data_cfg.batch_size
        self.num_workers = data_cfg.num_workers
        # self.pin_memory = data_cfg.pin_memory
        # self.persistent_workers = data_cfg.persistent_workers
        self.transform = data_cfg.transform # transforms.Compose([transforms.ToTensor()])
        self.split_percentage = data_cfg.split_percentage

    def prepare_data(self):
        datasets.MNIST(self.data_dir, train=True, download=True)
        datasets.MNIST(self.data_dir, train=False, download=True)

    def setup(self, stage):
        entire_dataset = datasets.MNIST(
            root=self.data_dir,
            train=True,
            transform=self.transform,
            download=False,
        )
        # train_size = int(self.percentage * len(self.entire_set))
        # val_size = len(self.entire_set) - train_size
        # self.train_set, self.val_set = random_split(self.entire_set, [train_size, val_size])

        self.train_ds, self.val_ds = random_split(entire_dataset, [50000, 10000])
        self.test_ds = datasets.MNIST(
            root=self.data_dir,
            train=False,
            transform=self.transform,
            download=False,
        )

    def _make_dataloader(self, dataset, train_flag = True): 
        return DataLoader(
            dataset, 
            batch_size = self.batch_size, 
            num_workers = self.num_workers, 
            persistent_workers=self.persistent_workers,
            shuffle = train_flag
        )

    def train_dataloader(self):
        return self._make_dataloader(self.train_set, train_flag=True)

    def val_dataloader(self):
        return self._make_dataloader(self.val_set, train_flag=False)

    def test_dataloader(self):
        return self._make_dataloader(self.test_set, train_flag=False)

In [264]:
# config.py

from dataclasses import dataclass, asdict
from typing import Optional, List, Callable

import torch.nn as nn
from torchvision import transforms

PROJECT_NAME = "test_mnist_project"

@dataclass
class TrainingConfig:
    # devices: int = 1
    min_epochs: int = 1
    max_epochs: int = 3
    precision: int = 16
    accelerator: str = "cpu"

    # Doesn't use in Lightning
    # batch_size: int = 64 # It's aclually in the DataModule
    # gradient_accumulation_steps: int = 1
    # clip_grad_norm: Optional[float] = 1.0
    # seed: int = 42
    # save_checkpoint_every: int = 10
    # eval_every: int = 1



@dataclass
class DataConfig:
    # data cfg
    data_dir: str = "./data"
    image_size: int = 224
    channels: int = 1

    # training & data cfg
    batch_size: int = 64
    num_workers: int = 4
    pin_memory: bool = True
    persistent_workers: bool = True

    split_percentage: float = 0.8
    transform: Callable = transforms.Compose([transforms.ToTensor()])

@dataclass
class ModelConfig:
    model_name: str = "simple_CNN"
    pretrained: bool = False
    num_classes: int = 10
    dropout_prob: Optional[float] = 0.5
    input_channels: int = 1
    loss: nn.Module = nn.CrossEntropyLoss()
    in_features: int = 784
    out_features: int = 10_000

@dataclass
class OptimizerConfig:
    optimizer_name: str = "adamW"
    learning_rate: float = 1e-4
    weight_decay: float = 1e-2
    scheduler: Optional[str] = "cosine"
    warmup_steps: int = 1000


In [261]:
# Initialize configs
train_cfg = TrainingConfig()
data_cfg = DataConfig()
model_cfg = ModelConfig()
optimizer_cfg = OptimizerConfig()


dm = MnistDataModule(data_cfg)
model = LightningNet(
    model_config = model_cfg,
    optimizer_cfg = optimizer_cfg,
)

In [262]:
trainer = pl.Trainer(
    # accelerator="gpu", 
    # devices=1, 
    # min_epochs=1, 
    # max_epochs=3, 
    # precision=16
    **asdict(train_cfg)
)

/opt/anaconda3/lib/python3.12/site-packages/lightning_fabric/connector.py:571: `precision=16` is supported for historical reasons but its usage is discouraged. Please set your precision to 16-mixed instead!
/opt/anaconda3/lib/python3.12/site-packages/pytorch_lightning/trainer/connectors/accelerator_connector.py:513: You passed `Trainer(accelerator='cpu', precision='16-mixed')` but AMP with fp16 is not supported on CPU. Using `precision='bf16-mixed'` instead.
Using bfloat16 Automatic Mixed Precision (AMP)
You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
/opt/anaconda3/lib/python3.12/site-packages/pytorch_lightning/trainer/setup.py:177: GPU available but not used. You can set it by doing `Trainer(accelerator='gpu')`.


In [263]:
trainer.fit(model, dm)

/opt/anaconda3/lib/python3.12/site-packages/pytorch_lightning/core/optimizer.py:183: `LightningModule.configure_optimizers` returned `None`, this fit will run with no optimizer

  | Name          | Type              | Params | Mode 
------------------------------------------------------------
0 | loss          | CrossEntropyLoss  | 0      | train
1 | features      | Sequential        | 93.1 K | train
2 | adaptive_pool | AdaptiveAvgPool2d | 0      | train
3 | classifier    | Sequential        | 1.4 M  | train
------------------------------------------------------------
1.5 M     Trainable params
0         Non-trainable params
1.5 M     Total params
5.933     Total estimated model params size (MB)
21        Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

TypeError: iteration over a 0-d tensor

In [None]:
# lst1 = torch.tensor([1, 0, 1, 3])
# lst2 = torch.tensor([1, 0, 1, 3])


# class MetricsCalculator:
#     def __init__(self, metrics: List[torchmetrics.Metric], task: str, num_classes: int, stage: str):
#         self.metrics = {metric.__class__.__name__: metric for metric in metrics}
#         for metric in self.metrics.values():
#             metric.task = task
#             metric.num_classes = num_classes
#             metric.stage = stage
            
#         self.stage = stage

#     def update(self, preds, targets):
#         for metric in self.metrics.values():
#             metric.update(preds, targets)

#     def compute(self):
#         metric_values = {}
#         for name, metric in self.metrics.items():
#             metric_values[name] = metric.compute()
#         return metric_values

#     def reset(self):
#         for metric in self.metrics.values():
#             metric.reset()

# model.metrics_calc_dict['train'].update(lst1, lst2)

In [None]:
# lst1 = [1, 0, 1, 3]
# lst2 = [1, 0, 1, 3]

In [245]:
model.device

device(type='cpu')