In [1]:
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    COLAB = True
    print("Note: using Google CoLab")
    %tensorflow_version 2.x
except:
    print("Note: not using Google CoLab")
    COLAB = False

Mounted at /content/drive
Note: using Google CoLab


In [2]:

import os
import string
import glob
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
import tensorflow.keras.applications.mobilenet  

from tqdm import tqdm
import tensorflow.keras.preprocessing.image
import pickle
from time import time
import numpy as np
import pandas as pd
from PIL import Image
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (LSTM, Embedding, 
    TimeDistributed, Dense, RepeatVector, 
    Activation, Flatten, Reshape, concatenate,  
    Dropout, BatchNormalization)
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras import Input, layers
from tensorflow.keras import optimizers

from tensorflow.keras.models import Model

from tensorflow.keras.layers import add
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt

START = "startseq"
STOP = "endseq"
EPOCHS = 10

print(tf.__version__)

2.6.0


In [3]:
# 경과 시간 형식화

def hms_string(sec_elapsed):
    h = int(sec_elapsed / (60 * 60))
    m = int((sec_elapsed % (60 * 60)) / 60)
    s = sec_elapsed % 60
    return f"{h}:{m:>02}:{s:>05.2f}"

In [4]:
# 데이터 path 설정

if COLAB:
    root_captioning = "/content/drive/MyDrive/싸피/인공지능음성/image_caption"
else:
    root_captioning = "./data/image_caption"

In [5]:
# Flickr 파일들 목록 불러오기

data = pd.read_csv(root_captioning + '/' + 'Flicker30k_results.csv', sep='|')
data


Unnamed: 0,image_name,comment_number,comment
0,1000092795.jpg,0,Two young guys with shaggy hair look at their...
1,1000092795.jpg,1,"Two young , White males are outside near many..."
2,1000092795.jpg,2,Two men in green shirts are standing in a yard .
3,1000092795.jpg,3,A man in a blue shirt standing in a garden .
4,1000092795.jpg,4,Two friends enjoy time spent together .
...,...,...,...
158910,998845445.jpg,0,A man in shorts and a Hawaiian shirt leans ov...
158911,998845445.jpg,1,"A young man hanging over the side of a boat ,..."
158912,998845445.jpg,2,A man is leaning off of the side of a blue an...
158913,998845445.jpg,3,"A man riding a small boat in a harbor , with ..."


In [6]:

null_punct = str.maketrans('', '', string.punctuation)
lookup = dict()
max_length = 0

for i in range(len(data)):
    try:
        id = data.loc[i]['image_name'].split('.')[0]
        desc = data.loc[i][' comment'].split()

        desc = [word.lower() for word in desc]
        desc = [w.translate(null_punct) for w in desc]
        desc = [word for word in desc if len(word)>1]
        desc = [word for word in desc if word.isalpha()]
        max_length = max(max_length,len(desc))

        if id not in lookup:
            lookup[id] = list()
            lookup[id].append(' '.join(desc))
    except:
        pass
    
    
lex = set()
for key in lookup:
  [lex.update(d.split()) for d in lookup[key]]

In [7]:
print(len(lookup)) # How many unique words
print(len(lex)) # The dictionary
# print(lex)
print(max_length) # Maximum length of a caption (in words)

print(root_captioning)

31783
12989
72
/content/drive/MyDrive/싸피/인공지능음성/image_caption


In [8]:
# image dataset
img_path = os.path.join(root_captioning,"data",'img30000path.pkl')
if not os.path.exists(img_path):
    img = glob.glob(os.path.join(root_captioning,'Flicker30k_dataset', '*.jpg'))
else:
    with open(img_path, "rb") as fp:
        img = pickle.load(fp)

In [9]:
# 제대로 가져왔나 확인
print(len(img))
print(img[0])
print(img[0][79:])

31783
/content/drive/MyDrive/싸피/인공지능음성/image_caption/Flicker30k_dataset/1000092795.jpg
1000092795.jpg


In [10]:
# 30k의 경우 8k와는 다르게 테스트, 트레인을 구분하진 않음
# 그냥 내가 적당히 짜르면 될듯
imgpath = []
for i in img:
    imgpath.append(i[79:])

# 이미지 비율 6:1정도로

train_img = img[:round(len(img) * 6 / 7)]
test_img = img[round(len(img) * 6 / 7):]
train_img_path = imgpath[:round(len(imgpath) * 6 / 7)]
test_img_path = imgpath[round(len(imgpath) * 6 / 7):]

print(len(train_img))
print(train_img[0])
print(len(test_img))
print(test_img[0])
print(len(train_img_path))
print(train_img_path[0])
print(len(test_img_path))
print(test_img_path[0])

27243
/content/drive/MyDrive/싸피/인공지능음성/image_caption/Flicker30k_dataset/1000092795.jpg
4540
/content/drive/MyDrive/싸피/인공지능음성/image_caption/Flicker30k_dataset/5572351880.jpg
27243
1000092795.jpg
4540
5572351880.jpg


In [11]:


train_descriptions = {k:v for k,v in lookup.items() if f'{k}.jpg' in train_img_path}
print(train_descriptions)
for n,v in train_descriptions.items(): 
  for d in range(len(v)):
    v[d] = f'{START} {v[d]} {STOP}'

Output hidden; open in https://colab.research.google.com to view.

In [12]:
# 사진과 문장들dict

print(len(train_descriptions))
print(train_descriptions['21514026'])

27243
['startseq lady in long black dress is standing in room with tables and chairs holding her arms behind her back endseq']


In [13]:
# 사용할 사전학습모델 정하기 - MobileNet

# keras 내장 모델은 https://keras.io/api/applications/ 에서 확인 - 사용법들도 써있다.
# 각 연도별 이미지 분류기의 정확도 순위는 https://paperswithcode.com/sota/image-classification-on-imagenet 에서 확인.

encode_model = MobileNetV2(weights='imagenet')
encode_model = Model(encode_model.input, encode_model.layers[-2].output)
WIDTH = 224
HEIGHT = 224
OUTPUT_DIM = 1280
preprocess_input = tensorflow.keras.applications.mobilenet.preprocess_input

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5


In [14]:
# 불러온 사전학습모델 확인
encode_model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [15]:
# 이미지를 학습에 쓸 수 있게 인코딩 해주는 함수

def encodeImage(img):
  # 리사이즈 - 각 사전학습 모델들은 최적화된 이미지 사이즈가 있음
  img = img.resize((WIDTH, HEIGHT), Image.ANTIALIAS)
  # 이미지를 넘파이어레이로 바꾸기
  x = tensorflow.keras.preprocessing.image.img_to_array(img)
  # 3채널 이미지 흑백으로? 색기반 아니니 상관없을듯
  x = np.expand_dims(x, axis=0)
  # 전처리 해주기 - 각 사전학습 모델별로 있음
  x = preprocess_input(x)
  x = encode_model.predict(x) # 이미지의 인코딩한 벡터 값 가져오기
  x = np.reshape(x, OUTPUT_DIM )
  return x

In [16]:
# 위의 방식으로 처리한 이미지는 pickle을 사용해 보관함
# 이 이미지들 다시 저장하고 불러오는거 되게 시간이랑 자원 많이 먹을 수 있기 때문

train_path = os.path.join(root_captioning,"data",f'mobilenetv2_30k_train{OUTPUT_DIM}.pkl')
if not os.path.exists(train_path):
  start = time()
  encoding_train = {}
  for id in tqdm(train_img_path):
    image_path = os.path.join(root_captioning,'Flicker30k_dataset', id)
    img = tensorflow.keras.preprocessing.image.load_img(image_path, target_size=(HEIGHT, WIDTH))
    encoding_train[id] = encodeImage(img)
  with open(train_path, "wb") as fp:
    pickle.dump(encoding_train, fp)
  print(f"\nGenerating training set took: {hms_string(time()-start)}")
else:
  with open(train_path, "rb") as fp:
    encoding_train = pickle.load(fp)

In [17]:
# 테스트 이미지들도 마찬가지로 해줌

test_path = os.path.join(root_captioning,"data",f'mobilenetv2_30k_test{OUTPUT_DIM}.pkl')
if not os.path.exists(test_path):
  start = time()
  encoding_test = {}
  for id in tqdm(test_img_path):
    image_path = os.path.join(root_captioning,'Flicker30k_dataset', id)
    img = tensorflow.keras.preprocessing.image.load_img(image_path, target_size=(HEIGHT, WIDTH))
    encoding_test[id] = encodeImage(img)
  with open(test_path, "wb") as fp:
    pickle.dump(encoding_test, fp)
  print(f"\nGenerating testing set took: {hms_string(time()-start)}")
else:
  with open(test_path, "rb") as fp:
    encoding_test = pickle.load(fp)

In [18]:
# 키값 빼고 문장들만 따로 

all_train_captions = []
for key, val in train_descriptions.items():
    for cap in val:
        all_train_captions.append(cap)
print(len(all_train_captions))
print(all_train_captions[0])
# print(encoding_train)

27243
startseq two young guys with shaggy hair look at their hands while hanging out in the yard endseq


In [19]:
# 자주 안쓰는 단어들은 삭제
word_count_threshold = 10
word_counts = {}
nsents = 0
for sent in all_train_captions:
    nsents += 1
    for w in sent.split(' '):
        word_counts[w] = word_counts.get(w, 0) + 1

vocab = [w for w in word_counts if word_counts[w] >= word_count_threshold]
print('preprocessed words %d ==> %d' % (len(word_counts), len(vocab)))
print(vocab[1])

preprocessed words 11861 ==> 2499
two


In [20]:


idxtoword = {}
wordtoidx = {}

ix = 1
for w in vocab:
    wordtoidx[w] = ix
    idxtoword[ix] = w
    ix += 1
    
vocab_size = len(idxtoword) + 1 
print(vocab_size)


2500


In [21]:
import csv
# idx word 저장
print(idxtoword)
print(wordtoidx)
with open('idxtoword30.csv', 'w') as f:  
    writer = csv.writer(f)
    for k, v in idxtoword.items():
       writer.writerow([k, v])
with open('wordtoidx30.csv', 'w') as f:  
    writer = csv.writer(f)
    for k, v in wordtoidx.items():
       writer.writerow([k, v])

{1: 'startseq', 2: 'two', 3: 'young', 4: 'guys', 5: 'with', 6: 'shaggy', 7: 'hair', 8: 'look', 9: 'at', 10: 'their', 11: 'hands', 12: 'while', 13: 'hanging', 14: 'out', 15: 'in', 16: 'the', 17: 'yard', 18: 'endseq', 19: 'several', 20: 'men', 21: 'hard', 22: 'hats', 23: 'are', 24: 'operating', 25: 'giant', 26: 'system', 27: 'child', 28: 'pink', 29: 'dress', 30: 'is', 31: 'climbing', 32: 'up', 33: 'set', 34: 'of', 35: 'stairs', 36: 'an', 37: 'way', 38: 'someone', 39: 'blue', 40: 'shirt', 41: 'and', 42: 'hat', 43: 'standing', 44: 'on', 45: 'leaning', 46: 'against', 47: 'window', 48: 'one', 49: 'gray', 50: 'black', 51: 'near', 52: 'stove', 53: 'people', 54: 'photo', 55: 'playing', 56: 'guitar', 57: 'other', 58: 'him', 59: 'man', 60: 'sits', 61: 'chair', 62: 'holding', 63: 'large', 64: 'stuffed', 65: 'animal', 66: 'girl', 67: 'rollerskates', 68: 'talking', 69: 'her', 70: 'cellphone', 71: 'parking', 72: 'lot', 73: 'asian', 74: 'wearing', 75: 'suit', 76: 'stands', 77: 'darkhaired', 78: 'woman

In [22]:
# 캡셔닝 쓸 최대길이 늘려준다 startseq, endseq땜시

max_length +=2
print(max_length)

74


In [23]:
# 미리 학습 데이터를 생성해놓고 갖다박는것은 비효울적일 수 있음
# 그래서 keras 생성기를 사용하면 쓸떄 쓸만큼씩 생성해서 갖다박음

# 데이터 제네레이터 함수

def data_generator(descriptions, photos, wordtoidx, \
                   max_length, num_photos_per_batch):
  # x1 - Training data for photos
  # x2 - The caption that goes with each photo
  # y - The predicted rest of the caption
  x1, x2, y = [], [], []
  n=0
  while True:
    for key, desc_list in descriptions.items():
      n+=1
      photo = photos[key+'.jpg']
      # Each photo has 5 descriptions
      for desc in desc_list:
        # Convert each word into a list of sequences.
        seq = [wordtoidx[word] for word in desc.split(' ') \
               if word in wordtoidx]
        # Generate a training case for every possible sequence and outcome
        for i in range(1, len(seq)):
          in_seq, out_seq = seq[:i], seq[i]
          in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
          out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
          x1.append(photo)
          x2.append(in_seq)
          y.append(out_seq)
      if n==num_photos_per_batch:
        yield ([np.array(x1), np.array(x2)], np.array(y))
        x1, x2, y = [], [], []
        n=0


In [24]:
# LSTM을 위한 데이터 가져와

glove_dir = os.path.join(root_captioning,'glove.6B')
embeddings_index = {} 
f = open(os.path.join(glove_dir, 'glove.6B.200d.txt'), encoding="utf-8")

for line in tqdm(f):
    try:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs
    except:
        pass

f.close()
print(f'Found {len(embeddings_index)} word vectors.')

400000it [00:28, 13867.63it/s]

Found 400000 word vectors.





In [25]:
# 자연어 처리를 위해서 embedding으로 벡터와 단어를 연결시켜줘야함

embedding_dim = 200

# Get 200-dim dense vector for each of the 10000 words in out vocabulary
embedding_matrix = np.zeros((vocab_size, embedding_dim))

for word, i in wordtoidx.items():
    #if i < max_words:
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        # Words not found in the embedding index will be all zeros
        embedding_matrix[i] = embedding_vector

In [26]:
# 들어온 단어, 임베딩벡터 dim
embedding_matrix.shape

(2500, 200)

In [27]:
# 캡셔닝 모델 만들기
inputs1 = Input(shape=(OUTPUT_DIM,))
fe1 = Dropout(0.4)(inputs1)
fe2 = Dense(256, activation='relu')(fe1)
inputs2 = Input(shape=(max_length,))
se1 = Embedding(vocab_size, embedding_dim, mask_zero=True)(inputs2)
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)
decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)
decoder3 = Dropout(0.3)(decoder2)
decoder4 = Dense(128, activation='relu')(decoder3)
outputs = Dense(vocab_size, activation='softmax')(decoder4)
caption_model = Model(inputs=[inputs1, inputs2], outputs=outputs)

In [28]:
# 모델 확인

caption_model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            [(None, 74)]         0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 1280)]       0                                            
__________________________________________________________________________________________________
embedding (Embedding)           (None, 74, 200)      500000      input_3[0][0]                    
__________________________________________________________________________________________________
dropout (Dropout)               (None, 1280)         0           input_2[0][0]                    
____________________________________________________________________________________________

In [29]:
# W
caption_model.layers[2].set_weights([embedding_matrix])
# 이게 학습 시키는거 막는거였나? 사전 학습 모델 가져올때 조정에 따라 바꾸는거임
caption_model.layers[2].trainable = False
# 모델 컴파일
caption_model.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=1e-3))

In [30]:
# 뭐지?
number_pics_per_bath = 4
steps = len(train_descriptions)//number_pics_per_bath

In [None]:
# 모델 저장 경로
model_path = os.path.join(root_captioning,"data",f'caption-model-mobilenetV2_30k.hdf5')

with tf.device('/GPU:0'):
    # 저장된 모델있으면 그거 쓰고 아니면 학습하기
    if not os.path.exists(model_path):
        # for i in tqdm(range(EPOCHS)):
        generator = data_generator(train_descriptions, encoding_train, 
                        wordtoidx, max_length, number_pics_per_bath)
        caption_model.fit_generator(generator, epochs=EPOCHS,
                        steps_per_epoch=steps, verbose=1)

        # caption_model.optimizer.lr = 1e-4
        # number_pics_per_bath = 6
        # steps = len(train_descriptions)//number_pics_per_bath
        # 두번시키는 이유는 뭐지?

        #   for i in range(EPOCHS):
        #       generator = data_generator(train_descriptions, encoding_train, 
        #                     wordtoidx, max_length, number_pics_per_bath)
        #       caption_model.fit_generator(generator, epochs=1, 
        #                             steps_per_epoch=steps, verbose=1)  
        caption_model.save_weights(model_path)
        print(f"\Training took: {hms_string(time()-start)}")
    else:
        caption_model.load_weights(model_path)



Epoch 1/10
Epoch 2/10
Epoch 3/10

In [None]:
# 캡셔닝 만드는 함수

def generateCaption(photo):
    in_text = START
    for i in range(max_length):
        sequence = [wordtoidx[w] for w in in_text.split() if w in wordtoidx]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = caption_model.predict([photo,sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = idxtoword[yhat]
        in_text += ' ' + word
        if word == STOP:
            break
    final = in_text.split()
    final = final[1:-1]
    final = ' '.join(final)
    return final

In [None]:
from PIL import Image, ImageFile
from matplotlib.pyplot import imshow
import requests
from io import BytesIO
import numpy as np

%matplotlib inline

# 대충 이미지 주소
urls = ['https://image.freepik.com/free-photo/adult-woman-playing-with-her-dog-in-the-park_23-2148345896.jpg', 
        'https://previews.123rf.com/images/yatigra/yatigra1207/yatigra120700020/14504175-%EA%B7%B8%EB%85%80%EC%9D%98-%EA%B0%95%EC%95%84%EC%A7%80-%EA%B3%B5%EC%9B%90%EC%97%90%EC%84%9C-%EC%9E%AC%EC%83%9D%ED%95%98%EB%8A%94-%EB%A7%A4%EB%A0%A5%EC%A0%81%EC%9D%B8-%EC%97%AC%EC%9E%90.jpg',
        'https://previews.123rf.com/images/himchenko/himchenko1507/himchenko150700117/42936013-%EA%B0%9C%EB%8A%94-%EA%B3%B5%EC%9B%90%EC%9D%98-%EC%A3%BC%EC%9D%B8%EA%B3%BC-%ED%95%A8%EA%BB%98-%EB%85%B8%EB%8A%94-%EC%82%AC%EB%9E%8C%EC%9E%85%EB%8B%88%EB%8B%A4-.jpg',
        'https://www.greentrust.or.kr/wp-content/uploads/2020/07/%EC%B2%A8%EB%B6%801.-SGT%ED%99%88%ED%8E%98%EC%9D%B4%EC%A7%80-%EB%B3%80%EA%B2%BD%EC%82%AC%EC%A7%84.jpg',
        
        ]

for url in urls:
  response = requests.get(url)
#   print(response.content)
  img = Image.open(BytesIO(response.content))
  img.load()

  plt.imshow(img)
  plt.show()
  
  response = requests.get(url)
  img = encodeImage(img).reshape((1,OUTPUT_DIM))
  print(img.shape)
  print("Caption:",generateCaption(img))
  print("_____________________________________")

In [None]:
# 내가 그린 샘플로 돌려보기  

pics = ['parkcouple.jpg', 'park.jpg', 'park.png', 'turtle.jpg']
for pic in pics:
    print(os.path.join(root_captioning,'data', pic))

    img = Image.open(root_captioning+'/data/'+pic)
    img.load()

    plt.imshow(img)
    plt.show()
    img = encodeImage(img).reshape((1,OUTPUT_DIM))
    print("Caption:",generateCaption(img))
    print("_____________________________________")

In [None]:
model_path = os.path.join(root_captioning,"data",f'image-captioning-model-mobilenetV2_30k.h5')

caption_model.save(model_path)


In [None]:
new_model = tf.keras.models.load_model(model_path)

new_model.summary()

In [None]:
def newEncodeImage(img):
  # 리사이즈 - 각 사전학습 모델들은 최적화된 이미지 사이즈가 있음
  img = img.resize((WIDTH, HEIGHT), Image.ANTIALIAS)
  # 이미지를 넘파이어레이로 바꾸기
  x = tensorflow.keras.preprocessing.image.img_to_array(img)
  x = np.expand_dims(x, axis=0)
  # 전처리 해주기 - 각 사전학습 모델별로 있음
  x = preprocess_input(x)
  # 더 간소화 시키기?
  x = encode_model.predict(x) # 이미지의 인코딩한 벡터 값 가져오기?
  print(x.shape)
  # 이미지 자체는 학습을 하는게 아니라 사전학습 된 것을 가져와서 LSTM으로 넘기는 역할임. 그에 맞는 형태로 리턴
  x = np.reshape(x, OUTPUT_DIM )
  return x


def newGenerateCaption(photo):
    in_text = START
    for i in range(max_length):
        sequence = [wordtoidx[w] for w in in_text.split() if w in wordtoidx]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = new_model.predict([photo,sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = idxtoword[yhat]
        in_text += ' ' + word
        if word == STOP:
            break
    final = in_text.split()
    final = final[1:-1]
    final = ' '.join(final)
    return final



pics = ['parkcouple.jpg', 'park.jpg', 'park.png', 'turtle.jpg']
for pic in pics:
    print(os.path.join(root_captioning,'data', pic))

    img = Image.open(root_captioning+'/data/'+pic)
    img.load()

    plt.imshow(img)
    plt.show()

    img = newEncodeImage(img).reshape((1,OUTPUT_DIM))
    print(img.shape)
    print("Caption:",newGenerateCaption(img))
    print("_____________________________________")