In [3]:
import sys
data_folder_path = "/mnt/mmlab2024nas/anhndt/Batch1/frames/Videos_L21_a"
sys.path.append(data_folder_path)

In [4]:
from pathlib import Path
from typing import Union, List
from concurrent.futures import ThreadPoolExecutor, as_completed


def collect_jpg_paths(folder_path: Union[Path, str], max_workers: int = 8) -> List[str]:
    """
    Scan the given folder and return a list of all .jpg file paths found 
    in its subfolders using multithreading for faster I/O operations.

    Args:
        folder_path: Root folder to scan (can be Path or string)
        max_workers: Number of threads to use for scanning

    Returns:
        List of file paths to all .jpg images in the folder and its subfolders
    """
    # Ensure folder_path is a Path object
    if isinstance(folder_path, str):
        folder_path = Path(folder_path)

    # Get first-level subdirectories only
    subfolders = [p for p in folder_path.iterdir() if p.is_dir()]

    # List to collect all jpg file paths
    jpg_files = []

    def scan_folder(subfolder: Path) -> List[str]:
        """Scan all .jpg files recursively in a single subfolder."""
        return [str(p) for p in subfolder.rglob("*.jpg")]

    # Use ThreadPoolExecutor to scan multiple subfolders concurrently
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit tasks for each subfolder
        futures = [executor.submit(scan_folder, subfolder)
                   for subfolder in subfolders]

        # Collect results as tasks complete
        for future in as_completed(futures):
            jpg_files.extend(future.result())

    return jpg_files



In [7]:
jpg_list = collect_jpg_paths(data_folder_path, max_workers=8)
len(jpg_list)

31945

In [6]:
jpg_path = jpg_list[0]
jpg_path

'/mnt/mmlab2024nas/anhndt/Batch1/frames/Videos_L21_a/L21_V007/L21_V007_00031.jpg'

In [8]:
import re

path = "/mnt/mmlab2024nas/anhndt/Batch1/frames/Videos_L23_a/L23_V001/L23_V001_00026.jpg"

# Regex pattern:
# 1. Lấy L23_a sau 'Videos_'
# 2. Lấy V001 trong tên folder/video
# 3. Lấy 00026 trước .jpg
pattern = r"Videos_(L\d+_[a-z])/.+?/(L\d+_(V\d+)_(\d+))\.jpg"

match = re.search(pattern, path)

if match:
    video_folder = match.group(1)  # L23_a
    video_id = match.group(3)      # V001
    frame_id = match.group(4)      # 00026
    print(video_folder, video_id, frame_id)

L23_a V001 00026


In [11]:
import re

def parse_path_info(example: str):
    """
    Parse video path to extract:
    - video_folder (e.g., L23_a)
    - video_id (e.g., V001)
    - frame_id (e.g., 00026)
    
    Raises:
        ValueError: if the path does not match the expected pattern
    """
    pattern = r"Videos_(L\d+_[a-z])/.+?/(L\d+_(V\d+)_(\d+))\.jpg"
    match = re.search(pattern, example)

    if match:
        video_folder = match.group(1)  # L23_a
        video_id = match.group(3)      # V001
        frame_id = match.group(4)      # 00026
        return video_folder, video_id, frame_id
    else:
        raise ValueError(f"Invalid path format: {example}")


In [12]:
from tqdm import tqdm
frames_info = []
for jpg_path in tqdm(jpg_list, desc="Parsing path info..."):
    video_folder, video_id, frame_id = parse_path_info(jpg_path)
    frame_info = {
        "video_folder": video_folder,
        "video_id": video_id,
        "frame_id": frame_id,
        "path": jpg_path
    }
    frames_info.append(frame_info)

Parsing path info...: 100%|██████████| 340323/340323 [00:00<00:00, 572045.67it/s]


In [13]:
frames_info[0]

{'video_folder': 'L23_a',
 'video_id': 'V001',
 'frame_id': '00026',
 'path': '/mnt/mmlab2024nas/anhndt/Batch1/frames/Videos_L23_a/L23_V001/L23_V001_00026.jpg'}

In [23]:
from pathlib import Path
from typing import Union, List
from concurrent.futures import ThreadPoolExecutor, as_completed
import re


def parse_path_info(example: str):
    """
    Parse a single .jpg file path to extract metadata.

    Example input:
        /mnt/.../Videos_L23_a/L23_V001/L23_V001_00026.jpg

    Returns:
        dict with keys:
            - video_folder: e.g., "L23_a"
            - video_id: e.g., "V001"
            - frame_id: e.g., "00026"
            - path: the original file path

    Raises:
        ValueError: if the path does not match the expected format.
    """
    # Regex pattern to extract video folder, video id, and frame id
    pattern = r"Videos_(L\d+_[a-z])/.+?/(L\d+_(V\d+)_(\d+))\.jpg"
    match = re.search(pattern, example)

    if match:
        # Extract and return metadata as a dictionary
        video_folder = match.group(1)  # Example: L23_a
        video_id = match.group(3)      # Example: V001
        frame_id = match.group(4)      # Example: 00026
        return {
            "video_folder": video_folder,
            "video_id": video_id,
            "frame_id": frame_id,
            "path": example
        }
    else:
        # Raise an error if the path format is unexpected
        raise ValueError(f"Invalid path format: {example}")


def parse_frames_info(folder_path: Union[Path, str], max_workers: int = 16) -> List[dict]:
    """
    Recursively scan a folder for .jpg files and parse each into metadata using multithreading.

    Args:
        folder_path: Root folder containing video frame subfolders (Path or string).
        max_workers: Number of threads to use for concurrent scanning.

    Returns:
        List of dictionaries, each containing:
            {
                "video_folder": str,
                "video_id": str,
                "frame_id": str,
                "path": str
            }
    """
    # Convert folder_path to Path object if needed
    if isinstance(folder_path, str):
        folder_path = Path(folder_path)

    # Get first-level subdirectories to parallelize scanning
    subfolders = [p for p in folder_path.iterdir() if p.is_dir()]

    # Container to store all parsed frame metadata
    frames_info = []

    def scan_folder(subfolder: Path) -> List[dict]:
        """
        Recursively scan a single subfolder for .jpg files
        and parse each path into a metadata dictionary.
        """
        return [parse_path_info(str(p)) for p in subfolder.rglob("*.jpg")]

    # Use ThreadPoolExecutor to scan subfolders concurrently
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit a scan task for each subfolder
        futures = [executor.submit(scan_folder, subfolder) for subfolder in subfolders]

        # Collect results as tasks complete
        for future in as_completed(futures):
            frames_info.extend(future.result())

    return frames_info


In [24]:
frames_info = parse_frames_info(data_folder_path, max_workers=8)

output_file = "frames_info.txt"

# Write each frame_info tuple as a line
with open(output_file, "w") as f:
    for info in frames_info:
        f.write(f"{info}\n")

print(f"Saved {len(frames_info)} frame infos to {output_file}")


Saved 340323 frame infos to frames_info.txt


In [25]:
import json

frames_info = parse_frames_info(data_folder_path, max_workers=8)

output_file = "frames_info.json"

# Save list[dict] to a JSON file with pretty formatting
with open(output_file, "w", encoding="utf-8") as f:
    json.dump(frames_info, f, indent=2, ensure_ascii=False)

print(f"Saved {len(frames_info)} frames info to {output_file}")


Saved 340323 frames info to frames_info.json
