In [43]:
import kagglehub

DS_PATH = kagglehub.dataset_download("uciml/iris")
print("Path to dataset files:", DS_PATH)

DS_FILE = f"{DS_PATH}/Iris.csv"


Path to dataset files: C:\Users\0\.cache\kagglehub\datasets\uciml\iris\versions\2


In [44]:
import pandas as pd

df = pd.read_csv(DS_FILE)
df


Unnamed: 0,Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
0,1,5.1,3.5,1.4,0.2,Iris-setosa
1,2,4.9,3.0,1.4,0.2,Iris-setosa
2,3,4.7,3.2,1.3,0.2,Iris-setosa
3,4,4.6,3.1,1.5,0.2,Iris-setosa
4,5,5.0,3.6,1.4,0.2,Iris-setosa
...,...,...,...,...,...,...
145,146,6.7,3.0,5.2,2.3,Iris-virginica
146,147,6.3,2.5,5.0,1.9,Iris-virginica
147,148,6.5,3.0,5.2,2.0,Iris-virginica
148,149,6.2,3.4,5.4,2.3,Iris-virginica


[Задание](https://www.kaggle.com/datasets/uciml/iris/data)

[Урок](https://pytorch-lighting.readthedocs.io/en/stable/starter/introduction.html)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
import numpy as np

RANDOM_SEED = 42
TEST_SIZE = 0.15

X = df.drop(columns=["Id", "Species"]).values.astype(np.float64)
Y = LabelEncoder().fit_transform(df["Species"]).astype(np.int64)  # type: ignore

X_train, X_test, Y_train, Y_test = train_test_split(
    X,
    Y,
    test_size=TEST_SIZE,
    random_state=RANDOM_SEED,
    stratify=Y,
)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)


In [None]:
import torch


class IrisDS(torch.utils.data.Dataset):
    def __init__(self, df: pd.DataFrame):
        X = df.drop(columns=["Id", "Species"]).to_numpy(dtype=np.float32)
        Y = pd.Categorical(df["Species"]).codes.astype(np.int64)

        self.X = torch.from_numpy(X)
        self.Y = torch.from_numpy(Y)

        self.mean = self.X.mean(0, keepdim=True)
        self.std = self.X.std(0, unbiased=False, keepdim=True).clamp_min(1e-6)
        self.X = (self.X - self.mean) / self.std

        if len(self.X) != len(self.Y):
            raise ValueError(
                f"Length of X ({len(self.X)}) and Y ({len(self.Y)}) must be equal"
            )

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]


In [47]:
from torch import optim, nn
import pytorch_lightning as pl


class IrisModel(pl.LightningModule):
    def __init__(
        self, iris_columns=4, species_num=3, lr=1e-3, lr_decay=1e-7, weight_decay=1e-4
    ):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(iris_columns, 5),
            nn.ReLU(),
            nn.Linear(5, 25),
            nn.ReLU(),
            nn.Linear(25, 35),
            nn.ReLU(),
            nn.Linear(35, 5),
            nn.ReLU(),
            nn.Linear(5, species_num),
        )
        self.loss_fn = nn.CrossEntropyLoss()
        self.lr = lr
        self.lr_decay = lr_decay
        self.weight_decay = weight_decay

    def forward(self, x):
        return self.net(x)

    def __step(self, batch, stage):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        preds = logits.argmax(dim=1)
        acc = (preds == y).float().mean()
        self.log(f"{stage}_loss", loss, prog_bar=True)
        self.log(f"{stage}_acc", acc, prog_bar=True)
        return loss

    def training_step(self, batch, _):
        return self.__step(batch, "train")

    def validation_step(self, batch, _):
        self.__step(batch, "val")

    def test_step(self, batch, _):
        self.__step(batch, "test")

    def configure_optimizers(self):
        optimizer = optim.Adam(
            self.parameters(), lr=self.lr, weight_decay=self.weight_decay
        )

        def handle_lr(step):
            new_lr = self.lr - self.lr_decay * step
            return max(new_lr, 0.0) / self.lr

        scheduler = optim.lr_scheduler.LambdaLR(
            optimizer=optimizer,
            lr_lambda=handle_lr,
        )

        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "interval": "step",
            },
        }


In [48]:
MAX_EPOCHS = 250
ENABLE_CHECKPOINTING = False
LOGGER = False

ds = IrisDS(df)
loader = torch.utils.data.DataLoader(
    ds,
    batch_size=8,
    shuffle=True,
)

model = IrisModel()
trainer = pl.Trainer(
    max_epochs=MAX_EPOCHS,
    enable_checkpointing=ENABLE_CHECKPOINTING,
    logger=LOGGER,
)

trainer.fit(model, loader)


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
c:\Users\0\anaconda3\envs\pytorch\Lib\site-packages\pytorch_lightning\trainer\configuration_validator.py:70: You defined a `validation_step` but have no `val_dataloader`. Skipping val loop.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name    | Type             | Params | Mode 
-----------------------------------------------------
0 | net     | Sequential       | 1.3 K  | train
1 | loss_fn | CrossEntropyLoss | 0      | train
-----------------------------------------------------
1.3 K     Trainable params
0         Non-trainable params
1.3 K     Total params
0.005     Total estimated model params size (MB)
11        Modules in train mode
0         Modules in eval mode
c:\Users\0\anaconda3\envs\pytorch\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:433: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasi

Epoch 249: 100%|██████████| 19/19 [00:00<00:00, 236.44it/s, train_loss=0.0341, train_acc=1.000]  

`Trainer.fit` stopped: `max_epochs=250` reached.


Epoch 249: 100%|██████████| 19/19 [00:00<00:00, 233.53it/s, train_loss=0.0341, train_acc=1.000]


In [49]:
trainer.test(model, loader)


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
c:\Users\0\anaconda3\envs\pytorch\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:484: Your `test_dataloader`'s sampler has shuffling enabled, it is strongly recommended that you turn shuffling off for val/test dataloaders.
c:\Users\0\anaconda3\envs\pytorch\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:433: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=23` in the `DataLoader` to improve performance.


Testing DataLoader 0: 100%|██████████| 19/19 [00:00<00:00, 442.91it/s]
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_acc            0.9866666793823242
        test_loss           0.03555147349834442
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 0.03555147349834442, 'test_acc': 0.9866666793823242}]