In [8]:
import os
import lightning as L
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import albumentations as A
import cv2
from torch.utils.data import DataLoader, Dataset
import numpy as np
from albumentations.pytorch import ToTensorV2

class AlbumentationsDataset(Dataset):
    def __init__(self, dataset, transform=None):
        self.dataset = dataset
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        image = np.array(image)

        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
        return image, label
        

class ImageDataModule(L.LightningDataModule):
    def __init__(self, data_dir: str, batch_size: int = 4):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.train_transform = A.Compose([
            A.CLAHE(clip_limit=2.0, tile_grid_size=(8, 8), p=0.8),
            A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1, p=0.5),
            A.GaussianBlur(blur_limit=(3, 5), p=0.5),
            A.Sharpen(p=0.5),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            A.Resize(512, 512),
            ToTensorV2(),
        ])
        self.val_transform = A.Compose([
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            A.Resize(512, 512),
            ToTensorV2(),
        ])


    def prepare_data(self):
        # Setup data here (called only once and on 1 GPU)
        pass

    def setup(self, stage=None):
        # Load datasets
        train_dataset = datasets.ImageFolder(os.path.join(self.data_dir, 'train'))
        val_dataset = datasets.ImageFolder(os.path.join(self.data_dir, 'val'))
        self.train_dataset = AlbumentationsDataset(train_dataset, self.train_transform)
        self.val_dataset = AlbumentationsDataset(val_dataset, self.val_transform)

    def train_dataloader(self):
        return DataLoader(self.train_dataset,num_workers=15, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset,num_workers=15, batch_size=self.batch_size)


In [21]:
import timm
import torch
import lightning as L
from torch import nn
from torch.optim.lr_scheduler import StepLR
import torchmetrics

class ImageClassifier(L.LightningModule):
    def __init__(self, num_classes: int, learning_rate: float = 0.001):
        super().__init__()
        self.save_hyperparameters()
        self.model = timm.create_model('efficientnet_b5', pretrained=True, num_classes=num_classes)
        self.loss_fn = nn.CrossEntropyLoss()
        self.f1_score = torchmetrics.F1Score(num_classes=num_classes, average='weighted')
        self.acc = torchmetrics.Accuracy(num_classes=num_classes)


    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_f1', self.f1_score(logits, y), on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_acc', self.acc(logits, y), on_step=True, on_epoch=True, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        self.log('val_loss', loss, on_epoch=True, prog_bar=True)
        self.log('val_f1', self.f1_score(logits, y), on_epoch=True, prog_bar=True)
        self.log('val_acc', self.acc(logits, y), on_step=True, on_epoch=True, prog_bar=True)

        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)
        scheduler = StepLR(optimizer, step_size=7, gamma=0.1)
        return [optimizer], [scheduler]


In [6]:
import os
import shutil
from PIL import Image, UnidentifiedImageError


def verify_images(root_folder, invalid_folder):
    if not os.path.exists(invalid_folder):
        os.makedirs(invalid_folder)

    for subdir, _, files in os.walk(root_folder):
        for file in files:
            file_path = os.path.join(subdir, file)
            try:
                # First, verify the image file integrity
                with Image.open(file_path) as img:
                    img.verify()  # Verify the image integrity
                
                # Reopen the image to check if it is truncated
                with Image.open(file_path) as img:
                    img.load()  # Ensure the image can be loaded completely
                
                print(f"Verified: {file_path}")
            except (IOError, OSError, UnidentifiedImageError) as e:
                print(f"Error verifying {file_path}: {e}")
                move_invalid_image(file_path, invalid_folder)

def move_invalid_image(file_path, invalid_folder):
    try:
        shutil.move(file_path, invalid_folder)
        print(f"Moved to invalid folder: {file_path}")
    except Exception as e:
        print(f"Error moving {file_path}: {e}")

if __name__ == "__main__":
    root_folder = "path/to/your/folder"
    invalid_folder = "path/to/invalid/folder"
    verify_images(root_folder, invalid_folder)

def move_invalid_image(file_path, invalid_folder):
    try:
        shutil.move(file_path, invalid_folder)
        print(f"Moved to invalid folder: {file_path}")
    except Exception as e:
        print(f"Error moving {file_path}: {e}")


root_folder = "./new_data/"
invalid_folder = "../fail_folder"
#verify_images(root_folder, invalid_folder)


In [None]:
import matplotlib.pyplot as plt
import torch

def imshow(img, title=None):
    """Imshow for Tensor."""
    img = img.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.чтобы229, 0.224, 0.225])
    img = std * img + mean  # unnormalize
    img = np.clip(img, 0, 1)  # clip to interval [0,1]
    plt.imshow(img)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated

def visualize_augmented_images(dataloader, num_images=4):
    """Function to visualize augmented images from a DataLoader"""
    dataiter = iter(dataloader)
    images, labels = next(dataiter)

    # Plot the images
    fig = plt.figure(figsize=(25, 4))
    for idx in np.arange(num_images):
        ax = fig.add_subplot(2, num_images//2, idx+1, xticks=[], yticks=[])
        imshow(images[idx], title=f'Label: {labels[idx]}')

# Assuming `train_dataloader` is your DataLoader
dm = ImageDataModule('new_data/')
dm.setup()
train_loader = dm.train_dataloader()
visualize_augmented_images(train_loader)


In [7]:
import comet_ml
import os
from lightning.pytorch.loggers import CometLogger
torch.set_float32_matmul_precision('medium')
# arguments made to CometLogger are passed on to the comet_ml.Experiment class

CometLogger will be initialized in online mode


In [18]:
#dm = ImageDataModule('new_data/')
model = ImageClassifier(num_classes=3)
trainer = L.Trainer(max_epochs=10, accelerator='gpu',logger = comet_logger, devices=1, precision='16-mixed')  # Set devices=0 if no GPU is available
#trainer.fit(model, datamodule=dm)

Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [None]:
ImageDataModule.val_transform

In [22]:
model = ImageClassifier.load_from_checkpoint("model.ckpt")

In [28]:
transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [29]:
# Assuming `model` is your trained model
model.eval()
model.to('cuda' if torch.cuda.is_available() else 'cpu')

ImageClassifier(
  (model): EfficientNet(
    (conv_stem): Conv2d(3, 48, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn1): BatchNormAct2d(
      48, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
      (drop): Identity()
      (act): SiLU(inplace=True)
    )
    (blocks): Sequential(
      (0): Sequential(
        (0): DepthwiseSeparableConv(
          (conv_dw): Conv2d(48, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=48, bias=False)
          (bn1): BatchNormAct2d(
            48, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
            (drop): Identity()
            (act): SiLU(inplace=True)
          )
          (se): SqueezeExcite(
            (conv_reduce): Conv2d(48, 12, kernel_size=(1, 1), stride=(1, 1))
            (act1): SiLU(inplace=True)
            (conv_expand): Conv2d(12, 48, kernel_size=(1, 1), stride=(1, 1))
            (gate): Sigmoid()
          )
          (conv_pw): Conv2d(48, 24, kernel_siz

In [34]:
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True  #

def predict_image(image_path, model, transform):
    try:
        image = Image.open(image_path).convert('RGB')
        image = transform(image).unsqueeze(0)  # Add batch dimension
        image = image.to('cuda' if torch.cuda.is_available() else 'cpu')
        
        with torch.no_grad():
            output = model(image)
            prediction = torch.argmax(output, dim=1).item()
        
        return prediction
    except IOError as e:
        print(f"Error loading image {image_path}: {e}")
        return None

In [35]:
from tqdm.auto import tqdm
def predict_folder(folder_path, model, transform):
    predictions = []
    for filename in tqdm(os.listdir(folder_path)):
        if filename.endswith(('.png', '.jpg', '.jpeg')):  # Check for image files
            file_path = os.path.join(folder_path, filename)
            pred = predict_image(file_path, model, transform)
            predictions.append((filename, pred))
            
    return predictions

# Example usage
folder_path = '../test/ims/test_minprirodi_Parnokopitnie/'
predictions = predict_folder(folder_path, model, transform)
print(predictions)

  0%|          | 0/917 [00:00<?, ?it/s]

[('test_img_187160067.jpeg', 2), ('test_img_1000229.jpeg', 1), ('test_img_100252267.jpeg', 1), ('test_img_100264991.jpeg', 1), ('test_img_100474106.jpeg', 2), ('test_img_100525803.jpeg', 2), ('test_img_10102168.jpeg', 1), ('test_img_101265901.jpeg', 1), ('test_img_101818505.jpeg', 0), ('test_img_102028858.jpeg', 0), ('test_img_102104273.jpeg', 2), ('test_img_102263044.jpeg', 2), ('test_img_102286217.jpeg', 0), ('test_img_102318908.jpeg', 2), ('test_img_102510488.jpeg', 1), ('test_img_102663583.jpeg', 1), ('test_img_1026913.jpeg', 1), ('test_img_102881766.jpeg', 0), ('test_img_103018416.jpeg', 0), ('test_img_103179771.jpeg', 0), ('test_img_10391387.jpeg', 1), ('test_img_191373171.jpeg', 1), ('test_img_191453578.jpeg', 1), ('test_img_19170934.jpeg', 0), ('test_img_19178670.jpeg', 0), ('test_img_192234087.jpeg', 1), ('test_img_192300074.jpeg', 1), ('test_img_192302980.jpeg', 2), ('test_img_192536547.jpeg', 1), ('test_img_19260435.jpeg', 2), ('test_img_193037789.jpeg', 1), ('test_img_19317

In [41]:
import pandas as pd
df = pd.DataFrame(predictions,columns = ['img_name','class'])
df.to_csv('class.csv', index=False, sep=',', columns=['img_name', 'class'])