<a href="https://colab.research.google.com/github/jihun0423/Dacon-Hansol-img-classification/blob/main/CLIP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

In [2]:
import clip
clip.available_models()

['RN50',
 'RN101',
 'RN50x4',
 'RN50x16',
 'RN50x64',
 'ViT-B/32',
 'ViT-B/16',
 'ViT-L/14',
 'ViT-L/14@336px']

In [3]:
# !sudo apt-get install -y fonts-nanum
# !sudo fc-cache -fv
# !rm ~/.cache/matplotlib -rf

import random
import pandas as pd
import numpy as np
import os
import re
import glob
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from PIL import Image

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models

from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore') 

In [45]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [4]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [5]:
device

device(type='cuda')

In [46]:

CFG = {
    'IMG_SIZE':224,
    'EPOCHS': 10,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':128,
    'SEED': 41
}

In [47]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [85]:
base_dir = '/content/gdrive/MyDrive/open (2)/'
train_folder = glob.glob(base_dir + 'train/*')
train_img_list = glob.glob(base_dir + 'train/*/*')
df = pd.DataFrame(columns=['img_path', 'label'])
df['img_path'] = train_img_list
df['label'] = df['img_path'].apply(lambda x : str(x).split('/')[-2])

In [86]:
le = preprocessing.LabelEncoder()
df['label'] = le.fit_transform(df['label'])

In [87]:
plus_folder = glob.glob(base_dir + 'plus/*')

plus_path = []
for folder in plus_folder:
    tmp = glob.glob(folder + '/*')
    plus_path += tmp

In [88]:
plus_df = pd.DataFrame(plus_path, columns=['img_path'])
plus_df['label'] = plus_df['img_path'].apply(lambda x: x.split('/')[-2])

In [89]:
plus_df['label']=plus_df['label'].astype(int)

In [90]:
df = pd.concat([df,plus_df])

In [91]:
df['label'].value_counts()

18    1405
10     595
1      307
3      210
15     162
2      145
11     142
7      130
6       99
9       57
5       54
17      51
14      27
12      22
13      17
4       14
0       12
8       10
16      10
Name: label, dtype: int64

In [66]:
[(0, '가구수정'),
 (1, '걸레받이수정'),
 (2, '곰팡이'),
 (3, '꼬임'),
 (4, '녹오염'),
 (5, '들뜸'),
 (6, '면불량'),
 (7, '몰딩수정'),
 (8, '반점'),
 (9, '석고수정'),
 (10, '오염'),
 (11, '오타공'),
 (12, '울음'),
 (13, '이음부불량'),
 (14, '창틀,문틀수정'),
 (15, '터짐'),
 (16, '틈새과다'),
 (17, '피스'),
 (18, '훼손')]

[(0, '가구수정'),
 (1, '걸레받이수정'),
 (2, '곰팡이'),
 (3, '꼬임'),
 (4, '녹오염'),
 (5, '들뜸'),
 (6, '면불량'),
 (7, '몰딩수정'),
 (8, '반점'),
 (9, '석고수정'),
 (10, '오염'),
 (11, '오타공'),
 (12, '울음'),
 (13, '이음부불량'),
 (14, '창틀,문틀수정'),
 (15, '터짐'),
 (16, '틈새과다'),
 (17, '피스'),
 (18, '훼손')]

In [92]:
df.loc[df['label'] == 0, 'text'] =  'the crack with drawer and wall'
df.loc[df['label'] == 1, 'text'] =  'the baseboard with a crack'
df.loc[df['label'] == 2, 'text'] =  'stains on the wall'
df.loc[df['label'] == 3, 'text'] =  'seam wrinkles around the corner'
df.loc[df['label'] == 4, 'text'] =  'wall with a brown spot'
df.loc[df['label'] == 5, 'text'] =  'wall paper is coming off at the corner'
df.loc[df['label'] == 6, 'text'] =  'a protruding part of the wallpaper'
df.loc[df['label'] == 7, 'text'] =  'corner of the ceiling'
df.loc[df['label'] == 8, 'text'] =  'red dots or blue dots on the wall'
df.loc[df['label'] == 9, 'text'] =  'crack on the plaster board'
df.loc[df['label'] == 10, 'text'] = 'contamination on the wall'
df.loc[df['label'] == 11, 'text'] = 'a hole at the ceiling'
df.loc[df['label'] == 12, 'text'] = 'wrinkle on the ceiling or wall'
df.loc[df['label'] == 13, 'text'] = 'wall paper is cut'
df.loc[df['label'] == 14, 'text'] = 'the door frame with a crack'
df.loc[df['label'] == 15, 'text'] = 'crack near electric outlet'
df.loc[df['label'] == 16, 'text'] = 'serious crack at the corner'
df.loc[df['label'] == 17, 'text'] = 'nail on the wall'
df.loc[df['label'] == 18, 'text'] = 'a hole or crack in the wallpaper'

In [93]:
df

Unnamed: 0,img_path,label,text
0,/content/gdrive/MyDrive/open (2)/train/가구수ᄌ...,0,the crack with drawer and wall
1,/content/gdrive/MyDrive/open (2)/train/가구수ᄌ...,0,the crack with drawer and wall
2,/content/gdrive/MyDrive/open (2)/train/가구수ᄌ...,0,the crack with drawer and wall
3,/content/gdrive/MyDrive/open (2)/train/가구수ᄌ...,0,the crack with drawer and wall
4,/content/gdrive/MyDrive/open (2)/train/가구수ᄌ...,0,the crack with drawer and wall
...,...,...,...
7,/content/gdrive/MyDrive/open (2)/plus/16/틈새...,16,serious crack at the corner
8,/content/gdrive/MyDrive/open (2)/plus/16/틈새...,16,serious crack at the corner
9,/content/gdrive/MyDrive/open (2)/plus/16/틈새...,16,serious crack at the corner
10,/content/gdrive/MyDrive/open (2)/plus/16/틈새...,16,serious crack at the corner


In [94]:
import numpy as np
from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test = train_test_split(df, df['label'], test_size=0.33, random_state=777, stratify=df['label'])

In [95]:
from torch.utils.data import Dataset, DataLoader
class FlawDataset(Dataset):
    def __init__(self, csv_df, transform):
        self.csv_df = csv_df
        self.img_list = []
        self.transform =transform
        for img_path in self.csv_df['img_path']:
            self.img_list.append(Image.open(img_path))
        
        
        self.text = self.csv_df['text'].to_list()
        self.label = self.csv_df['label'].to_list()
        
        
    def __len__(self):
        return len(self.img_list)
    
    def __getitem__(self, idx):
        img = self.img_list[idx]
        text = self.text[idx]
        label = self.label[idx]
        
        img = self.transform(img)
        
        return img, text, label

In [96]:
from torch.utils.data import Dataset, DataLoader, BatchSampler
class BalancedBatchSampler(BatchSampler):
    """
    BatchSampler - from a MNIST-like dataset, samples n_classes and within these classes samples n_samples.
    Returns batches of size n_classes * n_samples
    """

    def __init__(self, labels, n_classes, n_samples):
        self.labels = labels
        self.labels_set = list(set(self.labels.numpy()))
        self.label_to_indices = {label: np.where(self.labels.numpy() == label)[0]
                                 for label in self.labels_set}
        for l in self.labels_set:
            np.random.shuffle(self.label_to_indices[l])
        self.used_label_indices_count = {label: 0 for label in self.labels_set}
        self.count = 0
        self.n_classes = n_classes
        self.n_samples = n_samples
        self.n_dataset = len(self.labels)
        self.batch_size = self.n_samples * self.n_classes

    def __iter__(self):
        self.count = 0
        while self.count + self.batch_size < self.n_dataset:
            classes = np.random.choice(self.labels_set, self.n_classes, replace=False)
            indices = []
            for class_ in classes:
                indices.extend(self.label_to_indices[class_][
                               self.used_label_indices_count[class_]:self.used_label_indices_count[
                                                                         class_] + self.n_samples])
                self.used_label_indices_count[class_] += self.n_samples
                if self.used_label_indices_count[class_] + self.n_samples > len(self.label_to_indices[class_]):
                    np.random.shuffle(self.label_to_indices[class_])
                    self.used_label_indices_count[class_] = 0
            yield indices
            self.count += self.n_classes * self.n_samples

    def __len__(self):
        return self.n_dataset // self.batch_size

In [72]:
model, preprocess = clip.load('ViT-L/14@336px', device=device, jit=False)

In [73]:
preprocess

Compose(
    Resize(size=336, interpolation=bicubic, max_size=None, antialias=warn)
    CenterCrop(size=(336, 336))
    <function _convert_image_to_rgb at 0x7f4d9d9a6290>
    ToTensor()
    Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711))
)

In [117]:
from torchvision import transforms
from torchvision.transforms import InterpolationMode
train_preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomResizedCrop(size=(336, 336), interpolation=InterpolationMode.BICUBIC, scale=(0.8, 1.0), ratio=(0.9, 1.1)),
    transforms.autoaugment.TrivialAugmentWide(interpolation=InterpolationMode.BILINEAR),
    #_convert_image_to_rgb,
    transforms.ToTensor(),
    transforms.ConvertImageDtype(torch.float),
    transforms.Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711)),
    transforms.RandomErasing(p=0.1)
]
)

val_preprocess = transforms.Compose([
     transforms.ToPILImage(),
    transforms.Resize(size=(336, 336), interpolation=InterpolationMode.BICUBIC, max_size=None, antialias=None),
    transforms.CenterCrop(size=(336, 336)),
    #_convert_image_to_rgb,
    transforms.ToTensor(),
    transforms.ConvertImageDtype(torch.float),
    transforms.Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711)),
]
)

In [118]:
process = transforms.Compose([
    transforms.Resize(size=336, interpolation=InterpolationMode.BICUBIC, max_size=None, antialias=None),
    transforms.ToTensor()
])

In [119]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label, text, transforms=None):
        self.img_path_list = img_path_list
        self.label = label
        self.transforms = transforms
        self.text = text
        
    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) 
        
        if self.transforms is not None:
            image = self.transforms(image)
        
        if self.label is not None:
            label = self.label[index]
            text = self.text[index]
            return image, text, label
        else:
            return image
        
    def __len__(self):
        return len(self.img_path_list)

In [120]:
train_dataset = CustomDataset(x_train['img_path'].values, x_train['label'].values, x_train['text'].values,train_preprocess)
test_dataset = CustomDataset(x_test['img_path'].values, x_test['label'].values, x_test['text'].values,val_preprocess)

In [81]:
BATCH_SIZE = 19

In [None]:
train_labels = torch.tensor([item[2] for item in train_dataset])
train_sampler = BalancedBatchSampler(train_labels, BATCH_SIZE, 1)
train_dataloader = DataLoader(train_dataset, batch_sampler=train_sampler, num_workers=8,pin_memory=True)

test_labels = torch.tensor([item[2] for item in test_dataset])
test_sampler = BalancedBatchSampler(test_labels, BATCH_SIZE, 1)
test_dataloader = DataLoader(test_dataset, batch_sampler=test_sampler, num_workers=8,pin_memory=True)

In [None]:
EPOCH = 40

In [None]:
def convert_models_to_fp32(model): 
    for p in model.parameters(): 
        p.data = p.data.float() 
        p.grad.data = p.grad.data.float() 

if device == "cpu":
    model.float()

loss_img = nn.CrossEntropyLoss()
loss_txt = nn.CrossEntropyLoss()
#optimizer = optim.Adam(model.parameters(), lr=1e-6,betas=(0.9,0.98),eps=1e-6,weight_decay=0.05)
optimizer = optim.AdamW(model.parameters(), lr=1e-5)
#optimizer = optim.SGD(model.parameters(), lr=1e-5, momentum=0.9, weight_decay = 1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, len(train_dataloader)*EPOCH)

In [None]:
best_f1 = 1e-5
best_ep = -1
ans = x_test['label_index'].to_list()
best_te_loss = 1e5
f1_ls = []


for epoch in range(EPOCH):
    print(f"running epoch {epoch}, best test loss {best_te_loss} after epoch {best_ep}")
    step = 0
    tr_loss = 0
    model.train()
    pbar = tqdm(train_dataloader, leave=False)
    for batch in pbar:
        step += 1
        optimizer.zero_grad()

        images, texts, _ = batch
        images = images.to(device)
        texts = clip.tokenize(texts).to(device)
        #print(images.shape, texts.shape)
        logits_per_image, logits_per_text = model(images, texts)
        ground_truth = torch.arange(BATCH_SIZE).to(device)

        total_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2
        total_loss.backward()
        tr_loss += total_loss.item()
        if device == "cpu":
            optimizer.step()
            scheduler.step()
        else:
            convert_models_to_fp32(model)
            optimizer.step()
            scheduler.step()
            clip.model.convert_weights(model)
        pbar.set_description(f"train batchCE: {total_loss.item()}", refresh=True)
    tr_loss /= step
    
    step = 0
    te_loss = 0
    with torch.no_grad():
        model.eval()
        test_pbar = tqdm(test_dataloader, leave=False)
        for batch in test_pbar:
            step += 1
            images, texts, _ = batch
            images = images.to(device)
            texts = clip.tokenize(texts).to(device)
            logits_per_image, logits_per_text = model(images, texts)
            ground_truth = torch.arange(BATCH_SIZE).to(device)

            total_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2
            te_loss += total_loss.item()
            test_pbar.set_description(f"test batchCE: {total_loss.item()}", refresh=True)
        te_loss /= step
        
    preds = []
    for image in x_test['image']:
        image = preprocess(Image.open(image)).unsqueeze(0).to(device)
        text = clip.tokenize(text_label).to(device)

        with torch.no_grad():
            image_features = model.encode_image(image)
            text_features = model.encode_text(text)

            logits_per_image, logits_per_text = model(image, text)
            probs = logits_per_image.softmax(dim=-1).cpu()
            prob_idx = probs.argmax()
            preds.append(prob_idx)
            
            
    ans = x_test['label_index'].to_list()
    print(metrics.accuracy_score(ans, preds))
    f1_acc = metrics.f1_score(ans, preds, average = 'macro')
    print(f1_acc)
    f1_ls.append(f1_acc)
            
            

    if best_f1 < f1_acc:
        best_f1_acc = f1_acc
        best_ep = epoch
        torch.save(model.state_dict(), "./best_model_change_val336gpuno2f.pt")
        
    print(f"epoch {epoch}, tr_loss {tr_loss}, te_loss {te_loss}")
torch.save(model.state_dict(), "./lasttr224_model25.pt")

In [None]:
['the door frame with a crack' : 'bad joint',
 'the baseboard with a crack' : 'mopholder',
 'wall paper is coming off at the corner' : 'crack',
 'the crack with drawer and wall' : 'Furniture',
 'corner of the ceiling' : 'molding',
 'crack near electric outlet' : 'window frame',
 'serious crack at the corner' : 'gap',
 'a hole at the ceiling' : 'black spot',
 'wall paper is cut' : 'wailing',
 'crack on the ceiling' : 'plaster',
 'stains on the wall' : 'mold',
 'red dots or blue dots on the wall' : 'half spot',
 'a protruding part of the wallpaper' : 'cotton defect',
 'wrinkle on the ceiling or wall' : 'wrinkle',
 'seam wrinkles around the corner' : 'twist',
 'wall with a brown spot' : 'rust pollution',
 'nail on the wall':'piece' ,
 'contamination on the wall' : 'contamination',
 'a hole or crack in the wallpaper' :  'damage']