<a href="https://colab.research.google.com/github/qkrwoghd04/ImageCaptionnng_Using_ViT/blob/master/code/Image%26Text_fusion_using_BERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Text classification model based on BERT

In [1]:
# Install BERT for tf2 module
!pip install bert-for-tf2
# Install sentencepiece library for text cleaning
!pip install sentencepiece

Collecting bert-for-tf2
  Downloading bert-for-tf2-0.14.9.tar.gz (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.2/41.2 kB[0m [31m729.3 kB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py-params>=0.9.6 (from bert-for-tf2)
  Downloading py-params-0.10.2.tar.gz (7.4 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting params-flow>=0.8.0 (from bert-for-tf2)
  Downloading params-flow-0.8.2.tar.gz (22 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: bert-for-tf2, params-flow, py-params
  Building wheel for bert-for-tf2 (setup.py) ... [?25l[?25hdone
  Created wheel for bert-for-tf2: filename=bert_for_tf2-0.14.9-py3-none-any.whl size=30510 sha256=8dcf1b1c7fa9994377b40176898a6927a8e9b98a2325707e6f14e05e1d27c802
  Stored in directory: /root/.cache/pip/wheels/d8/da/50/126d7b8416d9a0e6bf876935c2219a71e72a6529c25e150c56
  Building wheel for params-flow (s

In [2]:
try:
    %tensorflow_version 2.x
except Exception:
    pass

import tensorflow as tf
import tensorflow_hub as hub
from keras import layers
from keras import callbacks
from keras import optimizers
from keras import utils
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import shuffle
import bert
import os
import numpy as np
import re
import pandas as pd
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go

Colab only includes TensorFlow 2.x; %tensorflow_version has no effect.


In [3]:
# 파일 경로 정의
train_csv_path = "/content/drive/MyDrive/image_dataset/processed/train_captions2.csv"
test_csv_path = "/content/drive/MyDrive/image_dataset/processed/test_captions2.csv"

# Import the training and test .csv files
colnames = ['image_path', 'caption', 'label']
train = pd.read_csv(train_csv_path, names=colnames, header=0)
test = pd.read_csv(test_csv_path, names=colnames, header=0)

# Sort values by 'image_path'
test = test.sort_values('image_path')
train = train.sort_values('image_path')

In [4]:
train.head(3)

Unnamed: 0,image_path,caption,label
0,Fall_001.jpg,a man lying on the ground in a hallway.,Fall
1,Fall_002.jpg,a person lying on the ground in front of a wi...,Fall
2,Fall_003.jpg,a man in a purple shirt lying on the floor.,Fall


In [6]:
# Check the shapes
print("train samples:",train.shape[0])
print("test samples:",test.shape[0])

train samples: 400
test samples: 40


In [7]:
# Cleaning text function

def preprocess_text(sen):
    # Removing html tags
    sentence = remove_tags(sen)

    # Remove punctuations and numbers
    sentence = re.sub('[^a-zA-Z]', ' ', sentence)

    # Single character removal
    sentence = re.sub(r"\s+[a-zA-Z]\s+", ' ', sentence)

    # Removing multiple spaces
    sentence = re.sub(r'\s+', ' ', sentence)

    sentence = sentence.lower()

    return sentence

def remove_tags(text):
    return TAG_RE.sub('', text)

TAG_RE = re.compile(r'<[^>]+>')
vec_preprocess_text = np.vectorize(preprocess_text)

In [8]:
# Check number of classes
nClasses = train.label.nunique()

In [9]:
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

# 레이블 인코더를 초기화합니다.
encoder = LabelEncoder()

# 텍스트 전처리 함수 'vec_preprocess_text'를 사용하여 캡션을 전처리합니다.
processed_train = vec_preprocess_text(train.caption.values)
processed_test = vec_preprocess_text(test.caption.values)

# 'train.label.values'를 사용하여 트레인 세트의 레이블을 인코딩하고,
# 'test.label.values'를 사용하여 테스트 세트의 레이블을 인코딩합니다.
encoded_labels_train = encoder.fit_transform(train.label.values)
encoded_labels_test = encoder.transform(test.label.values) # fit_transform이 아닌 transform을 사용

# 레이블의 총 클래스 수를 얻습니다.
nClasses = len(encoder.classes_)

# 인코딩된 레이블을 원-핫 인코딩 형태로 변환합니다.
labels_train = to_categorical(encoded_labels_train, num_classes=nClasses)
labels_test = to_categorical(encoded_labels_test, num_classes=nClasses)

print("Processed text sample:", processed_train[0])
print("Shape of train labels:", labels_train.shape)


Processed text sample:  man lying on the ground in hallway 
Shape of train labels: (400, 2)


In [10]:
# Import the BERT BASE model from Tensorflow HUB (layer, vocab_file and tokenizer)
BertTokenizer = bert.bert_tokenization.FullTokenizer
bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1",
                            trainable=False)
vocabulary_file = bert_layer.resolved_object.vocab_file.asset_path.numpy()
to_lower_case = bert_layer.resolved_object.do_lower_case.numpy()
tokenizer = BertTokenizer(vocabulary_file, to_lower_case)

In [11]:
# Preprocessing of texts according to BERT

def get_masks(text, max_length):
    """Mask for padding"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    length = len(tokens)
    if length > max_length:
        tokens = tokens[:max_length]

    return np.asarray([1]*len(tokens) + [0] * (max_length - len(tokens)))
vec_get_masks = np.vectorize(get_masks, signature = '(),()->(n)')

def get_segments(text, max_length):
    """Segments: 0 for the first sequence, 1 for the second"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    length = len(tokens)
    if length > max_length:
        tokens = tokens[:max_length]

    segments = []
    current_segment_id = 0
    with_tags = ["[CLS]"] + tokens + ["[SEP]"]
    token_ids = tokenizer.convert_tokens_to_ids(tokens)

    for token in tokens:
        segments.append(current_segment_id)
        if token == "[SEP]":
            current_segment_id = 1
    return np.asarray(segments + [0] * (max_length - len(tokens)))
vec_get_segments = np.vectorize(get_segments, signature = '(),()->(n)')

def get_ids(text, tokenizer, max_length):
    """Token ids from Tokenizer vocab"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    length = len(tokens)
    if length > max_length:
        tokens = tokens[:max_length]

    token_ids = tokenizer.convert_tokens_to_ids(tokens)
    input_ids = np.asarray(token_ids + [0] * (max_length-length))
    return input_ids
vec_get_ids = np.vectorize(get_ids, signature = '(),(),()->(n)')


def prepare(text_array, tokenizer, max_length = 128):

    ids = vec_get_ids(text_array,
                      tokenizer,
                      max_length).squeeze()
    masks = vec_get_masks(text_array,
                      max_length).squeeze()
    segments = vec_get_segments(text_array,
                      max_length).squeeze()

    return ids, segments, masks

In [12]:
max_length = 40 # that must be set according to your dataset
ids_train, segments_train, masks_train = prepare(processed_train,
                                                 tokenizer,
                                                 max_length)
ids_test, segments_test, masks_test = prepare(processed_test,
                                               tokenizer,
                                               max_length)

In [13]:
input_word_ids = layers.Input(shape=(max_length,), dtype=tf.int32,
                                       name="input_word_ids")
input_mask = layers.Input(shape=(max_length,), dtype=tf.int32,
                                   name="input_masks")
segment_ids = layers.Input(shape=(max_length,), dtype=tf.int32,
                                    name="segment_ids")
den_out, seq_out = bert_layer([input_word_ids, input_mask, segment_ids])

In [55]:
# Classification Model
input_word_ids = layers.Input(shape=(max_length,), dtype=tf.int32,
                                       name="input_word_ids")
input_mask = layers.Input(shape=(max_length,), dtype=tf.int32,
                                   name="input_mask")
segment_ids = layers.Input(shape=(max_length,), dtype=tf.int32,
                                    name="segment_ids")
den_out, seq_out = bert_layer([input_word_ids, input_mask, segment_ids])

X = layers.LSTM(128)(seq_out)
X = layers.Dropout(0.5)(X)
X = layers.Dense(256, activation="relu")(X)
X = layers.Dropout(0.5)(X)
output = layers.Dense(nClasses, activation = 'softmax')(X)

BERT_MODEL = tf.keras.models.Model(inputs=[input_word_ids, input_mask, segment_ids], outputs=[output])

In [56]:
# Adam optimizer
opt = optimizers.Adam(learning_rate=.001)

# Compile model
BERT_MODEL.compile(loss = 'categorical_crossentropy',
              optimizer = opt,
              metrics = ['accuracy'])


In [57]:
BERT_MODEL.summary()

Model: "model_5"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_word_ids (InputLayer  [(None, 40)]                 0         []                            
 )                                                                                                
                                                                                                  
 input_mask (InputLayer)     [(None, 40)]                 0         []                            
                                                                                                  
 segment_ids (InputLayer)    [(None, 40)]                 0         []                            
                                                                                                  
 keras_layer (KerasLayer)    [(None, 768),                1094822   ['input_word_ids[0][0]',

In [58]:
es = callbacks.EarlyStopping(monitor='val_accuracy', patience=2, restore_best_weights=True)

In [59]:
# Google Drive 마운트 경로
drive_path = "/content/drive/MyDrive/image_dataset/processed"

# 새로운 로그 저장 디렉토리 경로
log_dir = os.path.join(drive_path, "BERT_LSTM")

# 디렉토리가 존재하지 않는 경우 생성
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

# 체크포인트 및 로그 파일 경로 설정
checkpoint_path = os.path.join(log_dir, "weights-improvement-{epoch:02d}-{val_accuracy:.2f}.hdf5")
csv_logger_path = os.path.join(log_dir, 'BERT_LSTM.log')

# Setup callbacks, logs and early stopping condition
cp = tf.keras.callbacks.ModelCheckpoint(checkpoint_path, monitor='val_accuracy', save_best_only=True, verbose=1, mode='max')
csv_logger = tf.keras.callbacks.CSVLogger(csv_logger_path)

In [60]:
# Reduce learning rate if no improvement is observed
reduce_lr = callbacks.ReduceLROnPlateau(
    monitor='val_accuracy', factor=0.1, patience=1, min_lr=0.00001)

In [62]:
history = BERT_MODEL.fit([ids_train, masks_train, segments_train],
          labels_train,
          epochs = 100,
          batch_size = 512,
          validation_split = 0.2,
          callbacks = [csv_logger, reduce_lr , es])


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100


In [63]:
# Load the log file
df = pd.read_csv(r'/content/drive/MyDrive/image_dataset/processed/BERT_LSTM/BERT_LSTM.log')

In [64]:
# Training and Test accuracy
fig = go.Figure()
fig.add_trace(go.Scatter(x=df['epoch'], y=df['accuracy'],
                    mode='lines',
                    name='training'))

fig.add_trace(go.Scatter(x=df['epoch'], y=df['val_accuracy'],
                    mode='lines',
                    name='test'))

fig.update_layout(
    font_size = 20,
    paper_bgcolor='rgba(0,0,0,0)',
    plot_bgcolor='rgba(0,0,0,0)',
)

fig.update_xaxes(showgrid=True, gridwidth=0.5, gridcolor='Gray')
fig.update_yaxes(showgrid=True, gridwidth=0.5, gridcolor='Gray')

In [65]:
# Training and Test loss
fig = go.Figure()
fig.add_trace(go.Scatter(x=df['epoch'], y=df['loss'],
                    mode='lines',
                    name='training'))

fig.add_trace(go.Scatter(x=df['epoch'], y=df['val_loss'],
                    mode='lines',
                    name='test'))

fig.update_layout(
    font_size = 20,
    paper_bgcolor='rgba(0,0,0,0)',
    plot_bgcolor='rgba(0,0,0,0)',
)

fig.update_xaxes(showgrid=True, gridwidth=0.5, gridcolor='Gray')
fig.update_yaxes(showgrid=True, gridwidth=0.5, gridcolor='Gray')

In [66]:
BERT_MODEL.evaluate([ids_test, masks_test, segments_test],
               labels_test,
               batch_size = 512)



[0.5382574796676636, 0.6499999761581421]