In [260]:
%load_ext autoreload
%autoreload 2

import pytorch_lightning as pl
from torchvision import datasets, transforms
from torch.utils.data import random_split, DataLoader, Dataset
from pytorch_lightning.loggers import TensorBoardLogger
import os
import torch
import torch.nn.functional as F
import random
import numpy as np
import torch.nn as nn
from PIL import Image
import timm
from tqdm import tqdm
import shutil
# import warnings
# warnings.filterwarnings("ignore")


from skew_correction.helper import *

root_dir = "/".join( os.getcwd().split("/")[:-1])
data_dir = os.path.join(root_dir, "data")
root_dir

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


'/home/deepam_minda_farmart_co/fmt/skew_correction'

In [266]:
# def split_a_dir_into_train_test(src_dir, dest_dir):
#     org_img_paths = get_images_in_dir(src_dir, return_path=True)
    


def split_files(src_dir, dest_dir, train_size):
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)
    
    all_files = os.listdir(src_dir)
    random.shuffle(all_files)
    
    total_files = len(all_files)
    train_count = int(total_files * train_size)
    
    train_files = all_files[:train_count]
    test_files = all_files[train_count:]
    
    train_dir = os.path.join(dest_dir, "train_")
    test_dir = os.path.join(dest_dir, "test_")
    
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)
    
    move_files(src_dir, train_dir, train_files)
    move_files(src_dir, test_dir, test_files)
    return train_files, test_files

def move_files(src_dir, dest_dir, files):
    for file in files:
        src_path = os.path.join(src_dir, file)
        dest_path = os.path.join(dest_dir, file)
        shutil.move(src_path, dest_path)


# Example usage
src_directory = os.path.join(data_dir, "original/train")
dest_directory = os.path.join(data_dir, "original/")
train_ratio = 0.8  # 80% for training, 20% for testing

## train_files, test_files = split_files(src_directory, dest_directory, train_ratio)


In [90]:
def prepare_data(src_dir, dest_dir, save_csv=True, multiple=2):  
    """
    this function takes input path of a dir whoch contains 0 degree images and rotates them to a random angle between 
    -180 and +180 and stores them in the output dir. also makes a train.csv containing file name and angles.
    """
    
    os.makedirs(dest_dir, exist_ok=True)

    save_dict = {
        'filepath': [],
        'angle': []
    }
    

    org_img_paths = get_images_in_dir(src_dir, return_path=True)
    print(f"There are {len(org_img_paths)} images in src_folder. Preparing rotated images. \
        \nmultiple={multiple}. hence there will be {multiple*len(org_img_paths)} images")
    
    for num in range(multiple):
        for img_path in tqdm(org_img_paths):
            img = read_raw_image(img_path, mode='L')
            
            # select random angle and rotate
            angles = np.arange(-180, 180)
            angle = random.choice(angles)
            img = img.rotate(angle, expand=True)
            
            # save rotated img in dest folder
            img_name, ext = img_path.split('/')[-1].split('.')
            save_filename = f"{img_name}_{angle}.{ext}"
            dest_path = os.path.join(dest_dir, save_filename)       
            img.save(dest_path)
            
            if os.path.exists(dest_path):
                save_dict["filepath"].append(dest_path)
                save_dict["angle"].append(angle)
    
    if save_csv==True:
        pd.DataFrame(save_dict).to_csv(os.path.join(dest_dir, "data.csv"), index=None)

    return 1

In [91]:
src_dir = os.path.join(root_dir, "data/original/train/")
dest_dir = os.path.join(root_dir, "data/rotated/train/")
prepare_data(src_dir, dest_dir, save_csv=True, multiple=2)

There are 955 images in src_folder. Preparing rotated images.         
multiple=2. hence there will be 1910 images


  0%|          | 1/955 [00:00<02:52,  5.54it/s]

100%|██████████| 955/955 [01:19<00:00, 12.04it/s]
100%|██████████| 955/955 [01:18<00:00, 12.23it/s]


1

In [248]:
## define dataloader

from torch.utils.data import DataLoader, Dataset

train_transform=transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor()
])

class SkewDataset(Dataset):
    def __init__(self, csv_path, split="test"):
        super().__init__()
        self.df = pd.read_csv(csv_path)
        self.filepaths = self.df["filepath"]
        self.labels = self.df["angle"]
        self.split = split

    def __len__(self):
        return len(self.filepaths)
    
    def __getitem__(self, idx):
        img = read_raw_image(self.filepaths[idx])
        label = self.labels[idx]

        if self.split=="train":
            img = train_transform(img)
        else:
            img = transform(img)

        return img, torch.tensor(label)

In [256]:
dataset = SkewDataset("/home/deepam_minda_farmart_co/fmt/skew_correction/data/rotated/train/data.csv")
# sample = dataset.__getitem__(5)
# sample[0].shape, sample[1]
train_loader = DataLoader(dataset, batch_size=8, shuffle=True, num_workers=2)
# ?DataLoader
# batch = next(iter(train_loader))

torch.Size([8])

In [212]:
## define model class

class ConvNet(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3)
        self.relu = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=256, kernel_size=3)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        self.conv3 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3)
        self.pool3 = nn.MaxPool2d(kernel_size=2)
        self.adaptive_pool = nn.AdaptiveMaxPool2d((2,2))
        self.flatten = nn.Flatten()
        
        self.linear1 = nn.Linear(1024, 256)
        self.linear2 = nn.Linear(256, 32)
        self.linear3 = nn.Linear(32, 1)
        

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.relu(self.conv2(x))
        x = self.pool2(x)
        x = self.relu(self.conv3(x))
        x = self.pool3(x)
        x = self.adaptive_pool(x)
        x = self.flatten(x)
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        x = self.linear3(x)
        return x


In [213]:
from torchsummary import summary
model = ConvNet(5)
summary(model, (1, 224, 224))

dummy_input = torch.ones(3,1,224,224)
output = model(dummy_input)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 222, 222]             640
              ReLU-2         [-1, 64, 222, 222]               0
         MaxPool2d-3         [-1, 64, 111, 111]               0
            Conv2d-4        [-1, 256, 109, 109]         147,712
              ReLU-5        [-1, 256, 109, 109]               0
         MaxPool2d-6          [-1, 256, 54, 54]               0
            Conv2d-7          [-1, 256, 52, 52]         590,080
              ReLU-8          [-1, 256, 52, 52]               0
         MaxPool2d-9          [-1, 256, 26, 26]               0
AdaptiveMaxPool2d-10            [-1, 256, 2, 2]               0
          Flatten-11                 [-1, 1024]               0
           Linear-12                  [-1, 256]         262,400
             ReLU-13                  [-1, 256]               0
           Linear-14                   

In [None]:
78,785*

In [184]:
linear1 = nn.Linear(512, 128)
linear1(output.unsqueeze(0))


RuntimeError: mat1 and mat2 shapes cannot be multiplied (3x128 and 512x128)

In [177]:

# flat(output.unsqueeze(0))
output.unsqueeze(0).shape


torch.Size([1, 3, 512])

In [114]:
# ?nn.AdaptiveMaxPool2d