In [None]:
!pip install torchvision
!pip install --upgrade transformers
!pip install mmocr

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch==2.6.0->torchvision)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch==2.6.0->torchvision)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch==2.6.0->torchvision)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')

zipPath = '/content/drive/MyDrive/train_val_images.zip'
zipPathTest = '/content/drive/MyDrive/test_images.zip'
annsPathTrain = '/content/drive/MyDrive/TextOCR_0.1_train.json'
annsPathVal = '/content/drive/MyDrive/TextOCR_0.1_val.json'
annsPathTest = '/content/drive/MyDrive/TextOCR_0.1_test.json'

In [None]:
import zipfile
from PIL import Image
from IPython.display import display
import io

data = zipfile.ZipFile(zipPath, 'r')
images = [f for f in data.namelist() if f.startswith("train_images/") and f.endswith('.jpg')]
file = data.open(images[1000])
img = Image.open(io.BytesIO(file.read())).convert("RGB")
display(img)

In [None]:
from transformers import VisionEncoderDecoderModel, DonutProcessor
import torch
from PIL import Image

model = VisionEncoderDecoderModel.from_pretrained("naver-clova-ix/donut-base")
processor = DonutProcessor.from_pretrained("naver-clova-ix/donut-base")
model.eval().to("cuda")

In [None]:
from PIL import ImageOps

def resizeMaintainRatio(image, size=(1920, 2560)):
  w, h = image.size
  scale = min(size[0] / w, size[1] / h)
  newW, newH = int(w * scale), int(h * scale)
  newImg = image.resize((newW, newH))

  deltaW = size[0] - newW
  deltaH = size[1] - newH
  padding = (deltaW // 2, deltaH // 2, deltaW - (deltaW // 2), deltaH - (deltaH // 2))
  newImg = ImageOps.expand(newImg, padding, fill = (255, 255, 255))
  return newImg

In [None]:
import json
from collections import defaultdict
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import cv2
import numpy as np
import random

class ZippedDataset(Dataset):
  def __init__(self, zipPath, annsPath, processor, isTest, maxLength = 50, maxSamples = None):
    self.zipPath = zipPath
    self.annsPath = annsPath
    self.processor = processor
    self.maxLength = maxLength
    self.archive = zipfile.ZipFile(self.zipPath, 'r')

    with open(self.annsPath, 'r') as f:
      annotations = json.load(f)
    self.imgs = annotations["imgs"]
    self.anns = annotations["anns"]

    self.samples = []
    minBox = (20, 20)
    img = None
    curImgID = ""
    for ann in self.anns.values():
      imgID = ann["image_id"]
      text = ann["utf8_string"].strip()
      bbox = ann["bbox"]
      zipImgPath = self.imgs[imgID]["file_name"]
      zipImgPath = zipImgPath.replace("test/", "test_images/") if isTest else zipImgPath.replace("train/", "train_images/")
      if img is None or imgID != curImgID:
        with self.archive.open(zipImgPath) as f:
          img = Image.open(io.BytesIO(f.read())).convert("RGB")
        curImgID = imgID
      x, y, w, h = bbox
      if w < minBox[0] or h < minBox[1]:
        continue
      cropped = img.crop((x, y, x + w, y + h))
      if self.tooBlurry(cropped) or text == "" or text == ".":
        continue
      self.samples.append((zipImgPath, bbox, text))
      if len(self.samples) == maxSamples:
        break

  def __len__(self):
    return len(self.samples)

  def __getitem__(self, idx):
    zipImgPath, bbox, label = self.samples[idx]
    with self.archive.open(zipImgPath) as f:
      img = Image.open(io.BytesIO(f.read())).convert("RGB")
    x, y, w, h = bbox
    cropped = img.crop((x, y, x + w, y + h))
    cropped = resizeMaintainRatio(cropped, (512, 512))
    pixelValues = self.processor(cropped, return_tensors = "pt").pixel_values[0]
    labels = self.processor.tokenizer(label, max_length = self.maxLength, padding = "max_length", truncation = True, return_tensors = "pt").input_ids.squeeze(0)
    labels[labels == self.processor.tokenizer.pad_token_id] = -100
    return {"pixel_values": pixelValues, "labels": labels}

  def sendItem(self, idx):
    zipImgPath, bbox, label = self.samples[idx]
    with self.archive.open(zipImgPath) as f:
      img = Image.open(io.BytesIO(f.read())).convert("RGB")
    x, y, w, h = bbox
    cropped = img.crop((x, y, x + w, y + h))
    cropped = resizeMaintainRatio(cropped, (512, 512))
    return cropped, label

  def tooBlurry(self, img, threshold = 100):
    img = np.array(img)
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    laplacian = cv2.Laplacian(gray, cv2.CV_64F).var()
    return laplacian < threshold

  def printLabel(self, idx):
    _ , label = self.samples[idx]
    print(label)

In [None]:
datasetTrain = ZippedDataset(zipPath, annsPathTrain, processor, False, maxSamples = 40000)
dataloader = DataLoader(datasetTrain, batch_size = 4, shuffle = False, num_workers = 1)

In [None]:
import gc
import torch

torch.cuda.empty_cache()
gc.collect()
torch.cuda.ipc_collect()

In [None]:
from torch.cuda.amp import autocast

embeddings = []
model.eval()
count = 0

with autocast(dtype = torch.float16), torch.no_grad():
  for batch in dataloader:
    print("processing batch", count)
    count += 1
    batch = {k: v.to(model.device) for k, v in batch.items()}
    encoderOutputs = model.encoder(batch["pixel_values"])
    pooled = encoderOutputs.last_hidden_state.mean(dim = 1)
    embeddings.append(pooled.cpu())

  with autocast(dtype = torch.float16), torch.no_grad():


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
processing batch 5000
processing batch 5001
processing batch 5002
processing batch 5003
processing batch 5004
processing batch 5005
processing batch 5006
processing batch 5007
processing batch 5008
processing batch 5009
processing batch 5010
processing batch 5011
processing batch 5012
processing batch 5013
processing batch 5014
processing batch 5015
processing batch 5016
processing batch 5017
processing batch 5018
processing batch 5019
processing batch 5020
processing batch 5021
processing batch 5022
processing batch 5023
processing batch 5024
processing batch 5025
processing batch 5026
processing batch 5027
processing batch 5028
processing batch 5029
processing batch 5030
processing batch 5031
processing batch 5032
processing batch 5033
processing batch 5034
processing batch 5035
processing batch 5036
processing batch 5037
processing batch 5038
processing batch 5039
processing batch 5040
processing batch 5041
processing 

In [None]:
os.makedirs("/content/drive/MyDrive/Core-sets", exist_ok = True)

# **Diversity Embedding Clustering**

In [None]:
from sklearn.cluster import KMeans

X = torch.cat(embeddings).numpy()
kmeans = KMeans(n_clusters = 50, random_state = 44).fit(X)
clusterIDs = kmeans.labels_

In [None]:
from collections import defaultdict

clusterMap = {}
for idx, cid in enumerate(clusterIDs):
  if cid not in clusterMap:
    clusterMap[cid] = []
  clusterMap[cid].append(idx)

selected = []
for cid, idxs in clusterMap.items():
  selected.append(random.sample(idxs, min(160, len(idxs))))

In [None]:
import zipfile

imgLabelPairs = []
for clusterIdx in selected:
  for idx in clusterIdx:
    imgLabelPairs.append(datasetTrain.sendItem(idx))

zipPath = "/content/drive/MyDrive/Core-sets/DiversityClustering.zip"
with zipfile.ZipFile(zipPath, 'w') as zipf:
  for i, (img, label) in enumerate(imgLabelPairs):
    imgBuffer = io.BytesIO()
    img.save(imgBuffer, format = "JPEG")
    imgName = f"img_{i}.jpg"
    zipf.writestr(imgName, imgBuffer.getvalue())

labelsOnly = [label for _, label in imgLabelPairs]
labelJsonStr = json.dumps(labelsOnly)
with open("/content/drive/MyDrive/Core-sets/DiversityClustering.json", 'w') as f:
  f.write(labelJsonStr)

# **"Weak" Hard Example Mining**

In [None]:
from sklearn.cluster import KMeans

X = torch.cat(embeddings).numpy()
kmeans = KMeans(n_clusters = 50, random_state = 44).fit(X)
clusterIDs = kmeans.labels_
centroids = kmeans.cluster_centers_

In [None]:
selected = []

for clusterID in range(50):
  clusterMask = (clusterIDs == clusterID)
  clusterIdxs = np.where(clusterMask)[0]
  clusterEmbeddings = X[clusterIdxs]
  centroid = centroids[clusterID]
  dists = np.linalg.norm(clusterEmbeddings - centroid, axis = 1)
  count = min(160, len(clusterIdxs))
  topIndices = clusterIdxs[np.argsort(dists)[-count:]]
  selected.append(topIndices)

In [None]:
import zipfile

imgLabelPairs = []
for clusterIdx in selected:
  for idx in clusterIdx:
    imgLabelPairs.append(datasetTrain.sendItem(idx))

zipPath = "/content/drive/MyDrive/Core-sets/HardClustering.zip"
with zipfile.ZipFile(zipPath, 'w') as zipf:
  for i, (img, label) in enumerate(imgLabelPairs):
    imgBuffer = io.BytesIO()
    img.save(imgBuffer, format = "JPEG")
    imgName = f"img_{i}.jpg"
    zipf.writestr(imgName, imgBuffer.getvalue())

labelsOnly = [label for _, label in imgLabelPairs]
labelJsonStr = json.dumps(labelsOnly)
with open("/content/drive/MyDrive/Core-sets/HardClustering.json", 'w') as f:
  f.write(labelJsonStr)