In [1]:
import os
import subprocess
from typing import List, Dict, Any
from pathlib import Path
import numpy as np
import soundfile as sf
from loguru import logger

class AudioGenerator:
    """
    Extracts audio from a video and segments it according to timestamps.
    Segments are saved as .wav files in a temporary directory.
    """
    def __init__(self, video_path: str, tmp_audio_dir: str = "tmp_audio"):
        self.video_path = video_path
        self.tmp_audio_dir = Path(tmp_audio_dir)
        self.tmp_audio_dir.mkdir(parents=True, exist_ok=True)

    def extract_segments(self, segments: List[Dict[str, Any]],
                        sample_rate: int = 16000) -> List[str]:
        """
        For each segment (with start_time, end_time), extract audio and save as wav.
        Returns list of audio file paths.
        """
        audio_paths = []
        for idx, seg in enumerate(segments):
            start = self._parse_time(seg['start_time'])
            end = self._parse_time(seg.get('end_time'))
            duration = max(0.1, end - start) if end > start else 3.0
            out_path = self.tmp_audio_dir / f"segment_{idx:04d}.wav"
            print("*****")
            print(out_path)

            cmd = [
                "ffmpeg", "-y", "-i", str(self.video_path),
                "-ss", f"{start:.3f}", "-t", f"{duration:.3f}",
                "-ar", str(sample_rate), "-ac", "1", "-vn", str(out_path),
                "-loglevel", "error"
            ]
            result = subprocess.run(cmd)
            print(result)
            if result.returncode == 0 and out_path.exists():
                audio_paths.append(str(out_path))
            else:
                logger.error(f"Failed to extract audio segment {idx} ({start}-{end})")
                audio_paths.append("")
        return audio_paths

    def _parse_time(self, t: str) -> float:
        if not t:
            return 0.0
        t = t.replace(",", ".")
        parts = t.split(":")
        if len(parts) == 2:
            minutes, rest = parts
            seconds, ms = rest.split(".") if "." in rest else (rest, "0")
            return int(minutes) * 60 + int(seconds) + int(ms) / 1000
        elif len(parts) == 3:
            hours, minutes, rest = parts
            seconds, ms = rest.split(".") if "." in rest else (rest, "0")
            return int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(ms) / 1000
        return 0.0


In [2]:
from typing import List, Dict, Any, Optional
from pathlib import Path
import json
from loguru import logger
from datasets import Dataset, Audio, Features, Value

class HuggingfaceCooker:
    """A class to prepare and upload datasets to Huggingface Hub."""
    
    def __init__(
        self,
        token: Optional[str] = None,
        repository_id: Optional[str] = None
    ):
        """
        Initialize the Huggingface Cooker.
        
        Args:
            token: Huggingface API token (optional if not pushing to hub)
            repository_id: Repository ID to push to (optional)
        """
        self.token = token
        self.repository_id = repository_id
    
    def cook_dataset(
        self,
        ocr_results: List[Dict[str, Any]],
        video_path: str,
        output_dir: str,
        push_to_hub: bool = False,
        private: bool = False
    ) -> Dataset:
        """
        Cook a Huggingface dataset from OCR results and video file.
        
        Args:
            ocr_results: List of OCR results with timestamps and text
            video_path: Path to the video file
            output_dir: Directory to save the prepared dataset
            push_to_hub: Whether to push the dataset to Huggingface Hub
            
        Returns:
            The created Huggingface Dataset
        """
        try:
            output_dir = Path(output_dir)
            output_dir.mkdir(parents=True, exist_ok=True)
            
            audio_gen = AudioGenerator(video_path, tmp_audio_dir=output_dir/"tmp_audio")
            audio_paths = audio_gen.extract_segments(ocr_results)
            
            dataset_items = []
            for entry, audio_path in zip(ocr_results, audio_paths):
                if not entry.get('success', False):
                    continue
                
                text = entry.get('text', '').strip()
                if not text or not audio_path:
                    continue
                
                start_time = entry.get('start_time', entry.get('time_formatted', ''))
                
                dataset_items.append({
                    'audio': str(audio_path),
                    'text': text,
                    'start_time': start_time
                })
            
            features = Features({
                'audio': Audio(),
                'text': Value('string'),
                'start_time': Value('string')
            })
            
            dataset = Dataset.from_list(dataset_items, features=features)
            
            dataset.save_to_disk(str(output_dir))
            logger.info(f"✅ Successfully created dataset with {len(dataset)} entries")
            
            if push_to_hub and self.token:
                dataset.push_to_hub(
                    repo_id=self.repository_id,
                    token=self.token,
                    private=private 
                )
                logger.info(f"✅ Successfully pushed dataset to {self.repository_id}")
            
            return dataset
            
        except Exception as e:
            logger.error(f"❌ Error cooking dataset: {str(e)}")
            raise

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
ocr_results = [
    {
        "success": True,
        "text": "Bonjour, ceci est le début de la vidéo.",
        "start_time": "00:00:05",
        "end_time": "00:00:10"
    },
    {
        "success": True,
        "text": "Voici une autre phrase extraite à la minute 2.",
        "start_time": "00:00:11",
        "end_time": "00:02:50"
    },
    {
        "success": True,
        "text": "Fin de la vidéo, merci de votre attention.",
        "start_time": "00:02:51",
        "end_time": "00:04:00"
    }
]

video_path = r"C:/Users/sawal/Desktop/project X/Frame2Text4LLM/sandbox/sample_short.mp4"
output_dir = r'C:/Users/sawal/Desktop/project X/Frame2Text4LLM/sandbox'

cooker = HuggingfaceCooker(token=os.environ["HF_TOKEN"], repository_id="Frame2Text4LLM")
dataset = cooker.cook_dataset(
    ocr_results=ocr_results,
    video_path=video_path,
    output_dir=output_dir,
    push_to_hub=True  
)
print(True)

*****
C:\Users\sawal\Desktop\project X\Frame2Text4LLM\sandbox\tmp_audio\segment_0000.wav
CompletedProcess(args=['ffmpeg', '-y', '-i', 'C:/Users/sawal/Desktop/project X/Frame2Text4LLM/sandbox/sample_short.mp4', '-ss', '5.000', '-t', '5.000', '-ar', '16000', '-ac', '1', '-vn', 'C:\\Users\\sawal\\Desktop\\project X\\Frame2Text4LLM\\sandbox\\tmp_audio\\segment_0000.wav', '-loglevel', 'error'], returncode=0)
*****
C:\Users\sawal\Desktop\project X\Frame2Text4LLM\sandbox\tmp_audio\segment_0001.wav
CompletedProcess(args=['ffmpeg', '-y', '-i', 'C:/Users/sawal/Desktop/project X/Frame2Text4LLM/sandbox/sample_short.mp4', '-ss', '11.000', '-t', '159.000', '-ar', '16000', '-ac', '1', '-vn', 'C:\\Users\\sawal\\Desktop\\project X\\Frame2Text4LLM\\sandbox\\tmp_audio\\segment_0001.wav', '-loglevel', 'error'], returncode=0)
*****
C:\Users\sawal\Desktop\project X\Frame2Text4LLM\sandbox\tmp_audio\segment_0002.wav
CompletedProcess(args=['ffmpeg', '-y', '-i', 'C:/Users/sawal/Desktop/project X/Frame2Text4LLM/

Saving the dataset (1/1 shards): 100%|██████████| 3/3 [00:00<00:00, 89.16 examples/s] 
[32m2025-07-07 00:31:46.312[0m | [1mINFO    [0m | [36m__main__[0m:[36mcook_dataset[0m:[36m78[0m - [1m✅ Successfully created dataset with 3 entries[0m
Map: 100%|██████████| 3/3 [00:00<00:00, 161.02 examples/s]?, ?it/s]
Creating parquet from Arrow format: 100%|██████████| 1/1 [00:00<00:00, 37.37ba/s]
Uploading the dataset shards: 100%|██████████| 1/1 [00:00<00:00,  1.74it/s]
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
[32m2025-07-07 00:31:48.834[0m | [1mINFO    [0m | [36m__main__[0m:[36mcook_dataset[0m:[36m86[0m - [1m✅ Successfully pushed dataset to Frame2Text4LLM[0m


True
