In [14]:
# ==== CONFIG ====
from pathlib import Path
import random, os, numpy as np, tensorflow as tf

class C:
    # --- paths
    ROOT   = Path.home() / "ML_projs" / "image_captioning"
    DATA   = ROOT / "data"
    RAW    = DATA / "raw"
    IMAGES = DATA / "images" / "flickr8k"
    ANN    = DATA / "annotations"
    PROC   = DATA / "processed"
    MODELS = ROOT / "models"
    APP    = ROOT / "app"

    # --- encoder/decoder
    IMG_SIZE   = 224
    FEAT_DIM   = 1280        # 1280 for EfficientNetB0, 2048 if you used ResNet50
    EMB_DIM    = 256
    LSTM_UNITS = 512

    # --- tokenizer
    MIN_FREQ = 4             # you can tune this
    MAX_LEN  = 20            # from step 2.3 (override if recomputed)

    # --- training
    EPOCHS         = 20
    BATCH_SIZE     = 64
    DROPOUT        = 0.4
    LR             = 1e-3
    SEED           = 42

# make sure folders exist
for p in [C.DATA, C.RAW, C.IMAGES, C.ANN, C.PROC, C.MODELS, C.APP]:
    p.mkdir(parents=True, exist_ok=True)

# Reproducibility
random.seed(C.SEED)
np.random.seed(C.SEED)
tf.random.set_seed(C.SEED)

print("Project root:", C.ROOT)

Project root: C:\Users\mkalam\ML_projs\image_captioning


In [15]:
# 1) Install kaggle in THIS environment
import sys, subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "kaggle"])

# 2) Ensure Kaggle config dir is your home .kaggle
from pathlib import Path
import os
kaggle_dir = Path.home() / ".kaggle"
kaggle_dir.mkdir(exist_ok=True)
os.environ["KAGGLE_CONFIG_DIR"] = str(kaggle_dir)

print("Kaggle CLI installed. Config dir:", kaggle_dir)


Kaggle CLI installed. Config dir: C:\Users\mkalam\.kaggle


In [16]:
import kaggle, sys
print("kaggle package version:", getattr(kaggle, "__version__", "unknown"))
print("Python exe:", sys.executable)

kaggle package version: unknown
Python exe: C:\Users\mkalam\anaconda3\envs\cap_env\python.exe


In [17]:
from pathlib import Path
from kaggle.api.kaggle_api_extended import KaggleApi

RAW  = C.RAW
RAW.mkdir(parents=True, exist_ok=True)

api = KaggleApi()
api.authenticate()  # reads C:\Users\mkalam\.kaggle\kaggle.json

# Download the dataset zip to RAW (don't unzip yet)
api.dataset_download_files(
    dataset='adityajn105/flickr8k',
    path=str(RAW),
    unzip=False,
    quiet=False
)

list(RAW.iterdir())


Dataset URL: https://www.kaggle.com/datasets/adityajn105/flickr8k
flickr8k.zip: Skipping, found more recently modified local copy (use --force to force download)


[WindowsPath('C:/Users/mkalam/ML_projs/image_captioning/data/raw/flickr8k.zip'),
 WindowsPath('C:/Users/mkalam/ML_projs/image_captioning/data/raw/Images')]

In [20]:
import zipfile, shutil

IMAGES = C.IMAGES
ANN    = C.ANN
IMAGES.mkdir(parents=True, exist_ok=True)
ANN.mkdir(parents=True, exist_ok=True)

# Unzip every zip we just downloaded to RAW
for z in RAW.glob("*.zip"):
    with zipfile.ZipFile(z, "r") as f:
        f.extractall(RAW)

# Move images (usually under a folder named 'Images')
src_images_dir = RAW / "Images"
if src_images_dir.exists():
    for p in src_images_dir.iterdir():
        if p.is_file():
            p.replace(IMAGES / p.name)

# Move captions file (dataset provides one of these)
for name in ("captions.txt", "Flickr8k.token.txt"):
    f = RAW / name
    if f.exists():
        f.replace(ANN / f.name)

print("Done. Images in:", IMAGES)
print("Captions in:", ANN)


Done. Images in: C:\Users\mkalam\ML_projs\image_captioning\data\images\flickr8k
Captions in: C:\Users\mkalam\ML_projs\image_captioning\data\annotations


In [22]:
n_imgs = len(list(C.IMAGES.glob("*.jpg"))) + len(list(C.IMAGES.glob("*.jpeg"))) + len(list(C.IMAGES.glob("*.png")))
print("Image count:", n_imgs)
print("captions.txt:", (C.ANN/"captions.txt").exists())
print("Flickr8k.token.txt:", (C.ANN/"Flickr8k.token.txt").exists())


Image count: 8091
captions.txt: True
Flickr8k.token.txt: False


In [23]:
from pathlib import Path
import re, json
from collections import defaultdict

IMAGES = C.IMAGES
ANN = C.ANN
PROC = C.PROC
PROC.mkdir(parents=True, exist_ok=True)

cap_file = ANN / "captions.txt"   # you have this one
assert cap_file.exists(), "captions.txt not found"

def basic_clean(s: str) -> str:
    s = s.lower().strip()
    s = re.sub(r"[^a-z0-9' ]+", " ", s)   # keep letters, digits, apostrophes, spaces
    s = re.sub(r"\s+", " ", s).strip()
    return s

# Build {image_name: [captions...]} with start/end tokens
captions = defaultdict(list)
with open(cap_file, "r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line: 
            continue
        # Format is "image_name,caption"
        if "," not in line:
            continue
        img, cap = line.split(",", 1)
        if not (IMAGES / img).exists():
            continue
        clean = basic_clean(cap)
        if clean:
            captions[img].append(f"startseq {clean} endseq")

# drop any images without valid captions
captions = {k: v for k, v in captions.items() if v}

with open(PROC / "captions_clean.json", "w", encoding="utf-8") as f:
    json.dump(captions, f, ensure_ascii=False, indent=2)

len(captions), list(captions.keys())[:3]

(8091,
 ['1000268201_693b08cb0e.jpg',
  '1001773457_577c3a7d70.jpg',
  '1002674143_1b742ab4b8.jpg'])

In [25]:
import json
with open(PROC / "captions_clean.json", "r", encoding="utf-8") as f:
    captions = json.load(f)

sample_img = list(captions.keys())[0]
print("Sample image:", sample_img)
print("Number of captions:", len(captions[sample_img]))
print("Example caption:", captions[sample_img][0])

Sample image: 1000268201_693b08cb0e.jpg
Number of captions: 5
Example caption: startseq a child in a pink dress is climbing up a set of stairs in an entry way endseq


In [26]:
from pathlib import Path
from sklearn.model_selection import train_test_split
import json

PROC = C.PROC

# Load the cleaned captions dictionary we saved in 2.1
with open(PROC / "captions_clean.json", "r", encoding="utf-8") as f:
    captions = json.load(f)

# Get the list of image filenames that have captions
image_ids = sorted(captions.keys())

# Split: 80% train, 20% temp; then split temp into 50/50 = 10% val, 10% test
train_ids, temp_ids = train_test_split(
    image_ids, test_size=0.2, random_state=42, shuffle=True
)
val_ids, test_ids = train_test_split(
    temp_ids, test_size=0.5, random_state=42, shuffle=True
)

print(f"Total images: {len(image_ids)}")
print(f"Train: {len(train_ids)}  Val: {len(val_ids)}  Test: {len(test_ids)}")

# Save the splits for future steps
for name, ids in [("train", train_ids), ("val", val_ids), ("test", test_ids)]:
    with open(PROC / f"{name}_images.txt", "w", encoding="utf-8") as f:
        f.writelines([img + "\n" for img in ids])

# Quick sanity check: make sure all ids are unique and disjoint
overlap_train_val = set(train_ids) & set(val_ids)
overlap_train_test = set(train_ids) & set(test_ids)
overlap_val_test   = set(val_ids) & set(test_ids)
print("Overlap train/val:", len(overlap_train_val))
print("Overlap train/test:", len(overlap_train_test))
print("Overlap val/test:", len(overlap_val_test))


Total images: 8091
Train: 6472  Val: 809  Test: 810
Overlap train/val: 0
Overlap train/test: 0
Overlap val/test: 0


In [27]:
from pathlib import Path
from collections import Counter
import json, pickle

PROC = C.PROC

#  Load cleaned captions + training image IDs
with open(PROC / "captions_clean.json", "r", encoding="utf-8") as f:
    captions = json.load(f)
with open(PROC / "train_images.txt", "r", encoding="utf-8") as f:
    train_ids = [x.strip() for x in f]

#  Count word frequencies (from TRAIN captions only)
freq = Counter()
for img in train_ids:
    for cap in captions[img]:
        freq.update(cap.split())

#  Keep words that appear at least a few times (to remove noise)
min_freq = C.MIN_FREQ      # tweakable; lower = larger vocab
vocab = [w for w, c in freq.items() if c >= min_freq]

# Add special tokens
specials = ["<pad>", "<unk>"]  # pad for short captions, unk for unknown words
itos = specials + sorted(vocab)   # index→string
stoi = {w: i for i, w in enumerate(itos)}  # string→index

# Find a practical maximum caption length
lengths = []
for img in train_ids:
    for cap in captions[img]:
        lengths.append(len(cap.split()))
lengths.sort()
max_len = lengths[int(0.95 * len(lengths))]  # 95th percentile cutoff

# Save tokenizer info
with open(PROC / "tokenizer.pkl", "wb") as f:
    pickle.dump({"itos": itos, "stoi": stoi, "max_len": int(max_len)}, f)

print("✅ Vocabulary size:", len(itos))
print("Max caption length (95th percentile):", max_len)

✅ Vocabulary size: 3063
Max caption length (95th percentile): 20


In [28]:
from pathlib import Path
import numpy as np
import tensorflow as tf

IMAGES = C.IMAGES
PROC   = C.PROC
PROC.mkdir(parents=True, exist_ok=True)

# ---- Load split lists ----
def read_list(name):
    with open(PROC / f"{name}_images.txt", "r", encoding="utf-8") as f:
        return [line.strip() for line in f]

train_ids = read_list("train")
val_ids   = read_list("val")
test_ids  = read_list("test")

print(len(train_ids), len(val_ids), len(test_ids))

# ---- Choose encoder + preprocessing ----
enc_name = "efficientnetb0"
IMG_SIZE = C.IMG_SIZE

try:
    # EfficientNetB0 (preferred)
    from tensorflow.keras.applications.efficientnet import EfficientNetB0, preprocess_input
    base = EfficientNetB0(include_top=False, weights="imagenet", pooling="avg",
                          input_shape=(IMG_SIZE, IMG_SIZE, 3))
    preprocess_fn = preprocess_input
    feat_dim = base.output_shape[-1]  # 1280
except Exception as e:
    print("EfficientNetB0 not available, falling back to ResNet50:", e)
    from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
    base = ResNet50(include_top=False, weights="imagenet", pooling="avg",
                    input_shape=(IMG_SIZE, IMG_SIZE, 3))
    preprocess_fn = preprocess_input
    feat_dim = base.output_shape[-1]  # 2048

base.trainable = False
print("Encoder:", enc_name, "| feature dim:", feat_dim)

# ---- tf.data loader ----
def build_ds(id_list, batch=64, shuffle=False):
    paths = [str(IMAGES / img) for img in id_list]

    def _load(path):
        img = tf.io.read_file(path)
        img = tf.image.decode_jpeg(img, channels=3)        # force RGB
        img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE))
        img = tf.cast(img, tf.float32)
        img = preprocess_fn(img)                           # model-specific normalization
        return img, path

    ds = tf.data.Dataset.from_tensor_slices(paths)
    if shuffle:
        ds = ds.shuffle(buffer_size=min(len(paths), 2000), reshuffle_each_iteration=False)
    ds = ds.map(_load, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch).prefetch(tf.data.AUTOTUNE)
    return ds

train_ds = build_ds(train_ids, batch=64, shuffle=False)
val_ds   = build_ds(val_ids,   batch=64, shuffle=False)
test_ds  = build_ds(test_ids,  batch=64, shuffle=False)


6472 809 810
Encoder: efficientnetb0 | feature dim: 1280


In [32]:
import numpy as np

def extract_and_save(ds, id_list, out_path):
    feats = []
    names = []
    for batch_imgs, batch_paths in ds:
        batch_feats = base(batch_imgs, training=False).numpy()   # [B, feat_dim]
        feats.append(batch_feats)
        names.extend([p.numpy().decode("utf-8") for p in batch_paths])

    feats = np.vstack(feats)                                     # [N, feat_dim]
    # Convert absolute paths back to just the filenames to keep files portable
    img_names = [Path(p).name for p in names]
    np.savez_compressed(out_path, features=feats, filenames=np.array(img_names))
    return feats.shape

train_shape = extract_and_save(train_ds, train_ids, C.PROC / "features_b0_train.npz")
val_shape   = extract_and_save(val_ds,   val_ids,   C.PROC / "features_b0_val.npz")
test_shape  = extract_and_save(test_ds,  test_ids,  C.PROC / "features_b0_test.npz")

print("Saved:")
print("  train:", train_shape, "->", C.PROC / "features_b0_train.npz")
print("  val  :", val_shape,   "->", C.PROC / "features_b0_val.npz")
print("  test :", test_shape,  "->", C.PROC / "features_b0_test.npz")

Saved:
  train: (6472, 1280) -> C:\Users\mkalam\ML_projs\image_captioning\data\processed\features_b0_train.npz
  val  : (809, 1280) -> C:\Users\mkalam\ML_projs\image_captioning\data\processed\features_b0_val.npz
  test : (810, 1280) -> C:\Users\mkalam\ML_projs\image_captioning\data\processed\features_b0_test.npz


In [33]:
chk = np.load(C.PROC / "features_b0_train.npz")
print(chk["features"].shape, chk["filenames"].shape)
print("Example:", chk["filenames"][0], "→", chk["features"][0][:5])

(6472, 1280) (6472,)
Example: 3393152604_27bd1037f2.jpg → [0.2557082  0.3907099  0.08294499 0.4675278  0.44833222]


In [34]:
# Step 4

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
import pickle
from pathlib import Path

PROC = C.PROC

# Load tokenizer info (from Step 2.3)
with open(PROC / "tokenizer.pkl", "rb") as f:
    tok_data = pickle.load(f)
itos, stoi, max_len = tok_data["itos"], tok_data["stoi"], tok_data["max_len"]

vocab_size = len(itos)
print("Vocab size:", vocab_size, "| Max length:", max_len)


Vocab size: 3063 | Max length: 20


In [35]:
# Hyperparameters
IMG_FEAT_DIM = C.FEAT_DIM
EMB_DIM = C.EMB_DIM
LSTM_UNITS = C.LSTM_UNITS

#  Image feature input
img_input = Input(shape=(IMG_FEAT_DIM,), name="image_features")
img_embed = layers.Dense(EMB_DIM, activation='relu')(img_input)  # project to same dim as word embeddings
img_embed = layers.RepeatVector(max_len)(img_embed)              # repeat for each time step

# Text input
txt_input = Input(shape=(max_len,), name="text_seq")
txt_embed = layers.Embedding(input_dim=vocab_size, output_dim=EMB_DIM, mask_zero=True)(txt_input)

# Combine visual + text embeddings
merged = layers.concatenate([img_embed, txt_embed])
lstm_out = layers.LSTM(LSTM_UNITS, return_sequences=True)(merged)
drop = layers.Dropout(0.4)(lstm_out)
outputs = layers.TimeDistributed(layers.Dense(vocab_size, activation='softmax'))(drop)

# Build the model
decoder_model = Model(inputs=[img_input, txt_input], outputs=outputs)
decoder_model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 image_features (InputLayer  [(None, 1280)]               0         []                            
 )                                                                                                
                                                                                                  
 dense_2 (Dense)             (None, 256)                  327936    ['image_features[0][0]']      
                                                                                                  
 text_seq (InputLayer)       [(None, 20)]                 0         []                            
                                                                                                  
 repeat_vector_1 (RepeatVec  (None, 20, 256)              0         ['dense_2[0][0]']       

In [38]:
decoder_model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer='adam',
    metrics=['accuracy']
)

In [39]:
import json

# Load captions and features
with open(C.PROC / "captions_clean.json", "r", encoding="utf-8") as f:
    captions = json.load(f)

train_data = np.load(C.PROC / "features_b0_train.npz")
train_features = train_data["features"]
train_files = train_data["filenames"]

# Map filename → feature vector
feat_map = {fn: feat for fn, feat in zip(train_files, train_features)}

def caption_to_seq(caption, stoi, max_len):
    seq = [stoi.get(w, stoi["<unk>"]) for w in caption.split()]
    if len(seq) < max_len:
        seq += [0] * (max_len - len(seq))  # pad
    else:
        seq = seq[:max_len]
    return np.array(seq)

In [40]:
import numpy as np
import tensorflow as tf

def seq2seq_generator(captions, image_ids, feat_map, stoi, max_len, batch_size=64):
    """
    Yields:
      inputs: [image_features, input_seq]
        - image_features: (batch, IMG_FEAT_DIM)
        - input_seq:      (batch, max_len)
      targets:
        - y:              (batch, max_len)  (sparse integer targets)
    """
    pad_id = 0
    unk_id = stoi.get("<unk>", 1)

    X1, X2, Y = [], [], []
    while True:
        for img in image_ids:
            if img not in captions:
                continue
            for cap in captions[img]:
                # token ids
                seq = [stoi.get(w, unk_id) for w in cap.split()]

                # teacher forcing: input is tokens[:-1], target is tokens[1:]
                in_seq  = seq[:-1]
                out_seq = seq[1:]

                in_seq  = tf.keras.preprocessing.sequence.pad_sequences(
                    [in_seq], maxlen=max_len, padding='post', truncating='post', value=pad_id
                )[0]
                out_seq = tf.keras.preprocessing.sequence.pad_sequences(
                    [out_seq], maxlen=max_len, padding='post', truncating='post', value=pad_id
                )[0]

                X1.append(feat_map[img])
                X2.append(in_seq)
                Y.append(out_seq)

                if len(X1) == batch_size:
                    yield ([np.array(X1), np.array(X2)], np.array(Y))
                    X1, X2, Y = [], [], []

In [41]:
from pathlib import Path
import numpy as np
import json

PROC = C.PROC

# 1) Load captions and train features (if not already loaded in this session)
caps = json.load(open(C.PROC / "captions_clean.json", "r", encoding="utf-8"))

train_npz = np.load(C.PROC / "features_b0_train.npz")
train_features = train_npz["features"]     # shape: (N_train, IMG_FEAT_DIM)
train_files = train_npz["filenames"]       # shape: (N_train,)
feat_map = {fn: feat for fn, feat in zip(train_files, train_features)}

# 2) (Optional sanity) check IMG_FEAT_DIM matches your model
print("Feature dim from file:", train_features.shape[1])

# 3) Build the generator (make sure you're using *seq2seq_generator*)
batch_size=C.BATCH_SIZE
train_image_ids = list(feat_map.keys())  # use all train images
gen = seq2seq_generator(caps, train_image_ids, feat_map, stoi, max_len, batch_size=batch_size)

# 4) Compute steps_per_epoch so one epoch ≈ all captions once
total_captions = sum(len(caps.get(img, [])) for img in train_image_ids)
steps_per_epoch = max(1, total_captions // batch_size)

print("Total captions:", total_captions)
print("Steps per epoch:", steps_per_epoch)

# 5) (Optional) quick batch shape check before training
(X_img, X_seq), y = next(gen)
print("X_img:", X_img.shape)  # (batch_size, IMG_FEAT_DIM)
print("X_seq:", X_seq.shape)  # (batch_size, max_len)
print("y    :", y.shape)      # (batch_size, max_len)

# 6) Train
history = decoder_model.fit(
    seq2seq_generator(caps, train_image_ids, feat_map, stoi, max_len, batch_size=batch_size),
    epochs=C.EPOCHS,                 # start small; you can increase later (e.g., 15–20)
    steps_per_epoch=steps_per_epoch,
    verbose=1
)

Feature dim from file: 1280
Total captions: 32360
Steps per epoch: 505
X_img: (64, 1280)
X_seq: (64, 20)
y    : (64, 20)
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [42]:
# === Save trained model ===
C.MODELS.mkdir(parents=True, exist_ok=True)  # ensure folder exists
model_path = C.MODELS / "caption_decoder.keras"
decoder_model.save(model_path)

print(f"Model saved successfully at: {model_path}")


 Model saved successfully at: C:\Users\mkalam\ML_projs\image_captioning\models\caption_decoder.keras
