In [1]:
import pandas as pd
import numpy as np
import os
import glob

In [2]:
# Mount Drive
from google.colab import drive
drive.mount("/content/drive")

# Directory
dir = "/content/drive/MyDrive/Personal/Apziva/MonReader"

# Setting random state for consistency
seed = 123
np.random.seed(seed)

# Confirm GPU


gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Thu Dec  4 02:00:08 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA L4                      Off |   00000000:00:03.0 Off |                    0 |
| N/A   43C    P8             16W /   72W |       0MiB /  23034MiB |      0%      Default |
|                                         |                        |                  N/A |
+----------

In [3]:
TRAIN_NOT   = f"{dir}/images/training/notflip"
TRAIN_FLIP  = f"{dir}/images/training/flip"
TEST_NOT    = f"{dir}/images/testing/notflip"
TEST_FLIP   = f"{dir}/images/testing/flip"

def load_paths(pos_dir, neg_dir):
    # Use glob.glob to find files directly in the specified directories
    neg = glob.glob(os.path.join(neg_dir, "*"))
    pos = glob.glob(os.path.join(pos_dir, "*"))

    paths = neg + pos
    labels = [0]*len(neg) + [1]*len(pos)
    return paths, labels

train_files, train_labels = load_paths(TRAIN_FLIP, TRAIN_NOT)
test_files,  test_labels  = load_paths(TEST_FLIP, TEST_NOT)


In [None]:
from datasets import Dataset
from PIL import Image
from transformers import AutoImageProcessor, AutoModelForImageClassification, TrainingArguments, Trainer
import os
os.environ["WANDB_DISABLED"] = "true"


model_name = "google/vit-base-patch16-224"

processor = AutoImageProcessor.from_pretrained(model_name, use_fast = True)

id2label = {0: "notflip", 1: "flip"}
label2id = {"notflip": 0, "flip": 1}

# Hugging Face Datasets expects dicts
train_dict = {"image": train_files, "label": train_labels}
test_dict  = {"image": test_files,  "label": test_labels}

train_ds = Dataset.from_dict(train_dict)
test_ds  = Dataset.from_dict(test_dict)

def preprocess_images(examples):
    # Load images from paths
    images = [Image.open(p).convert("RGB") for p in examples["image"]]
    inputs = processor(images=images, return_tensors="pt")
    # Trainer expects 'pixel_values' for image inputs and 'labels' for target
    return {"pixel_values": inputs["pixel_values"], "labels": examples["label"]}

# Apply the preprocessing using .map() to create a new dataset
# This explicitly renames 'image' to 'pixel_values' and updates the schema
train_ds = train_ds.map(preprocess_images, batched=True, remove_columns=["image"])
test_ds  = test_ds.map(preprocess_images, batched=True, remove_columns=["image"])


Map:   0%|          | 0/2392 [00:00<?, ? examples/s]

In [None]:
model = AutoModelForImageClassification.from_pretrained(
    model_name,
    num_labels=2,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True # Add this argument to ignore size mismatches in the classification head
)
# For up-to-date versions
#   eval_strategy = "epoch"
training_args = TrainingArguments(
    output_dir=f"{dir}/vit-flip-checkpoints",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_steps=50,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=test_ds,
)

trainer.train()

model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

Some weights of ViTForImageClassification were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized because the shapes did not match:
- classifier.bias: found shape torch.Size([1000]) in the checkpoint and torch.Size([2]) in the model instantiated
- classifier.weight: found shape torch.Size([1000, 768]) in the checkpoint and torch.Size([2, 768]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


Epoch,Training Loss,Validation Loss
1,0.0233,0.045039
2,0.0002,0.043556
3,0.0033,0.038868


TrainOutput(global_step=450, training_loss=0.04577190714370873, metrics={'train_runtime': 852.0693, 'train_samples_per_second': 8.422, 'train_steps_per_second': 0.528, 'total_flos': 5.560825174743122e+17, 'train_loss': 0.04577190714370873, 'epoch': 3.0})

In [None]:
from sklearn.metrics import accuracy_score, f1_score

pred_output = trainer.predict(test_ds)
logits = pred_output.predictions
labels = pred_output.label_ids
preds = np.argmax(logits, axis=-1)

print("Accuracy:", accuracy_score(labels, preds))
print("F1:", f1_score(labels, preds, average="weighted"))


Accuracy: 0.9916247906197655
F1: 0.9916222022288634


2 datasets:
1. Test images both flipped and not flipped (n = 100)
2. Images ViT predied as not flipped (n=100)

In [None]:
# Get predictions from your existing trainer
pred_output = trainer.predict(test_ds)
logits = pred_output.predictions
label_ids = pred_output.label_ids  # true labels, if you need them

# For binary classification with 2 logits per sample:
preds = np.argmax(logits, axis=-1)

# If your model outputs a single logit (sigmoid), do:
# preds = (logits.squeeze(-1) > 0).astype(int)

# Indices of images predicted as "not flipped" (assume label 0)
non_flipped_idx = np.where(preds == 0)[0]

# Create a subset dataset with only predicted non-flipped images
non_flipped_ds = test_ds.select(non_flipped_idx)

print(f"Total images in test_ds: {len(test_ds)}")
print(f"Images predicted as not flipped: {len(non_flipped_ds)}")


In [None]:
from transformers import pipeline

ocr_pipe = pipeline(
    "image-to-text",
    model="microsoft/trocr-base-printed",
    device=0
)


In [None]:
import time
from math import ceil

def run_ocr_and_time(dataset, batch_size=8):
    """
    dataset: HF Dataset with an 'image' column (PIL images or arrays)
    Returns: (total_time_seconds, texts_list)
    """
    n = len(dataset)
    all_texts = []
    start = time.perf_counter()

    # Loop in batches
    for i in range(0, n, batch_size):
        batch = dataset[i:i+batch_size]
        images = batch["image"]  # list of images
        # OCR pipeline supports list input
        outputs = ocr_pipe(images)

        # outputs is typically a list of dicts like [{'generated_text': '...'}, ...]
        texts = [o["generated_text"] for o in outputs]
        all_texts.extend(texts)

    end = time.perf_counter()
    return end - start, all_texts

# 3a. Time on full test set
time_full, texts_full = run_ocr_and_time(test_ds, batch_size=8)
print(f"OCR time on FULL dataset ({len(test_ds)} images): {time_full:.2f} seconds")

# 3b. Time on predicted non-flipped subset
time_nonflip, texts_nonflip = run_ocr_and_time(non_flipped_ds, batch_size=8)
print(f"OCR time on NON-FLIPPED subset ({len(non_flipped_ds)} images): {time_nonflip:.2f} seconds")

# Optional: relative speedup
speedup = time_full / time_nonflip if time_nonflip > 0 else float("inf")
print(f"Speedup from filtering (full / non-flipped): {speedup:.2f}x")
