<a href="https://colab.research.google.com/github/j00lee/SignLingo/blob/main/Best_Frame_Selection_and_Secondary_Dataset_Selection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Testing on small sample of dataset

In [None]:
!pip install numpy==2.0.0 --force-reinstall
!pip install mediapipe --no-deps

Collecting numpy==2.0.0
  Using cached numpy-2.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
Using cached numpy-2.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.3 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 2.0.0
    Uninstalling numpy-2.0.0:
      Successfully uninstalled numpy-2.0.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
mediapipe 0.10.21 requires sounddevice>=0.4.4, which is not installed.
mediapipe 0.10.21 requires numpy<2, but you have numpy 2.0.0 which is incompatible.
mediapipe 0.10.21 requires protobuf<5,>=4.25.3, but you have protobuf 5.29.4 which is incompatible.[0m[31m
[0mSuccessfully installed numpy-2.0.0


In [None]:
# === Step 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import cv2
import random
import shutil
import numpy as np
import pandas as pd
import mediapipe as mp
from tqdm import tqdm

# === Set up MediaPipe Hands
mp_hands = mp.solutions.hands
hands_detector = mp_hands.Hands(static_image_mode=True, max_num_hands=2)

def calculate_sharpness(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    return cv2.Laplacian(gray, cv2.CV_64F).var()

def detect_hands(image):
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = hands_detector.process(image_rgb)
    return results.multi_hand_landmarks is not None

# === Paths
base_dataset_path = '/content/drive/MyDrive/ASL Project/dataset'
base_filtered_splits = '/content/drive/MyDrive/ASL Project/filtered splits'
base_best_frames = '/content/drive/MyDrive/ASL Project/best_frames'

splits = ['train', 'val', 'test']
# sample_size = 10  # Number of videos to sample per split

# === Hyperparameters
sharpness_threshold = 70.0  # Change this to be more/less strict

# === Logs
missing_videos = []
dirty_videos = []

for split in splits:
    print(f"🚀 Processing {split} split...")

    frames_path = os.path.join(base_dataset_path, split)
    output_path = os.path.join(base_best_frames, split)
    dirty_output_path = os.path.join(base_best_frames, f'dirty_{split}')

    # === Clear and recreate folders
    if os.path.exists(output_path):
        shutil.rmtree(output_path)
    os.makedirs(output_path)

    if os.path.exists(dirty_output_path):
        shutil.rmtree(dirty_output_path)
    os.makedirs(dirty_output_path)

    # Load CSV and sample videos
    csv_path = os.path.join(base_filtered_splits, f'{split}_filtered.csv')
    df = pd.read_csv(csv_path)  # Comma-separated
    video_list = [os.path.splitext(v)[0] for v in df['Video file'].tolist()]
    # sample_videos = random.sample(video_list, min(sample_size, len(video_list)))
    sample_videos = video_list

    sharpness_log = []

    for video_folder in tqdm(sample_videos, desc=f"Processing {split} videos"):
        frame_folder = os.path.join(frames_path, video_folder)

        if not os.path.isdir(frame_folder):
            print(f"⚠️ Warning: Frame folder missing for {video_folder}")
            missing_videos.append(video_folder)
            continue

        best_frame_with_hands = None
        best_score_with_hands = -1

        for frame_file in sorted(os.listdir(frame_folder)):
            frame_path = os.path.join(frame_folder, frame_file)
            frame = cv2.imread(frame_path)
            if frame is None:
                continue

            sharpness = calculate_sharpness(frame)
            has_hands = detect_hands(frame)

            if has_hands and sharpness > best_score_with_hands:
                best_score_with_hands = sharpness
                best_frame_with_hands = frame

        if best_frame_with_hands is not None:
            if best_score_with_hands >= sharpness_threshold:
                save_path = os.path.join(output_path, f'{video_folder}.jpg')
                cv2.imwrite(save_path, best_frame_with_hands)
                sharpness_log.append((video_folder, best_score_with_hands))
            else:
                print(f"⚠️ Frame for {video_folder} below threshold ({best_score_with_hands:.2f})")
                dirty_save_path = os.path.join(dirty_output_path, f'{video_folder}.jpg')
                cv2.imwrite(dirty_save_path, best_frame_with_hands)
                dirty_videos.append(video_folder)
        else:
            print(f"⚠️ No frame with hands found for {video_folder}")
            dirty_videos.append(video_folder)

    # Save sharpness log for this split
    sharpness_df = pd.DataFrame(sharpness_log, columns=["video", "sharpness"])
    sharpness_df = sharpness_df.sort_values(by="sharpness", ascending=False)
    sharpness_df.to_csv(os.path.join(base_best_frames, f"{split}_sharpness.csv"), index=False)

# === Save final logs
missing_log_path = os.path.join(base_best_frames, 'missing_videos.txt')
dirty_log_path = os.path.join(base_best_frames, 'dirty_videos.txt')

with open(missing_log_path, 'w') as f:
    for video in missing_videos:
        f.write(f"{video}\n")

with open(dirty_log_path, 'w') as f:
    for video in dirty_videos:
        f.write(f"{video}\n")

print("✅ Done selecting best frames for sampled videos!")
print(f"⚡ Missing videos logged to: {missing_log_path}")
print(f"⚡ Dirty videos logged to: {dirty_log_path}")


🚀 Processing train split...


Processing train videos:   0%|          | 5/21240 [00:01<1:55:06,  3.07it/s]


KeyboardInterrupt: 

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import mediapipe as mp
import shutil
from tqdm import tqdm
from multiprocessing import Pool

# --- CONFIG ---
BASE_DATASET_PATH = '/content/drive/MyDrive/ASL Project/dataset'
BASE_SPLITS_PATH = '/content/drive/MyDrive/ASL Project/filtered splits'
BASE_OUTPUT_PATH = '/content/drive/MyDrive/ASL Project/best_frames'
SHARPNESS_THRESHOLD = 70.0
SPLITS = ['train', 'val', 'test']
NUM_WORKERS = 2  # Limit parallel workers to reduce memory use
CHUNK_SIZE = 1000  # Process in smaller chunks to avoid memory overload

# --- SHARED PROCESSING FUNCTION ---
def process_video(args):
    video_folder, split, gloss_lookup = args

    frames_path = os.path.join(BASE_DATASET_PATH, split, video_folder)
    output_path = os.path.join(BASE_OUTPUT_PATH, split, f"{video_folder}.jpg")
    dirty_path = os.path.join(BASE_OUTPUT_PATH, f'dirty_{split}', f"{video_folder}.jpg")

    gloss = gloss_lookup.get(video_folder, "")

    if os.path.exists(output_path) or os.path.exists(dirty_path):
        return ('skipped', video_folder, gloss, None, None)

    if not os.path.isdir(frames_path):
        return ('missing', video_folder, gloss, None, None)

    try:
        mp_hands = mp.solutions.hands.Hands(static_image_mode=True, max_num_hands=2)
        best_frame = None
        best_score = -1

        for frame_file in sorted(os.listdir(frames_path)):
            frame_path = os.path.join(frames_path, frame_file)
            try:
                frame = cv2.imread(frame_path)
                if frame is None:
                    continue

                # Optional: downsample to save memory
                # frame = cv2.resize(frame, (640, 480))

                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                sharpness = cv2.Laplacian(gray, cv2.CV_64F).var()
                rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = mp_hands.process(rgb)

                if results.multi_hand_landmarks and sharpness > best_score:
                    best_score = sharpness
                    best_frame = frame
            except Exception as e:
                print(f"❌ Error processing frame {frame_path}: {e}")
                continue

        mp_hands.close()

        if best_frame is None:
            return ('dirty', video_folder, gloss, None, None)

        if best_score >= SHARPNESS_THRESHOLD:
            return ('clean', video_folder, gloss, best_score, best_frame)
        else:
            return ('dirty', video_folder, gloss, best_score, best_frame)
    except Exception as e:
        print(f"❌ Error processing video {video_folder}: {e}")
        return ('dirty', video_folder, gloss, None, None)

# --- MAIN PIPELINE ---
def process_split(split):
    print(f"\n🚀 Processing split: {split}")

    output_dir = os.path.join(BASE_OUTPUT_PATH, split)
    dirty_dir = os.path.join(BASE_OUTPUT_PATH, f'dirty_{split}')
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(dirty_dir, exist_ok=True)

    csv_path = os.path.join(BASE_SPLITS_PATH, f'{split}_filtered.csv')
    df = pd.read_csv(csv_path)
    video_list = [os.path.splitext(v)[0] for v in df['Video file'].tolist()]
    gloss_lookup = {os.path.splitext(row['Video file'])[0]: row['Gloss'] for _, row in df.iterrows()}
    args = [(v, split, gloss_lookup) for v in video_list]

    print(f"📦 {len(args)} videos to process with {NUM_WORKERS} workers in chunks of {CHUNK_SIZE}")

    clean_entries = []
    dirty_entries = []
    missing_videos = []
    skipped_videos = []
    sharpness_log_path = os.path.join(BASE_OUTPUT_PATH, f'{split}_sharpness.csv')

    if os.path.exists(sharpness_log_path):
        existing_sharpness_df = pd.read_csv(sharpness_log_path)
        processed_videos = set(existing_sharpness_df['video'].tolist())
    else:
        processed_videos = set()

    for i in range(0, len(args), CHUNK_SIZE):
        chunk = args[i:i + CHUNK_SIZE]
        with Pool(NUM_WORKERS) as pool:
            for result in tqdm(pool.imap_unordered(process_video, chunk), total=len(chunk)):
                if result[0] == 'skipped':
                    skipped_videos.append(result[1])
                    continue
                elif result[0] == 'missing':
                    missing_videos.append(result[1])
                    continue

                status, video_name, gloss, sharpness, frame = result
                if status == 'dirty':
                    if sharpness is not None:
                        dirty_entries.append((video_name, gloss, sharpness))
                    if frame is not None:
                        cv2.imwrite(os.path.join(dirty_dir, f'{video_name}.jpg'), frame)
                elif status == 'clean':
                    clean_entries.append((video_name, gloss, sharpness))
                    cv2.imwrite(os.path.join(output_dir, f'{video_name}.jpg'), frame)

    all_entries = clean_entries + dirty_entries
    all_df = pd.DataFrame(all_entries, columns=["video", "gloss", "sharpness"])
    if os.path.exists(sharpness_log_path):
        prev_df = pd.read_csv(sharpness_log_path)
        all_df = pd.concat([prev_df, all_df], ignore_index=True)

    all_df = all_df.drop_duplicates(subset="video").sort_values(by="sharpness", ascending=False)
    all_df.to_csv(sharpness_log_path, index=False)

    with open(os.path.join(BASE_OUTPUT_PATH, 'missing_videos.txt'), 'a') as f:
        for v in missing_videos:
            f.write(f"{v}\n")

    with open(os.path.join(BASE_OUTPUT_PATH, 'dirty_videos.txt'), 'a') as f:
        for v in [entry[0] for entry in dirty_entries if entry[0]]:
            f.write(f"{v}\n")

    print(f"✅ {split} done: {len(clean_entries)} clean, {len(dirty_entries)} dirty, {len(missing_videos)} missing, {len(skipped_videos)} skipped")

# --- RUN ALL SPLITS ---
if __name__ == '__main__':
    for split in SPLITS:
        process_split(split)

    print("\n🎉 All splits completed!")



🚀 Processing split: train
📦 21240 videos to process with 2 workers in chunks of 1000


100%|█████████▉| 998/1000 [39:00<00:04,  2.34s/it]Process ForkPoolWorker-16:



KeyboardInterrupt: 

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import mediapipe as mp
import shutil
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed, TimeoutError

# --- CONFIG ---
BASE_DATASET_PATH = '/content/drive/MyDrive/ASL Project/dataset'
BASE_SPLITS_PATH = '/content/drive/MyDrive/ASL Project/filtered splits'
BASE_OUTPUT_PATH = '/content/drive/MyDrive/ASL Project/best_frames'
SHARPNESS_THRESHOLD = 70.0
SPLITS = ['train', 'val', 'test']
NUM_WORKERS = 2  # Limit parallel workers to reduce memory use
CHUNK_SIZE = 1000  # Process in smaller chunks
TIMEOUT_PER_VIDEO = 7  # Timeout per video in seconds

# --- SHARED PROCESSING FUNCTION ---
def process_video(args):
    video_folder, split, gloss_lookup = args

    frames_path = os.path.join(BASE_DATASET_PATH, split, video_folder)
    output_path = os.path.join(BASE_OUTPUT_PATH, split, f"{video_folder}.jpg")
    dirty_path = os.path.join(BASE_OUTPUT_PATH, f'dirty_{split}', f"{video_folder}.jpg")

    gloss = gloss_lookup.get(video_folder, "")

    if os.path.exists(output_path) or os.path.exists(dirty_path):
        return ('skipped', video_folder, gloss, None, None)

    if not os.path.isdir(frames_path):
        return ('missing', video_folder, gloss, None, None)

    try:
        mp_hands = mp.solutions.hands.Hands(static_image_mode=True, max_num_hands=2)
        best_frame = None
        best_score = -1

        for frame_file in sorted(os.listdir(frames_path)):
            frame_path = os.path.join(frames_path, frame_file)
            try:
                frame = cv2.imread(frame_path)
                if frame is None:
                    continue
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                sharpness = cv2.Laplacian(gray, cv2.CV_64F).var()
                rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = mp_hands.process(rgb)

                if results.multi_hand_landmarks and sharpness > best_score:
                    best_score = sharpness
                    best_frame = frame
            except Exception as e:
                print(f"❌ Error processing frame {frame_path}: {e}")
                continue

        mp_hands.close()

        if best_frame is None:
            return ('dirty', video_folder, gloss, None, None)

        if best_score >= SHARPNESS_THRESHOLD:
            return ('clean', video_folder, gloss, best_score, best_frame)
        else:
            return ('dirty', video_folder, gloss, best_score, best_frame)
    except Exception as e:
        print(f"❌ Error processing video {video_folder}: {e}")
        return ('dirty', video_folder, gloss, None, None)

# --- MAIN PIPELINE ---
def process_split(split):
    print(f"\n🚀 Processing split: {split}")

    output_dir = os.path.join(BASE_OUTPUT_PATH, split)
    dirty_dir = os.path.join(BASE_OUTPUT_PATH, f'dirty_{split}')
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(dirty_dir, exist_ok=True)

    csv_path = os.path.join(BASE_SPLITS_PATH, f'{split}_filtered.csv')
    df = pd.read_csv(csv_path)
    video_list = [os.path.splitext(v)[0] for v in df['Video file'].tolist()]
    gloss_lookup = {os.path.splitext(row['Video file'])[0]: row['Gloss'] for _, row in df.iterrows()}
    args = [(v, split, gloss_lookup) for v in video_list]

    print(f"📦 {len(args)} videos to process with {NUM_WORKERS} workers in chunks of {CHUNK_SIZE}")

    clean_entries = []
    dirty_entries = []
    missing_videos = []
    skipped_videos = []
    timeout_videos = []
    sharpness_log_path = os.path.join(BASE_OUTPUT_PATH, f'{split}_sharpness.csv')

    if os.path.exists(sharpness_log_path):
        existing_sharpness_df = pd.read_csv(sharpness_log_path)
        processed_videos = set(existing_sharpness_df['video'].tolist())
    else:
        processed_videos = set()

    for i in range(0, len(args), CHUNK_SIZE):
        chunk = args[i:i + CHUNK_SIZE]
        with ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor:
            futures = {executor.submit(process_video, arg): arg for arg in chunk}
            for future in tqdm(as_completed(futures, timeout=TIMEOUT_PER_VIDEO * len(chunk)), total=len(chunk)):
                try:
                    result = future.result(timeout=TIMEOUT_PER_VIDEO)
                except TimeoutError:
                    arg = futures[future]
                    print(f"⚠️ Timeout processing {arg[0]}")
                    timeout_videos.append(arg[0])
                    continue
                except Exception as e:
                    arg = futures[future]
                    print(f"❌ Crash processing {arg[0]}: {e}")
                    timeout_videos.append(arg[0])
                    continue

                if result[0] == 'skipped':
                    skipped_videos.append(result[1])
                    continue
                elif result[0] == 'missing':
                    missing_videos.append(result[1])
                    continue

                status, video_name, gloss, sharpness, frame = result
                if status == 'dirty':
                    if sharpness is not None:
                        dirty_entries.append((video_name, gloss, sharpness))
                    if frame is not None:
                        cv2.imwrite(os.path.join(dirty_dir, f'{video_name}.jpg'), frame)
                elif status == 'clean':
                    clean_entries.append((video_name, gloss, sharpness))
                    cv2.imwrite(os.path.join(output_dir, f'{video_name}.jpg'), frame)

    all_entries = clean_entries + dirty_entries
    all_df = pd.DataFrame(all_entries, columns=["video", "gloss", "sharpness"])
    if os.path.exists(sharpness_log_path):
        prev_df = pd.read_csv(sharpness_log_path)
        all_df = pd.concat([prev_df, all_df], ignore_index=True)

    all_df = all_df.drop_duplicates(subset="video").sort_values(by="sharpness", ascending=False)
    all_df.to_csv(sharpness_log_path, index=False)

    with open(os.path.join(BASE_OUTPUT_PATH, 'missing_videos.txt'), 'a') as f:
        for v in missing_videos:
            f.write(f"{v}\n")

    with open(os.path.join(BASE_OUTPUT_PATH, 'dirty_videos.txt'), 'a') as f:
        for v in [entry[0] for entry in dirty_entries if entry[0]]:
            f.write(f"{v}\n")

    with open(os.path.join(BASE_OUTPUT_PATH, 'timeout_videos.txt'), 'a') as f:
        for v in timeout_videos:
            f.write(f"{v}\n")

    print(f"✅ {split} done: {len(clean_entries)} clean, {len(dirty_entries)} dirty, {len(missing_videos)} missing, {len(skipped_videos)} skipped, {len(timeout_videos)} timeouts")

# --- RUN ALL SPLITS ---
if __name__ == '__main__':
    for split in SPLITS:
        process_split(split)

    print("\n🎉 All splits completed!")



🚀 Processing split: train
📦 21240 videos to process with 2 workers in chunks of 1000


100%|██████████| 1000/1000 [02:32<00:00,  6.56it/s]
100%|██████████| 1000/1000 [00:15<00:00, 64.49it/s]
100%|██████████| 1000/1000 [00:25<00:00, 39.98it/s]
100%|██████████| 1000/1000 [00:39<00:00, 25.50it/s]
100%|██████████| 1000/1000 [00:40<00:00, 24.47it/s]
100%|██████████| 1000/1000 [00:56<00:00, 17.79it/s]
100%|██████████| 1000/1000 [00:27<00:00, 35.74it/s]
100%|██████████| 1000/1000 [01:17<00:00, 12.96it/s]
100%|██████████| 1000/1000 [01:03<00:00, 15.86it/s]
100%|██████████| 1000/1000 [00:16<00:00, 61.39it/s]
100%|██████████| 1000/1000 [00:28<00:00, 35.55it/s]
100%|██████████| 1000/1000 [00:42<00:00, 23.47it/s]
100%|██████████| 1000/1000 [00:33<00:00, 30.04it/s]
100%|██████████| 1000/1000 [00:32<00:00, 31.04it/s]
100%|██████████| 1000/1000 [00:18<00:00, 53.45it/s]
100%|██████████| 1000/1000 [00:21<00:00, 46.05it/s]
100%|██████████| 1000/1000 [00:33<00:00, 29.81it/s]
100%|██████████| 1000/1000 [01:31<00:00, 10.91it/s]
100%|██████████| 1000/1000 [01:09<00:00, 14.49it/s]
100%|███████

✅ train done: 0 clean, 0 dirty, 2 missing, 20324 skipped, 0 timeouts

🚀 Processing split: val
📦 5446 videos to process with 2 workers in chunks of 1000


100%|██████████| 1000/1000 [00:19<00:00, 52.17it/s]
100%|██████████| 1000/1000 [00:23<00:00, 41.98it/s]
100%|██████████| 1000/1000 [00:29<00:00, 33.35it/s]
100%|██████████| 1000/1000 [00:07<00:00, 133.50it/s]
100%|██████████| 1000/1000 [00:33<00:00, 30.02it/s]
100%|██████████| 446/446 [00:16<00:00, 27.56it/s]
  all_df = pd.concat([prev_df, all_df], ignore_index=True)


✅ val done: 0 clean, 0 dirty, 0 missing, 5273 skipped, 0 timeouts

🚀 Processing split: test
📦 17639 videos to process with 2 workers in chunks of 1000


100%|██████████| 1000/1000 [00:39<00:00, 25.31it/s]
100%|██████████| 1000/1000 [00:23<00:00, 42.55it/s]
100%|██████████| 1000/1000 [00:32<00:00, 30.40it/s]
100%|██████████| 1000/1000 [00:44<00:00, 22.53it/s]
100%|██████████| 1000/1000 [00:27<00:00, 35.78it/s]
100%|██████████| 1000/1000 [00:11<00:00, 90.27it/s]
100%|██████████| 1000/1000 [01:01<00:00, 16.13it/s]
100%|██████████| 1000/1000 [10:49<00:00,  1.54it/s]
100%|██████████| 1000/1000 [13:57<00:00,  1.19it/s]
100%|██████████| 1000/1000 [13:01<00:00,  1.28it/s]
100%|██████████| 1000/1000 [10:56<00:00,  1.52it/s]
100%|██████████| 1000/1000 [10:28<00:00,  1.59it/s]
100%|██████████| 1000/1000 [10:43<00:00,  1.55it/s]
100%|██████████| 1000/1000 [07:16<00:00,  2.29it/s]
100%|██████████| 1000/1000 [00:08<00:00, 113.23it/s]
100%|██████████| 1000/1000 [00:08<00:00, 120.95it/s]
100%|██████████| 1000/1000 [00:07<00:00, 138.54it/s]
100%|██████████| 639/639 [00:05<00:00, 110.63it/s]


✅ test done: 3419 clean, 2548 dirty, 3884 missing, 6860 skipped, 0 timeouts

🎉 All splits completed!


In [None]:
drive.flush_and_unmount()
drive.mount('/content/drive')

Mounted at /content/drive


# Evaluating Usefulness of Sharpness Score

# Refiltering the Test Set

In [None]:
import pandas as pd
import os
import shutil
from tqdm import tqdm

# === Step 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# === Step 2: Define paths
base_path = '/content/drive/MyDrive/ASL Project/'
test_image_dir = os.path.join(base_path, 'final_dataset/test')
sharpness_csv = os.path.join(base_path, 'best_frames/test_sharpness.csv')
discard_dir = os.path.join(base_path, 'best_frames/discarded_test')

# Create discard folder if it doesn't exist
os.makedirs(discard_dir, exist_ok=True)

# === Step 3: Load sharpness scores
df = pd.read_csv(sharpness_csv)

# === Step 4: Sort by sharpness and keep top 4000
top_k = 4000
df_sorted = df.sort_values(by='sharpness', ascending=False)
top_df = df_sorted.head(top_k)

# Append '.jpg' to match actual filenames
top_files = set(top_df['video'].astype(str).str.strip() + '.jpg')

# Debug check
print(f"✅ Matching against {len(top_files)} top-scoring image filenames.")


# === Step 5: Refilter test folder based on top sharpness
kept = 0
moved = 0

print("🚚 Filtering test set to top 4000 sharpest frames...")
for fname in tqdm(os.listdir(test_image_dir)):
    src_path = os.path.join(test_image_dir, fname)
    dst_path = os.path.join(discard_dir, fname)

    if fname in top_files:
        kept += 1
    else:
        if os.path.isfile(src_path):
            shutil.move(src_path, dst_path)
            moved += 1

print(f"\n✅ Done. Kept {kept} images with highest sharpness.")
print(f"🗑️ Moved {moved} images to discarded_test.")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Matching against 4000 top-scoring image filenames.
🚚 Filtering test set to top 4000 sharpest frames...


100%|██████████| 7760/7760 [00:16<00:00, 482.62it/s]


✅ Done. Kept 3423 images with highest sharpness.
🗑️ Moved 4337 images to discarded_test.





# Ensuring that the Test and Val sets are subsets of Train

In [None]:
import os
import shutil
import re
from tqdm import tqdm

# === Step 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# === Step 2: Define paths
base_path = '/content/drive/MyDrive/ASL Project/final_dataset/'
discard_path = '/content/drive/MyDrive/ASL Project/'

train_dir = os.path.join(base_path, 'train')
val_dir = os.path.join(base_path, 'val')
test_dir = os.path.join(base_path, 'test')

discard_val = os.path.join(discard_path, 'discarded_val')
discard_test = os.path.join(discard_path, 'discarded_test')

os.makedirs(discard_val, exist_ok=True)
os.makedirs(discard_test, exist_ok=True)

# === Step 3: Extract gloss from filename
def extract_cleaned_gloss(filename):
    try:
        # Extract the part after '-' and before '_' or '.jpg'
        base = filename.split('_')[0]           # e.g. '123456789-GLASS 3'
        gloss = base.split('-')[1]              # 'GLASS 3'

        # Remove trailing space + digits or trailing digits
        gloss = re.sub(r'\s*\d+$', '', gloss)   # 'GLASS 3' -> 'GLASS', 'GLASS3' -> 'GLASS'

        # Optional: normalize internal spacing
        gloss = gloss.strip().upper()

        return gloss
    except IndexError:
        print(f"⚠️ Could not extract gloss from: {filename}")
        return None

# === Step 4: Build allowed gloss set from train/
train_glosses = set()
for fname in os.listdir(train_dir):
    if os.path.isfile(os.path.join(train_dir, fname)):
        gloss = extract_cleaned_gloss(fname)
        if gloss:
            train_glosses.add(gloss)

print(f"✅ Found {len(train_glosses)} unique glosses in train set.")

# === Step 5: Function to filter folder
def filter_folder(folder, discard_folder, name):
    kept = 0
    total = 0
    for fname in tqdm(os.listdir(folder), desc=f"Filtering {name}"):
        file_path = os.path.join(folder, fname)
        if not os.path.isfile(file_path):
            continue
        total += 1
        gloss = extract_cleaned_gloss(fname)
        if gloss in train_glosses:
            kept += 1
        else:
            shutil.move(file_path, os.path.join(discard_folder, fname))
    print(f"📂 {name}: Kept {kept}/{total} images.")

# === Step 6: Apply to val/ and test/
filter_folder(val_dir, discard_val, 'val')
filter_folder(test_dir, discard_test, 'test')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Found 2787 unique glosses in train set.


Filtering val: 100%|██████████| 3957/3957 [00:00<00:00, 7966.58it/s]


📂 val: Kept 3957/3957 images.


Filtering test: 100%|██████████| 7760/7760 [00:00<00:00, 8169.87it/s]

📂 test: Kept 7760/7760 images.



