In [1]:
#Import Kaggle dataset
import os
from pathlib import Path

os.environ["KAGGLE_KEY"] = os.getenv('KAGGLE_KEY')
os.environ["KAGGLE_USERNAME"] = os.getenv('KAGGLE_USERNAME')


In [11]:
import kagglehub

asl_dataset = "ayuraj/asl-dataset"
asl_alphabet = "grassknoted/asl-alphabet"
path = kagglehub.dataset_download(asl_alphabet)
print(f"Path to dataset files:{path}")


Downloading from https://www.kaggle.com/api/v1/datasets/download/grassknoted/asl-alphabet?dataset_version_number=1...


100%|██████████| 1.03G/1.03G [02:10<00:00, 8.46MB/s]

Extracting files...





Path to dataset files:/home/mich02/.cache/kagglehub/datasets/grassknoted/asl-alphabet/versions/1


In [3]:
# First dataset
!apt-get install tree
!kaggle datasets download -d ayuraj/asl-dataset
! unzip -q "/content/asl-dataset.zip"

!kaggle datasets download -d grassknoted/asl-alphabet
! unzip -q "/content/asl-alphabet.zip"

!mv asl_alphabet_train/asl_alphabet_train/* asl_alphabet_train/
!rm -r asl_alphabet_train/asl_alphabet_train/
!mv asl_alphabet_test/asl_alphabet_test/* asl_alphabet_test/
!rm -r asl_alphabet_test/asl_alphabet_test/

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following NEW packages will be installed:
  tree
0 upgraded, 1 newly installed, 0 to remove and 41 not upgraded.
Need to get 47.9 kB of archives.
After this operation, 116 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 tree amd64 2.0.2-1 [47.9 kB]
Fetched 47.9 kB in 0s (121 kB/s)
Selecting previously unselected package tree.
(Reading database ... 125082 files and directories currently installed.)
Preparing to unpack .../tree_2.0.2-1_amd64.deb ...
Unpacking tree (2.0.2-1) ...
Setting up tree (2.0.2-1) ...
Processing triggers for man-db (2.10.2-1) ...
Dataset URL: https://www.kaggle.com/datasets/ayuraj/asl-dataset
License(s): CC0-1.0
Downloading asl-dataset.zip to /content
  0% 0.00/56.9M [00:00<?, ?B/s]
100% 56.9M/56.9M [00:00<00:00, 1.45GB/s]
Dataset URL: https://www.kaggle.com/datasets/grassknoted/asl-alphabet
License(s): GPL-2.0
D

In [18]:
# ===== Top-level worker helpers (picklable) =====
import uuid, random, cv2, numpy as np
from pathlib import Path
from PIL import Image

def _init_opencv_worker():
    try:
        cv2.setNumThreads(1)
        cv2.ocl.setUseOpenCL(False)
    except Exception:
        pass

def _grabcut_mask_np(img_bgr):
    h, w = img_bgr.shape[:2]
    mask = np.zeros((h, w), np.uint8)
    bgd, fgd = np.zeros((1, 65), np.float64), np.zeros((1, 65), np.float64)
    pad = max(6, min(h, w) // 20)
    rect = (pad, pad, w - 2*pad, h - 2*pad)
    try:
        cv2.grabCut(img_bgr, mask, rect, bgd, fgd, 3, mode=cv2.GC_INIT_WITH_RECT)
        fg = (mask == cv2.GC_FGD) | (mask == cv2.GC_PR_FGD)
    except Exception:
        fg = np.ones((h, w), bool)
    return fg

def _random_bg_np(shape, pastel_bg: bool):
    if not pastel_bg:
        return np.random.randint(0, 256, shape, dtype=np.uint8)
    base = np.random.randint(100, 256, (1, 1, 3), dtype=np.uint8)
    noise = np.random.randint(-30, 30, shape, dtype=np.int16)
    return np.clip(base + noise, 0, 255).astype(np.uint8)

# --- Random BG worker ---
def worker_random_bg(args):
    """args = (fpath_str, label, out_dir_str, target_size, pastel_bg, labeled_flag)"""
    _init_opencv_worker()
    fpath_str, lbl, out_dir_str, target_size, pastel_bg, labeled = args
    f = Path(fpath_str)
    out_dir = Path(out_dir_str)
    img_bgr = cv2.imread(str(f))
    if img_bgr is None:
        return None
    img_bgr = cv2.resize(img_bgr, target_size)
    mask = _grabcut_mask_np(img_bgr)
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    bg = _random_bg_np(img_rgb.shape, pastel_bg)
    comp = np.where(mask[..., None], img_rgb, bg).astype(np.uint8)

    if labeled:
        dst = out_dir / lbl / f.name
        if dst.exists():
            dst = out_dir / lbl / f"{f.stem}_rb_{uuid.uuid4().hex[:6]}{f.suffix}"
    else:
        dst = out_dir / f.name

    cv2.imwrite(str(dst), cv2.cvtColor(comp, cv2.COLOR_RGB2BGR))

    # Occasionally return a sample path for preview (opened in main proc)
    if random.random() < 0.02:
        return (str(dst), lbl or f.name)
    return None

# --- File copy worker (I/O bound, used by combine) ---
def worker_copy_file(args):
    """args = (src_path_str, dst_path_str, label_for_preview)"""
    src_path_str, dst_path_str, lbl = args
    src_p, dst_p = Path(src_path_str), Path(dst_path_str)
    dst_p.parent.mkdir(parents=True, exist_ok=True)
    try:
        import shutil
        shutil.copy2(src_p, dst_p)
    except Exception:
        return None
    if random.random() < 0.02:
        return (str(dst_p), lbl)
    return None

# --- Augmentation worker ---
def worker_augment_one(args):
    """args = (src_path_str, lbl, out_dir_str, target_size, pastel_bg_unused, labeled_flag, aug_conf_dict)"""
    _init_opencv_worker()
    src_path_str, lbl, out_dir_str, target_size, _pastel, labeled, aug_conf = args
    out_dir = Path(out_dir_str)
    try:
        # Rebuild the torchvision augment pipeline inside the worker (it’s picklable but safer to rebuild from config)
        from torchvision import transforms
        from PIL import Image
        import torch

        class AddGaussianNoise:
            def __init__(self, mean=0.0, std=0.015):
                self.mean, self.std = mean, std
            def __call__(self, img_tensor):
                noise = torch.randn_like(img_tensor) * self.std + self.mean
                return torch.clamp(img_tensor + noise, 0.0, 1.0)

        aug = transforms.Compose([
            transforms.Resize(target_size, interpolation=Image.BILINEAR),
            transforms.RandomApply([transforms.ColorJitter(
                brightness=0.25, contrast=0.25, saturation=0.15, hue=0.02)], p=0.8),
            transforms.RandomAffine(degrees=10, translate=(0.05, 0.05),
                                    scale=(0.95, 1.05), shear=(-5, 5)),
            transforms.RandomPerspective(distortion_scale=0.25, p=0.4),
            transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 1.5)),
            transforms.ToTensor(),
            AddGaussianNoise(mean=0.0, std=0.015),
            transforms.ToPILImage()
        ])

        base = Image.open(src_path_str).convert("RGB")
        aug_img = aug(base)
        new_name = f"{Path(src_path_str).stem}_aug_{uuid.uuid4().hex[:8]}.jpg"
        dst = (out_dir / lbl / new_name) if labeled else (out_dir / new_name)
        aug_img.save(dst, quality=95)

        if random.random() < 0.02:
            return (str(dst), lbl or Path(src_path_str).stem)
    except Exception:
        return None
    return None

In [19]:
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
from PIL import Image

def make_random_bg_dataset(self, in_dir, out_dir=None):
    src = Path(in_dir).resolve()
    out = Path(out_dir or f"{in_dir}_with_random_rgb_values").resolve()
    self._safe_mkdir(out)

    label_dirs = [d for d in src.iterdir() if d.is_dir()]
    labeled = bool(label_dirs)
    if labeled:
        for d in label_dirs:
            self._safe_mkdir(out / d.name)

    # Build jobs list (strings only)
    if labeled:
        jobs = [(str(f), d.name, str(out), self.target_size, self.pastel_bg, True)
                for d in label_dirs for f in d.iterdir() if self._is_image(f)]
    else:
        jobs = [(str(f), "", str(out), self.target_size, self.pastel_bg, False)
                for f in src.iterdir() if self._is_image(f)]

    previews = []
    with ProcessPoolExecutor(max_workers=self.n_workers, initializer=_init_opencv_worker) as ex:
        for r in tqdm(ex.map(worker_random_bg, jobs), total=len(jobs), desc=f"RandomBG {src.name}"):
            if r and len(previews) < 12:
                previews.append(r)

    # open a few preview paths now
    samples = []
    for pth, lbl in previews[:9]:
        try:
            samples.append((Image.open(pth).convert("RGB"), lbl))
        except Exception:
            pass

    print(f"✅ Random background dataset saved: {out}")
    self._show_grid(samples, f"Random RGB Backgrounds: {src.name}")
    return str(out)

In [20]:
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from PIL import Image

def combine_datasets(self, src_dirs, combined_dir):
    out = Path(combined_dir).resolve()
    self._safe_mkdir(out)

    label_set = set()
    for s in src_dirs:
        for d in os.listdir(s):
            if (Path(s) / d).is_dir():
                label_set.add(d)
    for lbl in sorted(label_set):
        self._safe_mkdir(out / lbl)

    tasks = []
    for src in src_dirs:
        base = Path(src).name
        labeled_subdirs = [d for d in Path(src).iterdir() if d.is_dir()]
        if labeled_subdirs:
            for d in labeled_subdirs:
                for f in d.iterdir():
                    if not self._is_image(f): continue
                    dst = out / d.name / f.name
                    if dst.exists(): dst = out / d.name / f"{base}_{f.name}"
                    tasks.append((str(f), str(dst), d.name))
        else:
            for f in [x for x in Path(src).iterdir() if self._is_image(x)]:
                dst = out / f.name
                if dst.exists(): dst = out / f"{base}_{f.name}"
                tasks.append((str(f), str(dst), ""))

    previews = []
    with ThreadPoolExecutor(max_workers=min(64, self.n_workers * 4)) as ex:
        for r in tqdm(ex.map(worker_copy_file, tasks), total=len(tasks), desc="Combining"):
            if r and len(previews) < 12:
                previews.append(r)

    samples = []
    for pth, lbl in previews[:9]:
        try:
            samples.append((Image.open(pth).convert("RGB"), lbl))
        except Exception:
            pass

    print(f"✅ Combined dataset saved: {out}")
    self._show_grid(samples, "Combined Dataset Preview")
    return str(out)

In [21]:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from tqdm import tqdm
from PIL import Image
import math

def augment_20_percent(self, in_dir, out_dir=None, include_originals=True):
    src = Path(in_dir).resolve()
    out = Path(out_dir or f"{in_dir}_augmented").resolve()
    self._safe_mkdir(out)

    labeled_dirs = [d for d in src.iterdir() if d.is_dir()]
    labeled = bool(labeled_dirs)
    if labeled:
        for d in labeled_dirs: self._safe_mkdir(out / d.name)

    files = ([(d.name, f) for d in labeled_dirs for f in d.iterdir() if self._is_image(f)]
             if labeled else [("", f) for f in src.iterdir() if self._is_image(f)])
    base_total = len(files)
    print(f"Found {base_total} images in {src.name}")

    if include_originals:
        def _copy_orig(item):
            lbl, f = item
            try:
                img = Image.open(f).convert("RGB").resize(self.target_size, Image.BILINEAR)
                dst = (out / lbl / f.name) if labeled else (out / f.name)
                if dst.exists():
                    dst = dst.with_name(f"{dst.stem}_orig_{uuid.uuid4().hex[:6]}{dst.suffix}")
                img.save(dst, quality=95)
            except Exception:
                pass
            return None
        with ThreadPoolExecutor(max_workers=min(64, self.n_workers * 4)) as ex:
            list(tqdm(ex.map(_copy_orig, files), total=len(files), desc="Copying originals"))

    to_add = int(math.ceil(base_total * 0.20))
    print(f"Augmenting +{to_add} images (~20%)...")

    plan = []
    if labeled:
        per = {d.name: [f for f in d.iterdir() if self._is_image(f)] for d in labeled_dirs}
        target = {k: max(0, int(math.ceil(len(v) * 0.20))) for k, v in per.items()}
        drift = to_add - sum(target.values())
        keys = list(target.keys()); i = 0
        while drift != 0 and keys:
            target[keys[i % len(keys)]] += 1 if drift > 0 else -1
            drift += -1 if drift > 0 else 1
            i += 1
        for lbl, lst in per.items():
            for k in range(target[lbl]):
                plan.append((str(lst[k % len(lst)]), lbl, str(out), self.target_size, False, True, {}))
    else:
        lst = [str(f) for _, f in files]
        for k in range(to_add):
            plan.append((lst[k % len(lst)], "", str(out), self.target_size, False, False, {}))

    previews = []
    with ProcessPoolExecutor(max_workers=self.n_workers, initializer=_init_opencv_worker) as ex:
        for r in tqdm(ex.map(worker_augment_one, plan), total=len(plan), desc="Augmenting"):
            if r and len(previews) < 12:
                previews.append(r)

    samples = []
    for pth, lbl in previews[:9]:
        try:
            samples.append((Image.open(pth).convert("RGB"), lbl))
        except Exception:
            pass

    print(f"✅ Augmentation complete: {out} | Copied: {base_total if include_originals else 0}, Augmented: {len(plan)}")
    self._show_grid(samples, "Augmented (+20%) Preview")
    return str(out)

In [None]:
import multiprocessing as mp
try:
    mp.set_start_method("fork")  # or "spawn" if you're on Windows
except RuntimeError:
    pass  # already set

In [17]:
train_dir = "/home/mich02/Desktop/Disability_project_Vietnam/ai4li_VSL/asl_alphabet/asl_alphabet_train"

pipe = ASLPipeline(
    target_size=(224, 224),
    pastel_bg=False,
    n_workers= max(1, os.cpu_count()-2),  # keep 1–2 cores free
    backend="process"                     # best for GrabCut + PIL ops
)

train_rand = pipe.make_random_bg_dataset(train_dir)
train_comb = pipe.combine_datasets([train_dir, train_rand], f"{train_dir}_combined")
train_aug  = pipe.augment_20_percent(train_comb)


RandomBG asl_alphabet_train:   0%|          | 0/87000 [00:00<?, ?it/s]


AttributeError: Can't pickle local object 'ASLPipeline.make_random_bg_dataset.<locals>._do_one'

# Vision Transfomer

Transformer for CV

![](https://drive.google.com/uc?export=view&id=19ZbTiRydCvsWg5dZXVJsuY2D6y7N3HFg)


![](https://drive.google.com/uc?export=view&id=15ZDgQ2fBMkReXxBwvX3eEa4wAsD407U7)

![](https://drive.google.com/uc?export=view&id=1WlKq6shzl4JjpQ7JZR0cW0B7hjJOcQUp)

