<a href="https://colab.research.google.com/github/thwlruss10/VDA_pipeline/blob/main/GIT_ViLT_20APR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers datasets nltk scikit-learn



In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from datasets import load_dataset
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import re
import pandas as pd
import os
from transformers import (
    # Preprocessing / Common
    AutoTokenizer, AutoFeatureExtractor,
    # Text & Image Models & transformers (ViTModel, DeiTModel, BEiT)
    AutoModel,
    # Training / Evaluation
    TrainingArguments, Trainer,
    # Misc
    logging
)

# import nltk
# nltk.download('wordnet')

import nltk
nltk.download('wordnet')
from nltk.corpus import wordnet
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# SET CACHE FOR HUGGINGFACE TRANSFORMERS + DATASETS
os.environ['HF_HOME'] = os.path.join(".", "cache")
# SET ONLY 1 GPU DEVICE
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

#set_caching_enabled(True)>> this line deleted as 'datasets' no longer supports explicit cahce enabling
#set_caching_enabled(True)
logging.set_verbosity_error()

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


Above code imports dependencies, sets cache path for Huggingface transformers, error logs message threshold set to supress warnings deemed not severe. GPU device set for torch processes.

In [None]:
import os
import re
import pandas as pd
from sklearn.model_selection import train_test_split

# Define base directory pointing to your Drive location
base_dir = "/content/drive/MyDrive/FinalProject/dataset"
os.makedirs(base_dir, exist_ok=True)

# Define regex to extract image ID
image_pattern = re.compile(r"( (in |on |of )?(the |this )?(image\d*) \?)")

# Read the raw Q&A file
# Extracts image ID and question at index[i] together with answer at [i+1]
# and organizes it into a pandas DataFrame, creates a list of unique answers,
# and then splits the dataset into training and testing sets, saving them as
# CSV.

qa_file = os.path.join(base_dir, "all_qa_pairs.txt")
with open(qa_file, "r", encoding="utf-8") as f: # open in read mode with in utf-8 encoding
    qa_data = [x.strip() for x in f.readlines()] # x.strip() removes white spaces

records = []
for i in range(0, len(qa_data), 2): # iterates in steps of two because quesiton is at index i and answer at i+1
    match = image_pattern.findall(qa_data[i]) # extract image ID from question string
    if match:
        img_id = match[0][3]
        question = qa_data[i].replace(match[0][0], "").strip() # remove image ID and replace question
        answer = qa_data[i + 1].strip() # answer retrieved from next element
        records.append({"question": question, "answer": answer, "image_id": img_id}) #populate dictionary



df = pd.DataFrame(records)


answer_space = []
for ans in df["answer"].to_list():
    if "," in ans:
        answer_space += ans.replace(" ", "").split(",")
    else:
        answer_space.append(ans)

# sort and remove duplicates from answer_space
answer_space = sorted(set(answer_space))

# Write the answer space to file
with open(os.path.join(base_dir, "answer_space.txt"), "w", encoding="utf-8") as f:
    f.writelines("\n".join(answer_space))

# Split dataset into training and evaluation
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# Save to CSV
train_df.to_csv(os.path.join(base_dir, "data_train.csv"), index=False)
test_df.to_csv(os.path.join(base_dir, "data_eval.csv"), index=False)


In [None]:
import pandas as pd

base_dir = "/content/drive/MyDrive/FinalProject/dataset"
eval_df = pd.read_csv(os.path.join(base_dir, "data_eval.csv"))


*** VLiT experimentation

In [None]:
from transformers.models.vilt.modeling_vilt import ViltForQuestionAnswering
from transformers.modeling_outputs import SequenceClassifierOutput
import torch.nn as nn
import torch

class ViltForSingleLabelQA(ViltForQuestionAnswering):
    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        token_type_ids=None,
        pixel_values=None,
        pixel_mask=None,
        labels=None,
        return_dict=True,
        **kwargs
    ):
        # 🧼 Remove any Trainer-only args
        kwargs.pop("num_items_in_batch", None)

        # ✅ Forward to base ViLT model without passing labels (so we control loss)
        outputs = super().forward(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            pixel_values=pixel_values,
            pixel_mask=pixel_mask,
            return_dict=return_dict,
            **kwargs
        )

        logits = outputs.logits

        # ✅ Our own classification loss
        loss = None
        if labels is not None:
            if labels.dtype != torch.long:
                labels = labels.long()
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits, labels)

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


In [None]:
def wup_measure(pred, gt, threshold=0.9):
    pred_synsets = wn.synsets(pred)
    gt_synsets = wn.synsets(gt)

    if not pred_synsets or not gt_synsets:
        return 0.0

    max_score = max((s1.wup_similarity(s2) or 0) for s1 in pred_synsets for s2 in gt_synsets)
    return 1.0 if max_score >= threshold else max_score


In [None]:
from transformers import ViltProcessor, ViltForQuestionAnswering
from PIL import Image
import torch

# Load model and processor
processor = ViltProcessor.from_pretrained("dandelin/vilt-b32-finetuned-vqa")
model = ViltForQuestionAnswering.from_pretrained("dandelin/vilt-b32-finetuned-vqa").to("cuda" if torch.cuda.is_available() else "cpu")

# Load image and define question
image_path = "/content/drive/MyDrive/FinalProject/dataset/images/image2.png"
image = Image.open(image_path).convert("RGB")
question = "What is on the left side of the sink?"

# Preprocess inputs
inputs = processor(image, question, return_tensors="pt").to(model.device)

# Inference
with torch.no_grad():
    outputs = model(**inputs)
    logits = outputs.logits
    predicted_answer = model.config.id2label[logits.argmax(-1).item()]

print("Predicted Answer:", predicted_answer)


Batch processing, Construct classification problem, Model training, output results compare to ViT

In [None]:
answer_to_id = {ans: i for i, ans in enumerate(answer_space)}


In [None]:
missing_answers_train = train_df[~train_df["answer"].isin(answer_to_id.keys())]
missing_answers_test = test_df[~test_df["answer"].isin(answer_to_id.keys())]

print(f"Missing in train: {len(missing_answers_train)}")
print(f"Missing in test: {len(missing_answers_test)}")


In [None]:
train_df = train_df[train_df["answer"].isin(answer_to_id.keys())].copy()
test_df = test_df[test_df["answer"].isin(answer_to_id.keys())].copy()


In [None]:
# Map answers to class indices
# answer_to_id = {ans: i for i, ans in enumerate(answer_space)}
train_df["label"] = train_df["answer"].map(answer_to_id)
test_df["label"] = test_df["answer"].map(answer_to_id)

# Ensure labels are integers and not NaN
train_df = train_df.dropna(subset=["label"]).copy()
test_df = test_df.dropna(subset=["label"]).copy()

train_df["label"] = train_df["label"].astype(int)
test_df["label"] = test_df["label"].astype(int)



In [None]:
train_df.head()

Crate a Hugging Face Dataset object

In [None]:

from datasets import Dataset, DatasetDict

train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

dataset = DatasetDict({
    "train": train_dataset,
    "test": test_dataset,
})


Preprocess function of ViLT

In [None]:

from transformers import ViltProcessor
from PIL import Image

processor = ViltProcessor.from_pretrained("dandelin/vilt-b32-mlm")
image_dir = os.path.join(base_dir, "images")

def preprocess(example):
    image_path = os.path.join(image_dir, f"{example['image_id']}.png")
    image = Image.open(image_path).convert("RGB")

    # Use return_tensors="np" to avoid PyTorch tensor overload
    inputs = processor(
        text=example["question"],
        images=image,
        return_tensors="np",
        padding="max_length",
        truncation=True,
    )

    # Flatten tensors to remove batch dim
    inputs = {k: v.squeeze(0) if hasattr(v, "shape") and v.shape[0] == 1 else v for k, v in inputs.items()}
    inputs["labels"] = int(example["labels"])
    return inputs


In [None]:
dataset = dataset.rename_column("label", "labels")


In [None]:
# Apply pre-processing
#dataset = dataset.map(preprocess)
dataset = dataset.map(preprocess, remove_columns=dataset["train"].column_names)


In [None]:
print(dataset["train"][0]["labels"], type(dataset["train"][0]["labels"]))

In [None]:
small_dataset = DatasetDict({
    "train": dataset["train"].select(range(100)),
    "test": dataset["test"].select(range(20)),
})


In [None]:
# Try 500 examples
medium_dataset = DatasetDict({
    "train": dataset["train"].select(range(500)),
    "test": dataset["test"].select(range(100)),
})


In [None]:
# Load the model

from transformers import ViltForQuestionAnswering
import os  # <-- missing import

# Define the base directory (your dataset folder in Google Drive)
base_dir = "/content/drive/MyDrive/FinalProject/dataset"

# Load the list of all possible answers
with open(os.path.join(base_dir, "answer_space.txt"), "r", encoding="utf-8") as f:
    answer_space = f.read().splitlines()


In [None]:
model = ViltForSingleLabelQA.from_pretrained(
    "dandelin/vilt-b32-mlm",
    num_labels=len(answer_space)
)
model.config.problem_type = "single_label_classification"


In [None]:
from sklearn.metrics import accuracy_score

from sklearn.metrics import accuracy_score, f1_score
import numpy as np

def compute_metrics(eval_preds):
    logits, labels = eval_preds
    preds = np.argmax(logits, axis=-1)

    # Decode IDs into answer strings
    pred_answers = [answer_space[p] for p in preds]
    true_answers = [answer_space[l] for l in labels]

    # WUPS across all pairs
    wups = [
        wup_measure(pred, true)
        for pred, true in zip(pred_answers, true_answers)
    ]

    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average="macro"),
        "wups": np.mean(wups)
    }
'''

def compute_metrics(eval_preds):
    logits, labels = eval_preds
    preds = logits.argmax(-1)
    return {
        "accuracy": accuracy_score(labels, preds)
    }
'''

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./vilt_daquar_output",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    learning_rate=5e-5,
    weight_decay=0.01,
    save_total_limit=1,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    report_to="none",
    #label_smoothing_factor=0.1,  # ✅ Try 0.1 to start
    remove_unused_columns=False  # ✅ ADD THIS LINE
)


In [None]:
from transformers import default_data_collator

def data_collator(features):
    batch = default_data_collator(features)
    batch["labels"] = batch["labels"].long()  # 💥 force long tensor
    return batch


In [None]:
from transformers import Trainer
'''
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=small_dataset["train"],
    eval_dataset=small_dataset["test"],
    tokenizer=processor,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)


'''
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    tokenizer=processor,
    data_collator=data_collator,  # <-- custom one
    compute_metrics=compute_metrics,
)


In [None]:
model.config.problem_type = "single_label_classification"

In [None]:
from nltk.corpus import wordnet as wn
trainer.train()


In [None]:
from datasets import DatasetDict

# Assuming `dataset` is your DatasetDict
# print(dataset["train"][0])  # Single example

# Or view the first few rows
for i in range(3):
    print(dataset["train"][i])




In [None]:
# stop here,
# need to scale gradually with slicing
# preprocess and scale to disk
# resize images... how to create demo photos?

In [None]:
trainer.save_model("/content/drive/MyDrive/FinalProject/vilt_checkpoint")
processor.save_pretrained("/content/drive/MyDrive/FinalProject/vilt_checkpoint")


In [None]:
dataset.save_to_disk("/content/drive/MyDrive/FinalProject/vilt_preprocessed")


In [None]:
metrics = trainer.evaluate()
with open("/content/drive/MyDrive/FinalProject/metrics.json", "w") as f:
    import json
    json.dump(metrics, f, indent=2)


In [None]:
def print_parameter_count(model):
    total = sum(p.numel() for p in model.parameters())
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)

    print(f"🧠 Total parameters:     {total:,}")
    print(f"🎯 Trainable parameters: {trainable:,}")
    print(f"🪶 Frozen parameters:    {total - trainable:,}")

print_parameter_count(model)


In [None]:
# stop here

In [None]:
from sklearn.metrics import accuracy_score

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc}


In [None]:
# validated invered results

from PIL import Image

def predict_vilt(image_path, question, model, processor, answer_space):
    image = Image.open(image_path).convert("RGB")
    inputs = processor(image, question, return_tensors="pt").to(model.device)
    with torch.no_grad():
        outputs = model(**inputs)
        pred_idx = outputs.logits.argmax(-1).item()
        return answer_space[pred_idx]


In [None]:
# example
image_path = "/content/drive/MyDrive/FinalProject/dataset/images/image3.png"
question = "What is under the cabinet near the sink?"

answer = predict_vilt(image_path, question, model, processor, answer_space)
print("Predicted Answer:", answer)


In [None]:
# evaluate performance

from sklearn.metrics import accuracy_score

acc = accuracy_score(test_df["answer"], test_df["vilt_pred"])
print(f"ViLT Accuracy on Test Set: {acc:.3f}")

# Save to CSV
# test_df.to_csv(os.path.join(base_dir, "vilt_predictions.csv"), index=False)



In [None]:
# run Batch inference on Evaluate set

from tqdm import tqdm

vilt_preds = []
for _, row in tqdm(test_df.iterrows(), total=len(test_df)):
    image_path = os.path.join(base_dir, "images", f"{row['image_id']}.png")
    question = row["question"]
    pred = predict_vilt(image_path, question, model, processor, answer_space)
    vilt_preds.append(pred)

test_df["vilt_pred"] = vilt_preds


In [None]:
from sklearn.metrics import accuracy_score

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc}


In [None]:
# Replace NaN with a string
df['answer'] = df['answer'].fillna('Missing')

# Recalculate the counts
answer_counts = df['answer'].value_counts()

# Plot histogram of top N most common answers (e.g., top 50)
top_n = 50
plt.figure(figsize=(12, 6))
answer_counts[:top_n].plot(kind='bar')
plt.title(f'Top {top_n} Most Common Answers (including Missing)')
plt.xlabel('Answer')
plt.ylabel('Frequency')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()


In [None]:
from transformers import ViltForQuestionAnswering

model = ViltForQuestionAnswering.from_pretrained(
    "dandelin/vilt-b32-mlm",
    num_labels=len(answer_space)
)


In [None]:
from transformers import ViltProcessor
processor = ViltProcessor.from_pretrained("dandelin/vilt-b32-mlm")


In [None]:
from transformers import DefaultDataCollator
data_collator = DefaultDataCollator()


In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./vilt_daquar_output",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    evaluation_strategy="epoch",  #try eval_Str
    save_strategy="epoch",
    logging_strategy="epoch",
    learning_rate=5e-5,
    weight_decay=0.01,
    save_total_limit=1,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    report_to="none"
)


In [None]:
df.isnull().sum(axis = 0)


In [None]:
# Sample DataFrame with multiple columns
data = {
    'question': ['q1', 'q2', 'q3', 'q4', 'q5'],
    'answer': ['yes', None, 'no', 'maybe', None],
    'confidence': [0.9, 0.8, None, 0.7, 0.6]
}
df = pd.DataFrame(data)

# Plot the number of missing values per column
df.isna().sum().plot(kind='bar')
plt.title('Number of Missing Values per Column')
plt.xlabel('Column')
plt.ylabel('Number of Missing Values')
plt.show()
