<a href="https://colab.research.google.com/github/ru2zi/DACON_Paper_Shape_Classification/blob/main/%EB%8F%84%EB%B0%B0_%ED%95%98%EC%9E%90_%EC%9C%A0%ED%98%95_%EB%B6%84%EB%A5%98_AI_%EA%B2%BD%EC%A7%84%EB%8C%80%ED%9A%8C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import random
import pandas as pd
import numpy as np
import os
import re
import glob
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models

from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore')

In [None]:
import os.path
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import seaborn as sns
import glob
from pathlib import Path
from tqdm import tqdm
from time import perf_counter
import os
import json
import shutil

from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report,accuracy_score
from IPython.display import Markdown, display

In [None]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':10,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':32,
    'SEED':41
}

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [None]:
all_img_list = [p for p in glob.glob('/content/drive/MyDrive/Dacon/train/**/*.png', recursive=True)if os.path.isfile(p)]

In [None]:
df = pd.DataFrame(columns=['img_path', 'label'])
df['img_path'] = all_img_list

In [None]:
df['label'] = df['img_path'].apply(lambda x: os.path.basename(os.path.dirname(x)))

In [None]:
df

In [None]:
train, val, _, _ = train_test_split(df, df['label'], test_size=0.3, stratify=df['label'], random_state=CFG['SEED'])

In [None]:
train.reset_index(drop=True, inplace=True)

In [None]:
val.reset_index(drop=True, inplace=True)

In [None]:
import os

# 탐색할 폴더 경로
folder_path = '/content/drive/MyDrive/Dacon/train'

# 폴더 내의 모든 하위 폴더 탐색
for root, dirs, files in os.walk(folder_path):
    # 각 폴더 내의 이미지 개수 출력
    if len(files) > 0:
        print(f"{root}: {len(files)}")


In [None]:
train['img_path'][8]

In [None]:
image_pil = Image.open(train['img_path'][810])
image = np.array(image_pil)
plt.imshow(image)
plt.show()

In [None]:
from scipy.ndimage.filters import gaussian_filter

# 이미지 불러오기
img = Image.open(train['img_path'][810])

# 가우시안 필터링 적용
gaussian_img = gaussian_filter(img, sigma=2)

# 이미지 출력
plt.imshow(gaussian_img)
plt.show()

In [None]:
gray = cv2.cvtColor(gaussian_img, cv2.COLOR_BGR2GRAY)

plt.figure(figsize=(6, 6))
plt.imshow(gray, cmap='gray')

In [None]:
pip install common_util


In [None]:
labelme_dict = common_util.load_json("/content/1399.json")

In [None]:
img_blurred = cv2.GaussianBlur(gray, ksize=(5, 5), sigmaX=0)

img_thresh = cv2.adaptiveThreshold(
    img_blurred,
    maxValue=255.0,
    adaptiveMethod=cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
    thresholdType=cv2.THRESH_BINARY_INV,
    blockSize=19,
    C=9
)

plt.figure(figsize=(6, 6))
plt.imshow(img_thresh, cmap='gray')

In [None]:
height, width, channel = image.shape

In [None]:
contours, hierarchy = cv2.findContours(
    img_thresh,
    mode=cv2.RETR_LIST,
    method=cv2.CHAIN_APPROX_SIMPLE
)

temp_result = np.zeros((height, width, channel), dtype=np.uint8)

cv2.drawContours(temp_result, contours=contours, contourIdx=-1, color=(255, 255, 255))

plt.figure(figsize=(6, 6))
plt.imshow(temp_result)


---

In [None]:
# 이미지 불러오기
img = cv2.imread(train['img_path'][810], cv2.IMREAD_GRAYSCALE)

# 캐니 엣지 검출 적용
edges = cv2.Canny(img, 100, 250)

# 이미지 출력
plt.imshow(edges, cmap='gray')
plt.show()


In [None]:
from scipy.ndimage import gaussian_laplace

In [None]:
# 이미지 불러오기
img = cv2.imread(train['img_path'][80], cv2.IMREAD_GRAYSCALE)

# 고주파 필터링 적용
kernel_size = 1
kernel = np.ones((kernel_size,kernel_size),np.float32)/(kernel_size**2)
filtered = cv2.filter2D(img,-1,kernel)

# 캐니 엣지 검출 적용
edges = cv2.Canny(filtered, 100, 250)

# 경계선 길이 측정
contours, hierarchy = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)

# 경계선 길이가 일정 값 이상인 부분만 추출
threshold = 50 # 일정 값
edges_filtered = np.zeros_like(edges)
for i in range(len(contours)):
    if cv2.arcLength(contours[i], True) > threshold:
        cv2.drawContours(edges_filtered, contours, i, 255, 1)

# 이미지 출력
plt.imshow(edges_filtered, cmap='gray')
plt.show()

In [None]:
import torch

def preprocess_image(img_path):
    img_path = str(img_path) # img_path를 문자열로 변환
    # 이미지 불러오기
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)

    if img is None:
        raise ValueError("이미지를 불러올 수 없습니다.")

    # 고주파 필터링 적용
    kernel_size = 2
    kernel = np.ones((kernel_size,kernel_size),np.float32)/(kernel_size**2)
    filtered = cv2.filter2D(img,-1,kernel)

    # 캐니 엣지 검출 적용
    edges = cv2.Canny(filtered, 100, 250)

    # 경계선 길이 측정
    contours, hierarchy = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)

    # 직선 제거
    lines = cv2.HoughLinesP(edges, rho=1, theta=np.pi/180, threshold=100, minLineLength=100, maxLineGap=10)
    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            cv2.line(edges, (x1, y1), (x2, y2), (0, 0, 0), 3)

    # 경계선 길이가 일정 값 이상인 부분만 추출
    threshold = 50 # 일정 값
    edges_filtered = np.zeros_like(edges)
    for i in range(len(contours)):
        if cv2.arcLength(contours[i], True) > threshold:
            cv2.drawContours(edges_filtered, contours, i, 255, 1)

    # CUDA Tensor를 CPU로 이동 후 NumPy 배열로 변환
    edges_filtered = torch.from_numpy(edges_filtered).cpu().numpy()

    return edges_filtered


In [None]:
img_path = train['img_path'][85]
preprocessed_img = preprocess_image(img_path)
# 패턴인식 알고리즘 등을 적용하여 하자 부분을 찾는 작업 수행
plt.imshow(preprocessed_img, cmap='gray')
plt.show()

In [None]:
train['img_path'][5]

---

### 전처리 이미지 정리

In [None]:
# 새로운 폴더 생성
import os
os.makedirs('/content/drive/MyDrive/Dacon/preprocessed_train', exist_ok=True)

In [None]:
src = '/content/drive/MyDrive/Dacon/train'
dst = '/content/drive/MyDrive/Dacon/preprocessed_train'

for foldername in os.listdir(src):
    folder_path = os.path.join(src, foldername)
    if os.path.isdir(folder_path):
        shutil.copytree(folder_path, os.path.join(dst, foldername), dirs_exist_ok=True)

In [None]:
def remove_png_files(folder_path):
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith(".png"):
                os.remove(os.path.join(root, file))
remove_png_files(dst)

---

In [None]:
# 파일 경로 지정
file_path = "/content/drive/MyDrive/Dacon/preprocessed_train"

# 파일 삭제
shutil.rmtree(file_path)

In [None]:
# train 데이터 프레임에서 이미지 경로와 라벨을 가져와 전처리 후 preprocessed_train 폴더에 저장
for idx, row in train.iterrows():
    # 이미지 경로
    img_path = row['img_path']
    # 하자 유형
    label = row['label']

    # 전처리 수행
    img = preprocess_image(img_path)

    # 폴더 생성
    folder_name = f"/content/drive/MyDrive/Dacon/Preprocessed_train/{label}"
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)

    # 이미지 저장
    filename = os.path.basename(img_path)
    cv2.imwrite(f"{folder_name}/{filename}", img)

In [None]:
pre_img_list = [p for p in glob.glob('/content/drive/MyDrive/Dacon/Preprocessed_train/**/*.png', recursive=True)if os.path.isfile(p)]

In [None]:
pre_img_train = pd.DataFrame(columns=['img_path', 'label'])
pre_img_train['img_path'] = pre_img_list
pre_img_train['label'] = pre_img_train['img_path'].apply(lambda x: os.path.basename(os.path.dirname(x)))

In [None]:
pre_img_train

## Label-Encoding

In [None]:
le = preprocessing.LabelEncoder()
pre_img_train['label'] = le.fit_transform(pre_img_train['label'])
val['label'] = le.transform(val['label'])

In [None]:
pre_img_train

## CustomDataset

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_path_list[index]

        image = cv2.imread(img_path)

        if self.transforms is not None:
            image = self.transforms(image=image)['image']

        if self.label_list is not None:
            label = self.label_list[index]
            return image, label
        else:
            return image

    def __len__(self):
        return len(self.img_path_list)

In [None]:
train_transform = A.Compose([ A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.HorizontalFlip(p=0.5),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

test_transform = A.Compose([ A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])


In [None]:
pre_train_dataset = CustomDataset(pre_img_train['img_path'].values, pre_img_train['label'].values, train_transform)
train_loader = DataLoader(pre_train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

val_dataset = CustomDataset(val['img_path'].values, val['label'].values, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

### ImageDataGenerator이용 이미지 전처리

In [None]:
import os

def get_subfolder_names(path):
    subfolder_names = [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))]
    return subfolder_names

In [None]:
get_subfolder_names('/content/drive/MyDrive/Dacon/train')

In [None]:
augmentation_factor = 5
batch_size = 32

In [None]:
import os
from keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(rescale=1./255,
                             rotation_range=15,
                             width_shift_range=0.1,
                             height_shift_range=0.1,
                             shear_range=0.5,
                             zoom_range=[0.8, 2.0],
                             horizontal_flip=True,
                             vertical_flip=True,
                             fill_mode='nearest')

train_dir = '/content/drive/MyDrive/Dacon/train'
folders = os.listdir(train_dir)

for folder in folders:
    print(f"Data augmentation for {folder}")
    folder_path = os.path.join(train_dir, folder)
    if os.path.isdir(folder_path):
        save_dir = os.path.join(train_dir, folder + '_augmented')
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        train_gen = datagen.flow_from_directory(folder_path,
                                                 batch_size=32,
                                                 save_to_dir=save_dir,
                                                 save_prefix=folder + '_',
                                                 target_size=(224, 224))

        total_images = sum([len(files) for r, d, files in os.walk(folder_path)])
        steps_per_epoch = (total_images * augmentation_factor) // batch_size

        model.fit(train_gen,
                  steps_per_epoch=steps_per_epoch,
                  epochs=10)

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt

In [None]:
from keras.preprocessing.image import ImageDataGenerator

train_datagen = ImageDataGenerator(rescale = 1./255)

test_datagen = ImageDataGenerator(rescale = 1./255)

training_set = train_datagen.flow_from_directory(train_dir,
                                                 target_size = (224, 224),
                                                 batch_size = 12,
                                                 class_mode = 'binary')

val_set = test_datagen.flow_from_directory(val_dir,
                                            target_size = (224, 224),
                                            batch_size = 12,
                                            class_mode = 'binary')

test_set = test_datagen.flow_from_directory(test_dir,
                                            target_size = (224, 224),
                                            batch_size = 12,
                                            class_mode = 'binary')


- 데이터 셋의 개수 증강

In [None]:
# 데이터셋 불러오기
train_gen = ImageDataGenerator(rescale=1./255,
                               rotation_range=15,
                               width_shift_range=0.1,
                               height_shift_range=0.1,
                               shear_range=0.5,
                               zoom_range=[0.8, 2.0],
                               horizontal_flip=True,
                               vertical_flip=True,
                               fill_mode='nearest')


test_gen = ImageDataGenerator(rescale = 1/255)

In [None]:
file = ['defect_free', 'stain']

In [None]:
from tensorflow.keras.preprocessing import image

In [None]:
for i in file:
#본인 컴퓨터 경로 입력
    path='/content/drive/MyDrive/AI부트캠프/Section4/project/output/train/'+i+'/'
    image_list = os.listdir(path)

#저장할 컴퓨터 경로 입력
    save_dir = '/content/drive/MyDrive/AI부트캠프/Section4/project/output/train/'+i+'/'

augment_number=10  #number of augmented versions for each image
for f in image_list:
    image_filename = path+f
    img = image.load_img(image_filename,target_size=(224,224))
    img_arr = image.img_to_array(img)
    img_arr = img_arr.reshape((1,) + img_arr.shape)


    for x, val in zip(train_gen.flow(img_arr,       #input image
            save_to_dir=save_dir,     #augmented images will be saved here
            save_prefix= f,        # the augmented image name will have prefix 'aug'
            save_format='jpg'),range(augment_number)) :
        pass


In [None]:
train_generator = train_gen.flow_from_directory(train_dir,
                                                target_size = (224, 224),
                                                batch_size = 20,
                                                class_mode = 'binary',
                                                shuffle = True)

val_generator = test_gen.flow_from_directory(val_dir,
                                            target_size = (224, 224),
                                            batch_size = 8,
                                            class_mode = 'binary',
                                            shuffle = True)

test_generator = test_gen.flow_from_directory(test_dir,
                                            target_size = (224, 224),
                                            batch_size = 8,
                                            class_mode = 'binary',
                                            shuffle = True)


In [None]:
training_set.class_indices

In [None]:
#step수로 가중치를 얼만큼 업데이트 할 것인지
len(train_generator), len(val_generator)

In [None]:
batch = train_generator.next()
type(batch[0]), batch[0].shape

## Model Define

In [None]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_b0(pretrained=True)
        self.classifier = nn.Linear(1000, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

---

In [None]:
!pip install effdet timm

In [None]:
from effdet import create_model
import timm
from effdet import DetBenchTrain, EfficientDet, get_efficientdet_config

class EfficientDetModel(nn.Module):
    def __init__(self, num_classes):
        super(EfficientDetModel, self).__init__()

        # EfficientDet 모델 구성 설정 가져오기
        backbone_name = 'tf_efficientnet_b0'
        config = get_efficientdet_config(backbone_name)
        config.num_classes = num_classes
        config.image_size = (512, 512)
        self.backbone = EfficientDet(config)

        # 객체 감지를 위한 DetBenchTrain 레이어 추가
        self.detection_layer = DetBenchTrain(
            num_classes=config.num_classes,
            num_anchors=config.num_anchors,
            backbone=self.backbone.backbone,
            bench_task='train')

    def forward(self, x):
        # EfficientDet 백본과 객체 감지 레이어 통과
        x = self.backbone(x)
        x = self.detection_layer(x)
        return x


## Train

In [None]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)

    best_score = 0
    best_model = None

    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for imgs, labels in tqdm(iter(train_loader)):
            imgs = imgs.float().to(device)
            labels = labels.type(torch.LongTensor).to(device)      # ADDED .type(torch.LongTensor)

            optimizer.zero_grad()

            output = model(imgs)
            loss = criterion(output, labels)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val Weighted F1 Score : [{_val_score:.5f}]')

        if scheduler is not None:
            scheduler.step(_val_score)

        if best_score < _val_score:
            best_score = _val_score
            best_model = model

    return best_model

In [None]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds, true_labels = [], []

    with torch.no_grad():
        for imgs, labels in tqdm(iter(val_loader)):
            imgs = imgs.float().to(device)
            labels = labels.type(torch.LongTensor).to(device)      # ADDED .type(torch.LongTensor)

            pred = model(imgs)

            loss = criterion(pred, labels)

            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += labels.detach().cpu().numpy().tolist()

            val_loss.append(loss.item())

        _val_loss = np.mean(val_loss)
        _val_score = f1_score(true_labels, preds, average='weighted')

    return _val_loss, _val_score

In [None]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
model = BaseModel()
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, threshold_mode='abs', min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

## Inference

In [None]:
sample = pd.read_csv('/content/drive/MyDrive/Dacon/test.csv')

In [None]:
# 이미지 경로 앞부분 바꾸기
sample['img_path'] = sample['img_path'].apply(lambda x: '/content/drive/MyDrive/Dacon' + x[1:])

In [None]:
# 변경된 CSV 파일 저장하기
sample.to_csv("/content/drive/MyDrive/Dacon/new_test.csv", index=False)

In [None]:
test = pd.read_csv('/content/drive/MyDrive/Dacon/new_test.csv')

In [None]:
test_dataset = CustomDataset(test['img_path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for imgs in tqdm(iter(test_loader)):
            imgs = imgs.float().to(device)

            pred = model(imgs)

            preds += pred.argmax(1).detach().cpu().numpy().tolist()

    preds = le.inverse_transform(preds)
    return preds

In [None]:
preds = inference(infer_model, test_loader, device)

## Submission

In [None]:
submit = pd.read_csv('/content/drive/MyDrive/Dacon/sample_submission.csv')

In [None]:
submit['label'] = preds

In [None]:
submit

In [None]:
# submit.loc[submit['label'] == '0', 'label'] = '가구수정'
# submit.loc[submit['label'] == '1', 'label'] = '걸레받이수정'
# submit.loc[submit['label'] == '2', 'label'] = '곰팡이'
# submit.loc[submit['label'] == '3', 'label'] = '꼬임'
# submit.loc[submit['label'] == '4', 'label'] = '녹오염'
# submit.loc[submit['label'] == '5', 'label'] = '들뜸'
# submit.loc[submit['label'] == '6', 'label'] = '면불량'
# submit.loc[submit['label'] == '7', 'label'] = '몰딩수정'
# submit.loc[submit['label'] == '8', 'label'] = '반점'
# submit.loc[submit['label'] == '9', 'label'] = '석고수정'
# submit.loc[submit['label'] == '10', 'label'] = '오염'
# submit.loc[submit['label'] == '11', 'label'] = '오타공'
# submit.loc[submit['label'] == '12', 'label'] = '울음'
# submit.loc[submit['label'] == '13', 'label'] = '이음부불량'
# submit.loc[submit['label'] == '14', 'label'] = '창틀,문틀수정'
# submit.loc[submit['label'] == '15', 'label'] = '터짐'
# submit.loc[submit['label'] == '16', 'label'] = '틈새과다'
# submit.loc[submit['label'] == '17', 'label'] = '피스'
# submit.loc[submit['label'] == '18', 'label'] = '훼손'

In [None]:
submit.to_csv('/content/drive/MyDrive/Dacon/baseline_submit.csv', index=False)

In [None]:
submit['label'].value_counts(normalize=True)

In [None]:
import matplotlib.pyplot as plt
from matplotlib import font_manager, rc

In [None]:
font_path = '/content/drive/MyDrive/Dacon/NanumBarunGothic.ttf'
fontprop = font_manager.FontProperties(fname=font_path, size=18)
plt.rc('font', family='NanumBarunGothic')