In [None]:
# 📸 Image Caption Viewer Project

## ✅ Objective
Display a collection of local images with their respective captions from a file. Useful for reviewing datasets, galleries, or annotations.

## 📁 Folder Structure
- `images/`: contains all the image files
- `captions.txt`: contains image filename + caption in format `filename.jpg|caption`
- `image_viewer.ipynb`: this notebook


In [None]:
!pip install tensorflow opencv-python matplotlib numpy


In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf


In [None]:
import os
os.listdir()


In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Load the uploaded image
image = cv2.imread("photo_2025-06-09_21-10-49.jpg")
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# Resize for model input
image_resized = cv2.resize(image, (256, 256)) / 255.0
image_resized = np.expand_dims(image_resized, axis=0)

# Show the original image
plt.imshow(image)
plt.title("Original Image")
plt.axis("off")
plt.show()


In [None]:
pip install torch torchvision matplotlib pillow


In [None]:
import torch
import torchvision.transforms as T
from torchvision import models
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np


In [None]:
from torchvision import models


In [None]:

# Load the pretrained DeepLabV3 model
model = models.segmentation.deeplabv3_resnet101(pretrained=True).eval()


In [None]:
import torch
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from torchvision.models.segmentation import deeplabv3_resnet101, DeepLabV3_ResNet101_Weights


In [None]:
# Load pretrained weights (recommended way)
weights = DeepLabV3_ResNet101_Weights.DEFAULT

# Load the model with these weights and set to eval mode
model = deeplabv3_resnet101(weights=weights)
model.eval()


In [None]:
# Load your image (replace 'your_image.jpg' with your file)
img_path = 'photo_2025-06-09_21-10-49.jpg'
input_image = Image.open(img_path).convert("RGB")

# Get the preprocessing transform from the weights metadata
preprocess = weights.transforms()

# Preprocess the image: resize, normalize, convert to tensor, etc.
input_tensor = preprocess(input_image)

# The model expects a batch, so add a batch dimension
input_batch = input_tensor.unsqueeze(0)


In [None]:
with torch.no_grad():  # No need to track gradients during inference
    output = model(input_batch)['out'][0]  # Output is a dict with key 'out'

# The output has shape [num_classes, H, W]
print(output.shape)  # e.g., torch.Size([21, 480, 480])


In [None]:
# Get the predicted class per pixel by taking argmax
output_predictions = output.argmax(0).byte().cpu().numpy()


In [None]:
# Define the class colors (manually defined palette since weights.meta["palette"] gives KeyError)
palette = [
    (0, 0, 0),        # 0=background
    (128, 0, 0),      # 1=aeroplane
    (0, 128, 0),      # 2=bicycle
    (128, 128, 0),    # 3=bird
    (0, 0, 128),      # 4=boat
    (128, 0, 128),    # 5=bottle
    (0, 128, 128),    # 6=bus
    (128, 128, 128),  # 7=car
    (64, 0, 0),       # 8=cat
    (192, 0, 0),      # 9=chair
    (64, 128, 0),     # 10=cow
    (192, 128, 0),    # 11=dining table
    (64, 0, 128),     # 12=dog
    (192, 0, 128),    # 13=horse
    (64, 128, 128),   # 14=motorbike
    (192, 128, 128),  # 15=person
    (0, 64, 0),       # 16=potted plant
    (128, 64, 0),     # 17=sheep
    (0, 192, 0),      # 18=sofa
    (128, 192, 0),    # 19=train
    (0, 64, 128)      # 20=tv/monitor
]

# Create a color mask using the palette
color_mask = np.zeros((output_predictions.shape[0], output_predictions.shape[1], 3), dtype=np.uint8)

for label, color in enumerate(palette):
    color_mask[output_predictions == label] = color

# Plot original image and mask overlay
plt.figure(figsize=(15, 5))

plt.subplot(1, 3, 1)
plt.title("Original Image")
plt.imshow(input_image)
plt.axis('off')

plt.subplot(1, 3, 2)
plt.title("Segmentation Mask")
plt.imshow(color_mask)
plt.axis('off')

plt.subplot(1, 3, 3)
plt.title("Overlay")
plt.imshow(input_image)
plt.imshow(color_mask, alpha=0.5)  # overlay with transparency
plt.axis('off')

plt.show()


In [None]:
# VOC color palette for 21 classes (including background)
palette = [
    (0, 0, 0),        # 0=background
    (128, 0, 0),      # 1=aeroplane
    (0, 128, 0),      # 2=bicycle
    (128, 128, 0),    # 3=bird
    (0, 0, 128),      # 4=boat
    (128, 0, 128),    # 5=bottle
    (0, 128, 128),    # 6=bus
    (128, 128, 128),  # 7=car
    (64, 0, 0),       # 8=cat
    (192, 0, 0),      # 9=chair
    (64, 128, 0),     # 10=cow
    (192, 128, 0),    # 11=diningtable
    (64, 0, 128),     # 12=dog
    (192, 0, 128),    # 13=horse
    (64, 128, 128),   # 14=motorbike
    (192, 128, 128),  # 15=person
    (0, 64, 0),       # 16=potted plant
    (128, 64, 0),     # 17=sheep
    (0, 192, 0),      # 18=sofa
    (128, 192, 0),    # 19=train
    (0, 64, 128)      # 20=tv/monitor
]


In [None]:
# Use VOC palette since 'palette' key is missing
palette = [
    (0, 0, 0), (128, 0, 0), (0, 128, 0), (128, 128, 0), (0, 0, 128),
    (128, 0, 128), (0, 128, 128), (128, 128, 128), (64, 0, 0), (192, 0, 0),
    (64, 128, 0), (192, 128, 0), (64, 0, 128), (192, 0, 128), (64, 128, 128),
    (192, 128, 128), (0, 64, 0), (128, 64, 0), (0, 192, 0), (128, 192, 0),
    (0, 64, 128)
]

In [None]:
# Create an empty color mask (same height & width as the output, 3 channels for RGB)
color_mask = np.zeros((output_predictions.shape[0], output_predictions.shape[1], 3), dtype=np.uint8)

# Color each pixel using the corresponding class color from the palette
for label, color in enumerate(palette):
    color_mask[output_predictions == label] = color


In [None]:
# Plot original image, mask, and overlay
plt.figure(figsize=(18, 6))

# Original Image
plt.subplot(1, 3, 1)
plt.title("Original Image")
plt.imshow(input_image)
plt.axis('off')

# Segmentation Mask
plt.subplot(1, 3, 2)
plt.title("Segmentation Mask")
plt.imshow(color_mask)
plt.axis('off')

# Overlay (original + segmentation)
plt.subplot(1, 3, 3)
plt.title("Overlay")
plt.imshow(input_image)
plt.imshow(color_mask, alpha=0.6)  # semi-transparent mask
plt.axis('off')

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
from PIL import Image


In [None]:
Image.fromarray(color_mask).save("segmentation_mask.png")
print("Segmentation mask saved as segmentation_mask.png")


In [None]:
!pip install torch torchvision pillow tqdm


In [None]:
import torch
import torchvision.transforms as T
from torchvision.models.segmentation import deeplabv3_resnet101
from PIL import Image, ImageEnhance
import numpy as np
import matplotlib.pyplot as plt
import random

class DeepLabSegmentation:
    def __init__(self, device=None):
        self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = deeplabv3_resnet101(pretrained=True).to(self.device).eval()
        self.preprocess = T.Compose([
            T.Resize((520, 520)),
            T.ToTensor(),
            T.Normalize(mean=(0.485, 0.456, 0.406),
                        std=(0.229, 0.224, 0.225)),
        ])
        self.colors = self.generate_colors(21)

    def generate_colors(self, num_classes):
        random.seed(42)
        return [tuple(random.choices(range(256), k=3)) for _ in range(num_classes)]

    @torch.no_grad()
    def segment(self, image: Image.Image):
        input_tensor = self.preprocess(image).unsqueeze(0).to(self.device)
        output = self.model(input_tensor)["out"]
        mask = torch.argmax(output.squeeze(), dim=0).cpu().numpy()
        return mask

    def colorize_mask(self, mask):
        color_mask = np.zeros((*mask.shape, 3), dtype=np.uint8)
        for class_id, color in enumerate(self.colors):
            color_mask[mask == class_id] = color
        return Image.fromarray(color_mask)

    def overlay(self, image, mask, alpha=0.6):
        color_mask = self.colorize_mask(mask).convert("RGBA")
        base = image.convert("RGBA").resize(color_mask.size)
        blended = Image.blend(base, color_mask, alpha)
        return blended


In [None]:
from torchvision.models import resnet50, ResNet50_Weights

class FeatureExtractor:
    def __init__(self, device=None):
        self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
        weights = ResNet50_Weights.DEFAULT
        model = resnet50(weights=weights)
        self.model = torch.nn.Sequential(*list(model.children())[:-1]).to(self.device).eval()
        self.preprocess = weights.transforms()

    @torch.inference_mode()
    def __call__(self, image: Image.Image):
        input_tensor = self.preprocess(image).unsqueeze(0).to(self.device)
        output = self.model(input_tensor).squeeze().cpu()
        return output


In [None]:
import os
from pathlib import Path
from tqdm import tqdm
import json

def process_dataset(img_dir, out_dir="preproc", captions_json=None, device=None):
    img_dir = Path(img_dir)
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    seg = DeepLabSegmentation(device=device)
    feats = FeatureExtractor(device=device)

    caption_map = {}
    if captions_json:
        with open(captions_json, "r", encoding="utf-8") as f:
            data = json.load(f)
        if isinstance(data, list):
            for row in data:
                caption_map.setdefault(row["image"], []).append(row["caption"])
        elif isinstance(data, dict):
            caption_map = {k: v if isinstance(v, list) else [v] for k, v in data.items()}

    manifest = []

    image_paths = [p for p in img_dir.rglob("*") if p.suffix.lower() in [".jpg", ".jpeg", ".png"]]
    print(f"Found {len(image_paths)} images.")

    for img_path in tqdm(image_paths):
        image = Image.open(img_path).convert("RGB")

        mask = seg.segment(image)
        mask_img = seg.colorize_mask(mask)
        overlay_img = seg.overlay(image, mask)
        feature_vector = feats(image)

        stem = img_path.stem
        mask_path = out_dir / f"{stem}_mask.png"
        overlay_path = out_dir / f"{stem}_overlay.jpg"
        feature_path = out_dir / f"{stem}_feat.pt"

        mask_img.save(mask_path)
        overlay_img.save(overlay_path)
        torch.save(feature_vector, feature_path)

        manifest.append({
            "image": str(img_path),
            "mask": str(mask_path.name),
            "overlay": str(overlay_path.name),
            "features": str(feature_path.name),
            "captions": caption_map.get(img_path.name, [])
        })

    with open(out_dir / "manifest.jsonl", "w", encoding="utf-8") as f:
        for row in manifest:
            f.write(json.dumps(row) + "\n")

    print(f"✅ Done! Outputs saved in {out_dir}")


In [None]:
# ────────────────────────────────────────────────────────────────────────────
# 1. Imports
# ────────────────────────────────────────────────────────────────────────────
from pathlib import Path
from typing import Union, List, Tuple, Optional
import json, random, torch, numpy as np
from PIL import Image
import torchvision.transforms as T
from torchvision.models.segmentation import deeplabv3_resnet101, DeepLabV3_ResNet101_Weights
from torchvision.models import resnet50, ResNet50_Weights
from tqdm.auto import tqdm

# ────────────────────────────────────────────────────────────────────────────
# 2. Tiny wrappers
# ────────────────────────────────────────────────────────────────────────────
class DeepLabSeg:
    """Light wrapper around DeepLabV3-ResNet101 for fast reuse."""
    def __init__(self, device: str = "cpu"):
        self.device = device
        weights = DeepLabV3_ResNet101_Weights.DEFAULT
        self.model = deeplabv3_resnet101(weights=weights).to(self.device).eval()
        self.pre = weights.transforms()
        random.seed(42)
        self.palette = [tuple(random.choices(range(256), k=3)) for _ in range(21)]

    @torch.no_grad()
    def __call__(self, img: Image.Image) -> np.ndarray:
        t = self.pre(img).unsqueeze(0).to(self.device)
        mask = self.model(t)["out"].argmax(1).squeeze().cpu().numpy()
        return mask.astype(np.uint8)

    def colorize(self, mask: np.ndarray) -> Image.Image:
        h, w = mask.shape
        out = np.zeros((h, w, 3), dtype=np.uint8)
        for cid, col in enumerate(self.palette):
            out[mask == cid] = col
        return Image.fromarray(out)

    def overlay(self, img: Image.Image, mask: np.ndarray, alpha=0.6) -> Image.Image:
        cm = self.colorize(mask).convert("RGBA")
        return Image.blend(img.convert("RGBA").resize(cm.size), cm, alpha).convert("RGB")


class ResNetFeature:
    """2048-D global feature from ResNet-50 (avg-pooled)."""
    def __init__(self, device: str = "cpu"):
        self.device = device
        w = ResNet50_Weights.DEFAULT
        m = resnet50(weights=w)
        self.model = torch.nn.Sequential(*list(m.children())[:-1]).to(self.device).eval()
        self.pre = w.transforms()

    @torch.no_grad()
    def __call__(self, img: Image.Image) -> torch.Tensor:
        t = self.pre(img).unsqueeze(0).to(self.device)
        return self.model(t).squeeze().cpu()          # (2048,)

# ────────────────────────────────────────────────────────────────────────────
# 3. The unified pipeline
# ────────────────────────────────────────────────────────────────────────────
def run_pipeline(
    images: Union[str, Path, List[Tuple[str, Image.Image]]],
    out_dir: Union[str, Path] = "preproc",
    captions: Optional[dict] = None,
    device: str = "cpu",
):
    """
    • `images` can be
        - a folder path
        - a single image file path
        - list of (name, PIL.Image) tuples (in-memory mode)
    • `captions` optional dict  {name: [cap1, cap2, ...]}
    """
    out_dir = Path(out_dir); out_dir.mkdir(parents=True, exist_ok=True)
    seg   = DeepLabSeg(device)
    feat  = ResNetFeature(device)
    caps  = captions or {}
    items = []

    # -------- prepare iterable of (name, PIL.Image) --------
    if isinstance(images, (str, Path)):
        p = Path(images)
        if p.is_dir():
            for f in sorted(p.rglob("*")):
                if f.suffix.lower() in (".jpg", ".jpeg", ".png"):
                    items.append((f.stem, Image.open(f).convert("RGB")))
        else:  # single file
            items.append((p.stem, Image.open(p).convert("RGB")))
    else:  # already [(name, PIL.Image), ...]
        items = images

    print(f"Processing {len(items)} image(s)…")
    manifest = []
    for name, img in tqdm(items):
        msk = seg(img)
        msk_img = seg.colorize(msk)
        ovl_img = seg.overlay(img, msk)
        vec = feat(img)

        msk_path = out_dir / f"{name}_mask.png"
        ovl_path = out_dir / f"{name}_overlay.jpg"
        vec_path = out_dir / f"{name}_feat.pt"

        msk_img.save(msk_path); ovl_img.save(ovl_path); torch.save(vec, vec_path)

        manifest.append({
            "image": name,          # simply a key; change if you prefer path
            "mask":    msk_path.name,
            "overlay": ovl_path.name,
            "feature": vec_path.name,
            "captions": caps.get(name, []),
        })

    with open(out_dir / "manifest.jsonl", "w", encoding="utf-8") as fp:
        fp.writelines(json.dumps(r)+"\n" for r in manifest)

    print("✅ Done!  Outputs in →", out_dir.resolve())

# ────────────────────────────────────────────────────────────────────────────
# 4. Example calls
# ────────────────────────────────────────────────────────────────────────────
# • Entire folder on CPU:
# run_pipeline("images/", out_dir="preproc_cpu", device="cpu")

# • Just one file:
# run_pipeline("images/cat.jpg", out_dir="preproc_single")

# • In-memory mode:
# imgs_mem = [("cat01", img1), ("dog02", img2)]
# run_pipeline(imgs_mem, out_dir="preproc_mem", device="cpu",
#              captions={"cat01": ["A cute cat."]})


In [None]:
import json, itertools, pprint, pathlib
pp = pprint.PrettyPrinter(depth=2)

manifest_path = pathlib.Path("preproc/manifest.jsonl")
rows = [json.loads(line) for line in manifest_path.open()]
pp.pprint(rows[:3])          # quick peek
print("Total samples:", len(rows))


In [None]:
import torch
from torch.utils.data import Dataset

class CaptionSet(Dataset):
    def __init__(self, manifest, vocab):
        self.manifest = manifest
        self.vocab    = vocab       # word ↔ id converter

    def __len__(self): return len(self.manifest)

    def __getitem__(self, idx):
        row = self.manifest[idx]
        # Load the 2048-D CNN feature vector
        feat = torch.load("preproc/" + row["feature"])
        # Pick ONE caption at random if multiple exist
        caption = row["captions"][0] if row["captions"] else ""
        # Numericalise caption → tensor of token IDs
        cap_ids = torch.tensor([self.vocab["<start>"]] +
                               [self.vocab.get(w, self.vocab["<unk>"]) for w in caption.lower().split()] +
                               [self.vocab["<end>"]])
        return feat, cap_ids


In [None]:
import json
import itertools
from collections import Counter

# Load the manifest
with open("preproc/manifest.jsonl", "r", encoding="utf-8") as f:
    rows = [json.loads(line) for line in f]

# Safely collect all caption text (skipping None)
all_captions = []
for row in rows:
    if "captions" in row and row["captions"]:
        all_captions.extend([cap for cap in row["captions"] if cap])
    elif "caption" in row and row["caption"]:
        all_captions.append(row["caption"])

# Build word frequency counter
words = Counter(itertools.chain.from_iterable(cap.lower().split() for cap in all_captions))

# Build vocab with special tokens and frequent words
special = ["<pad>", "<start>", "<end>", "<unk>"]
itos = special + [w for w, n in words.items() if n >= 2]
stoi = {w: i for i, w in enumerate(itos)}

# Final vocab and size
vocab = stoi
vocab_size = len(vocab)


In [None]:
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self, vocab, feat_dim=2048, emb_dim=512, hid_dim=512):
        super().__init__()
        vocab_size = len(vocab)
        self.vocab = vocab
        self.fc_img = nn.Linear(feat_dim, hid_dim)
        self.emb    = nn.Embedding(vocab_size, emb_dim, padding_idx=vocab["<pad>"])
        self.lstm   = nn.LSTM(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, feats, caps):
        """feats: (B,2048)  caps: (B,T)"""
        h0 = torch.tanh(self.fc_img(feats)).unsqueeze(0)  # (1,B,H)
        c0 = torch.zeros_like(h0)
        emb = self.emb(caps)
        out, _ = self.lstm(emb, (h0, c0))
        logits = self.fc_out(out)  # (B,T,V)
        return logits


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class CaptionDataset(Dataset):
    def __init__(self, manifest_path, vocab, max_len=20):
        self.vocab = vocab
        self.data = []
        with open(manifest_path, "r", encoding="utf-8") as f:
            for line in f:
                entry = json.loads(line)
                for cap in entry["captions"]:
                    self.data.append((entry["features"], cap))
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def encode_caption(self, caption):
        words = caption.lower().split()
        tokens = ["<start>"] + words[:self.max_len - 2] + ["<end>"]
        ids = [self.vocab.get(w, self.vocab["<unk>"]) for w in tokens]
        pad_len = self.max_len - len(ids)
        return torch.tensor(ids + [self.vocab["<pad>"]] * pad_len)

    def __getitem__(self, idx):
        feat_path, caption = self.data[idx]
        feat = torch.load(feat_path)  # (2048,)
        cap_tensor = self.encode_caption(caption)
        return feat, cap_tensor


In [None]:
import os, json, itertools, torch
from collections import Counter
from torch.utils.data import Dataset, DataLoader

# -------------------------------
# STEP 1: Ensure valid manifest
# -------------------------------
manifest_path = "preproc/manifest.jsonl"
dummy_feat_path = "dummy.pt"

# Create dummy data if manifest doesn't exist or is empty/invalid
def ensure_valid_manifest():
    valid_rows = []
    if os.path.exists(manifest_path):
        with open(manifest_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    row = json.loads(line)
                    if "image" in row and ("captions" in row or "caption" in row):
                        captions = row.get("captions") or [row.get("caption")]
                        if row["image"] and any(captions):
                            valid_rows.append(row)
                except:
                    continue

    if not valid_rows:
        print("⚠️ Manifest invalid or empty. Creating dummy data.")
        os.makedirs("preproc", exist_ok=True)
        torch.save(torch.randn(2048), dummy_feat_path)
        dummy_row = {
            "image": dummy_feat_path,
            "captions": ["A dummy image showing a random scene."]
        }
        with open(manifest_path, "w", encoding="utf-8") as f:
            f.write(json.dumps(dummy_row) + "\n")
        valid_rows = [dummy_row]

    return valid_rows

rows = ensure_valid_manifest()

# -------------------------------
# STEP 2: Build vocabulary
# -------------------------------
all_captions = []
for row in rows:
    caps = row.get("captions") or [row.get("caption")]
    all_captions.extend([cap for cap in caps if cap])

words = Counter(itertools.chain.from_iterable(cap.lower().split() for cap in all_captions))
special = ["<pad>", "<start>", "<end>", "<unk>"]
itos = special + [w for w, n in words.items() if n >= 1]
stoi = {w: i for i, w in enumerate(itos)}
vocab = stoi
vocab_size = len(vocab)

print(f"✅ Vocabulary built with {vocab_size} tokens")

# -------------------------------
# STEP 3: CaptionDataset
# -------------------------------
class CaptionDataset(Dataset):
    def __init__(self, manifest_path, vocab, max_len=20):
        self.vocab = vocab
        self.data = []

        with open(manifest_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    row = json.loads(line)
                    image_path = row.get("image")
                    captions = row.get("captions") or [row.get("caption")]
                    if not image_path or not captions:
                        continue
                    for cap in captions:
                        if cap:
                            self.data.append((image_path, cap))
                except:
                    continue

        self.max_len = max_len
        print(f"📦 Loaded {len(self.data)} usable image-caption pairs")

    def encode_caption(self, caption):
        tokens = caption.lower().split()
        ids = [self.vocab["<start>"]] + [self.vocab.get(t, self.vocab["<unk>"]) for t in tokens] + [self.vocab["<end>"]]
        if len(ids) < self.max_len:
            ids += [self.vocab["<pad>"]] * (self.max_len - len(ids))
        else:
            ids = ids[:self.max_len]
        return torch.tensor(ids)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        feat_path, caption = self.data[idx]
        feat = torch.load(feat_path)
        cap_tensor = self.encode_caption(caption)
        return feat, cap_tensor

# -------------------------------
# STEP 4: DataLoader
# -------------------------------
train_ds = CaptionDataset(manifest_path, vocab)
train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)


In [None]:

from pathlib import Path

image_dir = Path("images/")
image_files = list(image_dir.glob("*.jpg")) + list(image_dir.glob("*.png"))

print(f"Found {len(image_files)} image(s).")
for img in image_files[:5]:
    print("-", img.name)


In [None]:
import os
import json
import torch
import torchvision.transforms as T
from torchvision.models.segmentation import deeplabv3_resnet50
from PIL import Image
from pathlib import Path
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np

def process_dataset(img_dir, out_dir, captions_json=None, device="cpu"):
    img_dir = Path(img_dir)
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    # Load captions if provided
    caption_map = {}
    if captions_json:
        with open(captions_json, "r", encoding="utf-8") as f:
            caption_map = json.load(f)

    # Load DeepLab model and preprocessing
    weights = deeplabv3_resnet50(pretrained=True).eval()
    model = weights.to(device)
    preprocess = T.Compose([
        T.Resize((224, 224)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    # Color palette for segmentation
    palette = weights.meta["palette"]

    manifest = []

    for img_path in tqdm(sorted(img_dir.glob("*"))):
        if not img_path.suffix.lower() in [".jpg", ".jpeg", ".png"]: continue
        try:
            img = Image.open(img_path).convert("RGB")  # force RGB
            input_tensor = preprocess(img).unsqueeze(0).to(device)

            with torch.no_grad():
                output = model(input_tensor)["out"][0].argmax(0).cpu().numpy()

            # Color mask
            color_mask = np.zeros((output.shape[0], output.shape[1], 3), dtype=np.uint8)
            for label, color in enumerate(palette):
                color_mask[output == label] = color

            mask_img = Image.fromarray(color_mask)
            overlay = Image.blend(img.resize(mask_img.size), mask_img, alpha=0.5)

            # Simulate feature vector (real project: use precomputed CNN features)
            feature_vector = torch.randn(2048)  # Dummy

            # File naming
            stem = img_path.stem
            mask_path = out_dir / f"{stem}_mask.png"
            overlay_path = out_dir / f"{stem}_overlay.jpg"
            feature_path = out_dir / f"{stem}_feat.pt"

            # ✅ Fix RGBA to RGB conversion before saving JPEG
            overlay.convert("RGB").save(overlay_path)
            mask_img.save(mask_path)
            torch.save(feature_vector, feature_path)

            manifest.append({
                "image": str(feature_path),
                "mask": str(mask_path.name),
                "overlay": str(overlay_path.name),
                "captions": caption_map.get(img_path.name, [])
            })
        except Exception as e:
            print(f"❌ Skipped {img_path.name}: {e}")

    # Save final manifest
    with open(out_dir / "manifest.jsonl", "w", encoding="utf-8") as f:
        for row in manifest:
            f.write(json.dumps(row) + "\n")

    print(f"✅ Processed {len(manifest)} images and saved to {out_dir}")


In [None]:
import json, pathlib
manifest_path = pathlib.Path("preproc/manifest.jsonl")
rows = [json.loads(line) for line in manifest_path.open()]
print("✅ Total samples:", len(rows))


In [None]:
process_dataset(
    img_dir=".",              # ✅ This points to the current folder where your .jpg is
    out_dir="preproc/",
    captions_json=None,
    device="cpu"
)


In [None]:
import json, pathlib

manifest_path = pathlib.Path("preproc/manifest.jsonl")
rows = [json.loads(line) for line in manifest_path.open()]
print("✅ Total usable samples:", len(rows))


In [None]:
from PIL import Image

img_path = "photo_2025-06-09_21-10-49.jpg"
try:
    img = Image.open(img_path)
    img.verify()
    print("✅ Image is valid and readable.")
except Exception as e:
    print("❌ Error loading image:", e)


In [None]:
from pathlib import Path
from PIL import Image
from tqdm import tqdm

# ✅ Define image directory and collect image paths
image_dir = Path("images/")
img_files = list(image_dir.glob("*.jpg")) + list(image_dir.glob("*.jpeg")) + list(image_dir.glob("*.png"))

print(f"📸 Total images found: {len(img_files)}")

# ✅ Try to open each image safely
for img_path in tqdm(img_files):
    print("🖼️ Found:", img_path)
    try:
        image = Image.open(img_path).convert("RGB")
        print("✅ Image is valid and readable.")
    except Exception as e:
        print(f"❌ Failed to open {img_path}: {e}")
        continue


In [None]:
import os
import json
from PIL import Image
from torchvision import transforms
from tqdm import tqdm
import torch
from torchvision.models import resnet50

def process_dataset(img_dir, out_dir, captions_json=None, device="cpu"):
    os.makedirs(out_dir, exist_ok=True)
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    model = resnet50(pretrained=True)
    model = model.to(device)
    model.eval()

    # Get list of images
    supported_exts = (".jpg", ".jpeg", ".png")
    img_files = [os.path.join(img_dir, f) for f in os.listdir(img_dir) if f.lower().endswith(supported_exts)]

    print(f"🔍 Found {len(img_files)} images in {img_dir}")

    manifest = []

    for img_path in tqdm(img_files):
        print("🖼️ Checking:", img_path)
        try:
            image = Image.open(img_path).convert("RGB")
        except Exception as e:
            print(f"❌ Error reading {img_path}: {e}")
            continue

        image_tensor = transform(image).unsqueeze(0).to(device)
        with torch.no_grad():
            features = model(image_tensor).squeeze().cpu().numpy().tolist()

        sample = {
            "image": os.path.abspath(img_path),
            "features": features,
            "caption": None  # Will be filled later if captions_json provided
        }
        manifest.append(sample)

    print(f"✅ Total usable samples: {len(manifest)}")

    # Save manifest
    with open(os.path.join(out_dir, "manifest.jsonl"), "w", encoding="utf-8") as f:
        for row in manifest:
            f.write(json.dumps(row) + "\n")

    print(f"📦 Manifest saved to {out_dir}/manifest.jsonl")


In [None]:
process_dataset(
    img_dir=".",              # since your image is in current folder
    out_dir="preproc/",
    captions_json=None,
    device="cpu"
)


In [None]:
from collections import Counter
import re

# Helper tokenizer
def tokenize(text):
    return re.findall(r"\w+", text.lower())

# Build vocab from dummy captions or predefined words
class Vocabulary:
    def __init__(self, freq_threshold=1):
        self.freq_threshold = freq_threshold
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {v: k for k, v in self.itos.items()}
    
    def build_vocab(self, sentence_list):
        frequencies = Counter()
        idx = 4

        for sentence in sentence_list:
            tokens = tokenize(sentence)
            frequencies.update(tokens)

        for word, freq in frequencies.items():
            if freq >= self.freq_threshold:
                self.stoi[word] = idx
                self.itos[idx] = word
                idx += 1

    def numericalize(self, text):
        tokenized_text = tokenize(text)
        return [
            self.stoi.get(token, self.stoi["<UNK>"])
            for token in tokenized_text
        ]


In [None]:
from torch.utils.data import Dataset
import torch

class CaptionDataset(Dataset):
    def __init__(self, manifest_path, vocab):
        with open(manifest_path, 'r') as f:
            self.samples = [json.loads(line) for line in f]
        self.vocab = vocab
        self.max_len = 20  # cap caption length

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        caption = sample.get("caption") or "a sample image"
        caption_idxs = [self.vocab.stoi["<SOS>"]] + self.vocab.numericalize(caption)[:self.max_len] + [self.vocab.stoi["<EOS>"]]
        padded = caption_idxs + [self.vocab.stoi["<PAD>"]] * (self.max_len + 2 - len(caption_idxs))
        return torch.tensor(sample["features"], dtype=torch.float32), torch.tensor(padded, dtype=torch.long)


In [None]:
import os, json
from pathlib import Path
from tqdm import tqdm
import torch
import torchvision.transforms as T
from torchvision import models
from PIL import Image
from torch.utils.data import Dataset, DataLoader

# --------------------------
# 1. Process Dataset: Save features and manifest
# --------------------------
def process_dataset(img_dir="images/", out_dir="preproc/", captions_json=None, device="cpu"):
    device = torch.device(device)
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    model = models.resnet50(pretrained=True)
    model.fc = torch.nn.Identity()
    model = model.to(device).eval()

    preprocess = T.Compose([
        T.Resize((224, 224)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    img_dir = Path(img_dir)
    image_files = list(img_dir.glob("*.jpg")) + list(img_dir.glob("*.jpeg")) + list(img_dir.glob("*.png"))

    caption_map = {}
    if captions_json and os.path.exists(captions_json):
        with open(captions_json, "r", encoding="utf-8") as f:
            caption_map = json.load(f)

    manifest = []

    for img_path in tqdm(image_files):
        try:
            image = Image.open(img_path).convert("RGB")
        except Exception as e:
            print(f"❌ Cannot open {img_path.name}: {e}")
            continue

        try:
            img_tensor = preprocess(image).unsqueeze(0).to(device)
            with torch.no_grad():
                features = model(img_tensor).squeeze().cpu()

            stem = img_path.stem
            feat_path = out_dir / f"{stem}_feat.pt"
            torch.save(features, feat_path)

            manifest.append({
                "features": str(feat_path),
                "captions": caption_map.get(img_path.name, ["a sample image"])
            })
        except Exception as e:
            print(f"❌ Failed processing {img_path.name}: {e}")

    with open(out_dir / "manifest.jsonl", "w", encoding="utf-8") as f:
        for item in manifest:
            f.write(json.dumps(item) + "\n")

    print(f"✅ Processed {len(manifest)} images into features and manifest.")


# --------------------------
# 2. Vocabulary class
# --------------------------
class Vocabulary:
    def __init__(self):
        self.stoi = {"<pad>": 0, "<start>": 1, "<end>": 2, "<unk>": 3}
        self.itos = ["<pad>", "<start>", "<end>", "<unk>"]

    def build_vocab(self, captions):
        idx = len(self.stoi)
        for sentence in captions:
            for word in sentence.lower().split():
                if word not in self.stoi:
                    self.stoi[word] = idx
                    self.itos.append(word)
                    idx += 1

    def encode(self, sentence):
        return [self.stoi.get(word, self.stoi["<unk>"]) for word in sentence.lower().split()]


# --------------------------
# 3. Caption Dataset class
# --------------------------
class CaptionDataset(Dataset):
    def __init__(self, manifest_path, vocab, max_len=20):
        self.vocab = vocab
        self.data = []
        self.max_len = max_len

        with open(manifest_path, "r", encoding="utf-8") as f:
            for i, line in enumerate(f):
                try:
                    entry = json.loads(line)
                except:
                    continue

                feat_path = entry.get("features")
                if not feat_path or not os.path.exists(feat_path): continue

                caps = entry.get("captions") or [entry.get("caption")]
                if not caps or caps[0] is None: continue

                for cap in caps:
                    if cap:
                        self.data.append((feat_path, cap))

        print(f"✅ Loaded {len(self.data)} valid samples.")

    def encode_caption(self, caption):
        tokens = self.vocab.encode(caption)
        tokens = tokens[:self.max_len]
        return torch.tensor(tokens + [0] * (self.max_len - len(tokens)))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        feat_path, caption = self.data[idx]
        feat = torch.load(feat_path)
        cap_tensor = self.encode_caption(caption)
        return feat, cap_tensor


# --------------------------
# 4. Run Everything
# --------------------------
# Step 1: Process dataset (only once)
process_dataset(
    img_dir="images/",
    out_dir="preproc/",
    captions_json=None,
    device="cpu"  # or "cuda"
)

# Step 2: Dummy captions to build vocab
dummy_captions = ["a segmented image", "an example picture"]
vocab = Vocabulary()
vocab.build_vocab(dummy_captions)

# Step 3: Build dataset and dataloader
train_ds = CaptionDataset("preproc/manifest.jsonl", vocab)
if len(train_ds) == 0:
    raise ValueError("❌ Dataset is empty. Please check image folder or feature generation.")
else:
    train_dl = DataLoader(train_ds, batch_size=2, shuffle=True)
    print(f"📦 DataLoader ready with {len(train_ds)} samples.")


In [None]:
from torch.utils.data import Dataset
from PIL import Image
import json
import os

class CaptionDataset(Dataset):
    def __init__(self, manifest_path, vocab, transform=None):
        self.vocab = vocab
        self.transform = transform
        with open(manifest_path, "r", encoding="utf-8") as f:
            self.samples = [json.loads(line) for line in f]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        img = Image.open(sample['image']).convert("RGB")
        if self.transform:
            img = self.transform(img)

        # Use dummy caption if not available
        caption = sample.get('caption', "a photo")
        tokens = self.vocab.tokenize(caption)
        caption_ids = [self.vocab.start_token_id] + self.vocab.encode(tokens) + [self.vocab.end_token_id]

        return img, torch.tensor(caption_ids)


In [None]:
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=512, hid_dim=512, vocab_size=1000):
        super().__init__()
        self.fc_img = nn.Linear(feat_dim, hid_dim)
        self.embedding = nn.Embedding(vocab_size, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, img_feats, captions):
        # Encode image
        img_feats = self.fc_img(img_feats).unsqueeze(1)  # (B, 1, H)
        embeds = self.embedding(captions)  # (B, T, E)
        x = torch.cat([img_feats, embeds], dim=1)  # (B, T+1, E)
        out, _ = self.lstm(x)
        return self.fc_out(out)


In [None]:
from torch.nn.utils.rnn import pad_sequence

def custom_collate_fn(batch):
    """Collate function to pad caption sequences in a batch."""
    images, captions = zip(*batch)
    
    # Stack images (they're already tensors)
    images = torch.stack(images, dim=0)

    # Pad captions
    captions = pad_sequence(captions, batch_first=True, padding_value=0)  # 0 = <PAD> token

    return images, captions


In [None]:
from torch.utils.data import DataLoader
from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

train_ds = CaptionDataset("preproc/manifest.jsonl", vocab, transform)
train_dl = DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=custom_collate_fn)


In [None]:
import torch.nn as nn
import torch

class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=512, hid_dim=512, vocab_size=None):
        super().__init__()
        self.fc_img = nn.Linear(feat_dim, hid_dim)
        self.embed = nn.Embedding(vocab_size, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, img_feat, captions):
        # img_feat: (B, feat_dim)
        h0 = self.fc_img(img_feat).unsqueeze(0)  # (1, B, H)
        x = self.embed(captions)                 # (B, T, E)
        out, _ = self.rnn(x, h0)                 # (B, T, H)
        return self.fc_out(out)                  # (B, T, vocab_size)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ✅ Fix the vocab size and pad index
vocab_size = len(vocab.stoi)
pad_idx = vocab.stoi["<pad>"]

# ✅ Create the model with correct parameters
model = Captioner(vocab_size=vocab_size, pad_idx=pad_idx).to(device)

# ✅ Define optimizer and loss using correct pad index
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)


In [None]:
vocab_size = len(vocab.itos)


In [None]:
model = Captioner(vocab_size=len(vocab.itos)).to(device)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Correctly access vocabulary size and <pad> index
vocab_size = len(vocab.itos)
pad_idx = vocab.stoi["<pad>"]

# Initialize model, optimizer, and loss
model = Captioner(vocab_size=vocab_size, pad_idx=pad_idx).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)


In [None]:
# --- Vocabulary Class ---
class Vocabulary:
    def __init__(self):
        self.itos = ["<pad>", "<start>", "<end>", "<unk>"]
        self.stoi = {tok: idx for idx, tok in enumerate(self.itos)}

    def build_vocab(self, captions):
        idx = len(self.stoi)
        for sent in captions:
            for word in sent.lower().split():
                if word not in self.stoi:
                    self.stoi[word] = idx
                    self.itos.append(word)
                    idx += 1


In [None]:
class Vocabulary:
    def __init__(self):
        self.special_tokens = ["<pad>", "<start>", "<end>", "<unk>"]
        self.itos = list(self.special_tokens)
        self.stoi = {token: idx for idx, token in enumerate(self.itos)}

    def build_vocab(self, captions):
        idx = len(self.stoi)
        for sentence in captions:
            for word in sentence.lower().split():
                if word not in self.stoi:
                    self.stoi[word] = idx
                    self.itos.append(word)
                    idx += 1


In [None]:
# ────────────────────────────────────────────────────────────────────────────
# 0. Imports
# ────────────────────────────────────────────────────────────────────────────
import os, json, re, random, requests, torch, torch.nn as nn
from pathlib import Path
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchvision import transforms
from torchvision.models import resnet50, ResNet50_Weights
from PIL import Image

# ────────────────────────────────────────────────────────────────────────────
# 1. Setup paths and download image
# ────────────────────────────────────────────────────────────────────────────
IMAGE_DIR = Path("images")
FEATURE_DIR = Path("preproc")
MANIFEST = FEATURE_DIR / "manifest.jsonl"
IMAGE_DIR.mkdir(exist_ok=True)
FEATURE_DIR.mkdir(exist_ok=True)

if not any(IMAGE_DIR.glob("*.jpg")):
    print("📥 Downloading demo image...")
    url = "https://upload.wikimedia.org/wikipedia/commons/thumb/3/3a/Cat03.jpg/640px-Cat03.jpg"
    img_data = requests.get(url).content
    with open(IMAGE_DIR / "demo.jpg", "wb") as f:
        f.write(img_data)
    print("✅ Demo image saved")

# ────────────────────────────────────────────────────────────────────────────
# 2. Feature extraction and manifest creation
# ────────────────────────────────────────────────────────────────────────────
def process_dataset(image_dir, manifest_path, feature_dir):
    model = resnet50(weights=ResNet50_Weights.DEFAULT)
    model = nn.Sequential(*list(model.children())[:-1])
    model.eval()

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    manifest = []

    for img_path in image_dir.glob("*.jpg"):
        img = Image.open(img_path).convert("RGB")
        tensor = transform(img).unsqueeze(0)
        with torch.no_grad():
            feat = model(tensor).squeeze().clone()
        feat_path = feature_dir / f"{img_path.stem}.pt"
        torch.save(feat, feat_path)
        manifest.append({
            "feature_path": str(feat_path),
            "captions": ["a photo of a cat"]
        })

    with open(manifest_path, "w") as f:
        for entry in manifest:
            f.write(json.dumps(entry) + "\n")
    print(f"✅ Manifest created with {len(manifest)} entries")

if not MANIFEST.exists() or os.stat(MANIFEST).st_size == 0:
    process_dataset(IMAGE_DIR, MANIFEST, FEATURE_DIR)

# ────────────────────────────────────────────────────────────────────────────
# 3. Vocabulary
# ────────────────────────────────────────────────────────────────────────────
class Vocabulary:
    def __init__(self):
        self.special = ["<pad>", "<start>", "<end>", "<unk>"]
        self.itos = self.special.copy()
        self.stoi = {tok: idx for idx, tok in enumerate(self.special)}

    def build(self, captions):
        words = re.findall(r"\w+", " ".join(captions).lower())
        for word in sorted(set(words)):
            if word not in self.stoi:
                self.stoi[word] = len(self.itos)
                self.itos.append(word)

    def encode(self, text):
        tokens = re.findall(r"\w+", text.lower())
        return [self.stoi.get(t, self.stoi["<unk>"]) for t in tokens]

    def __len__(self):
        return len(self.itos)

# ────────────────────────────────────────────────────────────────────────────
# 4. Load manifest + check
# ────────────────────────────────────────────────────────────────────────────
rows = []
with open(MANIFEST) as f:
    for line in f:
        data = json.loads(line.strip())
        path = Path(data.get("feature_path") or data.get("feature", ""))
        if path.exists():
            data["feature_path"] = str(path)
            rows.append(data)

if not rows:
    print("❌ No valid feature paths found. Regenerating...")
    process_dataset(IMAGE_DIR, MANIFEST, FEATURE_DIR)
    with open(MANIFEST) as f:
        for line in f:
            data = json.loads(line.strip())
            path = Path(data.get("feature_path") or data.get("feature", ""))
            if path.exists():
                data["feature_path"] = str(path)
                rows.append(data)

if not rows:
    raise RuntimeError("❌ Still no valid data rows found. Please check image/feature paths.")

# ────────────────────────────────────────────────────────────────────────────
# 5. Build vocab
# ────────────────────────────────────────────────────────────────────────────
vocab = Vocabulary()
all_captions = [cap for row in rows for cap in row["captions"]]
vocab.build(all_captions)
print("✅ Vocab size:", len(vocab))

# ────────────────────────────────────────────────────────────────────────────
# 6. Dataset + Dataloader
# ────────────────────────────────────────────────────────────────────────────
class CaptionDataset(Dataset):
    def __init__(self, rows, vocab, max_len=20):
        self.rows = rows
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.rows)

    def __getitem__(self, idx):
        row = self.rows[idx]
        feat = torch.load(row["feature_path"])
        cap = random.choice(row["captions"])
        ids = [self.vocab.stoi["<start>"]] + self.vocab.encode(cap)[:self.max_len] + [self.vocab.stoi["<end>"]]
        return feat, torch.tensor(ids)

def collate_fn(batch):
    feats, caps = zip(*batch)
    feats = torch.stack(feats)
    caps = pad_sequence(caps, batch_first=True, padding_value=vocab.stoi["<pad>"])
    return feats, caps

train_ds = CaptionDataset(rows, vocab)
if len(train_ds) == 0:
    raise ValueError("❌ No samples in dataset.")
train_dl = DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=collate_fn)

# ────────────────────────────────────────────────────────────────────────────
# 7. Captioner model
# ────────────────────────────────────────────────────────────────────────────
class Captioner(nn.Module):
    def __init__(self):
        super().__init__()
        self.img_fc = nn.Linear(2048, 512)
        self.emb = nn.Embedding(len(vocab), 256, padding_idx=vocab.stoi["<pad>"])
        self.gru = nn.GRU(256, 512, batch_first=True)
        self.fc_out = nn.Linear(512, len(vocab))

    def forward(self, feats, caps):
        h0 = torch.tanh(self.img_fc(feats)).unsqueeze(0)
        x = self.emb(caps)
        out, _ = self.gru(x, h0)
        return self.fc_out(out)

# ────────────────────────────────────────────────────────────────────────────
# 8. Training loop (✅ FIXED LINE BELOW)
# ────────────────────────────────────────────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Captioner().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<pad>"])  # ✅ FIXED

print("🚀 Starting training...")
for epoch in range(5):
    model.train()
    total_loss = 0
    for feats, caps in train_dl:
        feats, caps = feats.to(device), caps.to(device)
        optimizer.zero_grad()
        output = model(feats, caps[:, :-1])
        loss = criterion(output.reshape(-1, output.size(-1)), caps[:, 1:].reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"📘 Epoch {epoch+1} — Loss: {total_loss / len(train_dl):.4f}")


In [None]:
import json
import torch
import torchvision.transforms as transforms
from torchvision.models import resnet50, ResNet50_Weights
from pathlib import Path
from PIL import Image
from tqdm import tqdm

# ─────────────────────────────────────────────────────────
# Paths
# ─────────────────────────────────────────────────────────
manifest_path = Path("preproc/manifest.jsonl")
image_root = Path("images")  # directory containing original .jpg images
out_dir = Path("preproc/features")  # directory to save .pt feature files
out_dir.mkdir(parents=True, exist_ok=True)

# ─────────────────────────────────────────────────────────
# Load pre-trained ResNet model for feature extraction
# ─────────────────────────────────────────────────────────
model = resnet50(weights=ResNet50_Weights.DEFAULT)
model = torch.nn.Sequential(*list(model.children())[:-1])  # remove final classification layer
model.eval()

# ─────────────────────────────────────────────────────────
# Image preprocessing
# ─────────────────────────────────────────────────────────
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# ─────────────────────────────────────────────────────────
# Load and validate manifest
# ─────────────────────────────────────────────────────────
rows = []
with open(manifest_path, "r") as f:
    for line in f:
        row = json.loads(line.strip())
        if "image" in row:
            rows.append(row)
        else:
            print("⚠️ Skipping row without 'image' key:", row)

# ─────────────────────────────────────────────────────────
# Process images, extract features, and update manifest
# ─────────────────────────────────────────────────────────
for row in tqdm(rows, desc="Extracting features"):
    try:
        img_name = row["image"]
        img_path = image_root / img_name

        if not img_path.exists():
            raise FileNotFoundError(f"Image file not found: {img_path}")

        img = Image.open(img_path).convert("RGB")
        x = transform(img).unsqueeze(0)  # shape: (1, 3, 224, 224)

        with torch.no_grad():
            feat = model(x).squeeze()  # shape: (2048,)

        # Save feature vector
        feat_file = f"{img_path.stem}.pt"
        feat_path = out_dir / feat_file
        torch.save(feat, feat_path)

        # Update row with new 'feature' key
        row["feature"] = str(Path("features") / feat_file)

    except Exception as e:
        print(f"❌ Error processing image: {row.get('image', 'UNKNOWN')} — {e}")

# ─────────────────────────────────────────────────────────
# Save updated manifest
# ─────────────────────────────────────────────────────────
updated_manifest_path = Path("preproc/manifest.jsonl")
with open(updated_manifest_path, "w") as f:
    for row in rows:
        if "feature" in row:  # only keep rows with valid features
            f.write(json.dumps(row) + "\n")

print(f"✅ Updated manifest saved to: {updated_manifest_path}")


In [None]:
import os, json, torch, random
from pathlib import Path
from PIL import Image
from torchvision.models import resnet50, ResNet50_Weights
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm

# ───────────────────────────────
# Step 1: Setup paths
# ───────────────────────────────
IMAGE_DIR = Path("images")
FEATURE_DIR = Path("preproc/features")
MANIFEST = Path("preproc/manifest.jsonl")

IMAGE_DIR.mkdir(parents=True, exist_ok=True)
FEATURE_DIR.mkdir(parents=True, exist_ok=True)

# ───────────────────────────────
# Step 2: Download demo image if missing
# ───────────────────────────────
demo_img_path = IMAGE_DIR / "demo.jpg"
if not demo_img_path.exists():
    print("📥 Downloading demo image...")
    import requests
    url = "https://upload.wikimedia.org/wikipedia/commons/thumb/3/3a/Cat03.jpg/640px-Cat03.jpg"
    img_data = requests.get(url).content
    with open(demo_img_path, "wb") as f:
        f.write(img_data)
    print("✅ Demo image saved")

# ───────────────────────────────
# Step 3: Extract features & create manifest
# ───────────────────────────────
def process_image_and_create_manifest():
    model = resnet50(weights=ResNet50_Weights.DEFAULT)
    model = torch.nn.Sequential(*list(model.children())[:-1])
    model.eval()

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    rows = []
    for img_path in IMAGE_DIR.glob("*.jpg"):
        img = Image.open(img_path).convert("RGB")
        x = transform(img).unsqueeze(0)

        with torch.no_grad():
            feat = model(x).squeeze()

        feat_path = FEATURE_DIR / f"{img_path.stem}.pt"
        torch.save(feat, feat_path)

        rows.append({
            "image": img_path.name,
            "feature": f"features/{img_path.stem}.pt",
            "captions": ["a photo of a cat"]
        })

    # Save manifest
    with open(MANIFEST, "w") as f:
        for row in rows:
            f.write(json.dumps(row) + "\n")
    print(f"✅ Manifest created with {len(rows)} entries")

# Only run if manifest missing or empty
if not MANIFEST.exists() or MANIFEST.stat().st_size == 0:
    process_image_and_create_manifest()

# ───────────────────────────────
# Step 4: Load manifest safely
# ───────────────────────────────
def load_manifest():
    rows = []
    with open(MANIFEST, "r") as f:
        for line in f:
            row = json.loads(line.strip())
            feature_file = row.get("feature")
            if feature_file and (Path("preproc") / feature_file).exists():
                row["feature"] = str(Path("preproc") / feature_file)
                rows.append(row)
            else:
                print(f"⚠️ Missing feature file for: {row.get('image')}")
    if not rows:
        raise ValueError("❌ No valid entries in manifest. Check your paths and feature files.")
    return rows

rows = load_manifest()

# ───────────────────────────────
# Step 5: Dummy Vocabulary
# ───────────────────────────────
class DummyVocab:
    def __init__(self):
        self.stoi = {"<pad>": 0, "<start>": 1, "<end>": 2, "<unk>": 3}
    def encode(self, text): return [4, 5, 6]
vocab = DummyVocab()

# ───────────────────────────────
# Step 6: Dataset + Dataloader
# ───────────────────────────────
class CaptionDataset(Dataset):
    def __init__(self, rows, vocab, max_len=20):
        self.rows = rows
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.rows)

    def __getitem__(self, idx):
        row = self.rows[idx]
        feat = torch.load(row["feature"])
        cap = random.choice(row["captions"])
        ids = [self.vocab.stoi["<start>"]] + self.vocab.encode(cap)[:self.max_len] + [self.vocab.stoi["<end>"]]
        return feat, torch.tensor(ids)

def collate_fn(batch):
    feats, caps = zip(*batch)
    feats = torch.stack(feats)
    caps = pad_sequence(caps, batch_first=True, padding_value=vocab.stoi["<pad>"])
    return feats, caps

train_ds = CaptionDataset(rows, vocab)
train_dl = DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=collate_fn)

print(f"✅ Dataset ready with {len(train_ds)} samples")


In [None]:
import torch
import torch.nn as nn

# Dummy vocab for demonstration
class DummyVocab:
    def __init__(self):
        self.itos = ["<pad>", "<start>", "<end>", "<unk>", "cat", "photo"]
        self.stoi = {tok: idx for idx, tok in enumerate(self.itos)}
vocab = DummyVocab()

# ✅ Corrected Captioner model
class Captioner(nn.Module):
    def __init__(self, vocab_size, pad_idx):
        super().__init__()
        self.img_fc = nn.Linear(2048, 512)
        self.emb = nn.Embedding(vocab_size, 256, padding_idx=pad_idx)
        self.gru = nn.GRU(256, 512, batch_first=True)
        self.fc_out = nn.Linear(512, vocab_size)

    def forward(self, feats, caps):
        h0 = torch.tanh(self.img_fc(feats)).unsqueeze(0)
        x = self.emb(caps)
        out, _ = self.gru(x, h0)
        return self.fc_out(out)

# ✅ Instantiate with correct args
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Captioner(vocab_size=len(vocab.itos), pad_idx=vocab.stoi["<pad>"]).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<pad>"])

print("✅ Model initialized successfully!")


In [None]:
import os, json, random
from pathlib import Path
from tqdm import tqdm
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --------------------------
# Vocabulary Class
# --------------------------
class Vocabulary:
    def __init__(self):
        self.stoi = {"<pad>": 0, "<start>": 1, "<end>": 2, "<unk>": 3}
        self.itos = ["<pad>", "<start>", "<end>", "<unk>"]

    def build_vocab(self, captions):
        idx = len(self.stoi)
        for sentence in captions:
            for word in sentence.lower().split():
                if word not in self.stoi:
                    self.stoi[word] = idx
                    self.itos.append(word)
                    idx += 1

    def encode(self, sentence, max_len=20):
        tokens = self.stoi.get("<start>") and [self.stoi["<start>"]]
        tokens += [self.stoi.get(word, self.stoi["<unk>"]) for word in sentence.lower().split()]
        tokens.append(self.stoi["<end>"])
        tokens = tokens[:max_len]
        return torch.tensor(tokens + [self.stoi["<pad>"]] * (max_len - len(tokens)))

# --------------------------
# CaptionDataset Class
# --------------------------
class CaptionDataset(Dataset):
    def __init__(self, manifest_path, vocab, max_len=20):
        self.vocab = vocab
        self.max_len = max_len
        self.rows = []

        with open(manifest_path, "r", encoding="utf-8") as f:
            for line in f:
                row = json.loads(line)
                if "features" in row and os.path.exists(row["features"]):
                    if "captions" in row and row["captions"]:
                        self.rows.append(row)

        print(f"✅ Loaded {len(self.rows)} valid samples.")

    def __len__(self):
        return len(self.rows)

    def __getitem__(self, idx):
        row = self.rows[idx]
        feat = torch.load(row["features"])  # already 2048-D feature
        caption = random.choice(row["captions"])
        cap_tensor = self.vocab.encode(caption, self.max_len)
        return feat, cap_tensor

# --------------------------
# Simple Captioning Model
# --------------------------
class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=256, hid_dim=256, vocab_size=1000, pad_idx=0):
        super().__init__()
        self.fc_img = nn.Linear(feat_dim, hid_dim)
        self.emb    = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_idx)
        self.lstm   = nn.LSTM(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, feats, caps):
        h0 = torch.tanh(self.fc_img(feats)).unsqueeze(0)
        c0 = torch.zeros_like(h0)
        emb = self.emb(caps)
        out, _ = self.lstm(emb, (h0, c0))
        return self.fc_out(out)

# --------------------------
# Build Vocab & Load Data
# --------------------------
dummy_captions = ["a segmented image", "an example picture"]
vocab = Vocabulary()
vocab.build_vocab(dummy_captions)
vocab_size = len(vocab.stoi)

train_ds = CaptionDataset("preproc/manifest.jsonl", vocab)
train_dl = DataLoader(train_ds, batch_size=2, shuffle=True)

# --------------------------
# Train Model
# --------------------------
model = Captioner(vocab_size=vocab_size, pad_idx=vocab.stoi["<pad>"]).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<pad>"])

epochs = 5
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for feats, caps in train_dl:
        feats, caps = feats.to(device), caps.to(device)
        optimizer.zero_grad()
        outputs = model(feats, caps[:, :-1])  # predict next token
        loss = criterion(outputs.reshape(-1, vocab_size), caps[:, 1:].reshape(-1))  # ignore <start>
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_dl)
    print(f"📘 Epoch {epoch+1}/{epochs} - Loss: {avg_loss:.4f}")


In [None]:
import torch
from torch.utils.data import Dataset
from pathlib import Path
import json
import random

class CaptionDataset(Dataset):
    def __init__(self, jsonl_path, vocab, transform=None):
        self.vocab = vocab
        self.transform = transform

        # ✅ Properly load and parse each line as a JSON object (dict)
        with open(jsonl_path, 'r') as f:
            self.rows = [json.loads(line) for line in f]

    def __getitem__(self, idx):
        row = self.rows[idx]
        
        # ✅ Load precomputed image feature
        feat_path = Path("preproc") / row["feature"]
        feat = torch.load(feat_path)  # should be shape (2048,)
        
        # ✅ Pick one caption randomly
        caption = random.choice(row["captions"]).lower()
        
        # ✅ Tokenize and convert to indices
        tokens = ["<start>"] + caption.split() + ["<end>"]
        token_ids = [self.vocab.stoi.get(token, self.vocab.stoi["<unk>"]) for token in tokens]
        cap_tensor = torch.tensor(token_ids, dtype=torch.long)

        return feat, cap_tensor

    def __len__(self):
        return len(self.rows)


In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

train_ds = CaptionDataset("preproc/manifest.jsonl", vocab, transform)
train_dl = DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=custom_collate_fn)


In [None]:
import torch.nn as nn

# Captioner model definition
class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=512, hid_dim=512, vocab_size=1000):
        super().__init__()
        self.fc_img = nn.Linear(feat_dim, hid_dim)
        self.embed = nn.Embedding(vocab_size, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, img_feat, captions):
        img_feat = self.fc_img(img_feat).unsqueeze(1)  # (B, 1, H)
        cap_embeds = self.embed(captions)              # (B, T, E)
        x = torch.cat([img_feat, cap_embeds[:, :-1, :]], dim=1)  # Shifted input
        out, _ = self.lstm(x)
        return self.fc_out(out)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create model, optimizer, loss
vocab_size = len(vocab.itos)
model = Captioner(vocab_size=vocab_size).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<pad>"])

print(f"✅ Model ready on {device} with vocab size {vocab_size}")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import json, random, os
from pathlib import Path

# --------------------------
# Vocabulary Class
# --------------------------
class Vocabulary:
    def __init__(self):
        self.stoi = {"<pad>": 0, "<start>": 1, "<end>": 2, "<unk>": 3}
        self.itos = ["<pad>", "<start>", "<end>", "<unk>"]

    def build_vocab(self, captions):
        idx = len(self.stoi)
        for sentence in captions:
            for word in sentence.lower().split():
                if word not in self.stoi:
                    self.stoi[word] = idx
                    self.itos.append(word)
                    idx += 1

    def encode(self, sentence, max_len=20):
        tokens = [self.stoi["<start>"]]
        tokens += [self.stoi.get(word, self.stoi["<unk>"]) for word in sentence.lower().split()]
        tokens.append(self.stoi["<end>"])
        tokens = tokens[:max_len]
        return torch.tensor(tokens + [self.stoi["<pad>"]] * (max_len - len(tokens)))

# --------------------------
# Dataset Class
# --------------------------
class CaptionDataset(Dataset):
    def __init__(self, manifest_path, vocab, max_len=20):
        self.data = []
        self.vocab = vocab
        self.max_len = max_len
        with open(manifest_path, "r", encoding="utf-8") as f:
            for line in f:
                row = json.loads(line)
                if "features" in row and os.path.exists(row["features"]):
                    if "captions" in row and row["captions"]:
                        self.data.append(row)
        print(f"✅ Loaded {len(self.data)} samples.")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data[idx]
        feat = torch.load(row["features"])  # (2048,)
        caption = random.choice(row["captions"])
        cap_tensor = self.vocab.encode(caption, self.max_len)
        return feat, cap_tensor

# --------------------------
# Captioner Model
# --------------------------
class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=256, hid_dim=256, vocab_size=1000, pad_idx=0):
        super().__init__()
        self.fc_img = nn.Linear(feat_dim, hid_dim)
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, feats, caps):
        h0 = torch.tanh(self.fc_img(feats)).unsqueeze(0)
        c0 = torch.zeros_like(h0)
        emb = self.emb(caps)
        out, _ = self.lstm(emb, (h0, c0))
        return self.fc_out(out)

# --------------------------
# Build vocab, dataset and dataloader
# --------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

dummy_captions = ["a segmented image", "an example picture"]
vocab = Vocabulary()
vocab.build_vocab(dummy_captions)
PAD_IDX = vocab.stoi["<pad>"]  # ✅ Define PAD_IDX here
vocab_size = len(vocab.stoi)

train_ds = CaptionDataset("preproc/manifest.jsonl", vocab)
train_dl = DataLoader(train_ds, batch_size=2, shuffle=True)

# --------------------------
# Initialize model, loss and optimizer
# --------------------------
model = Captioner(vocab_size=vocab_size, pad_idx=PAD_IDX).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)  # ✅ Now PAD_IDX is defined
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# --------------------------
# Training Loop
# --------------------------
epochs = 5
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for feats, caps in train_dl:
        feats, caps = feats.to(device), caps.to(device)
        optimizer.zero_grad()
        outputs = model(feats, caps[:, :-1])
        loss = criterion(outputs.reshape(-1, vocab_size), caps[:, 1:].reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_dl)
    print(f"📘 Epoch {epoch+1}/{epochs} - Loss: {avg_loss:.4f}")


In [None]:
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=512, hid_dim=512, vocab_size=1000):
        super().__init__()
        self.fc_img = nn.Linear(feat_dim, hid_dim)
        self.embed = nn.Embedding(vocab_size, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, img_feat, captions):
        img_feat = self.fc_img(img_feat).unsqueeze(1)         # (B, 1, H)
        cap_embeds = self.embed(captions)                     # (B, T, E)
        x = torch.cat([img_feat, cap_embeds[:, :-1, :]], dim=1)  # Shifted input
        out, _ = self.lstm(x)
        return self.fc_out(out)


In [None]:
import torch.optim as optim

PAD_IDX = 0  # Replace with your actual padding token index
model = Captioner()
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = optim.Adam(model.parameters(), lr=1e-3)


In [None]:
# ────────────────────────────────────────────────────────────────────────────
# 0. Imports & Cleanup
# ────────────────────────────────────────────────────────────────────────────
import os, json, re, random, requests, shutil, torch, torch.nn as nn
from pathlib import Path
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchvision import transforms
from torchvision.models import resnet50, ResNet50_Weights
from PIL import Image, UnidentifiedImageError

# Clean previous data
shutil.rmtree("images", ignore_errors=True)
shutil.rmtree("preproc", ignore_errors=True)

# ────────────────────────────────────────────────────────────────────────────
# 1. Setup: Download or Use Local Image
# ────────────────────────────────────────────────────────────────────────────
IMAGE_DIR = Path("images")
FEATURE_DIR = Path("preproc")
MANIFEST = FEATURE_DIR / "manifest.jsonl"
IMAGE_DIR.mkdir(exist_ok=True)
FEATURE_DIR.mkdir(exist_ok=True)

img_path = IMAGE_DIR / "demo.jpg"
img_url = "https://images.unsplash.com/photo-1518791841217-8f162f1e1131?w=640"

def download_image_safe(url, path):
    try:
        print("\U0001F4E5 Attempting to download image...")
        r = requests.get(url, timeout=10)
        if r.status_code == 200 and len(r.content) > 10000:
            with open(path, "wb") as f:
                f.write(r.content)
            Image.open(path).verify()
            print(f"✅ Valid image saved at: {path}")
            return True
        return False
    except Exception as e:
        print(f"❌ Download failed: {e}")
        return False

image_ready = False
if not img_path.exists() or img_path.stat().st_size < 10000:
    image_ready = download_image_safe(img_url, img_path)

if not image_ready:
    print("⚠️  Fallback: searching local .jpg image...")
    for file in IMAGE_DIR.glob("*.jpg"):
        try:
            Image.open(file).verify()
            img_path = file
            print(f"✅ Found local image: {img_path}")
            image_ready = True
            break
        except UnidentifiedImageError:
            continue

if not image_ready:
    raise RuntimeError("❌ No valid image. Place a .jpg file in 'images/' folder named 'demo.jpg'")

# ────────────────────────────────────────────────────────────────────────────
# 2. Feature Extraction + Manifest Creation
# ────────────────────────────────────────────────────────────────────────────
def process_dataset(image_dir, manifest_path, feature_dir):
    model = resnet50(weights=ResNet50_Weights.DEFAULT)
    model = nn.Sequential(*list(model.children())[:-1])
    model.eval()

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])

    manifest = []
    for img_path in image_dir.glob("*.jpg"):
        try:
            img = Image.open(img_path).convert("RGB")
        except UnidentifiedImageError:
            print(f"❌ Skipping unreadable image: {img_path}")
            continue
        tensor = transform(img).unsqueeze(0)
        with torch.no_grad():
            feat = model(tensor).squeeze().clone()
        feat_path = feature_dir / f"{img_path.stem}.pt"
        torch.save(feat, feat_path)
        manifest.append({"feature": feat_path.name, "captions": ["a photo of a cat"]})

    with open(manifest_path, "w") as f:
        for entry in manifest:
            f.write(json.dumps(entry) + "\n")
    print("✅ Manifest created with", len(manifest), "entries.")

process_dataset(IMAGE_DIR, MANIFEST, FEATURE_DIR)

# ────────────────────────────────────────────────────────────────────────────
# 3. Vocabulary Class
# ────────────────────────────────────────────────────────────────────────────
class Vocabulary:
    def __init__(self, min_freq=1):
        self.special = ["<pad>", "<start>", "<end>", "<unk>"]
        self.itos = self.special.copy()
        self.stoi = {tok: idx for idx, tok in enumerate(self.special)}
        self.min_freq = min_freq

    def _tokenize(self, text):
        return re.findall(r"\w+", text.lower())

    def build(self, captions):
        freqs = Counter(tok for cap in captions for tok in self._tokenize(cap))
        for word, freq in freqs.items():
            if freq >= self.min_freq and word not in self.stoi:
                self.stoi[word] = len(self.itos)
                self.itos.append(word)

    def encode(self, text):
        return [self.stoi.get(tok, self.stoi["<unk>"]) for tok in self._tokenize(text)]

    def decode(self, ids):
        return " ".join(self.itos[i] for i in ids if i not in {
            self.stoi["<pad>"], self.stoi["<start>"], self.stoi["<end>"]
        })

    def __len__(self):
        return len(self.itos)

# ────────────────────────────────────────────────────────────────────────────
# 4. Load Manifest + Build Vocabulary
# ────────────────────────────────────────────────────────────────────────────
with open(MANIFEST) as f:
    rows = [json.loads(line) for line in f if "feature" in line]

if not rows:
    raise ValueError("❌ No valid samples found.")

for row in rows:
    row["captions"] = row.get("captions", ["a photo"])

vocab = Vocabulary()
vocab.build([cap for r in rows for cap in r["captions"]])
print("✅ Vocab size:", len(vocab))

# ────────────────────────────────────────────────────────────────────────────
# 5. Dataset and DataLoader
# ────────────────────────────────────────────────────────────────────────────
class CaptionDataset(Dataset):
    def __init__(self, manifest_rows, vocab, max_len=20):
        self.rows = manifest_rows
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.rows)

    def __getitem__(self, idx):
        row = self.rows[idx]
        feat = torch.load(FEATURE_DIR / row["feature"])
        cap = random.choice(row["captions"]).lower()
        ids = [self.vocab.stoi["<start>"]] + self.vocab.encode(cap)[:self.max_len] + [self.vocab.stoi["<end>"]]
        return feat, torch.tensor(ids)

def collate(batch):
    feats, caps = zip(*batch)
    feats = torch.stack(feats)
    caps = pad_sequence(caps, batch_first=True, padding_value=vocab.stoi["<pad>"])
    return feats, caps

train_ds = CaptionDataset(rows, vocab)
train_dl = DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=collate)
print("✅ Dataset size:", len(train_ds))

# ────────────────────────────────────────────────────────────────────────────
# 6. Captioner Model
# ────────────────────────────────────────────────────────────────────────────
class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=256, hid_dim=512, vocab_size=len(vocab)):
        super().__init__()
        self.img_fc = nn.Linear(feat_dim, hid_dim)
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=vocab.stoi["<pad>"])
        self.gru = nn.GRU(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, feats, caps):
        h0 = torch.tanh(self.img_fc(feats)).unsqueeze(0)
        x = self.emb(caps)
        out, _ = self.gru(x, h0)
        return self.fc_out(out)

# ────────────────────────────────────────────────────────────────────────────
# 7. Training Loop
# ────────────────────────────────────────────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Captioner().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<pad>"])

model.train()
for epoch in range(10):
    total_loss = 0
    for feats, caps in train_dl:
        feats, caps = feats.to(device), caps.to(device)
        optimizer.zero_grad()
        outputs = model(feats, caps)
        targets = caps[:, 1:]
        outputs = outputs[:, :-1, :]
        loss = criterion(outputs.reshape(-1, outputs.size(-1)), targets.reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"📘 Epoch [{epoch+1}/10] — Loss: {total_loss / len(train_dl):.4f}")


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim


In [None]:
class ImageCaptionDataset(Dataset):
    def __init__(self, image_features, captions):
        self.image_features = image_features
        self.captions = captions

    def __len__(self):
        return len(self.captions)

    def __getitem__(self, idx):
        return self.image_features[idx], self.captions[idx]




In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
import os


In [None]:
# Load ResNet18 and remove the classifier layer
resnet = models.resnet18(pretrained=True)
resnet = torch.nn.Sequential(*(list(resnet.children())[:-1]))
resnet.eval()

# Transform image to fit ResNet input
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load your image
img_path = "photo_2025-06-09_21-10-49.jpg"  # file already in your folder
img = Image.open(img_path).convert("RGB")
img = transform(img).unsqueeze(0)  # Add batch dim

# Extract features
with torch.no_grad():
    features = resnet(img).squeeze().flatten()  # Shape: (512,)

# Convert to dataset-style tensor
img_features = features.unsqueeze(0)  # Shape: (1, 512)


In [None]:
encoded_captions = torch.tensor([[1, 5, 10, 3]])  # Example: [<start>, 'a', 'cat', <end>]


In [None]:
class ImageCaptionDataset(Dataset):
    def __init__(self, image_features, captions):
        self.image_features = image_features
        self.captions = captions

    def __len__(self):
        return len(self.captions)

    def __getitem__(self, idx):
        return self.image_features[idx], self.captions[idx]

# Create Dataset and DataLoader
train_dataset = ImageCaptionDataset(img_features, encoded_captions)
train_dl = DataLoader(train_dataset, batch_size=1, shuffle=True)


In [None]:
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self):
        super(Captioner, self).__init__()
        self.fc = nn.Linear(512, 1000)  # change 1000 to vocab size

    def forward(self, img_feats, captions):
        return self.fc(img_feats)  # dummy forward for now


In [None]:
model = Captioner()


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


In [None]:
criterion = nn.CrossEntropyLoss()


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


In [None]:
num_epochs = 10  # Or any number you choose

model.train()

for epoch in range(num_epochs):
    total_loss = 0

    for img_feats, captions in train_dl:
        img_feats = img_feats.to(device)
        captions = captions.to(device)

        optimizer.zero_grad()

        # Input: captions without last token (e.g. <start> A cat)
        # Target: captions without first token (e.g. A cat <end>)
        inputs = captions[:, :-1]     # (B, T-1)
        targets = captions[:, 1:]     # (B, T-1)

        outputs = model(img_feats, inputs)  # (B, T-1, V)

        # Reshape to match CrossEntropyLoss input expectations
        outputs = outputs.reshape(-1, outputs.size(-1))  # (B*T-1, V)
        targets = targets.reshape(-1)                    # (B*T-1)

        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_dl)
    print(f"📘 Epoch [{epoch+1}/{num_epochs}] — Loss: {avg_loss:.4f}")


In [None]:
num_epochs = 10
model.train()

for epoch in range(num_epochs):
    total_loss = 0

    for img_feats, captions in train_dl:
        img_feats = img_feats.to(device)     # (B, 2048)
        captions = captions.to(device)       # (B, T)

        optimizer.zero_grad()

        # Input to model: all tokens except last (teacher forcing)
        inputs = captions[:, :-1]            # (B, T-1)
        targets = captions[:, 1:]            # (B, T-1)

        # Forward pass
        outputs = model(img_feats, inputs)   # (B, T-1, vocab_size)

        # Flatten for loss computation
        outputs = outputs.reshape(-1, outputs.size(-1))  # (B*(T-1), vocab_size)
        targets = targets.reshape(-1)                    # (B*(T-1))

        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_dl)
    print(f"📘 Epoch [{epoch+1}/{num_epochs}] — Loss: {avg_loss:.4f}")


In [None]:
print("outputs shape:", outputs.shape)  # should be (B*T, V)
print("targets shape:", targets.shape)  # should be (B*T,)


In [None]:
class Captioner(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size):
        super(Captioner, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, img_feats, captions):
        # img_feats: (batch, feature_dim)
        # captions: (batch, seq_len)

        embeddings = self.embedding(captions)  # (batch, seq_len, embed_size)

        # You can optionally expand and feed image features as first input token
        # or use it to initialize hidden state

        outputs, _ = self.lstm(embeddings)     # (batch, seq_len, hidden_size)
        outputs = self.fc(outputs)             # (batch, seq_len, vocab_size)

        return outputs


In [None]:
outputs = model(img_feats, captions)   # (B, T, V)
outputs = outputs[:, 1:, :]            # skip <start> prediction if needed
targets = captions[:, 1:]              # shift ground truth

outputs = outputs.reshape(-1, outputs.shape[-1])  # (B*T, V)
targets = targets.reshape(-1)                     # (B*T,)

loss = criterion(outputs, targets)
loss.backward()
optimizer.step()


In [None]:
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size):
        super(Captioner, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, img_feats, captions):
        """
        img_feats is unused here but you can use it to init hidden state
        captions: [batch_size, seq_len]
        """
        embeddings = self.embedding(captions)                 # (B, T, E)
        lstm_out, _ = self.lstm(embeddings)                   # (B, T, H)
        outputs = self.linear(lstm_out)                       # (B, T, V)
        return outputs


In [None]:
outputs = model(img_feats, captions)        # (B, T, V)
outputs = outputs[:, 1:, :]                 # skip <start> token
targets = captions[:, 1:]                   # predict next word

outputs = outputs.reshape(-1, outputs.size(-1))  # (B*T, V)
targets = targets.reshape(-1)                   # (B*T)

loss = criterion(outputs, targets)
loss.backward()
optimizer.step()


In [None]:
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size):
        super(Captioner, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, img_feats, captions):
        # Use img_feats later if needed for context initialization
        embeddings = self.embedding(captions)            # (B, T, E)
        lstm_out, _ = self.lstm(embeddings)              # (B, T, H)
        outputs = self.linear(lstm_out)                  # (B, T, V)
        return outputs


In [None]:
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size):
        super(Captioner, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, img_feats, captions):
        # img_feats is not used in this simple version; can be added to init LSTM hidden
        embeddings = self.embedding(captions)         # (B, T, E)
        lstm_out, _ = self.lstm(embeddings)           # (B, T, H)
        outputs = self.linear(lstm_out)               # (B, T, V)
        return outputs


In [None]:
captions = [["a", "dog", "on", "a", "beach"], ["a", "cat", "on", "a", "mat"], ...]


In [None]:
captions = [
    "<START> a dog is playing with a ball <END>",
    "<START> a child is eating ice cream <END>",
    "<START> a man is riding a bicycle <END>"
]

with open("captions.txt", "w") as f:
    for caption in captions:
        f.write(caption + "\n")


In [None]:
captions = []

with open("captions.txt", "r") as file:
    for line in file:
        line = line.strip()
        if line:  # skip empty lines
            tokens = line.lower().split()  # basic tokenization
            captions.append(tokens)


In [None]:
from collections import Counter

# Flatten all tokens
all_tokens = [token for caption in captions for token in caption]
counter = Counter(all_tokens)

# Set a threshold frequency (e.g., keep words appearing ≥1 time)
threshold = 1
vocab = [word for word, count in counter.items() if count >= threshold]

# Special tokens
vocab = ['<PAD>', '<START>', '<END>', '<UNK>'] + sorted(set(vocab))
word2idx = {word: idx for idx, word in enumerate(vocab)}
idx2word = {idx: word for word, idx in word2idx.items()}
vocab_size = len(vocab)


In [None]:
max_len = max(len(cap) for cap in captions)

encoded_captions = []
for caption in captions:
    encoded = [word2idx.get(word, word2idx['<UNK>']) for word in caption]
    # Pad to max_len
    encoded += [word2idx['<PAD>']] * (max_len - len(encoded))
    encoded_captions.append(encoded)

encoded_captions = torch.tensor(encoded_captions)


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re

# Example image features (e.g., extracted by ResNet)
img_features = torch.randn(10, 2048)  # 10 samples, 2048-dim features

# Captions for each image (1:1 mapping)
captions = [
    "A cat sitting on a mat",
    "A dog running in the park",
    "A child playing with a ball",
    "A man riding a bicycle",
    "A woman reading a book",
    "A bird flying in the sky",
    "A car parked on the road",
    "A group of people hiking",
    "A boat floating on water",
    "A cityscape with tall buildings"
]

# -----------------------------
# 1. Simple Vocabulary
# -----------------------------
class Vocabulary:
    def __init__(self):
        self.special = ["<pad>", "<start>", "<end>", "<unk>"]
        self.itos = self.special.copy()
        self.stoi = {tok: idx for idx, tok in enumerate(self.special)}

    def build(self, captions):
        counter = Counter()
        for cap in captions:
            counter.update(re.findall(r"\w+", cap.lower()))
        for word in counter:
            if word not in self.stoi:
                self.stoi[word] = len(self.itos)
                self.itos.append(word)

    def encode(self, text):
        tokens = ["<start>"] + re.findall(r"\w+", text.lower()) + ["<end>"]
        return [self.stoi.get(t, self.stoi["<unk>"]) for t in tokens]

vocab = Vocabulary()
vocab.build(captions)

# -----------------------------
# 2. Encode captions
# -----------------------------
encoded_captions = [torch.tensor(vocab.encode(cap)) for cap in captions]

# -----------------------------
# 3. Pad Function
# -----------------------------
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    img_feats, caps = zip(*batch)
    img_feats = torch.stack(img_feats)
    caps = pad_sequence(caps, batch_first=True, padding_value=vocab.stoi["<pad>"])
    return img_feats, caps

# -----------------------------
# 4. Dataset + DataLoader
# -----------------------------
class ImageCaptionDataset(Dataset):
    def __init__(self, image_features, captions):
        self.image_features = image_features
        self.captions = captions

    def __len__(self):
        return len(self.captions)

    def __getitem__(self, idx):
        return self.image_features[idx], self.captions[idx]

train_dataset = ImageCaptionDataset(img_features, encoded_captions)
train_dl = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

# -----------------------------
# 5. Test loading
# -----------------------------
for img_feats, caps in train_dl:
    print("Image Feature Shape:", img_feats.shape)  # (B, 2048)
    print("Caption Shape:", caps.shape)             # (B, T)
    break


In [None]:
embed_size = 256
hidden_size = 512

model = Captioner(embed_size, hidden_size, vocab_size)
criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<PAD>'])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re
from torch.nn.utils.rnn import pad_sequence

# -----------------------------
# 1. Dummy Image Features (10 samples)
# -----------------------------
img_features = torch.randn(10, 2048)  # Shape: (10, 2048)

# 2. Dummy Captions
captions = [
    "A cat sitting on a mat",
    "A dog running in the park",
    "A child playing with a ball",
    "A man riding a bicycle",
    "A woman reading a book",
    "A bird flying in the sky",
    "A car parked on the road",
    "A group of people hiking",
    "A boat floating on water",
    "A cityscape with tall buildings"
]

# -----------------------------
# 3. Vocabulary Builder
# -----------------------------
class Vocabulary:
    def __init__(self):
        self.special = ["<pad>", "<start>", "<end>", "<unk>"]
        self.itos = self.special.copy()
        self.stoi = {tok: idx for idx, tok in enumerate(self.special)}

    def build(self, captions):
        counter = Counter()
        for cap in captions:
            counter.update(re.findall(r"\w+", cap.lower()))
        for word in counter:
            if word not in self.stoi:
                self.stoi[word] = len(self.itos)
                self.itos.append(word)

    def encode(self, text):
        tokens = ["<start>"] + re.findall(r"\w+", text.lower()) + ["<end>"]
        return [self.stoi.get(t, self.stoi["<unk>"]) for t in tokens]

vocab = Vocabulary()
vocab.build(captions)

# -----------------------------
# 4. Encode Captions
# -----------------------------
encoded_captions = [torch.tensor(vocab.encode(cap)) for cap in captions]

# -----------------------------
# 5. Dataset & DataLoader
# -----------------------------
class ImageCaptionDataset(Dataset):
    def __init__(self, image_features, captions):
        self.image_features = image_features
        self.captions = captions

    def __len__(self):
        return len(self.captions)

    def __getitem__(self, idx):
        return self.image_features[idx], self.captions[idx]

def collate_fn(batch):
    img_feats, caps = zip(*batch)
    img_feats = torch.stack(img_feats)
    caps = pad_sequence(caps, batch_first=True, padding_value=vocab.stoi["<pad>"])
    return img_feats, caps

train_dataset = ImageCaptionDataset(img_features, encoded_captions)
train_dl = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

# -----------------------------
# 6. Print Shapes
# -----------------------------
print("Image features shape:", img_features.shape)
print("Number of encoded captions:", len(encoded_captions))
print("Example caption tensor shape:", encoded_captions[0].shape)

# Optional: Print 1 batch
for img_feats_batch, caps_batch in train_dl:
    print("\nBatch image features shape:", img_feats_batch.shape)  # (B, 2048)
    print("Batch caption shape:", caps_batch.shape)                # (B, T)
    break


In [None]:
# Fix shape mismatch: make image features match captions
img_features = img_features.repeat(3, 1)  # Now shape is [3, 512]


In [None]:
train_dataset = ImageCaptionDataset(img_features, encoded_captions)
train_dl = DataLoader(train_dataset, batch_size=2, shuffle=True)  # batch_size can be 2 or 3


In [None]:
num_epochs = 10  # you can increase later if needed

for epoch in range(num_epochs):
    total_loss = 0
    for img_feats, captions in train_dl:
        optimizer.zero_grad()

        outputs = model(img_feats, captions)       # Shape: (B, T, V)
        outputs = outputs[:, 1:, :]                 # Skip <START>
        targets = captions[:, 1:]                   # Next word prediction

        outputs = outputs.reshape(-1, outputs.size(-1))  # (B*T, V)
        targets = targets.reshape(-1)                    # (B*T)

        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")


In [None]:
def generate_caption(model, img_feat, vocab, max_len=20):
    model.eval()
    caption = [vocab['<START>']]
    with torch.no_grad():
        for _ in range(max_len):
            cap_tensor = torch.tensor(caption).unsqueeze(0)  # (1, T)
            output = model(img_feat.unsqueeze(0), cap_tensor)  # (1, T, V)
            next_word_logits = output[0, -1]  # last token's output
            predicted = next_word_logits.argmax().item()

            caption.append(predicted)

            if predicted == vocab['<END>']:
                break

    # Decode
    inv_vocab = {idx: tok for tok, idx in vocab.items()}
    return ' '.join([inv_vocab[idx] for idx in caption[1:-1]])  # Skip <START> and <END>


In [None]:
from collections import Counter

# Flatten all tokens
all_tokens = [token for caption in captions for token in caption]

# Count frequency
counter = Counter(all_tokens)

# Set a threshold if needed, e.g., min_freq = 1
tokens = sorted(counter)

# Build vocab dict
vocab = {token: idx + 2 for idx, token in enumerate(tokens)}
vocab['<PAD>'] = 0
vocab['<START>'] = 1
vocab['<END>'] = len(vocab)  # or any fixed value

# OPTIONAL: Create inverse vocab for decoding
inv_vocab = {idx: token for token, idx in vocab.items()}


In [None]:
import torch
import torch.nn as nn

# -----------------------------
# Dummy Captioner model
# -----------------------------
class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=256, hid_dim=512, vocab_size=100):
        super().__init__()
        self.img_fc = nn.Linear(feat_dim, hid_dim)
        self.emb = nn.Embedding(vocab_size, emb_dim)
        self.gru = nn.GRU(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, feat, caps):
        h0 = torch.tanh(self.img_fc(feat)).unsqueeze(0)
        x = self.emb(caps)
        out, _ = self.gru(x, h0)
        return self.fc_out(out)

# -----------------------------
# Caption Generation Function
# -----------------------------
def generate_caption(model, img_feat, vocab, max_len=20):
    model.eval()
    generated = [vocab.stoi["<start>"]]
    img_feat = img_feat.unsqueeze(0)  # add batch dim

    with torch.no_grad():
        h = torch.tanh(model.img_fc(img_feat)).unsqueeze(0)

        for _ in range(max_len):
            input_ids = torch.tensor([generated], dtype=torch.long)  # shape: (1, T)
            emb = model.emb(input_ids)
            output, h = model.gru(emb, h)
            logits = model.fc_out(output[:, -1, :])  # last token output
            pred = logits.argmax(dim=-1).item()
            generated.append(pred)
            if pred == vocab.stoi["<end>"]:
                break

    # Decode using vocab.itos
    caption_words = [vocab.itos[idx] for idx in generated[1:-1] if idx < len(vocab.itos)]
    return ' '.join(caption_words)

# -----------------------------
# Usage (Assuming trained model & vocab)
# -----------------------------
# model = Captioner(...).to(device)
# img_feat = img_features[0].to(device)
# caption = generate_caption(model, img_feat, vocab)
# print("Generated Caption:", caption)


In [None]:
vocab['<UNK>'] = len(vocab)


In [None]:
# Function to decode a caption (list of token indices) into text
def decode_caption(caption, vocab):
    """
    caption: list of token indices
    vocab: Vocabulary object with vocab.itos (index to string)
    """
    # Skip <start> and <end> tokens
    words = [vocab.itos[idx] if idx < len(vocab.itos) else '<UNK>' for idx in caption[1:-1]]
    return ' '.join(words)


In [None]:
def generate_caption(model, img_feat, vocab, max_len=20):
    model.eval()
    caption = [vocab['<START>']]
    with torch.no_grad():
        for _ in range(max_len):
            cap_tensor = torch.tensor(caption).unsqueeze(0)  # (1, T)
            output = model(img_feat.unsqueeze(0), cap_tensor)  # (1, T, V)
            next_word_logits = output[0, -1]  # last token's output
            predicted = next_word_logits.argmax().item()
            caption.append(predicted)
            if predicted == vocab['<END>']:
                break

    # Create reverse vocab
    inv_vocab = {idx: tok for tok, idx in vocab.items()}

    # ✅ This line must be inside the function!
    return ' '.join([inv_vocab.get(idx, '<UNK>') for idx in caption[1:-1]])


In [None]:
caption = generate_caption(model, img_features[0], vocab)
print("Generated Caption:", caption)


In [None]:
def generate_caption(model, img_feat, vocab, max_len=20):
    model.eval()
    caption = [vocab['<START>']]
    with torch.no_grad():
        for _ in range(max_len):
            cap_tensor = torch.tensor(caption).unsqueeze(0)  # shape (1, T)
            output = model(img_feat.unsqueeze(0), cap_tensor)  # shape (1, T, V)
            next_word_logits = output[0, -1]  # last timestep
            predicted = next_word_logits.argmax().item()
            caption.append(predicted)
            if predicted == vocab['<END>']:
                break

    # Decode caption (skip <START> and <END>)
    inv_vocab = {idx: tok for tok, idx in vocab.items()}
    return ' '.join([inv_vocab.get(idx, '<UNK>') for idx in caption[1:-1]])


In [None]:
caption = generate_caption(model, img_features[0], vocab)
print("Generated Caption:", caption)


In [None]:
for i in range(len(img_features)):
    caption = generate_caption(model, img_features[i], vocab)
    print(f"Image {i+1} Caption:", caption)


In [None]:
# Save
torch.save(model.state_dict(), 'caption_model.pth')

# Load
model.load_state_dict(torch.load('caption_model.pth'))
model.eval()


In [None]:
pip install streamlit


In [None]:
# ✅ COMPLETE WORKING CODE (Training + Saving + Streamlit)

# ─────────────────────────────────────────────────────────────
# PART 1: TRAINING + SAVING VOCAB & MODEL
# ─────────────────────────────────────────────────────────────
import os, json, re, pickle, torch, random
import torch.nn as nn
from PIL import Image
from collections import Counter
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchvision import transforms, models

# Directories
Path("images").mkdir(exist_ok=True)
Path("preproc").mkdir(exist_ok=True)

# Dummy image for training
img_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/3/3a/Cat03.jpg/640px-Cat03.jpg"
img_path = Path("images/demo.jpg")

if not img_path.exists():
    import requests
    r = requests.get(img_url)
    with open(img_path, 'wb') as f:
        f.write(r.content)

# ─────── 1. Vocabulary ───────
class Vocabulary:
    def __init__(self):
        self.itos = ["<pad>", "<start>", "<end>", "<unk>"]
        self.stoi = {tok: i for i, tok in enumerate(self.itos)}

    def build(self, captions):
        freqs = Counter(word for cap in captions for word in re.findall(r"\w+", cap.lower()))
        for word, freq in freqs.items():
            if word not in self.stoi:
                self.stoi[word] = len(self.itos)
                self.itos.append(word)

    def encode(self, text):
        return [self.stoi.get(w, self.stoi["<unk>"]) for w in re.findall(r"\w+", text.lower())]

    def decode(self, ids):
        return " ".join(self.itos[i] for i in ids if i not in {self.stoi["<pad>"], self.stoi["<start>"], self.stoi["<end>"]})

# ─────── 2. Feature Extractor ───────
def extract_feature(img_path):
    img = Image.open(img_path).convert("RGB")
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    img_tensor = transform(img).unsqueeze(0)
    model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
    model = nn.Sequential(*list(model.children())[:-1])
    model.eval()
    with torch.no_grad():
        features = model(img_tensor).squeeze()
    return features

# ─────── 3. Captioner Model ───────
class Captioner(nn.Module):
    def __init__(self, feat_dim=2048, emb_dim=256, hid_dim=512, vocab_size=10000):
        super().__init__()
        self.img_fc = nn.Linear(feat_dim, hid_dim)
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.gru = nn.GRU(emb_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, vocab_size)

    def forward(self, feats, caps):
        h0 = torch.tanh(self.img_fc(feats)).unsqueeze(0)
        x = self.emb(caps)
        out, _ = self.gru(x, h0)
        return self.fc_out(out)

# ─────── 4. Training ───────
caption = "A photo of a cat"
vocab = Vocabulary()
vocab.build([caption])
encoded = [vocab.stoi["<start>"]] + vocab.encode(caption) + [vocab.stoi["<end>"]]
feature = extract_feature(img_path)

class DummyDataset(Dataset):
    def __init__(self):
        self.feat = feature
        self.cap = torch.tensor(encoded)
    def __len__(self): return 10
    def __getitem__(self, idx): return self.feat, self.cap

def collate_fn(batch):
    feats, caps = zip(*batch)
    return torch.stack(feats), pad_sequence(caps, batch_first=True, padding_value=0)

dataloader = DataLoader(DummyDataset(), batch_size=2, shuffle=True, collate_fn=collate_fn)

model = Captioner(vocab_size=len(vocab.itos))
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=0)

model.train()
for epoch in range(2):
    total_loss = 0
    for feats, caps in dataloader:
        optimizer.zero_grad()
        outputs = model(feats, caps[:, :-1])
        loss = criterion(outputs.reshape(-1, outputs.size(2)), caps[:, 1:].reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")

# ─────── 5. Save ───────
torch.save(model.state_dict(), "caption_model.pth")
with open("vocab.pkl", "wb") as f:
    pickle.dump({"itos": vocab.itos, "stoi": vocab.stoi}, f)


# ─────────────────────────────────────────────────────────────
# PART 2: STREAMLIT APP
# ─────────────────────────────────────────────────────────────
# Save this part separately as app.py and run using: streamlit run app.py

# import streamlit as st
# from PIL import Image
# import torch
# import torch.nn as nn
# from torchvision import models, transforms
# import pickle, os
# 
# class Vocabulary:
#     def __init__(self):
#         self.itos = []
#         self.stoi = {}
#     def load(self, path):
#         with open(path, "rb") as f:
#             data = pickle.load(f)
#             self.itos = data["itos"]
#             self.stoi = data["stoi"]
# 
# class Captioner(nn.Module):
#     def __init__(self, feat_dim=2048, emb_dim=256, hid_dim=512, vocab_size=10000):
#         super().__init__()
#         self.img_fc = nn.Linear(feat_dim, hid_dim)
#         self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
#         self.gru = nn.GRU(emb_dim, hid_dim, batch_first=True)
#         self.fc_out = nn.Linear(hid_dim, vocab_size)
#     def forward(self, feats, caps):
#         h0 = torch.tanh(self.img_fc(feats)).unsqueeze(0)
#         x = self.emb(caps)
#         out, _ = self.gru(x, h0)
#         return self.fc_out(out)
# 
# def extract_features(img_tensor):
#     resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
#     resnet = nn.Sequential(*list(resnet.children())[:-1])
#     resnet.eval()
#     with torch.no_grad():
#         return resnet(img_tensor).squeeze()
# 
# def generate_caption(model, img_feat, vocab, max_len=20):
#     model.eval()
#     generated = [vocab.stoi["<start>"]]
#     with torch.no_grad():
#         h = torch.tanh(model.img_fc(img_feat.unsqueeze(0))).unsqueeze(0)
#         for _ in range(max_len):
#             input_ids = torch.tensor([generated], dtype=torch.long)
#             output, h = model.gru(model.emb(input_ids), h)
#             logits = model.fc_out(output[:, -1, :])
#             next_word = logits.argmax(dim=-1).item()
#             generated.append(next_word)
#             if next_word == vocab.stoi["<end>"]:
#                 break
#     return " ".join([vocab.itos[i] for i in generated[1:-1]])
# 
# st.title("🖼️ Image Caption Generator")
# if not os.path.exists("vocab.pkl") or not os.path.exists("caption_model.pth"):
#     st.error("❌ Model or vocab file missing. Please train first.")
#     st.stop()
# 
# vocab = Vocabulary()
# vocab.load("vocab.pkl")
# model = Captioner(vocab_size=len(vocab.itos))
# model.load_state_dict(torch.load("caption_model.pth", map_location="cpu"))
# model.eval()
# 
# transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
# ])
# 
# uploaded_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"])
# if uploaded_file is not None:
#     image = Image.open(uploaded_file).convert("RGB")
#     st.image(image, caption="Uploaded Image", use_column_width=True)
#     img_tensor = transform(image).unsqueeze(0)
#     img_feat = extract_features(img_tensor)
#     caption = generate_caption(model, img_feat, vocab)
#     st.markdown(f"**Generated Caption:** {caption}")


In [None]:
import torch
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size):
        super(Captioner, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, img_feat, captions):
        embeddings = self.embed(captions)  # (B, T, E)
        img_feat = img_feat.unsqueeze(1)   # (B, 1, E)
        embeddings = torch.cat((img_feat, embeddings), 1)  # (B, T+1, E)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.fc(hiddens)
        return outputs


In [None]:
import torch
import torchvision.transforms as transforms
from torchvision.models import resnet18
import pickle

def extract_features(img_tensor):
    model = resnet18(pretrained=True)
    model.fc = torch.nn.Identity()
    model.eval()
    with torch.no_grad():
        features = model(img_tensor)
    return features.squeeze(0)

def load_vocab(path='vocab.pkl'):
    with open(path, 'rb') as f:
        vocab = pickle.load(f)
    return vocab

def generate_caption(model, img_feat, vocab, max_len=20):
    model.eval()
    caption = [vocab['<START>']]
    with torch.no_grad():
        for _ in range(max_len):
            cap_tensor = torch.tensor(caption).unsqueeze(0)
            output = model(img_feat.unsqueeze(0), cap_tensor)
            next_word_logits = output[0, -1]
            predicted = next_word_logits.argmax().item()
            caption.append(predicted)
            if predicted == vocab['<END>']:
                break
    inv_vocab = {idx: tok for tok, idx in vocab.items()}
    return ' '.join([inv_vocab.get(idx, '<UNK>') for idx in caption[1:-1]])


In [None]:
import torch
import torchvision.transforms as transforms
from torchvision.models import resnet18
import pickle

def extract_features(img_tensor):
    model = resnet18(pretrained=True)
    model.fc = torch.nn.Identity()
    model.eval()
    with torch.no_grad():
        features = model(img_tensor)
    return features.squeeze(0)

def load_vocab(path='vocab.pkl'):
    with open(path, 'rb') as f:
        vocab = pickle.load(f)
    return vocab

def generate_caption(model, img_feat, vocab, max_len=20):
    model.eval()
    caption = [vocab['<START>']]
    with torch.no_grad():
        for _ in range(max_len):
            cap_tensor = torch.tensor(caption).unsqueeze(0)
            output = model(img_feat.unsqueeze(0), cap_tensor)
            next_word_logits = output[0, -1]
            predicted = next_word_logits.argmax().item()
            caption.append(predicted)
            if predicted == vocab['<END>']:
                break
    inv_vocab = {idx: tok for tok, idx in vocab.items()}
    return ' '.join([inv_vocab.get(idx, '<UNK>') for idx in caption[1:-1]])


In [None]:
import torch
import torch.nn as nn

class Captioner(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size):
        super(Captioner, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, img_feat, captions):
        embeddings = self.embed(captions)  # (B, T, E)
        img_feat = img_feat.unsqueeze(1)   # (B, 1, E)
        embeddings = torch.cat((img_feat, embeddings), 1)  # (B, T+1, E)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.fc(hiddens)
        return outputs


In [None]:
with open("captions.txt", "w") as f:
    f.write("image1.jpg A man riding a bike.\n")
    f.write("image2.jpg A cat sleeping on the bed.\n")
    f.write("image3.jpg A group of people playing football.\n")


In [None]:
with open("captions.txt", "r") as f:
    lines = f.readlines()

for line in lines:
    image_name, caption = line.strip().split(' ', 1)
    print(f"Image: {image_name}, Caption: {caption}")


In [None]:
from collections import Counter

# Read captions
with open("captions.txt", "r") as f:
    lines = f.readlines()

captions = [line.strip().split(' ', 1)[1] for line in lines]

# Tokenize and count words
all_words = []
for caption in captions:
    all_words.extend(caption.lower().split())

# Count word frequency
word_counts = Counter(all_words)

# Optionally, create word2idx and idx2word
vocab = {word: idx for idx, (word, _) in enumerate(word_counts.items(), start=1)}

print("Vocabulary:", vocab)


In [None]:
from PIL import Image
import os

for line in lines:
    image_name, caption = line.strip().split(' ', 1)
    image_path = os.path.join("images", image_name)  # replace "images" with your folder path

    if os.path.exists(image_path):
        img = Image.open(image_path)
        img.show()  # just to verify it loads
    else:
        print(f"Image {image_name} not found.")


In [None]:
image_path = os.path.join("images", image_name)


In [None]:
from PIL import Image
import os

# Read captions from file
with open("captions.txt", "r") as f:
    lines = f.readlines()

for line in lines:
    if "|" not in line:
        continue  # skip malformed lines
    image_name, caption = line.strip().split('|', 1)
    image_path = os.path.join("images", image_name)

    if os.path.exists(image_path):
        print(f"\n✅ Showing: {image_name} - {caption}")
        img = Image.open(image_path)
        img.show()
    else:
        print(f"\n❌ Image {image_name} not found.")


In [None]:
from PIL import Image
from IPython.display import display
import os

# Define path to your image
img_path = "images/demo.jpg"  # ✅ Make sure this image file exists at this path

# Check if image exists
if not os.path.exists(img_path):
    raise FileNotFoundError(f"❌ Image not found at {img_path}. Please check the path.")

# Open and display the image
img = Image.open(img_path).convert("RGB")
display(img)


In [None]:
!pip install torch torchvision  


In [None]:
import torch
from torchvision import transforms
from torchvision.models.segmentation import deeplabv3_resnet101


In [None]:
def segment_image(img):
    # Load pretrained segmentation model
    model = deeplabv3_resnet101(pretrained=True)
    model.eval()

    # Preprocessing
    preprocess = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
    ])
    input_tensor = preprocess(img).unsqueeze(0)  # add batch dimension

    with torch.no_grad():
        output = model(input_tensor)['out'][0]
        output_predictions = output.argmax(0)

    # Convert to PIL Image for display
    segmented_img = Image.fromarray(output_predictions.byte().cpu().numpy())
    return segmented_img


In [None]:
# Imports
from PIL import Image
from IPython.display import display
import os
import torch
from torchvision import transforms
from torchvision.models.segmentation import deeplabv3_resnet101

# Step 1: Load image
img_path = "images/demo.jpg"

# Check image exists
if not os.path.exists(img_path):
    raise FileNotFoundError(f"❌ Image not found at {img_path}. Please check the path.")

# Open image
img = Image.open(img_path).convert("RGB")
print("🖼️ Original Image:")
display(img)

# Step 2: Define segmentation function
def segment_image(img):
    # Load model
    model = deeplabv3_resnet101(pretrained=True)
    model.eval()

    # Preprocess
    preprocess = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
    ])
    input_tensor = preprocess(img).unsqueeze(0)  # Add batch dimension

    # Predict segmentation
    with torch.no_grad():
        output = model(input_tensor)['out'][0]
        output_predictions = output.argmax(0)

    # Convert to PIL Image
    segmented_img = Image.fromarray(output_predictions.byte().cpu().numpy())
    return segmented_img

# Step 3: Segment and display
segmented_result = segment_image(img)
print("🧩 Segmented Image Output:")
display(segmented_result)


In [None]:
# 📦 Imports
import os
import torch
import torchvision.transforms as transforms
from torchvision.models.segmentation import deeplabv3_resnet101
from PIL import Image
from IPython.display import display

# 📂 Step 1: Load image
img_path = "images/demo.jpg"

# Check if file exists
if not os.path.exists(img_path):
    raise FileNotFoundError(f"❌ Image not found at {img_path}")

# Open the image
img = Image.open(img_path).convert("RGB")
print("🖼️ Original Image:")
display(img)

# 🧩 Step 2: Define segmentation function
def segment_image(img):
    model = deeplabv3_resnet101(pretrained=True)
    model.eval()

    preprocess = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
    ])
    input_tensor = preprocess(img).unsqueeze(0)

    with torch.no_grad():
        output = model(input_tensor)['out'][0]
        output_predictions = output.argmax(0)

    segmented_img = Image.fromarray(output_predictions.byte().cpu().numpy())
    return segmented_img

# 🧩 Step 3: Segment and display
segmented_result = segment_image(img)
print("🧩 Segmented Image Output:")
display(segmented_result)

# 📝 Step 4: Define dummy caption generation function
def generate_caption(image_pil):
    # Dummy logic – replace with real captioning model later
    return "A cat sitting on a blanket."  # This is just a placeholder caption

# 📝 Step 5: Generate and display caption
caption = generate_caption(img)
print(f"📝 Caption: {caption}")


In [None]:
!pip install transformers --quiet


In [None]:
from transformers import BlipProcessor, BlipForConditionalGeneration
import torch

# Load processor and model only once
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

# Define caption generation function
def generate_caption(image_pil):
    inputs = processor(image_pil, return_tensors="pt")
    with torch.no_grad():
        out = model.generate(**inputs)
    caption = processor.decode(out[0], skip_special_tokens=True)
    return caption


In [None]:
# Generate and show caption
caption = generate_caption(img)
print("📝 Generated Caption:", caption)


In [None]:
import matplotlib.pyplot as plt

# Resize segmented image to match original size (if needed)
segmented_result_resized = segmented_result.resize(img.size)

# Plot both images side by side with caption
plt.figure(figsize=(12, 6))

# Original Image
plt.subplot(1, 2, 1)
plt.imshow(img)
plt.title("Original Image")
plt.axis("off")

# Segmented Image
plt.subplot(1, 2, 2)
plt.imshow(segmented_result_resized)
plt.title("Segmented Image")
plt.axis("off")

# Show everything
plt.suptitle(f"📝 Caption: {caption}", fontsize=16)
plt.tight_layout()
plt.show()


In [None]:
segmented_result_resized.save("output/segmented_image.png")


In [None]:
with open("output/caption.txt", "w") as f:
    f.write(caption)


In [None]:
os.makedirs("output", exist_ok=True)


In [None]:
image_folder = "images/"
for filename in os.listdir(image_folder):
    if filename.endswith(".jpg") or filename.endswith(".png"):
        path = os.path.join(image_folder, filename)
        img = Image.open(path).convert("RGB")
        caption = generate_caption(img)
        segmented = segment_image(img)
        segmented_resized = segmented.resize(img.size)

        # Save results
        segmented_resized.save(f"output/{filename}_segmented.png")
        with open(f"output/{filename}_caption.txt", "w") as f:
            f.write(caption)


In [None]:
pip install streamlit


In [None]:
import streamlit as st
from PIL import Image

st.title("Image Segmentation + Captioning")

uploaded_file = st.file_uploader("Upload an image", type=["jpg", "png"])

if uploaded_file:
    img = Image.open(uploaded_file).convert("RGB")
    st.image(img, caption="Original Image")

    caption = generate_caption(img)
    segmented = segment_image(img).resize(img.size)

    st.image(segmented, caption="Segmented Image")
    st.write(f"📝 Caption: {caption}")


In [None]:
streamlit run app.py


In [None]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

reference = [["a", "cat", "is", "on", "the", "bed"]]  # Must be list of list of tokens
candidate = caption.lower().split()

# Use smoothing to avoid 0 for low-overlap
smoothie = SmoothingFunction().method4

score = sentence_bleu(reference, candidate, weights=(0.5, 0.5), smoothing_function=smoothie)
print("✅ BLEU Score (with smoothing):", score)


In [None]:
import matplotlib.pyplot as plt

# Resize segmented image to match original
segmented_result_resized = segmented_result.resize(img.size)

# Show everything together
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.imshow(img)
plt.title("Original Image")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(segmented_result_resized)
plt.title("Segmented Image")
plt.axis("off")

plt.suptitle(f"📝 Generated Caption: {caption}\n📊 BLEU Score: {round(score, 4)}", fontsize=14)
plt.tight_layout()
plt.show()


In [None]:
# Change the path
img_path = "images/another_sample.jpg"


In [None]:
# ✅ Save caption with BLEU score to a text file (with UTF-8 encoding)
with open("output/generated_caption.txt", "w", encoding="utf-8") as f:
    f.write(f"📝 Caption: {caption}\n")
    f.write(f"📊 BLEU Score: {score}")


In [None]:
from torchvision.models.segmentation import deeplabv3_resnet101, DeepLabV3_ResNet101_Weights

# Load with recommended weights
weights = DeepLabV3_ResNet101_Weights.DEFAULT
model = deeplabv3_resnet101(weights=weights)
model.eval()


In [None]:
from transformers import BlipProcessor, BlipForConditionalGeneration
import torch


In [None]:
# Load captioning model (do NOT confuse with segmentation)
caption_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").eval()


In [None]:
def generate_caption(image_pil):
    inputs = caption_processor(image_pil, return_tensors="pt")
    with torch.no_grad():
        out = caption_model.generate(**inputs)
    caption = caption_processor.decode(out[0], skip_special_tokens=True)
    return caption


In [None]:
import os

image_folder = "images/"
output_folder = "output/"
image_files = [f for f in os.listdir(image_folder) if f.endswith(".jpg") or f.endswith(".png")]

for image_file in image_files:
    print(f"\n🔄 Processing: {image_file}")
    img_path = os.path.join(image_folder, image_file)
    img = Image.open(img_path).convert("RGB")

    # Segment
    segmented_result = segment_image(img)
    segmented_result_resized = segmented_result.resize(img.size)

    # Caption
    caption = generate_caption(img)

    # Save segmented image
    seg_filename = os.path.join(output_folder, f"{os.path.splitext(image_file)[0]}_segmented.jpg")
    segmented_result_resized.save(seg_filename)

    # Save caption and BLEU score
    reference = [["a", "cat", "is", "on", "the", "bed"]]  # Update or automate this for batch
    candidate = caption.lower().split()
    smoothie = SmoothingFunction().method4
    score = sentence_bleu(reference, candidate, weights=(0.5, 0.5), smoothing_function=smoothie)

    text_filename = os.path.join(output_folder, f"{os.path.splitext(image_file)[0]}_caption.txt")
    with open(text_filename, "w", encoding="utf-8") as f:
        f.write(f"📝 Caption: {caption}\n")
        f.write(f"📊 BLEU Score: {score}\n")


In [None]:
import os
image_folder = "images/"
output_folder = "output/"
image_files = [f for f in os.listdir(image_folder) if f.endswith(".jpg") or f.endswith(".png")]


In [None]:
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
import torch


In [None]:
caption_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").eval()


In [None]:
def generate_caption(image_pil):
    inputs = caption_processor(image_pil, return_tensors="pt")
    with torch.no_grad():
        out = caption_model.generate(**inputs)
    caption = caption_processor.decode(out[0], skip_special_tokens=True)
    return caption


In [None]:
def segment_image(img):
    # Dummy segmentation: convert to grayscale (for testing only)
    return img.convert("L")


In [None]:
for image_file in image_files:
    print(f"\n🔄 Processing: {image_file}")
    img_path = os.path.join(image_folder, image_file)
    img = Image.open(img_path).convert("RGB")
    
    # Segment
    segmented_result = segment_image(img)
    segmented_result_resized = segmented_result.resize(img.size)
    
    # Caption
    caption = generate_caption(img)

    # Save segmented image
    seg_filename = os.path.join(output_folder, f'{os.path.splitext(image_file)[0]}_segmented.jpg')
    segmented_result_resized.save(seg_filename)

    # Save caption
    caption_filename = os.path.join(output_folder, f'{os.path.splitext(image_file)[0]}_caption.txt')
    with open(caption_filename, "w") as f:
        f.write(caption)
    
    print(f"✅ Saved segmented image: {seg_filename}")
    print(f"📝 Caption: {caption}")


In [None]:
from IPython.display import display

for image_file in image_files:
    img_path = os.path.join(image_folder, image_file)
    img = Image.open(img_path).convert("RGB")
    
    print(f"\n🖼️ Original Image: {image_file}")
    display(img)

    caption = generate_caption(img)
    print(f"📝 Caption: {caption}")

    segmented = segment_image(img)
    display(segmented)


In [None]:
with open("captions.txt", "w") as f:
    f.write("image1.jpg|a cat is sitting on the bed\n")
    f.write("image2.jpg|a man is playing football\n")


In [None]:
from IPython.display import display
import matplotlib.pyplot as plt

for image_file in image_files:
    img_path = os.path.join(image_folder, image_file)
    seg_path = os.path.join(output_folder, f"{os.path.splitext(image_file)[0]}_segmented.jpg")

    img = Image.open(img_path)
    seg_img = Image.open(seg_path)

    fig, axes = plt.subplots(1, 2, figsize=(10, 5))
    axes[0].imshow(img)
    axes[0].set_title(f"Original: {image_file}")
    axes[0].axis('off')

    axes[1].imshow(seg_img)
    axes[1].set_title("Segmented Output")
    axes[1].axis('off')

    plt.show()

    # Show the caption
    print("📝 Caption:", generate_caption(img))
    print("-" * 80)


In [None]:
import torchvision.transforms as T
from torchvision.models.detection import maskrcnn_resnet50_fpn

model = maskrcnn_resnet50_fpn(pretrained=True)
model.eval()


In [None]:
import torch
from PIL import Image
import numpy as np

transform = T.Compose([T.ToTensor()])

def segment_and_save(image_path, output_path):
    img = Image.open(image_path).convert("RGB")
    img_tensor = transform(img).unsqueeze(0)  # add batch dim

    with torch.no_grad():
        prediction = model(img_tensor)[0]

    # Create a blank mask and combine masks with confidence > threshold
    masks = prediction['masks']
    threshold = 0.5
    combined_mask = torch.zeros_like(masks[0][0])

    for i in range(len(masks)):
        if prediction['scores'][i] > threshold:
            combined_mask = torch.maximum(combined_mask, masks[i][0])

    combined_mask = combined_mask.numpy()
    combined_mask = (combined_mask > 0.5).astype(np.uint8) * 255  # binary mask

    # Convert mask to an RGB image with colormap
    from matplotlib import cm
    colormap = cm.viridis(combined_mask / 255.0)
    seg_image = Image.fromarray((colormap[:, :, :3] * 255).astype(np.uint8))
    seg_image.save(output_path)


In [None]:
import os

image_folder = "images/"
output_folder = "output/"

# Get all image filenames in the folder
image_files = [f for f in os.listdir(image_folder) if f.endswith(".jpg") or f.endswith(".png")]


In [None]:
for image_file in image_files:
    ...


In [None]:
os.makedirs("final_output", exist_ok=True)


In [None]:
import os
from PIL import Image, ImageDraw

# ✅ Make sure the output folder exists
os.makedirs("final_output", exist_ok=True)

# Dummy caption generator
def generate_caption(image):
    return "This is a placeholder caption"

# Assuming image_files, image_folder, and output_folder are already defined
for image_file in image_files:
    img_path = os.path.join(image_folder, image_file)
    seg_path = os.path.join(output_folder, f"{os.path.splitext(image_file)[0]}_segmented.jpg")

    img = Image.open(img_path)
    seg_img = Image.open(seg_path)

    caption = generate_caption(img)

    combined = Image.new("RGB", (img.width + seg_img.width, max(img.height, seg_img.height)))
    combined.paste(img, (0, 0))
    combined.paste(seg_img, (img.width, 0))

    draw = ImageDraw.Draw(combined)
    draw.text((10, combined.height - 30), f"Caption: {caption}", fill="white")

    output_path = os.path.join("final_output", f"{os.path.splitext(image_file)[0]}_final.jpg")
    combined.save(output_path)


In [None]:
import os
print(os.listdir('.'))


In [3]:
from model import generate_caption


In [4]:
import os
from PIL import Image

image_folder = "images"
output_folder = "output"
image_files = [f for f in os.listdir(image_folder) if f.endswith(".jpg") or f.endswith(".png")]


In [5]:
os.makedirs("final_output", exist_ok=True)


In [6]:
from PIL import ImageDraw

for image_file in image_files:
    img_path = os.path.join(image_folder, image_file)
    seg_path = os.path.join(output_folder, f"{os.path.splitext(image_file)[0]}_segmented.jpg")

    img = Image.open(img_path)
    seg_img = Image.open(seg_path)

    caption = generate_caption(img)  # ✅ Now this will work

    combined = Image.new("RGB", (img.width + seg_img.width, max(img.height, seg_img.height)))
    combined.paste(img, (0, 0))
    combined.paste(seg_img, (img.width, 0))

    draw = ImageDraw.Draw(combined)
    draw.text((10, combined.height - 30), f"Caption: {caption}", fill="white")

    output_path = os.path.join("final_output", f"{os.path.splitext(image_file)[0]}_final.jpg")
    combined.save(output_path)


In [None]:
import torch
from caption_model import Captioner  # your model class
from utils import Vocabulary          # your vocab class
import pickle
import os
from flask import Flask, render_template, request, send_from_directory
from PIL import Image
import numpy as np
import cv2
from torchvision import models, transforms
import torchvision.transforms.functional as F
import torchvision

# ========== Model Setup ========== #

# Device setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load vocabulary
with open('vocab.pkl', 'rb') as f:
    vocab = pickle.load(f)

# Load Captioning model (⚠️ Fix: no args passed here)
model = Captioner()
model.load_state_dict(torch.load('model.pth', map_location=device))
model = model.to(device).eval()

# ResNet feature extractor (remove classification layer)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
modules = list(resnet.children())[:-1]
resnet = torch.nn.Sequential(*modules).to(device).eval()

# Load DeepLabV3 for segmentation
segmentation_model = torchvision.models.segmentation.deeplabv3_resnet101(weights="DEFAULT")
segmentation_model = segmentation_model.to(device).eval()

# ========== Flask App Setup ========== #

app = Flask(__name__)
UPLOAD_FOLDER = "static/uploads"
OUTPUT_FOLDER = "static/outputs"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

# ========== Function: Generate Caption ========== #

def generate_caption(image):
    image = image.convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        features = resnet(image_tensor).squeeze(0)
        caption = model.generate_caption(features.unsqueeze(0), vocab)

    return caption

# ========== Function: Segment Image ========== #

def segment_image(image, save_path):
    image = image.convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        output = segmentation_model(image_tensor)['out']
        mask = torch.argmax(output.squeeze(), dim=0).cpu().numpy()

    # Create color overlay mask (e.g., green for class 15 = person)
    color_mask = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)
    color_mask[mask == 15] = [0, 255, 0]

    # Convert original image to OpenCV format
    image_cv = np.array(image)
    image_cv = cv2.cvtColor(image_cv, cv2.COLOR_RGB2BGR)

    # Blend image and mask
    blended = cv2.addWeighted(image_cv, 0.7, color_mask, 0.3, 0)
    cv2.imwrite(save_path, blended)

# ========== Routes ========== #

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        uploaded_file = request.files['image']
        if uploaded_file.filename != '':
            # Save uploaded image
            original_filename = uploaded_file.filename
            original_path = os.path.join(UPLOAD_FOLDER, original_filename)
            uploaded_file.save(original_path)

            # Open and process image
            img = Image.open(original_path)

            # Generate caption
            caption = generate_caption(img)

            # Segment and save output
            segmented_filename = original_filename.rsplit('.', 1)[0] + '_segmented.jpg'
            segmented_path = os.path.join(OUTPUT_FOLDER, segmented_filename)
            segment_image(img, segmented_path)

            # Render output page
            return render_template('index.html',
                                   caption=caption,
                                   original_path=f"uploads/{original_filename}",
                                   segmented_path=f"outputs/{segmented_filename}")
    return render_template('index.html')

@app.route('/static/uploads/<filename>')
def uploaded_file(filename):
    return send_from_directory(UPLOAD_FOLDER, filename)

@app.route('/static/outputs/<filename>')
def segmented_file(filename):
    return send_from_directory(OUTPUT_FOLDER, filename)

# ========== Start Server ========== #

if __name__ == '__main__':
    app.run(debug=True)


In [None]:
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Smart Image Captioning & Segmentation</title>

    <!-- Bootstrap & Icons -->
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
    <link href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.10.5/font/bootstrap-icons.css" rel="stylesheet">

    <style>
        body {
            background-color: #f9f9f9;
        }
        .card {
            border-radius: 15px;
        }
        .caption-box {
            background-color: #f1f1f1;
            padding: 10px;
            border-radius: 8px;
            font-size: 1.1rem;
            color: #333;
        }
        .form-control {
            padding: 14px;
            font-size: 1.1rem;
        }
        .btn-success {
            font-size: 1.2rem;
            padding: 12px 28px;
        }
    </style>
</head>
<body>

<div class="container mt-5">
    <div class="card shadow-lg p-4">
        <h2 class="text-center mb-4"><i class="bi bi-stars"></i> Smart Image Analyzer</h2>

        <!-- Upload Form -->
        <form method="POST" action="/" enctype="multipart/form-data" id="upload-form">
            <div class="mb-3 text-center">
                <input class="form-control" type="file" name="image" id="imageInput" required>
            </div>
            <div class="text-center">
                <button class="btn btn-success px-4" type="submit">
                    <i class="bi bi-upload"></i> Upload & Analyze
                </button>
            </div>
        </form>

        <!-- Loading Spinner -->
        <div class="text-center mt-4" id="loading" style="display: none;">
            <div class="spinner-border text-success" role="status"></div>
            <p class="mt-2">Processing image...</p>
        </div>

        {% if caption %}
        <!-- Results -->
        <div class="mt-5 row">
            <div class="col-md-6 text-center">
                <h5>🖼️ Original Image</h5>
                <img src="{{ original_path }}" class="img-fluid rounded shadow mb-2">
                <div class="caption-box"><strong>📝 Caption:</strong> {{ caption }}</div>
            </div>
            <div class="col-md-6 text-center">
                <h5>🎯 Segmented Output</h5>
                <img src="{{ segmented_path }}" class="img-fluid rounded shadow">
            </div>
        </div>
        {% endif %}
    </div>
</div>

<script>
    const form = document.getElementById('upload-form');
    form.addEventListener('submit', () => {
        document.getElementById('loading').style.display = 'block';
    });
</script>

</body>
</html>


In [None]:
body {
    font-family: 'Segoe UI', sans-serif;
}

.card-header {
    font-size: 1rem;
}

.btn {
    transition: all 0.3s ease;
}

.btn:hover {
    transform: scale(1.05);
    box-shadow: 0 0 15px rgba(0, 0, 0, 0.15);
}

input[type="file"] {
    cursor: pointer;
}


In [None]:
import os

# Create the static directory if it doesn't exist
os.makedirs("static", exist_ok=True)

# Create the styles.css file inside static/
with open("static/styles.css", "w") as f:
    f.write("""
body {
    background: linear-gradient(to right, #ece9e6, #ffffff);
    font-family: 'Segoe UI', sans-serif;
}

.card {
    border-radius: 1rem;
}

.caption-box {
    background-color: #f0f0f0;
    border-radius: 8px;
    padding: 10px;
    font-size: 1.1em;
}
    """)
print("✅ static/styles.css created successfully.")


In [None]:
# build_vocab.py

from utils import Vocabulary
import pickle

captions_file = "captions.txt"
vocab = Vocabulary()

# Add special tokens
vocab.add_word('<pad>')
vocab.add_word('<start>')
vocab.add_word('<end>')
vocab.add_word('<unk>')

with open(captions_file, 'r') as f:
    lines = f.readlines()

for line in lines:
    if "|" not in line:
        continue
    _, caption = line.strip().split("|")
    tokens = caption.lower().strip().split()
    for token in tokens:
        vocab.add_word(token)

# Save vocab
with open("vocab.pkl", "wb") as f:
    pickle.dump(vocab, f)

print("✅ vocab.pkl created with", len(vocab), "words.")


In [None]:
# dataset.py

import torch
from torch.utils.data import Dataset
from PIL import Image
import os
import torchvision.transforms as transforms
import pickle

class CaptionDataset(Dataset):
    def __init__(self, caption_file, image_folder, vocab_path, transform=None):
        with open(vocab_path, "rb") as f:
            self.vocab = pickle.load(f)

        with open(caption_file, "r") as f:
            self.data = [line.strip().split("|") for line in f if "|" in line]

        self.image_folder = image_folder
        self.transform = transform or transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor()
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name, caption = self.data[idx]
        img_path = os.path.join(self.image_folder, img_name)
        image = Image.open(img_path).convert("RGB")
        image = self.transform(image)

        tokens = caption.lower().strip().split()
        caption_idx = [self.vocab.word2idx["<start>"]]
        caption_idx += [self.vocab.word2idx.get(word, self.vocab.word2idx["<unk>"]) for word in tokens]
        caption_idx.append(self.vocab.word2idx["<end>"])

        return image, torch.tensor(caption_idx)


In [None]:
# Training Loop
for epoch in range(EPOCHS):
    for batch in dataloader:
        images, captions = zip(*batch)

        # Pad captions
        lengths = [len(cap) for cap in captions]
        max_len = max(lengths)
        padded = torch.zeros(len(captions), max_len).long()
        for i, cap in enumerate(captions):
            padded[i, :len(cap)] = cap

        images = torch.stack(images).to(device)
        captions = padded.to(device)

        with torch.no_grad():
            features = resnet(images).squeeze()

        outputs = captioner(features, captions)  # (B, T, V)

        # Align dimensions for loss
        output_dim = outputs.size(-1)
        outputs = outputs[:, 1:, :].contiguous().view(-1, output_dim)  # shift outputs
        targets = captions[:, 1:].contiguous().view(-1)                # shift targets

        loss = criterion(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {loss.item():.4f}")
    torch.save(captioner.state_dict(), "model.pth")
print("✅ model.pth saved successfully!")



In [None]:
from PIL import Image
import os

# Folder where images are stored
image_folder = "images"
os.makedirs(image_folder, exist_ok=True)

# List of missing image names (based on captions.txt)
missing_images = ["image2.jpg", "image3.jpg", "image4.jpg"]  # update this list

# Create dummy 224x224 white images
for img_name in missing_images:
    img_path = os.path.join(image_folder, img_name)
    if not os.path.exists(img_path):
        img = Image.new('RGB', (224, 224), color=(255, 255, 255))
        img.save(img_path)
        print(f"✅ Created dummy: {img_name}")
    else:
        print(f"✅ Already exists: {img_name}")


In [None]:
from PIL import Image
import os

os.makedirs("images", exist_ok=True)

img_path = os.path.join("images", "image1.jpg")
dummy_img = Image.new("RGB", (224, 224), color=(255, 255, 255))
dummy_img.save(img_path)
print("✅ Created dummy: image1.jpg")


In [None]:
import pickle
with open("vocab.pkl", "rb") as f:
    vocab = pickle.load(f)

print("Vocab size:", len(vocab))
print("Max index:", max(vocab.word2idx.values()))


In [6]:
import os
print(os.path.exists("model.pth"))  # Should print: True


True


In [None]:

!python app.py
