<a href="https://colab.research.google.com/github/kavish-24/Konkani_Mentall_Health/blob/main/DataPreparationPrudentMedia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install evaluate odfpy pydub


Collecting evaluate
  Downloading evaluate-0.4.6-py3-none-any.whl.metadata (9.5 kB)
Collecting odfpy
  Downloading odfpy-1.4.1.tar.gz (717 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m717.0/717.0 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading evaluate-0.4.6-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: odfpy
  Building wheel for odfpy (setup.py) ... [?25l[?25hdone
  Created wheel for odfpy: filename=odfpy-1.4.1-py2.py3-none-any.whl size=160673 sha256=3d68e8ce6e862b9e708ef6583da209a090c20ac1361939528cf005642fac446c
  Stored in directory: /root/.cache/pip/wheels/36/5d/63/8243a7ee78fff0f944d638fd0e66d7278888f5e2285d7346b6
Successfully built odfpy
Installing collected packages: odfpy, evaluate
Successfully installed evaluate-0.4.6 odfpy-1.4.1


In [None]:
from odf import text, teletype
from odf.opendocument import OpenDocumentText
from odf.style import Style, TextProperties
from odf.text import P
import os
import re

def extract_text_from_odt(odt_path, skip_bold=True, debug=False):
    """
    Extract text from ODT file, skipping all bold text (including letters, spaces, and punctuation)
    and removing all noise (punctuation, extra spaces) from non-bold text.

    Args:
        odt_path: Path to input ODT file
        skip_bold: If True, skip all bold text
        debug: If True, print debug information

    Returns:
        List of cleaned non-bold text paragraphs
    """
    from odf.opendocument import load

    if not os.path.exists(odt_path):
        raise FileNotFoundError(f"ODT file not found: {odt_path}")

    try:
        doc = load(odt_path)
    except Exception as e:
        raise ValueError(f"Failed to load ODT file: {e}")

    bold_styles = set()

    # Check automatic styles for bold
    for style in doc.automaticstyles.getElementsByType(Style):
        style_name = style.getAttribute('name')
        for prop in style.getElementsByType(TextProperties):
            font_weight = prop.getAttribute('fontweight')
            if font_weight and 'bold' in str(font_weight).lower():
                bold_styles.add(style_name)
                if debug:
                    print(f"   Found bold style: {style_name}")

    # Check named styles for bold
    for style in doc.styles.getElementsByType(Style):
        style_name = style.getAttribute('name')
        for prop in style.getElementsByType(TextProperties):
            font_weight = prop.getAttribute('fontweight')
            if font_weight and 'bold' in str(font_weight).lower():
                bold_styles.add(style_name)
                if debug:
                    print(f"   Found bold style: {style_name}")

    print(f"\n🔍 Detected {len(bold_styles)} bold styles: {bold_styles}")

    extracted_parts = []
    skipped_count = 0
    kept_count = 0
    skipped_text_samples = []
    kept_text_samples = []

    def get_style_name(node):
        """Get style name from a node using multiple methods"""
        # Try different attribute names
        for attr_name in ['stylename', 'style-name']:
            try:
                style = node.getAttribute(attr_name)
                if style:
                    return style
            except:
                pass

        # Try namespace-aware retrieval
        try:
            style = node.getAttrNS(
                "urn:oasis:names:tc:opendocument:xmlns:text:1.0",
                "style-name"
            )
            if style:
                return style
        except:
            pass

        return None

    def is_node_bold(node):
        """Check if a node has bold styling"""
        style_name = get_style_name(node)
        if style_name and style_name in bold_styles:
            return True
        return False

    def process_node(node, parent_is_bold=False):
        """Recursively process a node and its children"""
        nonlocal skipped_count, kept_count, skipped_text_samples, kept_text_samples

        result_text = ""

        # Check if current node is bold
        current_is_bold = parent_is_bold or is_node_bold(node)

        if node.nodeType == node.TEXT_NODE:
            node_text = node.data
            if node_text.strip():
                if skip_bold and current_is_bold:
                    skipped_count += 1
                    if len(skipped_text_samples) < 5:
                        skipped_text_samples.append(node_text.strip()[:50])
                    if debug:
                        print(f"   [SKIP] TEXT_NODE (bold): {repr(node_text.strip()[:50])}")
                else:
                    result_text += node_text
                    kept_count += 1
                    if len(kept_text_samples) < 5:
                        kept_text_samples.append(node_text.strip()[:50])
                    if debug:
                        print(f"   [KEEP] TEXT_NODE: {repr(node_text.strip()[:50])}")

        elif node.nodeType == node.ELEMENT_NODE:
            # For span elements, check if they're bold
            if node.tagName == "text:span":
                span_is_bold = current_is_bold or is_node_bold(node)

                # Get all text from this span (including nested elements)
                span_full_text = teletype.extractText(node)

                if skip_bold and span_is_bold:
                    skipped_count += 1
                    if span_full_text.strip() and len(skipped_text_samples) < 5:
                        skipped_text_samples.append(span_full_text.strip()[:50])
                    if debug:
                        style_name = get_style_name(node)
                        print(f"   [SKIP] SPAN (bold - {style_name}): {repr(span_full_text[:50])}")
                    # Don't process children if parent span is bold
                    return ""
                else:
                    if debug and span_full_text.strip():
                        style_name = get_style_name(node)
                        print(f"   [KEEP] SPAN ({style_name}): {repr(span_full_text[:50])}")
                    # Process children with current bold status
                    for child in node.childNodes:
                        result_text += process_node(child, span_is_bold)
            else:
                # For other elements, process children
                for child in node.childNodes:
                    result_text += process_node(child, current_is_bold)

        return result_text

    for paragraph in doc.getElementsByType(text.P):
        # Check if paragraph itself is bold
        para_is_bold = is_node_bold(paragraph)

        if debug and para_is_bold:
            print(f"\n⚠️  Paragraph itself is BOLD - will skip all content")

        para_text = ""
        for node in paragraph.childNodes:
            para_text += process_node(node, para_is_bold)

        if para_text.strip():
            # Clean text: remove all punctuation and normalize spaces
            para_text = re.sub(r'\.{2,}', ' ', para_text)  # Replace 2+ dots with single space
            para_text = re.sub(r'[!?,;:"\'()\[\]{}\-—*]+', '', para_text)  # Remove other punctuation

            para_text = re.sub(r'\s+', ' ', para_text)  # Normalize spaces
            para_text = para_text.strip()
            if para_text:
                extracted_parts.append(para_text)

    print(f"\n📊 Extraction summary:")
    print(f"   Kept: {kept_count} elements")
    print(f"   Skipped (bold): {skipped_count} elements")
    print(f"   Extracted paragraphs: {len(extracted_parts)}")

    if skipped_count > 0:
        print(f"\n❌ Sample SKIPPED bold text:")
        for sample in skipped_text_samples:
            print(f"   • {repr(sample)}...")

    if kept_count > 0:
        print(f"\n✅ Sample KEPT non-bold text:")
        for sample in kept_text_samples:
            print(f"   • {repr(sample)}...")

    if extracted_parts:
        print(f"\n🧼 Sample CLEANED paragraphs:")
        for i, para in enumerate(extracted_parts[:3]):
            print(f"   • [{i}] {repr(para)[:100]}...")

    if skipped_count == 0 and skip_bold:
        print(f"\n⚠️  WARNING: No bold text was found to skip! Check ODT styles.")

    return extracted_parts

def create_new_odt(output_path, paragraphs):
    """
    Create a new ODT file with the given paragraphs.

    Args:
        output_path: Path to save the new ODT file
        paragraphs: List of text paragraphs to include
    """
    doc = OpenDocumentText()

    for para_text in paragraphs:
        p = P(text=para_text)
        doc.text.addElement(p)

    try:
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        doc.save(output_path)
        print(f"\n💾 New ODT file created: {output_path}")
    except Exception as e:
        raise ValueError(f"Failed to save new ODT file: {e}")

def main():
    # Configuration
    INPUT_ODT_PATH = os.getenv("TRANSCRIPT_FILE_PATH", "/content/drive/MyDrive/Anju Project (1)/Audio Prudent media (1)/August 2017 (1)/dataset/10  AUG PRIME.odt")
    OUTPUT_ODT_PATH = os.getenv("OUTPUT_ODT_PATH", "/content/drive/MyDrive/Anju Project (1)/Audio Prudent media (1)/August 2017 (1)/dataset/10 AUG PRIME_non_bold.odt")
    DEBUG = True

    print("="*70)
    print("EXTRACTING NON-BOLD TEXT AND REMOVING ALL NOISE")
    print("="*70)

    try:
        print("\n📄 Extracting text from ODT file...")
        paragraphs = extract_text_from_odt(INPUT_ODT_PATH, skip_bold=True, debug=DEBUG)

        print(f"\n✓ Extracted {len(paragraphs)} paragraphs")
        if paragraphs:
            print(f"   Sample cleaned paragraph: {repr(paragraphs[0])[:150]}...")

        print("\n📝 Creating new ODT file...")
        create_new_odt(OUTPUT_ODT_PATH, paragraphs)

        print("\n" + "="*70)
        print("✅ PROCESS COMPLETE!")
        print("="*70)

    except Exception as e:
        print(f"\n❌ Process failed: {e}")
        raise

if __name__ == "__main__":
    main()

In [7]:
"""
Simplified Audio Segmentation Using Whisper-Only Approach
=========================================================

This script uses Whisper's built-in capabilities for accurate audio-text alignment
without complex manual matching. Perfect for Google Colab.

Approach:
1. Use Silero VAD to detect speech segments
2. Let Whisper transcribe each segment with word timestamps
3. Validate and create training manifest

No complex alignment needed - Whisper handles it!
"""

# ============================================================================
# INSTALLATION (Run this first in Colab)
# ============================================================================
"""
!pip install -q faster-whisper
!pip install -q torch torchaudio
!pip install -q librosa soundfile
!pip install -q odfpy
!pip install -q tqdm
!apt-get install -y ffmpeg
"""

# ============================================================================
# IMPORTS
# ============================================================================
import os
import json
import re
import unicodedata
from pathlib import Path
from typing import List, Dict, Any
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import librosa
import soundfile as sf
import torch
from tqdm import tqdm

# For ODT reading
from odf.opendocument import load
from odf import text, teletype

# Whisper
from faster_whisper import WhisperModel

# ============================================================================
# CONFIGURATION
# ============================================================================
class Config:
    """Configuration for audio segmentation."""

    # Paths (modify these for your Colab setup)
    OUTPUT_DIR = "/content/drive/MyDrive/dataset/whisper_segments"

    # Whisper settings
    WHISPER_MODEL = "small"  # Options: tiny, base, small, medium, large
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
    COMPUTE_TYPE = "float16" if torch.cuda.is_available() else "int8"

    # VAD settings
    VAD_THRESHOLD = 0.5
    MIN_SPEECH_DURATION = 0.5  # seconds
    MIN_SILENCE_DURATION = 0.3  # seconds

    # Segment settings
    MIN_SEGMENT_DURATION = 1.0
    MAX_SEGMENT_DURATION = 30.0
    TARGET_SEGMENT_DURATION = 10.0  # Ideal segment length

    # Language
    LANGUAGE = "mr"  # Marathi (closest to Konkani in Whisper)

    # Quality thresholds
    MIN_CONFIDENCE = 0.3  # Minimum word probability
    MIN_WORDS_PER_SEGMENT = 3


# ============================================================================
# SILERO VAD (Voice Activity Detection)
# ============================================================================
class SileroVAD:
    """Silero VAD for detecting speech segments."""

    def __init__(self):
        print("Loading Silero VAD model...")
        try:
            self.model, utils = torch.hub.load(
                repo_or_dir='snakers4/silero-vad',
                model='silero_vad',
                force_reload=False,
                onnx=False
            )
            self.get_speech_timestamps = utils[0]
            print("✓ Silero VAD loaded")
        except Exception as e:
            print(f"⚠ Could not load Silero VAD: {e}")
            print("Falling back to energy-based VAD")
            self.model = None

    def detect_speech(self, audio_path, threshold=0.5, min_speech_ms=250, min_silence_ms=100):
        """Detect speech segments in audio file."""

        # Load audio at 16kHz (required by Silero)
        audio, sr = librosa.load(audio_path, sr=16000, mono=True)

        if self.model is not None:
            # Use Silero VAD
            audio_tensor = torch.from_numpy(audio)

            speech_timestamps = self.get_speech_timestamps(
                audio_tensor,
                self.model,
                threshold=threshold,
                min_speech_duration_ms=min_speech_ms,
                min_silence_duration_ms=min_silence_ms,
                sampling_rate=16000
            )

            segments = [
                {
                    'start': ts['start'] / 16000,
                    'end': ts['end'] / 16000
                }
                for ts in speech_timestamps
            ]
        else:
            # Fallback: Energy-based VAD
            segments = self._energy_based_vad(audio, sr)

        return segments

    def _energy_based_vad(self, audio, sr, frame_length=2048, hop_length=512):
        """Fallback energy-based VAD."""
        # Calculate RMS energy
        rms = librosa.feature.rms(y=audio, frame_length=frame_length, hop_length=hop_length)[0]

        # Threshold
        threshold = np.mean(rms) * 1.5

        # Find speech frames
        speech_frames = rms > threshold

        # Convert to time segments
        times = librosa.frames_to_time(np.arange(len(speech_frames)), sr=sr, hop_length=hop_length)

        segments = []
        in_speech = False
        start_time = 0

        for i, is_speech in enumerate(speech_frames):
            if is_speech and not in_speech:
                start_time = times[i]
                in_speech = True
            elif not is_speech and in_speech:
                segments.append({'start': start_time, 'end': times[i]})
                in_speech = False

        if in_speech:
            segments.append({'start': start_time, 'end': times[-1]})

        return segments


# ============================================================================
# WHISPER ALIGNER
# ============================================================================
class WhisperAligner:
    """Simplified aligner using only Whisper."""

    def __init__(self, config: Config = None):
        self.config = config or Config()

        # Create output directories
        self.output_dir = Path(self.config.OUTPUT_DIR)
        self.audio_dir = self.output_dir / "audio"
        self.text_dir = self.output_dir / "text"
        self.metadata_dir = self.output_dir / "metadata"

        for dir_path in [self.audio_dir, self.text_dir, self.metadata_dir]:
            dir_path.mkdir(parents=True, exist_ok=True)

        # Initialize VAD
        self.vad = SileroVAD()

        # Initialize Whisper
        print(f"Loading Whisper model: {self.config.WHISPER_MODEL}...")
        self.whisper = WhisperModel(
            self.config.WHISPER_MODEL,
            device=self.config.DEVICE,
            compute_type=self.config.COMPUTE_TYPE
        )
        print(f"✓ Whisper loaded on {self.config.DEVICE}")

    def load_transcript_from_odt(self, odt_path: str) -> str:
        """Load transcript from ODT file."""
        print(f"Loading transcript: {Path(odt_path).name}")

        doc = load(str(odt_path))
        paragraphs = []

        for paragraph in doc.getElementsByType(text.P):
            para_text = teletype.extractText(paragraph)
            if para_text.strip():
                paragraphs.append(para_text.strip())

        full_text = " ".join(paragraphs)
        print(f"✓ Loaded {len(paragraphs)} paragraphs, {len(full_text)} characters")
        return full_text

    def process_audio_file(
        self,
        audio_path: str,
        odt_path: str = None,
        session_id: str = None
    ) -> List[Dict[str, Any]]:
        """
        Process audio file using Whisper-only approach.

        Args:
            audio_path: Path to audio file
            odt_path: Optional path to reference transcript (for validation only)
            session_id: Session identifier

        Returns:
            List of segment metadata
        """
        audio_path = Path(audio_path)

        if session_id is None:
            session_id = audio_path.stem

        print(f"\n{'='*80}")
        print(f"PROCESSING: {audio_path.name}")
        print(f"Session: {session_id}")
        print(f"{'='*80}\n")

        # Load reference transcript if provided
        reference_transcript = None
        if odt_path:
            reference_transcript = self.load_transcript_from_odt(odt_path)

        # Step 1: Detect speech segments with VAD
        print("Step 1: Detecting speech segments...")
        vad_segments = self.vad.detect_speech(
            str(audio_path),
            threshold=self.config.VAD_THRESHOLD,
            min_speech_ms=int(self.config.MIN_SPEECH_DURATION * 1000),
            min_silence_ms=int(self.config.MIN_SILENCE_DURATION * 1000)
        )
        print(f"✓ Found {len(vad_segments)} speech segments")

        # Step 2: Merge short segments
        merged_segments = self._merge_short_segments(vad_segments)
        print(f"✓ Merged to {len(merged_segments)} segments")

        # Step 3: Transcribe each segment with Whisper
        print("\nStep 2: Transcribing with Whisper...")
        all_segments = []

        for idx, vad_seg in enumerate(tqdm(merged_segments, desc="Transcribing")):
            # Extract audio segment
            segment_audio, sr = librosa.load(
                str(audio_path),
                sr=16000,
                offset=vad_seg['start'],
                duration=vad_seg['end'] - vad_seg['start']
            )

            # Save temporary audio file for Whisper
            temp_audio = self.output_dir / f"temp_{idx}.wav"
            sf.write(temp_audio, segment_audio, sr)

            try:
                # Transcribe with Whisper
                segments, info = self.whisper.transcribe(
                    str(temp_audio),
                    language=self.config.LANGUAGE,
                    word_timestamps=True,
                    beam_size=5,
                    best_of=5,
                    temperature=0.0,
                    vad_filter=False  # We already did VAD
                )

                # Process segments
                for seg in segments:
                    if not seg.text.strip():
                        continue

                    # Adjust timestamps to original audio
                    adjusted_start = vad_seg['start'] + seg.start
                    adjusted_end = vad_seg['start'] + seg.end

                    # Extract word-level info
                    words = []
                    if hasattr(seg, 'words') and seg.words:
                        words = [
                            {
                                'word': w.word.strip(),
                                'start': vad_seg['start'] + w.start,
                                'end': vad_seg['start'] + w.end,
                                'probability': w.probability
                            }
                            for w in seg.words
                        ]

                    all_segments.append({
                        'start': adjusted_start,
                        'end': adjusted_end,
                        'duration': adjusted_end - adjusted_start,
                        'text': seg.text.strip(),
                        'words': words,
                        'avg_logprob': seg.avg_logprob if hasattr(seg, 'avg_logprob') else 0.0
                    })

            finally:
                # Clean up temp file
                if temp_audio.exists():
                    temp_audio.unlink()

        print(f"\n✓ Transcribed {len(all_segments)} segments")

        # Step 3: Validate and filter segments
        print("\nStep 3: Validating segments...")
        valid_segments = self._validate_segments(all_segments)
        print(f"✓ {len(valid_segments)}/{len(all_segments)} segments passed validation")

        # Step 4: Save segments
        print("\nStep 4: Saving segments...")
        saved_segments = self._save_segments(
            valid_segments,
            audio_path,
            session_id,
            reference_transcript
        )

        print(f"\n{'='*80}")
        print(f"✓ COMPLETE! Created {len(saved_segments)} segments")
        print(f"{'='*80}\n")

        return saved_segments

    def _merge_short_segments(self, segments: List[Dict]) -> List[Dict]:
        """Merge segments that are too short."""
        if not segments:
            return []

        merged = []
        current = segments[0].copy()

        for next_seg in segments[1:]:
            current_duration = current['end'] - current['start']
            gap = next_seg['start'] - current['end']

            # Merge if current is too short and gap is small
            if current_duration < self.config.TARGET_SEGMENT_DURATION and gap < 1.0:
                current['end'] = next_seg['end']
            else:
                merged.append(current)
                current = next_seg.copy()

        merged.append(current)
        return merged

    def _validate_segments(self, segments: List[Dict]) -> List[Dict]:
        """Validate and filter segments based on quality criteria."""
        valid = []

        for seg in segments:
            # Check duration
            if seg['duration'] < self.config.MIN_SEGMENT_DURATION:
                continue
            if seg['duration'] > self.config.MAX_SEGMENT_DURATION:
                continue

            # Check text quality
            text = seg['text'].strip()
            if not text:
                continue

            words = text.split()
            if len(words) < self.config.MIN_WORDS_PER_SEGMENT:
                continue

            # Check word-level confidence
            if seg.get('words'):
                avg_prob = np.mean([w['probability'] for w in seg['words']])
                if avg_prob < self.config.MIN_CONFIDENCE:
                    continue

            valid.append(seg)

        return valid

    def _save_segments(
        self,
        segments: List[Dict],
        audio_path: Path,
        session_id: str,
        reference_transcript: str = None
    ) -> List[Dict]:
        """Save segments to disk and create manifest."""
        saved_segments = []

        # Load full audio once
        print("Loading audio for extraction...")
        audio, sr = librosa.load(str(audio_path), sr=16000, mono=True)

        for idx, seg in enumerate(tqdm(segments, desc="Saving segments")):
            segment_id = f"{session_id}_{idx:04d}"

            # Paths
            audio_file = self.audio_dir / f"{segment_id}.wav"
            text_file = self.text_dir / f"{segment_id}.txt"

            try:
                # Extract audio segment
                start_sample = int(seg['start'] * sr)
                end_sample = int(seg['end'] * sr)
                segment_audio = audio[start_sample:end_sample]

                # Save audio
                sf.write(audio_file, segment_audio, sr)

                # Save text
                clean_text = self._clean_text(seg['text'])
                with open(text_file, 'w', encoding='utf-8') as f:
                    f.write(clean_text)

                # Create metadata entry
                metadata = {
                    'segment_id': segment_id,
                    'audio_filepath': f"audio/{segment_id}.wav",
                    'text_filepath': f"text/{segment_id}.txt",
                    'text': clean_text,
                    'start_time': float(seg['start']),
                    'end_time': float(seg['end']),
                    'duration': float(seg['duration']),
                    'word_count': len(clean_text.split()),
                    'language': self.config.LANGUAGE,
                    'avg_confidence': float(np.mean([w['probability'] for w in seg.get('words', [])])) if seg.get('words') else 0.0
                }

                saved_segments.append(metadata)

            except Exception as e:
                print(f"\n⚠ Error saving segment {segment_id}: {e}")
                continue

        # Save manifest
        manifest = {
            'session_id': session_id,
            'audio_file': str(audio_path.name),
            'total_segments': len(saved_segments),
            'total_duration': sum(s['duration'] for s in saved_segments),
            'language': self.config.LANGUAGE,
            'segments': saved_segments
        }

        manifest_file = self.metadata_dir / f"{session_id}_manifest.json"
        with open(manifest_file, 'w', encoding='utf-8') as f:
            json.dump(manifest, f, indent=2, ensure_ascii=False)

        # Also save JSONL format for Whisper fine-tuning
        jsonl_file = self.metadata_dir / f"{session_id}_train.jsonl"
        with open(jsonl_file, 'w', encoding='utf-8') as f:
            for seg in saved_segments:
                entry = {
                    'audio': seg['audio_filepath'],
                    'text': seg['text'],
                    'duration': seg['duration']
                }
                f.write(json.dumps(entry, ensure_ascii=False) + '\n')

        print(f"\n✓ Saved manifest to: {manifest_file}")
        print(f"✓ Saved JSONL to: {jsonl_file}")

        return saved_segments

    @staticmethod
    def _clean_text(text: str) -> str:
        """Clean and normalize text."""
        # Normalize Unicode
        text = unicodedata.normalize("NFC", text)

        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text)

        # Strip
        text = text.strip()

        return text


# ============================================================================
# MAIN EXECUTION
# ============================================================================
def main():
    """Main execution function."""

    # Configuration
    config = Config()

    # Initialize aligner
    aligner = WhisperAligner(config)

    # File list - MODIFY THESE PATHS FOR YOUR SETUP
    files_to_process = [
        {
            'audio_path': '/content/drive/MyDrive/dataset/Konkani Prime News_100817.wav',  # Your audio file
            'odt_path': '/content/drive/MyDrive/dataset/10 AUG PRIME_non_bold (1).odt',  # Optional: reference transcript
            'session_id': 'session_001'
        }
    ]

    # Process files
    all_segments = []

    for file_info in files_to_process:
        try:
            # Check if files exist
            if not os.path.exists(file_info['audio_path']):
                print(f"❌ Audio file not found: {file_info['audio_path']}")
                continue

            # Process
            segments = aligner.process_audio_file(
                audio_path=file_info['audio_path'],
                odt_path=file_info.get('odt_path'),
                session_id=file_info.get('session_id')
            )

            all_segments.extend(segments)

        except Exception as e:
            print(f"❌ Error processing {file_info['audio_path']}: {e}")
            import traceback
            traceback.print_exc()

    # Summary
    print(f"\n{'='*80}")
    print(f"FINAL SUMMARY")
    print(f"{'='*80}")
    print(f"Total segments created: {len(all_segments)}")
    print(f"Total duration: {sum(s['duration'] for s in all_segments):.2f}s")
    print(f"Output directory: {config.OUTPUT_DIR}")
    print(f"{'='*80}\n")


# ============================================================================
# RUN
# ============================================================================
if __name__ == "__main__":
    main()

Loading Silero VAD model...


Using cache found in /root/.cache/torch/hub/snakers4_silero-vad_master


✓ Silero VAD loaded
Loading Whisper model: small...
✓ Whisper loaded on cuda

PROCESSING: Konkani Prime News_100817.wav
Session: session_001

Loading transcript: 10 AUG PRIME_non_bold (1).odt
✓ Loaded 27 paragraphs, 6076 characters
Step 1: Detecting speech segments...
✓ Found 71 speech segments
✓ Merged to 37 segments

Step 2: Transcribing with Whisper...


Transcribing: 100%|██████████| 37/37 [01:00<00:00,  1.63s/it]



✓ Transcribed 75 segments

Step 3: Validating segments...
✓ 68/75 segments passed validation

Step 4: Saving segments...
Loading audio for extraction...


Saving segments: 100%|██████████| 68/68 [00:01<00:00, 38.34it/s]



✓ Saved manifest to: /content/drive/MyDrive/dataset/whisper_segments/metadata/session_001_manifest.json
✓ Saved JSONL to: /content/drive/MyDrive/dataset/whisper_segments/metadata/session_001_train.jsonl

✓ COMPLETE! Created 68 segments


FINAL SUMMARY
Total segments created: 68
Total duration: 336.08s
Output directory: /content/drive/MyDrive/dataset/whisper_segments



In [12]:
!pip install numpy librosa faster-whisper tqdm odfpy soundfile python-Levenshtein



In [17]:
import os
import re
from pathlib import Path
from odf import text, teletype
from odf.opendocument import load

class PhoneticTranscriptMatcher:
    def __init__(self, whisper_dir, odt_path, output_dir, noise_threshold=10):
        self.whisper_dir = Path(whisper_dir)
        self.odt_path = Path(odt_path)
        self.output_dir = Path(output_dir)
        self.noise_threshold = noise_threshold  # Minimum word count
        self.reference_paragraphs = []
        self.used_paragraphs = set()  # Track which paragraphs have been used

    def read_odt(self):
        """Extract text from ODT file"""
        print("Reading ODT reference document...")
        try:
            doc = load(self.odt_path)
            all_paragraphs = doc.getElementsByType(text.P)

            for para in all_paragraphs:
                para_text = teletype.extractText(para)
                if para_text.strip():
                    self.reference_paragraphs.append(para_text.strip())

            print(f"Loaded {len(self.reference_paragraphs)} paragraphs from ODT")
            return True
        except Exception as e:
            print(f"Error reading ODT: {e}")
            return False

    def normalize_marathi_phonetic(self, char):
        """Normalize Marathi/Konkani characters to their phonetic equivalents"""
        # Remove vowel marks (matras)
        vowel_marks = ['ा', 'ि', 'ी', 'ु', 'ू', 'े', 'ै', 'ो', 'ौ', 'ं', 'ः', '़', 'ृ', 'ॅ']
        if char in vowel_marks:
            return ''

        # Phonetic normalization mapping
        phonetic_map = {
            # थ / ट / ठ -> ट
            'थ': 'ट', 'ठ': 'ट',
            # ध / ड / ढ -> ड
            'ध': 'ड', 'ढ': 'ड',
            # फ / प -> प
            'फ': 'प',
            # भ / ब -> ब
            'भ': 'ब',
            # छ / च -> च
            'छ': 'च',
            # झ / ज -> ज
            'झ': 'ज',
            # ख / क / घ / ग -> क
            'ख': 'क', 'घ': 'क', 'ग': 'क',
            # ण / न -> न
            'ण': 'न',
            # ष / श / स -> स
            'ष': 'स', 'श': 'स',
            # ळ / ल -> ल
            'ळ': 'ल',
        }

        return phonetic_map.get(char, char.lower())

    def get_first_letters(self, text):
        """Extract first letters of each word for phonetic matching"""
        words = re.findall(r'\S+', text)
        first_letters = []
        for word in words:
            # Remove punctuation from start
            clean_word = re.sub(r'^[^\w]+', '', word)
            if clean_word:
                first_char = clean_word[0]
                normalized = self.normalize_marathi_phonetic(first_char)
                if normalized:  # Only add if not empty (vowel marks return '')
                    first_letters.append(normalized)
        return first_letters

    def is_noisy_transcript(self, content):
        """Detect if transcript is likely noise/useless"""
        if not content:
            return True

        words = content.split()
        if len(words) < self.noise_threshold:
            return True

        # Check for excessive repetition
        unique_words = set(words)
        repetition_ratio = len(unique_words) / len(words)
        if repetition_ratio < 0.2:  # Too repetitive
            return True

        return False

    def match_by_first_letters(self, transcript_text):
        """Match transcript to reference paragraph based on first letter sounds"""
        transcript_letters = self.get_first_letters(transcript_text)

        if len(transcript_letters) < 3:
            return None, 0, -1

        # Compare first 5-10 letters
        compare_length = min(10, len(transcript_letters))
        transcript_signature = transcript_letters[:compare_length]

        best_match = None
        best_score = 0
        best_idx = -1

        for idx, ref_para in enumerate(self.reference_paragraphs):
            # Skip if this paragraph has already been used
            if idx in self.used_paragraphs:
                continue

            ref_letters = self.get_first_letters(ref_para)

            if len(ref_letters) < 3:
                continue

            ref_signature = ref_letters[:compare_length]

            # Calculate matching score
            matches = sum(1 for i, letter in enumerate(transcript_signature)
                         if i < len(ref_signature) and letter == ref_signature[i])

            score = matches / compare_length

            if score > best_score:
                best_score = score
                best_match = ref_para
                best_idx = idx

        return best_match, best_score, best_idx

    def process_transcripts(self):
        """Process all transcript files"""
        if not self.read_odt():
            print("Failed to read ODT file. Exiting.")
            return

        # Create output directory
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # Check if directory exists
        if not self.whisper_dir.exists():
            print(f"ERROR: Directory not found: {self.whisper_dir}")
            return

        # Get all text files (try different patterns)
        transcript_files = list(self.whisper_dir.glob('*.txt'))
        if not transcript_files:
            transcript_files = list(self.whisper_dir.glob('**/*.txt'))  # Search subdirectories

        if not transcript_files:
            print(f"\nNo .txt files found in: {self.whisper_dir}")
            print("Contents of directory:")
            try:
                for item in self.whisper_dir.iterdir():
                    print(f"  - {item.name}")
            except Exception as e:
                print(f"  Cannot read directory: {e}")
            return

        print(f"\nFound {len(transcript_files)} transcript files")

        processed = 0
        skipped_noise = 0
        no_match = 0

        for txt_file in sorted(transcript_files):
            print(f"\nProcessing: {txt_file.name}")

            # Read transcript
            try:
                with open(txt_file, 'r', encoding='utf-8') as f:
                    transcript_content = f.read()
            except Exception as e:
                print(f"  ⚠️  Error reading file: {e}")
                continue

            # Match based on first letter sounds (process all files, no noise filtering)
            matched_text, score, idx = self.match_by_first_letters(transcript_content)

            # Get first letters for display
            trans_letters = ''.join(self.get_first_letters(transcript_content)[:15])

            if score < 0.1:  # Low confidence match
                print(f"  ⚠️  No good match (score: {score:.2f}, letters: {trans_letters})")
                no_match += 1
                # Skip this file - no good match found
                continue
            else:
                ref_letters = ''.join(self.get_first_letters(matched_text)[:15]) if matched_text else ''
                print(f"  ✓ Match found (score: {score:.2f}, para: {idx})")
                print(f"    Transcript letters: {trans_letters}")
                print(f"    Reference letters:  {ref_letters}")

                # Mark this paragraph as used
                self.used_paragraphs.add(idx)

                # Output only the reference text from ODT
                output_content = matched_text

            # Save to output
            output_file = self.output_dir / txt_file.name
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(output_content)

            processed += 1

        # Summary
        print("\n" + "="*60)
        print("PROCESSING SUMMARY")
        print("="*60)
        print(f"Total files found: {len(transcript_files)}")
        print(f"Successfully processed: {processed}")
        print(f"Skipped (noise): {skipped_noise}")
        print(f"No good match: {no_match}")
        print(f"Unique paragraphs used: {len(self.used_paragraphs)}/{len(self.reference_paragraphs)}")
        print(f"\nOutput saved to: {self.output_dir}")


# Usage
if __name__ == "__main__":
    # Configure paths
    WHISPER_DIR = "/content/drive/MyDrive/dataset/whisper_segments/text"
    ODT_FILE = "/content/drive/MyDrive/dataset/10 AUG PRIME_non_bold (1).odt"
    OUTPUT_DIR = "/content/drive/MyDrive/dataset/final_text"

    # Initialize and run
    matcher = PhoneticTranscriptMatcher(
        whisper_dir=WHISPER_DIR,
        odt_path=ODT_FILE,
        output_dir=OUTPUT_DIR,
        noise_threshold=10  # Minimum words to not be considered noise
    )

    matcher.process_transcripts()

Reading ODT reference document...
Loaded 27 paragraphs from ODT

Found 68 transcript files

Processing: session_001_0000.txt
  ✓ Match found (score: 0.50, para: 0)
    Transcript letters: नबपअ
    Reference letters:  नपपक

Processing: session_001_0001.txt
  ✓ Match found (score: 0.10, para: 1)
    Transcript letters: तवदकबवतआटडकसअउप
    Reference letters:  टनडकवटटडकसउपउलस

Processing: session_001_0002.txt
  ✓ Match found (score: 0.20, para: 8)
    Transcript letters: कसबआबकपवपजततपमब
    Reference letters:  कवटआटडसऑकपउसआकन

Processing: session_001_0003.txt
  ✓ Match found (score: 0.30, para: 9)
    Transcript letters: असकसकरबपककककललउ
    Reference letters:  कपकककपनसकहकमबअस

Processing: session_001_0004.txt
  ✓ Match found (score: 0.30, para: 4)
    Transcript letters: आजतममदलदचसवतहकत
    Reference letters:  आजममदडकवहबदललएप

Processing: session_001_0005.txt
  ✓ Match found (score: 0.20, para: 7)
    Transcript letters: चकबदवसउआमतहन
    Reference letters:  बपयबदएआएमतकपडआ

Processing: sess

In [2]:
!pip install odfpy


