In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import os
import pickle
import time
from sklearn.model_selection import train_test_split
from google.colab import drive

In [None]:
# ============================================================================
# PATH CONFIGURATION
# ============================================================================
drive.mount('/content/drive')

CSV_PATH = '/content/drive/MyDrive/Kuliah/KecerdasanBuatan/FP/data/features/captions_preprocessed.csv'
IMAGE_FEATURES_DIR = '/content/drive/MyDrive/Kuliah/KecerdasanBuatan/FP/data/images'
TOKENIZER_PATH = '/content/drive/MyDrive/Kuliah/KecerdasanBuatan/FP/data/features/tokenizer.pkl'
CHECKPOINT_PATH = "/content/drive/MyDrive/Kuliah/KecerdasanBuatan/FP/checkpoints"

Mounted at /content/drive


In [None]:
# ============================================================================
# LOAD TOKENIZER & DATASET
# ============================================================================
# Memuat data tokenizer
with open(TOKENIZER_PATH, 'rb') as f:
    tokenizer_data = pickle.load(f)

word_to_idx = tokenizer_data['word_to_idx']
vocab_size = tokenizer_data['vocab_size']
max_length = tokenizer_data['max_length']

# Memuat dataset
df = pd.read_csv(CSV_PATH)
train_df, val_df = train_test_split(df, test_size=0.1, random_state=42)

# Hyperparameters
BATCH_SIZE = 64
BUFFER_SIZE = 1000
EMBEDDING_DIM = 256
UNITS = 512

In [None]:
# ============================================================================
# DATA GENERATOR
# ============================================================================
def load_data(img_name, caption):
    # Menyesuaikan penamaan file .npy
    path_npy = os.path.join(IMAGE_FEATURES_DIR, img_name.decode('utf-8') + '.npy')
    img_tensor = np.load(path_npy)

    # Tokenisasi caption
    cap_seq = [word_to_idx.get(word, word_to_idx['<UNK>']) for word in caption.decode('utf-8').split()]
    cap_seq = tf.keras.preprocessing.sequence.pad_sequences([cap_seq], maxlen=max_length, padding='post')[0]

    return img_tensor, cap_seq

def map_func(img_name, cap):
    img_tensor, cap_seq = tf.numpy_function(load_data, [img_name, cap], [tf.float32, tf.int32])
    img_tensor.set_shape((64, 2048)) # Sesuai output InceptionV3 yang di-reshape
    cap_seq.set_shape((max_length,))
    return img_tensor, cap_seq

dataset = tf.data.Dataset.from_tensor_slices((train_df['image'].values, train_df['caption'].values))
dataset = dataset.map(map_func, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# ============================================================================
# MODEL ARCHITECTURE
# ============================================================================
class CNN_Encoder(tf.keras.Model):
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        self.fc = tf.keras.layers.Dense(embedding_dim)
        # Menangani range fitur (0-17) agar stabil
        self.ln = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        # Mencegah Dead ReLU pada fitur yang banyak nol
        self.leaky_relu = tf.keras.layers.LeakyReLU(alpha=0.1)

    def call(self, x):
        x = self.fc(x)
        x = self.ln(x)
        return self.leaky_relu(x)

class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        score = self.V(tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis)))
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units, return_sequences=True, return_state=True)
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        context_vector, _ = self.attention(features, hidden)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state = self.gru(x)
        x = self.fc1(output)
        x = tf.reshape(x, (-1, x.shape[2]))
        return self.fc2(x), state

In [None]:
# ============================================================================
# TRAINING SETUP
# ============================================================================
encoder = CNN_Encoder(EMBEDDING_DIM)
decoder = RNN_Decoder(EMBEDDING_DIM, UNITS, vocab_size)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0)) # Masking padding index 0
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_sum(loss_) / (tf.reduce_sum(mask) + 1e-8)

ckpt = tf.train.Checkpoint(encoder=encoder, decoder=decoder, optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, CHECKPOINT_PATH, max_to_keep=5)

@tf.function
def train_step(img_tensor, target):
    loss = 0
    hidden = tf.zeros((target.shape[0], UNITS))
    dec_input = tf.expand_dims([word_to_idx['startseq']] * target.shape[0], 1)

    with tf.GradientTape() as tape:
        features = encoder(img_tensor)
        for i in range(1, target.shape[1]):
            predictions, hidden = decoder(dec_input, features, hidden)
            loss += loss_function(target[:, i], predictions)
            dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = (loss / int(target.shape[1]))
    trainable_variables = encoder.trainable_variables + decoder.trainable_variables

    # Hitung gradien
    gradients = tape.gradient(loss, trainable_variables)

    # Gradient Clipping untuk mencegah NaN
    gradients, _ = tf.clip_by_global_norm(gradients, 5.0)

    optimizer.apply_gradients(zip(gradients, trainable_variables))
    return total_loss



In [None]:
# ============================================================================
# TRAINING MODEL
# ============================================================================
EPOCHS = 20
print("Starting Training...")

for epoch in range(EPOCHS):
    start = time.time()
    total_loss = 0

    for (batch, (img_tensor, target)) in enumerate(dataset):
        batch_loss = train_step(img_tensor, target)
        total_loss += batch_loss

        if batch % 50 == 0:
            print(f'Epoch {epoch+1} Batch {batch} Loss {batch_loss.numpy():.4f}')

    ckpt_manager.save()
    print(f'Epoch {epoch+1} Final Loss {total_loss/len(dataset):.4f}')
    print(f'Time taken: {time.time()-start:.2f} sec\n')

Starting Training...
Epoch 1 Batch 0 Loss 3.7136
Epoch 1 Batch 50 Loss 2.7689
