In [None]:
!pip install tensorflow-addons

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd drive/MyDrive/DACON_Practice/TourismData_MultiModal

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd

import os
import random
from tqdm import tqdm

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold, train_test_split

import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, TFAutoModel, AdamWeightDecay, AutoModelForSequenceClassification, AutoConfig, AutoTokenizer, TrainingArguments, Trainer
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.keras.initializers import TruncatedNormal
from tensorflow.keras.callbacks import EarlyStopping

import time
import pickle


from PIL import Image
from torchvision import transforms
import random

from sklearn.metrics import f1_score

from collections import Counter

import tensorflow_addons as tfa

import transformers
transformers.logging.set_verbosity_error()
#초기화되지 않은 weight가 있거나, 불러왔는데 사용을 하지 않는 weight가 존재할 때 발생하는 Warning 안뜨도록 함

def my_seed_everywhere(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    tf.random.set_seed(seed)
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)


my_seed_everywhere(42)

In [None]:
# Data Augmentation -> 추가 학습용 데이터, 검증 데이터 생성

In [None]:
train = pd.read_csv('train.csv') #train 데이터 불러오기

In [None]:
#증강에 사용할 형용사, 부사, 유의어 모음 불러오기(Google Drive 첨부 - https://drive.google.com/drive/folders/1U6Y6Zv_PxZXsXLgLlgzu6smZ_e_b2mpS?usp=sharing)

adj_ls = pd.read_csv('adjective.csv')['adjective'].to_list() #형용사
ad_ls = pd.read_csv('adverb.csv')['adverb'].to_list()  #부사

main_token = pd.read_csv('sim_data.csv')['token'].to_list() #유의어 기준 단어 리스트
sub_token = pd.read_csv('sim_data.csv')['sim_token'].to_list() #대응 유의어 모음 리스트
sim_dic = {} #딕셔너리에 저장
for m, s in zip(main_token,sub_token) : #메인, subtoken받아 딕셔너리 형태로 만듦
    sim_dic[m] = s

In [None]:
#형용사 증강 : 문장의 명사 앞에 랜덤으로 형용사어 삽입 함수

def aug_adj(senten):
    tokens = senten.split(' ') #문장을 입력으로 받아 띄어쓰기를 기준으로 나눠 토큰 구함


    cnt = 0 #수
    token_ls = [] #명사 포함한 토큰 담을 리스트
    for i in range(len(tokens)) :
        token = tokens[i] #각 단어로부터 토큰 구하기 위함
        if token != '' : #공백이 아닌 경우
            if token[-1] == '은' or token[-1] == '는' or token[-1] == '이' or token[-1] == '가' : #주어인 경우
                cnt +=1 #개수 추가
                token_ls.append(token) #토큰 추가
            if token[-1] == '을' or token[-1] == '를' : #목적어
                cnt +=1
                token_ls.append(token)

    if cnt > 0 : #감지된 토큰 있을 경우
        for i in range(random.randint(1,cnt)) : #특정 개수만큼의 명사 토큰에 대해 형용사 붙여 증강
            token_index = random.randint(0,cnt-1)
            senten = senten.replace(token_ls[token_index],'{} {}'.format(adj_ls[random.randint(0,len(adj_ls)-1)],token_ls[token_index]))

    return senten

In [None]:
#부사 증강 : 문장의 동사 앞에 랜덤으로 부사어 삽입 함수
def aug_ad(senten):
    tokens = senten.split(' ')


    cnt = 0
    token_ls = []
    for i in range(len(tokens)) :
        token = tokens[i]
        if token != '' :
            if token[-1] == '.':
                cnt +=1
                token_ls.append(token)

    if cnt > 0 :
        for i in range(random.randint(1,cnt)) :
            token_index = random.randint(0,cnt-1)
            senten = senten.replace(token_ls[token_index],'{} {}'.format(ad_ls[random.randint(0,len(ad_ls)-1)],token_ls[token_index]))

    return senten

In [None]:
#유의어 증강 : 문장의 단어들을 유의어로 변경하는 함수
def aug_simul(senten):
    tokens = senten.split(' ')
    cnt3 = 0
    token_ls3 = []
    for i in range(len(tokens)) :
        token = tokens[i]
        if token in sim_dic.keys(): #유사어 존재할때
            token_ls3.append(token)
            cnt3 += 1

    if cnt3 > 0 :
        for i in range(random.randint(1,cnt3)) :
            token_index = random.randint(0,len(token_ls3)-1)
            to_index = tokens.index(token_ls3[token_index])
            to_val = sim_dic[tokens[to_index]].split('/')
            if len(to_val) > 1 :
                tokens[to_index] = to_val[random.randint(0,len(to_val)-1)]
            else :
                tokens[to_index] = to_val[0]

            token_ls3.pop(token_index) #증강 이뤄진 경우 다시 증강 이뤄지지 않도록


    senten = ' '.join(tokens)
    return senten

In [None]:
#이미지 증강


def img_aug(img) :
    loader_transform1 = transforms.ColorJitter(
        brightness=0.6,
        contrast=0.6,
        saturation=0.6,
        hue=0.2
        )
    loader_transform2 = transforms.RandomAffine(degrees=90,fill=0)


    imgArray = np.array(img) # 이미지 분석을 위해 배열 전환
    img_shape = imgArray.shape #기존 shape 받아와 변형
    x = int(img_shape[0] * 0.8)
    y = int(img_shape[1] * 0.8)
    x2 = int(img_shape[0] * 0.4)
    y2 = int(img_shape[1] * 0.4)

    re_img = loader_transform1(img) #ColorJitter
    re_img = loader_transform2(re_img) #RandomAffine
    re_img = transforms.RandomCrop(size=(x,y))(re_img) #RandomCrop
    re_img

    imgArray_aug = np.array(re_img) #이미지를 배열로 변환
    indx1 = random.sample(range(x2),2) #Random 노이즈 부여 위함
    indx2 = random.sample(range(y2),2)

    length = (np.max(indx1)-np.min(indx1))*(np.max(indx2)-np.min(indx2))*3 #총 노이즈 길이

    imgArray_aug[np.min(indx1):np.max(indx1), np.min(indx2):np.max(indx2), range(3)] = np.random.choice(256, length, replace=True).reshape(((np.max(indx1)-np.min(indx1)),(np.max(indx2)-np.min(indx2)),3)) #length만큼의 노이즈 추출해 reshape해서 원본 이미지 값 수정

    aug_img = Image.fromarray(imgArray_aug) #이미지 배열을 다시 이미지 객체로 변환

    return aug_img


In [None]:
try:
  os.mkdir('image/val')   #증강 이미지를 저장할 폴더 생성

except:
  pass

sam_overview = train['overview'].to_list() #텍스트후기
sam_label = train['cat3'].to_list() #'소'카테고리 정보
sam_img = train['img_path'].to_list() #이미지 경로


#증강 샘플 1개 생성
sample = sam_overview[0]
label = sam_label[0]
img = Image.open(sam_img[0])

new_sample = aug_adj(sample) #형용사 증강
new_sample = aug_ad(new_sample) #부사 증강
new_sample = aug_simul(new_sample) #이미지 증강

new_img = img_aug(img)
save_path = 'image/val/val_1.jpg'
new_img.save(save_path,'JPEG') #validation 디렉토리에 증강된 이미지 저장

val_set = pd.DataFrame({'img_path' : [save_path], 'overview' : [new_sample], 'cat3' : [label]}) #증강된 데이터를 csv파일 형태로 만들기 위함

In [None]:
#미리 만들어둔 샘플 1개와 concat하여 5000개의 증강데이터 생성
for i in range(4999) :
    idx = random.randint(0,len(sam_overview)-1)
    sample = sam_overview[idx]
    label = sam_label[idx]
    img = Image.open(sam_img[idx])


    new_sample = aug_adj(sample)
    new_sample = aug_ad(new_sample)
    new_sample = aug_simul(new_sample)

    new_img = img_aug(img)
    save_path = 'image/val/val_{}.jpg'.format(i+2)
    new_img.save(save_path,'JPEG')

    df = pd.DataFrame({'img_path' : [save_path], 'overview' : [new_sample], 'cat3' : [label]})
    val_set = pd.concat([val_set,df],axis=0)

#총 5000개의 증강 데이터가 생성됨

In [None]:
val_set.to_csv('val_set.csv')
del val_set
# 증강데이터에 대한 csv파일까지 생성 완료

In [None]:
# 텍스트 증강을 통한 샘플 불균형 완화
def text_aug(label_ls, train_set, num, adj_aug=True, ad_aug=True, sim_aug=True ):
    train2 = train_set
    cnt2 = 16985 #훈련데이터 수
    for i in label_ls : #각 레이블정보마다 돌면서 확인
        cnt = Counter(train2['cat3'])[i] #각 레이블 수 추출
        if cnt < num : #레이블 수가 특정 수보다 작으면
            for j in range(num-cnt): #필요한 만큼
                print(j) #몇 번째 데이터 증강인지 확인하기 위함
                cnt2 += 1 #훈련데이터 수 증가
                df3 = train2.loc[(train['cat3'] == i)] #해당 레이블을 갖는 데이터를 찾아 df3으로 추출해옴
                df3_ls = df3['overview'].to_list() #df3에서의 텍스트 후기 데이터
                num1 = len(df3_ls)
                idx = random.randint(0, num1-1) #특정 데이터에 대해
                sample = df3_ls[idx]
                if adj_aug == True : #각 속성별로 적용 여부 체크해 데이터 증강 적용
                    sample = aug_adj(sample)
                if ad_aug == True :
                    sample = aug_ad(sample)
                if sim_aug == True :
                    sample = aug_simul(sample)


                df4 = pd.DataFrame({'id' : ['TRAIN_{}'.format(cnt2)], 'overview' : [sample], 'cat3' : [i]})  #증강된 데이터 정보를 데이터프레임화해 기존 데이터에 병합 (데이터 불균형 해소)
                train2 = pd.concat([train2,df4],axis=0) #행방향으로 합침
    return train2 #샘플 뷸균형을 해소한 훈련데이터 반환

In [None]:
train = pd.read_csv('train.csv') #훈련데이터 로드

train_set = train[['id', 'overview','cat3']] #전체 데이터 중 id, overview, 소분류 정보만 사용
label_ls = list(train_set['cat3'].unique()) #유니크한 label값만 뽑아옴

#형용사만 사용 / 최소 50개
data1 = text_aug(label_ls,train_set,50,True,False,False) #형용사 증강만 적용한 데이터. 각 레이블 당 데이터가 최소 50개는 존재하도록 함
data1.to_csv('train(adj,50).csv', index=False)
del data1

#부사만 사용 / 최소 50개
data2 = text_aug(label_ls,train_set,50,False,True,False) #부사 증강만 적용한 데이터
data2.to_csv('train(ad,50).csv', index=False)
del data2

#유의어만 사용 / 최소 50개
data3 = text_aug(label_ls,train_set,50,False,False,True) #유의어 증강만 적용한 데이터
data3.to_csv('train(sim,50).csv', index=False)
del data3

#형용사+부사 / 최소 50개
data4 = text_aug(label_ls,train_set,50,True,True,False) #형용사, 부사 증강 적용 데이터
data4.to_csv('train(adj,ad,50).csv', index=False)
del data4

#형용사+부사+유의어 / 최소 100개
data5 = text_aug(label_ls,train_set,100,True,True,True) #모든 증강을 적용한 데이터
data5.to_csv('train(adj,ad,sim,50).csv', index=False)
del data5

del train_set
del label_ls

In [None]:
#model 학습

import tensorflow as tf
tf.config.run_functions_eagerly(False)
#함수를 그래프로 컴파일하여 실행하도록 변경
#대규모 모델 훈련과 같은 경우에는 그래프 모드(graph mode)로 실행하는 것이 더욱 효율적

class SAMModel(tf.keras.Model): #keras 모델 상속받아 정의
    def __init__(self, my_model, rho=0.05):
        """
        p, q = 2 for optimal results as suggested in the paper
        (Section 2)
        """
        super(SAMModel, self).__init__()
        self.my_model = my_model
        self.rho = rho

    def train_step(self, data):
        (text, labels) = data #데이터로부터 text와 label분류
        e_ws = [] #스케일이 ㅈ거용된 gradient값들을 담을 리스트
        with tf.GradientTape() as tape: #Gradient를 계산할 수 있는 Tape를 생성. 연산을 기록하여 후에 Gradient를 계산하는 데 사용
            predictions = self.my_model(text) #텍스트를 입력으로 해 prediction 구함
            loss = self.compiled_loss(labels, predictions) #label과 비교해 loss 계산
        trainable_params = self.my_model.trainable_variables #학습가능한 variable들 추출
        gradients = tape.gradient(loss, trainable_params) #tape 객체 통해 구한 loss를 기반으로 파라미터 갱신
        grad_norm = self._grad_norm(gradients) #Gradient의 크기 측정
        scale = self.rho / (grad_norm + 1e-10) #계산된 gradient의 크기, rho를 통해 scale 변수 구함

        for (grad, param) in zip(gradients, trainable_params): #각 gradient와 trainable_parameter를 대응시켜 스케일 적용
            e_w = grad * scale #스케일 적용
            param.assign_add(e_w) # e_w 값을 해당 파라미터에 더하고 업데이트. SAM 알고리즘에 따라 각 파라미터가 업데이트됨
            e_ws.append(e_w) #각 파라미터에 적용된 e_w 값을 리스트 e_ws에 추가

        with tf.GradientTape() as tape:
            predictions = self.my_model(text)
            loss = self.compiled_loss(labels, predictions) #loss

        sam_gradients = tape.gradient(loss, trainable_params) #sam 적용한 Gradient 계산
        for (param, e_w) in zip(trainable_params, e_ws):
            param.assign_sub(e_w) # SAM적용해 파라미터 갱신

        self.optimizer.apply_gradients(
            zip(sam_gradients, trainable_params)) #모델 파라미터 업데이트

        self.compiled_metrics.update_state(labels, predictions) #모델 평가지표 업데이트
        return {m.name: m.result() for m in self.metrics} #손실함수 이름, 값 딕셔너리 반환

    def test_step(self, data):
        (text, labels) = data #데이터 추출
        predictions = self.my_model(text, training=False)
        loss = self.compiled_loss(labels, predictions)
        self.compiled_metrics.update_state(labels, predictions)
        return {m.name: m.result() for m in self.metrics}

    def _grad_norm(self, gradients):
        norm = tf.norm(
            tf.stack([
                tf.norm(grad) for grad in gradients if grad is not None
            ])
        )
        return norm

    def call(self, inputs):
        """Forward pass of SAM.
        SAM delegates the forward pass call to the wrapped model.
        Args:
          inputs: Tensor. The model inputs.
        Returns:
          A Tensor, the outputs of the wrapped model for given `inputs`.
        """
        return self.my_model(inputs)

#텍스트모델

In [None]:
#텍스트 모델 전처리 함수
def preprocessing(train,val,test, modelname, max_seq_len, add_token=False, emphasize_token=False, token_change = False):
    encoder = LabelEncoder() #label encoder 객체 생성
    y_train = train['cat3'] #train, val set으로부터 label 추출
    y_val =  val['cat3']

    y_train = encoder.fit_transform(y_train) #train data에 대한 label encoding
    y_val = encoder.transform(y_val) #같은 값으로 encoding 위함

    y_train_data = pd.Series(y_train) #series형으로 변환
    y_val_data = pd.Series(y_val)

    model_name = modelname
    f = open('add_token.txt') #ex) 흑돼지->흑+돼지가 아닌 흑돼지로 인코딩되도록 하기 위함
    add_token_ls = f.read().split()
    emphasize_token_ls =  ['상설시장','채식주의','채식주의자','비건','비거니즘','고택','펜션','관아','팔각','주심포','건물','오일시장'] #반복을 통해 강조시켜줄 토큰
    tokenizer = AutoTokenizer.from_pretrained(model_name) #Tokenizer 객체 선언

    if add_token == True :
        for token in add_token_ls :
            tokenizer.add_tokens(token) #토크나이저를 통해 토큰화해  append

    def convert_examples_to_features(examples, labels, max_seq_len, tokenizer): #원본 input, label을 통해 bert, transformer에서 사용할 수 있도록 데이터 변환
        #token_type_ids는 어떤 문장에 속하는지 나타내기 위함
        input_ids, attention_masks, token_type_ids, data_labels = [], [], [], []


        for example, label in tqdm(zip(examples, labels), total=len(examples)):
            if token_change == True :     # 특정 라벨에 특정 단어가 들어가는 경우 그 단어를 살작 변형 -> 사용했을때와 안했을때 모두 뽑아내서 앙상블
                if label == '뮤지컬' :
                    if example.find('뮤지컬') != -1 : #텍스트에 '뮤지컬'이라는 단어가 들어가는 경우
                        example = example.replace('뮤지컬','뮤지컬(뮤지컬공연)') #단어 대체
                if label == '분수' :
                    if example.find('분수쇼') != -1 :
                        example = example.replace('분수쇼','분수(분수쇼)')
                if label == '채식전문점' :
                    if example.find('채식') != -1 :
                        example = example.replace('채식','채식(채식전문점)')
                if label == '게스트하우스' :
                    if example.find('게스트하우스') != -1 :
                        example = example.replace('게스트하우스','게스트하우스(게하)')

            #토큰 강조하기로 했을 경우
            if emphasize_token == True :     #  특정 단어를 강조하기 위해 반복
                for em in emphasize_token_ls :
                    if example.find(em) != -1 :
                        example = example.replace(em,'{} {} {}'.format(em,em,em)) #단어 반복


            input_id = tokenizer.encode(example, max_length=max_seq_len, pad_to_max_length=True)      # 토크나이저를 통해 인코딩. 입력 토큰 사이즈 지정 및 패딩 적용
            padding_count = input_id.count(tokenizer.pad_token_id) #pad_token_id는 패딩을 나타내는 토큰의 식별자(ID). 총 패딩 적용된 수 추출
            attention_mask = [1] * (max_seq_len - padding_count) + [0] * padding_count #패딩 적용 길이에 따라 attention mask생성
            token_type_id = [0] * max_seq_len #모두 한 문장으로 이뤄져있음. 하나로 통일(?)

            assert len(input_id) == max_seq_len, "Error with input length {} vs {}".format(len(input_id), max_seq_len) #input_id, attention_mask, token_type_id 길이 모두 동일한지 확인
            assert len(attention_mask) == max_seq_len, "Error with attention mask length {} vs {}".format(len(attention_mask), max_seq_len)
            assert len(token_type_id) == max_seq_len, "Error with token type length {} vs {}".format(len(token_type_id), max_seq_len)

            input_ids.append(input_id)
            attention_masks.append(attention_mask)
            token_type_ids.append(token_type_id)
            data_labels.append(label) #변환된 정보 리스트에 저장

        input_ids = np.array(input_ids, dtype=int) #리스트 정보를 array 자료형으로 변환
        attention_masks = np.array(attention_masks, dtype=int)
        token_type_ids = np.array(token_type_ids, dtype=int)

        data_labels = np.asarray(data_labels, dtype=np.int32)

        return (input_ids, attention_masks, token_type_ids), data_labels #변환된 정보들 반환

    y_test_data = [i for i in range(len(test))] #test data의 Y값 임의로 설정
    x_train, y_train = convert_examples_to_features(train['overview'], y_train_data, max_seq_len=max_seq_len, tokenizer=tokenizer) #train data의 overview text정보를 통해 feature로 변환
    x_val, y_val = convert_examples_to_features(val['overview'], y_val_data, max_seq_len=max_seq_len, tokenizer=tokenizer)
    x_test, _ = convert_examples_to_features(test['overview'], y_test_data, max_seq_len=max_seq_len, tokenizer=tokenizer)
    return x_train, y_train, x_val, y_val, x_test #변환된 train, validation, test set 정보 반환


In [None]:
try:
  os.mkdir('weights')
except:
  pass

def train_inference(modelname, optimizer='Adam', dropout=0.1, TruncatedNormal=0.02, epocs=4, batch_size=8, sam = False, name = 'pred'):

    class TFBertForSequenceClassification(tf.keras.Model):
        def __init__(self, model_name):
            super(TFBertForSequenceClassification, self).__init__()
            self.bert = TFAutoModel.from_pretrained(modelname, from_pt=True) #모델 로드
            self.dropout = tf.keras.layers.Dropout(dropout) #dropout 층
            self.classifier = tf.keras.layers.Dense(128, #output 차원은 128
                                                    kernel_initializer=tf.keras.initializers.TruncatedNormal(0.02), #정규 분포에서 생성된 랜덤한 값을 사용하여 가중치(weight)를 초기화
                                                    activation='softmax',
                                                    name='classifier')

        def call(self, inputs):
            input_ids, attention_mask, token_type_ids = inputs #input으로부터 데이터 추출
            outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids) #bert에 입력 넣어 output 추출
            cls_token = outputs[1]
            prediction = self.classifier(cls_token) #classifier layer 통과

            return prediction


    model = TFBertForSequenceClassification(modelname) #모델 객체 생성

    if sam == True :
        model = SAMModel(model) #SAM 적용할 경우

    loss = tf.keras.losses.sparse_categorical_crossentropy #label이 정수일 때 사용하는 loss
    model.compile(optimizer=optimizer, loss=loss, metrics = ['accuracy'])
    model.fit(x_train, y_train, epochs=epocs, batch_size=batch_size, validation_data=(x_val,y_val))
    model.save_weights('weights/{}'.format(name))

    pred = model.predict(x_test, batch_size=batch_size) #테스트데이터에 대한 output 계산
    return pred

In [None]:
#Text Model

my_seed_everywhere(42)

lr = 5e-6
wd = 1e-2 * lr
optimizer = tfa.optimizers.AdamW(learning_rate=lr, weight_decay=wd)

# optimizer = tf.keras.optimizers.Nadam(learning_rate=5e-6)

train = pd.read_csv('train.csv')
val= pd.read_csv('val_set.csv')
test = pd.read_csv('test.csv') #데이터 로드

# 원본데이터, 사용자사전&강조&반복X
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200) #klue/roberta-large모델 사용, max seq lenght 200. 데이터 전처리 적용해 변환해옴
pred1 = train_inference('klue/roberta-large',epocs=7,dropout=0.1,TruncatedNormal=0.01,optimizer=optimizer,batch_size=8,sam=True,name = 'pred1') #원본 데이터로 학습한 결과
#Graph Execution error로 인해 batchsize 조정

with open('final_result/pred1.pickle', 'wb') as f:
    pickle.dump(pred1, f, pickle.HIGHEST_PROTOCOL)

del pred1

tf.keras.backend.clear_session() #세션 초기화


# 원본데이터, 사용자사전&강조&반복O
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200,add_token=True,emphasize_token=True,token_change=True)
pred2 = train_inference('klue/roberta-large',epocs=8,dropout=0.1,TruncatedNormal=0.02,optimizer=optimizer,batch_size=8, sam=True,name = 'pred2')
with open('final_result/pred2.pickle', 'wb') as f:
    pickle.dump(pred2, f, pickle.HIGHEST_PROTOCOL)
del pred2

tf.keras.backend.clear_session()

In [None]:
train = pd.read_csv('train(adj,50).csv')   #형용사만 사용 증강
my_seed_everywhere(42)


# 형용사 이용 증강, 사용자사전&강조&반복X
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200) #전처리된 데이터를  받아옴
pred3 = train_inference('klue/roberta-large',epocs=8,dropout=0.1,TruncatedNormal=0.01,optimizer=optimizer,batch_size=8,sam=False,name = 'pred3')
with open('final_result/pred3.pickle', 'wb') as f:
    pickle.dump(pred3, f, pickle.HIGHEST_PROTOCOL)
del pred3
tf.keras.backend.clear_session()


# 형용사 이용 증강, 사용자사전&강조&반복O
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200,add_token=True,emphasize_token=True,token_change=True)
pred4 = train_inference('klue/roberta-large',epocs=8,dropout=0.1,TruncatedNormal=0.02,optimizer=optimizer,batch_size=8, sam=True,name = 'pred4')
with open('final_result/pred4.pickle', 'wb') as f:
    pickle.dump(pred4, f, pickle.HIGHEST_PROTOCOL)
del pred4
tf.keras.backend.clear_session()

In [None]:
train = pd.read_csv('train(ad,50).csv')   #부사만 사용 증강

# 부사만 이용 증강, 사용자사전&강조&반복X
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200)
pred5 = train_inference('klue/roberta-large',epocs=6,dropout=0.1,TruncatedNormal=0.01,optimizer=optimizer,batch_size=8,sam=False,name = 'pred5')
with open('final_result/pred5.pickle', 'wb') as f:
    pickle.dump(pred5, f, pickle.HIGHEST_PROTOCOL)
del pred5
tf.keras.backend.clear_session()


# 부사만 이용 증강, 사용자사전&강조&반복O
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200,add_token=True,emphasize_token=True,token_change=True)
pred6 = train_inference('klue/roberta-large',epocs=6,dropout=0.1,TruncatedNormal=0.02,optimizer=optimizer,batch_size=8, sam=True,name = 'pred'6)
with open('final_result/pred6.pickle', 'wb') as f:
    pickle.dump(pred6, f, pickle.HIGHEST_PROTOCOL)
del pred6
tf.keras.backend.clear_session()

In [None]:
train = pd.read_csv('train(adj,ad,50).csv')   #형용사&부사 사용 증강

my_seed_everywhere(42)
# 형용사&부사 이용 증강, 사용자사전&강조&반복X
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200)
pred7 = train_inference('klue/roberta-large',epocs=6,dropout=0.1,TruncatedNormal=0.01,optimizer=optimizer,batch_size=8,sam=True,name = 'pred7')
with open('final_result/pred7.pickle', 'wb') as f:
    pickle.dump(pred7, f, pickle.HIGHEST_PROTOCOL)
del pred7
tf.keras.backend.clear_session()


# 형용사&부사 이용 증강, 사용자사전&강조&반복O
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200,add_token=True,emphasize_token=False,token_change=True)
pred8 = train_inference('klue/roberta-large',epocs=6,dropout=0.1,TruncatedNormal=0.02,optimizer=optimizer,batch_size=8, sam=True,name = 'pred8')
with open('final_result/pred8.pickle', 'wb') as f:
    pickle.dump(pred8, f, pickle.HIGHEST_PROTOCOL)
del pred8
tf.keras.backend.clear_session()

In [None]:
train = pd.read_csv('train(adj,ad,sim,100).csv')   #형용사&부사&유의어 사용 증강
my_seed_everywhere(42)

# 형용사&부사 이용 증강, 사용자사전&강조&반복X
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200)
pred9 = train_inference('klue/roberta-large',epocs=6,dropout=0.1,TruncatedNormal=0.01,optimizer=optimizer,batch_size=8,sam=True,name = 'pred9')
with open('final_result/pred9.pickle', 'wb') as f:
    pickle.dump(pred9, f, pickle.HIGHEST_PROTOCOL)
del pred9
tf.keras.backend.clear_session()


# 형용사&부사 이용 증강, 사용자사전&강조&반복O
x_train, y_train, x_val, y_val, x_test = preprocessing(train,val,test,'klue/roberta-large',200,add_token=True,emphasize_token=True,token_change=True)
pred10 = train_inference('klue/roberta-large',epocs=6,dropout=0.1,TruncatedNormal=0.02,optimizer=optimizer,batch_size=8, sam=True,name = 'pred10')
with open('final_result/pred10.pickle', 'wb') as f:
    pickle.dump(pred10, f, pickle.HIGHEST_PROTOCOL)
del pred10
tf.keras.backend.clear_session()

In [None]:
encoder = LabelEncoder()
y_train = train['cat3']


y_train = encoder.fit_transform(y_train)

In [None]:
# Ensemble
def mode(list): #최빈값 구하기 위한 함수
    count = 0
    mode = 0
    for x in list:
        if list.count(x) > count:
            count = list.count(x)
            mode = x

    return mode



pred_ls = []
for i in range(len(pred1)):
  index = mode([ #voting
      pred2[i].argmax(),
      pred1[i].argmax(),
      pred3[i].argmax(),
      pred4[i].argmax(),
      pred5[i].argmax(),
      pred6[i].argmax(),
      pred7[i].argmax(),
      pred8[i].argmax(),
      pred9[i].argmax(),
      pred10[i].argmax(),
      ])
  pred_ls.append(encoder.classes_[index])
y_pre= encoder.transform(pred_ls) #예측 결과에 대한 변환


In [None]:
# Image Model

train = pd.read_csv('train.csv')
test = pd.read_csv('test.csv')

In [None]:
# resize, Padding

import cv2


def padding(img, set_size):

    try:
        h,w,c = img.shape
    except:
        print('파일을 확인후 다시 시작하세요.')
        raise

    if h < w: #가로가 더 긴 경우
        new_width = set_size
        new_height = int(new_width * (h/w)) #비율 유지 위함
    else:
        new_height = set_size
        new_width = int(new_height * (w/h))

    if max(h, w) < set_size: #지정한 사이즈보다 작을 때
        img = cv2.resize(img, (new_width, new_height), cv2.INTER_CUBIC)
    else: #지정 사이즈보다 클때
        img = cv2.resize(img, (new_width, new_height), cv2.INTER_AREA)


    try:
        h,w,c = img.shape
    except:
        print('파일을 확인후 다시 시작하세요.')
        raise

    delta_w = set_size - w
    delta_h = set_size - h
    top, bottom = delta_h//2, delta_h-(delta_h//2)
    left, right = delta_w//2, delta_w-(delta_w//2)

    new_img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) #부족한 픽셀만큼 제로 패딩 적용

    return new_img

In [None]:
#생성한 함수를 활용하여 학습데이터 생성

IMAGE_SIZE = [224, 224]

img_ls = train['img_path'].to_list()
Timg_ls = test['img_path'].to_list()


x_train = []

for i in tqdm(range(len(img_ls))) :
  img = cv2.imread(img_ls[i], cv2.IMREAD_COLOR)
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  pad_img = padding(img,IMAGE_SIZE[0]) #패딩 및 resize 적용
  x_train.append(pad_img) #처리된 이미지 추가

x_train = np.array(x_train) #numpy 배열 형태로 변환




x_test = []

for i in tqdm(range(len(Timg_ls))) :
  img = cv2.imread(Timg_ls[i], cv2.IMREAD_COLOR)
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  pad_img = padding(img,IMAGE_SIZE[0])
  x_test.append(pad_img)

x_test = np.array(x_test)


In [None]:
pretrained_model = tf.keras.applications.RegNetY120(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3]) #top layer 제거해 모델 객체 생성
# pretrained_model = efn.EfficientNetB7(weights='noisy-student', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.DenseNet201(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
#pretrained_model = tf.keras.applications.Xception(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
#pretrained_model = tf.keras.applications.inception_v3.InceptionV3(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
#pretrained_model = tf.keras.applications.VGG16(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.mobilenet.MobileNet(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
pretrained_model.trainable = False # False = transfer learning, True = fine-tuning


len(pretrained_model.layers)

In [None]:
# 이미지 모델 학습 함수 생성

def train_inference(pretrained_model,open=-50,dim=64,dropout=0.2,optimizer='Adam',epocs=100,batch_size=8,sam=False):
    for layer in pretrained_model.layers[:open]:      #전이학습 개방정도. open값 절대값 높을수록 더 많은 레이어를 학습시키게 됨
        layer.trainable = False



    model = tf.keras.Sequential([
        pretrained_model, #특성맵 추출
        tf.keras.layers.GlobalAveragePooling2D(),#GAP layer
        tf.keras.layers.Dense(dim,activation='relu'), #dim만큼의 middle output 채널 수 가짐
        tf.keras.layers.Dropout(dropout), #드롭아웃 적용
        tf.keras.layers.Dense(128, activation='softmax') #128만큼의 output dim 가짐
    ])

    if sam == True :
        model = SAMModel(model)
    model.compile(
    optimizer=optimizer,
    loss = 'sparse_categorical_crossentropy',
    metrics=['accuracy']
    )
    early = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=3,restore_best_weights=True) #early stopping 적용 위한 객체 선언
    model.fit(x_train, y_train, epochs=epocs, batch_size=batch_size, validation_split=0.1,callbacks=[early]) #모델 학습

    pred = model.predict(x_test, batch_size=batch_size) #예측
    return pred #예측결과 반환

In [None]:
optimizer = tf.keras.optimizers.Nadam(learning_rate=5e-6) #Nadam optimizer
pretrained_model = tf.keras.applications.RegNetY120(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
pretrained_model.trainable = False #전이학습
pred11 = train_inference(pretrained_model,open=-1,dim=64,dropout=0.2,optimizer=optimizer,epocs=500,batch_size=8,sam=True) #극소수 레이어만 학습가능하도록 함
tf.keras.backend.clear_session()


pretrained_model = tf.keras.applications.RegNetY120(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
pretrained_model.trainable = True
pred12 = train_inference(pretrained_model,open=-50,dim=128,dropout=0.3,optimizer=optimizer,epocs=500,batch_size=8,sam=False)
tf.keras.backend.clear_session()


pretrained_model = tf.keras.applications.RegNetY120(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
pretrained_model.trainable = True
pred13 = train_inference(pretrained_model,open=-100,dim=128,dropout=0.3,optimizer=optimizer,epocs=500,batch_size=8,sam=False)
tf.keras.backend.clear_session()

In [None]:
def mode(list):
    count = 0
    mode = 0
    for x in list:
        if list.count(x) > count:
            count = list.count(x)
            mode = x

    return mode



pred_ls = []
for i in range(len(pred1)):
  index = mode([
      pred2[i].argmax(),
      pred1[i].argmax(),
      pred3[i].argmax(),
      pred4[i].argmax(),
      pred5[i].argmax(),
      pred6[i].argmax(),
      pred7[i].argmax(),
      pred8[i].argmax(),
      pred9[i].argmax(),
      pred10[i].argmax(),
      pred11[i].argmax(),
      pred12[i].argmax(),
      pred13[i].argmax(),
      ])
  pred_ls.append(encoder.classes_[index])
y_pre2= encoder.transform(pred_ls)

# 텍스트 + 이미지 앙상블 결과 : y_pre2

1. 이렇게 텍스트만 사용한 y_pre
2. 텍스트 이미지 머신러닝을 사용한 y_pre2


In [None]:
# 1. y_pre를 소분류로 변형하여 csv로 만들어 저장합니다.

train = pd.read_csv('train.csv')
test = pd.read_csv('test.csv')
y_train = train_set['cat3']
y_train = encoder.fit_transform(y_train)


cat3 = encoder.inverse_transform(y_pre)
df_cat3 = pd.DataFrame({'cat3':cat3})
test = test['id']
sample_submission = pd.concat([test,df_cat3],axis=1)
sample_submission.to_csv('result1.csv',index=False, encoding='utf-8')

In [None]:
# 2. y_pre2를 소분류로 변형하여 csv로 만들어 저장합니다.

cat3 = encoder.inverse_transform(y_pre2)
df_cat3 = pd.DataFrame({'cat3':cat3})
sample_submission = pd.concat([test,df_cat3],axis=1)
sample_submission.to_csv('result2.csv',index=False, encoding='utf-8')