In [1]:
!pip install timm==0.9.2 torch torchvision pandas scikit-learn matplotlib xgboost easyocr --quiet seaborn




[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m


In [2]:
import os
import pandas as pd
import numpy as np
from PIL import Image

import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
import timm
import torch.nn as nn
import torch.optim as optim

from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import string
import xgboost as xgb

import easyocr


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
import os
import pandas as pd

# Set your local paths here (CHANGE these to your real paths)
BASE_PATH = "/Users/manshusainishab/Desktop/dataset/train"     # example path
CSV_PATH = "/Users/manshusainishab/Desktop/dataset/images_numeric_labels.csv"

IMAGE_DIR = BASE_PATH

# Load dataframe
df = pd.read_csv(CSV_PATH)

# Keep only filename (remove full path)
df['image_path'] = df['image_path'].apply(os.path.basename)

print("Total samples:", len(df))
df.head()


In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

class AadhaarDataset(Dataset):
    def __init__(self, df, img_dir, transform=None):
        self.df = df.reset_index(drop=True)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        path = os.path.join(self.img_dir, row['image_path'])
        image = Image.open(path).convert("RGB")
        label = torch.tensor(row['label'], dtype=torch.long)
        if self.transform:
            image = self.transform(image)
        return image, label, row['image_path']


In [None]:
dataset = AadhaarDataset(df, IMAGE_DIR, transform) 
train_size = int(0.8 * len(dataset)) 
test_size = len(dataset) - train_size 
train_dataset, test_dataset = random_split(dataset, [train_size, test_size]) 
train_indices = train_dataset.indices 
test_indices = test_dataset.indices 
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True) 
test_loader = DataLoader(test_dataset, batch_size=8) 
print(len(train_dataset), len(test_dataset))

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = timm.create_model('vit_base_patch16_224', pretrained=True)
num_ftrs = model.head.in_features
model.head = nn.Linear(num_ftrs, 1)

model = model.to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)


In [None]:
def train_model(model, loader, criterion, optim, device):
    model.train()
    total, correct, loss_sum = 0, 0, 0

    for imgs, labels, _ in loader:
        imgs, labels = imgs.to(device), labels.float().unsqueeze(1).to(device)

        optim.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optim.step()

        preds = (torch.sigmoid(outputs) >= 0.5).float()
        correct += (preds == labels).sum().item()
        total += len(labels)
        loss_sum += loss.item() * len(labels)

    return loss_sum/total, correct/total

def eval_model(model, loader, criterion, device):
    model.eval()
    total, correct, loss_sum = 0, 0, 0
    preds_all, labels_all = [], []

    with torch.no_grad():
        for imgs, labels, _ in loader:
            imgs, labels = imgs.to(device), labels.float().unsqueeze(1).to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss_sum += loss.item()*len(labels)

            preds = (torch.sigmoid(outputs) >= 0.5).float()

            correct += (preds == labels).sum().item()
            total += len(labels)

            preds_all.extend(preds.cpu().numpy().flatten())
            labels_all.extend(labels.cpu().numpy().flatten())

    return loss_sum/total, correct/total, preds_all, labels_all


In [None]:
for epoch in range(10):
    tl, ta = train_model(model, train_loader, criterion, optimizer, device)
    vl, va, _, _ = eval_model(model, test_loader, criterion, device)
    print(f"Epoch {epoch+1} | Train Acc: {ta:.4f} | Test Acc: {va:.4f}")


In [None]:
test_loss, test_acc, preds, labels = eval_model(model, test_loader, criterion, device)

print("\nConfusion Matrix:")
cm = confusion_matrix(labels, preds)
print(cm)

sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.show()


In [None]:
print("Stage 2 check")

In [None]:
!pip install sentence-transformers --quiet
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"


import ssl
ssl._create_default_https_context = ssl._create_unverified_context



In [None]:
import pytesseract
from PIL import Image

TESS_LANGS = "eng+hin+tam+tel+kan+mal+ben+mar+pan+guj"

def extract_text(path):
    text = pytesseract.image_to_string(
        Image.open(path),
        lang=TESS_LANGS
    )
    return text.strip()


In [None]:
import re
import unicodedata

INDIAN_SCRIPTS = {
    "devanagari",  # hi, mr, ne, sa, bho, mai
    "bengali",     # bn, as, mni
    "tamil",       # ta
    "telugu",      # te
    "kannada",     # kn
    "malayalam",   # ml (your model doesn't support OCR for it)
    "oriya",       # odia (not supported in EasyOCR)
    "gurmukhi",    # Punjabi (not supported)
}

LATIN = "latin"

def detect_scripts(text):
    scripts = set()

    for ch in text:
        if ch.isspace() or ch.isdigit():
            continue

        try:
            block = unicodedata.name(ch)
        except:
            scripts.add("other")
            continue

        # Indian scripts
        if "DEVANAGARI" in block:
            scripts.add("devanagari")
        elif "BENGALI" in block:
            scripts.add("bengali")
        elif "TAMIL" in block:
            scripts.add("tamil")
        elif "TELUGU" in block:
            scripts.add("telugu")
        elif "KANNADA" in block:
            scripts.add("kannada")
        elif "MALAYALAM" in block:
            scripts.add("malayalam")
        elif "ORIYA" in block or "ODIA" in block:
            scripts.add("oriya")
        elif "GURMUKHI" in block:
            scripts.add("gurmukhi")

        # English
        elif "LATIN" in block:
            scripts.add("latin")

        # Arabic family (Urdu)
        elif "ARABIC" in block:
            scripts.add("arabic")

        else:
            scripts.add("other")

    return scripts


In [None]:
def allowed_mix(scripts):
    indian = scripts.intersection(INDIAN_SCRIPTS)
    english = "latin" in scripts

    # FOREIGN language detected ‚Üí reject
    if scripts - indian - {"latin","other"} :
        return False

    # English + exactly one Indian language hindi -> ok
    if (english and len(indian) == 1 and indian == {"devanagari"}):
        return True

    # Anything else ‚Üí reject
    return False


In [None]:
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-mpnet-base-v2")

def text_embedding(text):
    vector = embedder.encode(text)
    return vector


In [None]:
X_vectors = []
y_labels = []

print("Extracting OCR + embeddings for all images...")


import warnings

# Ignore specific user warnings
warnings.filterwarnings("ignore", message="'pin_memory' argument is set as true but not supported on MPS now")

for i, row in df.iterrows():
    path = os.path.join(IMAGE_DIR, row["image_path"])
    text = extract_text(path)
    emb = text_embedding(text)     # 384-d vector
    X_vectors.append(emb)
    y_labels.append(row["label"])

X_vectors = np.vstack(X_vectors)
y_labels = np.array(y_labels)

print("Embedding matrix shape:", X_vectors.shape)



In [None]:
train_mask = np.zeros(len(df), dtype=bool)
test_mask = np.zeros(len(df), dtype=bool)
train_mask[train_indices] = True
test_mask[test_indices] = True

X_train = X_vectors[train_mask]
X_test = X_vectors[test_mask]
y_train = y_labels[train_mask]
y_test = y_labels[test_mask]

print("Train:", X_train.shape, "Test:", X_test.shape)


In [None]:

xgb_model = xgb.XGBClassifier(
    max_depth=6,
    n_estimators=300,
    learning_rate=0.05,
    subsample=0.9,
    colsample_bytree=0.9,
    objective="binary:logistic"
)

xgb_model.fit(X_train, y_train)

def predict_pipeline(path):
    # ---------- Stage 1: ViT ----------
    img = Image.open(path).convert("RGB")
    img_t = transform(img).unsqueeze(0).to(device)

    with torch.no_grad():
        out = torch.sigmoid(model(img_t)).item()
        vit_pred = 1 if out >= 0.65 else 0

    # If ViT rejects ‚Üí stop here
    if vit_pred == 0:
        return {
            "final": 0,
            "reason": "ViT rejected (Stage 1)",
            "stage1_vit": vit_pred,
            "stage2_xgb": None,
            "ocr_text": None,
        }

    # ---------- Stage 2: OCR + Embedding + XGBoost ----------
    # ---------------- Stage 2: OCR + Language Filter + Embedding + XGB ----------------

    text = extract_text(path)
    scripts = detect_scripts(text)
    
    if not allowed_mix(scripts):
        return {
            "final": 0,
            "reason": f"Rejected: unsupported script combination {scripts}",
            "stage1_vit": vit_pred,
            "stage2_xgb": None,
            "ocr_text": text
        }

    # 2Ô∏è‚É£ Continue with your embedding + XGB
    emb = text_embedding(text).reshape(1, -1)
    
    xgb_pred = int(xgb_model.predict(emb)[0])
    prob = float(xgb_model.predict_proba(emb)[0, 1])
    
    return {
        "stage1_vit": vit_pred,
        "stage2_xgb": xgb_pred,
        "xgb_prob": prob,
        "ocr_text": text,
        "script_detected": list(scripts),
        "final": xgb_pred}



In [None]:
sample = "/Users/manshusainishab/Desktop/dataset/train/adhar_c66.png"
result = predict_pipeline(sample)
result


In [None]:
sample = "/Users/manshusainishab/Downloads/image_128.png"
result = predict_pipeline(sample)
result


In [None]:
final_preds = []
final_labels = []

print("Evaluating full pipeline (Stage1 + Stage2)...")

for idx in test_indices:
    row = df.iloc[idx]
    path = os.path.join(IMAGE_DIR, row["image_path"])
    true_label = row["label"]

    result = predict_pipeline(path)
    final_pred = result["final"]

    final_labels.append(true_label)
    final_preds.append(final_pred)

# Convert to numpy arrays
final_labels = np.array(final_labels)
final_preds = np.array(final_preds)

# Print classification report
print("\nüìò Classification Report (Final Output)")
print(classification_report(final_labels, final_preds))

# Confusion matrix
cm = confusion_matrix(final_labels, final_preds)
print("\nüìä Confusion Matrix (Final):")
print(cm)

# Plot confusion matrix
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Pred 0 (Reject)', 'Pred 1 (Aadhaar)'],
            yticklabels=['True 0 (Non-Aadhaar)', 'True 1 (Aadhaar)'])
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Final Confusion Matrix (After Stage1 + Stage2)")
plt.show()
