Building a webdataset

In [None]:
import os
import tarfile
import random
import json
import io
from pathlib import Path
import shutil
from typing import Dict, Any, List

# Set a random seed for reproducibility
random.seed(42)

# Directories containing the videos and metadata
# This script assumes that the name of the videos and the name of the metadata files are exactly the same, just changing the extension
video_dir = Path('videos/')  # Folder with .mp4 files
metadata_dir = Path('jsons_dataset_reformat/')  # Folder with .json files
output_dir = Path('hf_dataset/')  # Output directory for dataset


# Step 1: Find matching video-metadata pairs
video_files = {f.stem: f for f in video_dir.glob('*.mp4')}
metadata_files = {f.stem: f for f in metadata_dir.glob('*.json')}

# Find pairs of videos and metadata that have the same filename prefix
matched_files = [(video_files[f], metadata_files[f]) for f in video_files if f in metadata_files]

print(f"Matched files: {len(matched_files)}")
if not matched_files:
    raise ValueError("No matching video-metadata pairs found.")

# Step 2: Shuffle the matched files for randomness
random.shuffle(matched_files)

# Step 3: Build folder structure for the dataset
train_dir = output_dir / 'train'
train_dir.mkdir(parents=True, exist_ok=True)

def create_tar_files(pairs, output_dir):
    """Helper function to create TAR files with batch_size pairs of video/metadata each."""
    #Note: you should adjust 75 to the number that fits your dataset.
    # - You want to avoid having tar files bigger than 20GB (ideally 1-2gb per file)
    # - You don't want more than 10K tar files
    # more details: https://huggingface.co/docs/hub/repositories-recommendations

    
    batch_size = 75
    for i in range(0, len(pairs), batch_size):
        tar_filename = output_dir / f'{i // batch_size:05d}.tar'
        with tarfile.open(tar_filename, 'w') as tar:
            shard_metadata = {"num_examples": len(pairs[i:i + batch_size])}
            for j, (video_file, metadata_file) in enumerate(pairs[i:i + batch_size], start=i):

                # Create WebDataset compatible names
                base_name = f'{j:06d}'
                video_name = f'{base_name}.mp4'
                json_name = f'{base_name}.json'
                
                # Add files to tar with new names
                tar.add(video_file, arcname=video_name)
                
                # Read and modify JSON content
                with open(metadata_file, 'r') as f:
                    metadata = json.load(f)
                metadata['original_video_filename'] = video_file.name
                metadata['original_json_filename'] = metadata_file.name
                
                # Write modified JSON to tar
                json_content = json.dumps(metadata, ensure_ascii=False).encode('utf-8')
                json_info = tarfile.TarInfo(name=json_name)
                json_info.size = len(json_content)
                tar.addfile(json_info, fileobj=io.BytesIO(json_content))

            # Add shard metadata
            shard_metadata_content = json.dumps(shard_metadata).encode('utf-8')
            shard_metadata_info = tarfile.TarInfo(name='__meta__.json')
            shard_metadata_info.size = len(shard_metadata_content)
            tar.addfile(shard_metadata_info, fileobj=io.BytesIO(shard_metadata_content))
                
        print(f'Created {tar_filename} with {len(pairs[i:i + batch_size])} elements.')

# Step 4: Push all video/json pairs into TAR files of 75 elements
print("Creating TAR files for the dataset...")
create_tar_files(matched_files, train_dir)

print("TAR file creation complete.")

Now that the dataset is ready as a webdataset, we push it to the hub

In [None]:
from datasets import load_dataset
from huggingface_hub import HfApi
from datasets import config

# default is 100, given the size of the dataset, probably is better to make this number smaller

config.PARQUET_ROW_GROUP_SIZE_FOR_BINARY_DATASETS = 10 
def upload_dataset_to_hub(local_dir, repo_id):
    # Initialize Hugging Face API
    api = HfApi()

    # Ensure the repository exists (or create it)
    try:
        api.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True)
    except Exception as e:
        print(f"Error creating/checking repo: {e}")
        return

    # Load the dataset
    print("Loading dataset...")
    dataset = load_dataset("webdataset", data_dir=local_dir, split="train")
    
    # Removing some columns that are not used
    columns_to_remove = ['__key__', '__url__']
    updated_dataset = dataset.remove_columns(columns_to_remove)
    print("Dataset loaded. Pushing to Hub...")


    # Push the dataset to the Hub
    updated_dataset.push_to_hub(repo_id)

    print(f"Dataset successfully pushed to {repo_id}")

# Usage
local_dataset_dir = "hf_dataset"
hf_repo_id = "yourrepo/identifier"

upload_dataset_to_hub(local_dataset_dir, hf_repo_id)

Once pushed, you can consume the dataset like this:

In [None]:
from datasets import load_dataset
import os

#full dataset (600GB of data)
dataset = load_dataset("yourrepo/identifier", split="train")
print(dataset[0]['json']) # Access the metadata and speech to text of the first sample
dataset['0']['mp4'] # Access the video

#dataset streaming (will only download the data as needed)
dataset = load_dataset("yourrepo/identifier", split="train", streaming=True)
sample = next(iter(dataset))
print(sample['json'])

with open('sample.mp4', 'wb') as video_file:
    video_file.write(sample['mp4'])