In [4]:
import os
import re
import ffmpeg
import pydub
import pysrt
import time
import asyncio
import edge_tts
import numpy as np
from deep_translator import GoogleTranslator
from pydub import AudioSegment
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips
from faster_whisper import WhisperModel
from shutil import which
from datetime import datetime
import tempfile
import aiohttp
import ssl
import random
from pydub.silence import detect_nonsilent
from typing import List, Dict

# ============== Configuration ==============
FFMPEG_PATH = which("ffmpeg")
INPUT_VIDEO = "to translate/4.2.4_Configuration de la solution_Avr_10_Latest.mp4"
BASE_NAME = os.path.splitext(os.path.basename(INPUT_VIDEO))[0]
TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_DIR = f"{BASE_NAME}_run_{TIMESTAMP}"
MODEL_SIZE = "small"
USE_EDGE_TTS = True

# ============== Audio Processing Functions ==============
class AudioProcessor:
    @staticmethod
    def apply_speed_adjustment(raw_audio: AudioSegment, speed_setting: str) -> AudioSegment:
        """Apply speed adjustment with duration compensation"""
        speed_factor = 1 + (int(speed_setting.strip('%')) / 100)
        original_duration = len(raw_audio)
        
        # Speed adjustment with crossfade to avoid clicks
        sped_up = raw_audio.speedup(
            playback_speed=speed_factor,
            chunk_size=150,
            crossfade=25
        )
        
        # Calculate compensation
        new_duration = len(sped_up)
        compensation_ms = original_duration - new_duration
        
        if compensation_ms > 0:
            return sped_up + AudioSegment.silent(duration=compensation_ms)
        return sped_up

    @staticmethod
    def generate_phrase_audio(text: str, voice_speed: str) -> AudioSegment:
        """Generate phrase audio with natural ending detection"""
        async def _generate():
            communicate = edge_tts.Communicate(text)
            return await communicate
        
        raw_audio = asyncio.run(_generate()).audio
        processed = AudioProcessor.apply_speed_adjustment(raw_audio, voice_speed)
        
        # Detect natural speech endings
        non_silent = detect_nonsilent(
            processed, 
            min_silence_len=50,
            silence_thresh=processed.dBFS - 16
        )
        
        if non_silent:
            end_pad = 150  # Minimum ending padding
            new_end = max(non_silent[-1][1] + end_pad, len(processed))
            return processed[:new_end]
        return processed

# ============== Timing Synchronization ==============
class SyncValidator:
    @staticmethod
    def validate_segment_timing(original_duration: float, translated_audio: AudioSegment) -> AudioSegment:
        """Ensure audio duration matches video segment duration"""
        audio_duration = len(translated_audio) / 1000  # Convert ms to seconds
        drift = original_duration - audio_duration
        
        if abs(drift) > 0.5:  # 500ms tolerance
            compensation_ms = int(drift * 1000)
            if compensation_ms > 0:
                return translated_audio + AudioSegment.silent(duration=compensation_ms)
            else:
                return translated_audio[:compensation_ms]
        return translated_audio

    @staticmethod
    def calculate_phrase_timings(phrases: List[str], silences: List[float]) -> List[Dict]:
        """Calculate precise timings for each phrase"""
        timings = []
        current_time = 0.0
        
        for i, phrase in enumerate(phrases):
            # Generate temporary audio to measure duration
            with tempfile.NamedTemporaryFile() as tmp:
                asyncio.run(edge_tts.Communicate(phrase).save(tmp.name))
                audio = AudioSegment.from_file(tmp.name)
                duration = len(audio) / 1000  # Convert ms to seconds
            
            timings.append({
                "start": current_time,
                "end": current_time + duration,
                "phrase": phrase
            })
            
            # Add silence after phrase if not last element
            if i < len(silences):
                current_time += duration + (silences[i] / 1000)
        
        return timings

# ============== Main Processing Pipeline ==============
class TranslationPipeline:
    def __init__(self):
        self.debug_log = []
        
    async def process_segment(self, segment: Dict, output_path: str):
        """Process single video segment with sync validation"""
        # Generate translated audio
        translated_audio = await self._generate_translated_audio(segment)

        # Validate timing
        original_duration = segment["end"] - segment["start"]
        validated_audio = SyncValidator.validate_segment_timing(
            original_duration, translated_audio
        )

        # Save debug information
        self._log_segment_debug(segment, translated_audio, validated_audio)

        # Export final audio
        validated_audio.export(output_path, format="wav")
        
        async def _generate_translated_audio(self, segment: Dict) -> AudioSegment:
            """Generate translated audio with proper timing"""
            combined_audio = AudioSegment.silent(segment["pre_silence"])
        
        for i, phrase in enumerate(segment["phrases"]):
            # Generate phrase audio
            phrase_audio = AudioProcessor.generate_phrase_audio(
                phrase, segment["speed"]
            )
            
            # Add inter-phrase silence
            if i > 0 and i <= len(segment["inter_silences"]):
                combined_audio += AudioSegment.silent(
                    segment["inter_silences"][i-1]
                )
            
            combined_audio += phrase_audio
        
        # Add post-silence
        combined_audio += AudioSegment.silent(segment["post_silence"])
        
        return combined_audio
    
    def _log_segment_debug(self, segment, translated_audio, validated_audio):
      """Log segment debug information."""
      # Duration in seconds
      original_duration = segment["end"] - segment["start"]
      translated_duration = len(translated_audio) / 1000
      validated_duration = len(validated_audio) / 1000

      # Log entry
      log_entry = {
          "segment_start": segment["start"],
          "segment_end": segment["end"],
          "original_duration": original_duration,
          "translated_duration": translated_duration,
          "validated_duration": validated_duration,
          "voice_speed": segment["speed"],
          "pre_silence": segment["pre_silence"],
          "post_silence": segment["post_silence"],
          "inter_silences": segment["inter_silences"],
          "phrases": segment["phrases"],
      }
      self.debug_log.append(log_entry)

# ============== Helper Functions ==============
def sanitize_silences(silences: List[float]) -> List[float]:
    """Ensure silences are within valid range"""
    return [max(0, min(5000, s)) for s in silences]

def parse_review_file(review_path: str) -> List[Dict]:
    """Parse review file with sanity checks"""
    segments = []
    current_segment = {}
    segment_number = 0

    with open(review_path, 'r', encoding='utf-8') as f:
      for line in f:
        line = line.strip()
        if line.startswith("Segment"):
          segment_number += 1
          # Save previous segment if it exists
          if current_segment:
            segments.append(current_segment)
          # Start a new segment
          current_segment = {}
          match = re.match(r"Segment (\d+) \(start: (\d+\.?\d*)s, end: (\d+\.?\d*)s\):", line)
          if match:
            current_segment['segment_number'] = int(match.group(1))
            current_segment['start'] = float(match.group(2))
            current_segment['end'] = float(match.group(3))
        elif line.startswith("**Original:**"):
          current_segment['original'] = line.split('**Original:**')[1].strip()
        elif line.startswith("**Auto Translated:**"):
          current_segment['auto_translated'] = line.split('**Auto Translated:**')[1].strip()
        elif line.startswith("**Final Translation:**"):
          current_segment['phrases'] = split_french_phrases(line.split('**Final Translation:**')[1].strip())
        elif line.startswith("**Voice Speed:**"):
          current_segment['speed'] = line.split('**Voice Speed:**')[1].strip()
        elif line.startswith("**Pre-Silence:**"):
          current_segment['pre_silence'] = int(line.split('**Pre-Silence:**')[1].strip())
        elif line.startswith("**Post-Silence:**"):
          current_segment['post_silence'] = int(line.split('**Post-Silence:**')[1].strip())
        elif line.startswith("**Inter-Phrase-Silence:**"):
          silences_str = line.split('**Inter-Phrase-Silence:**')[1].strip()
          if silences_str:
            current_segment['inter_silences'] = [int(s.strip()) for s in silences_str.split(',') if s.strip()]
          else:
            current_segment['inter_silences'] = []
        elif line.startswith("**Décalage (local ms):**"):
          current_segment['decalage'] = int(line.split('**Décalage (local ms):**')[1].strip())
      if current_segment:
        segments.append(current_segment)

    return segments

def split_french_phrases(text: str) -> List[str]:
    """Splits a French text into phrases using common punctuation marks."""
    # Split by periods, question marks, and exclamation points, but keep the delimiters.
    phrases = re.split(r"([.?!])", text)
    # Recombine the delimiters with the preceding text.
    # Here's the fix: convert phrases to an iterator explicitly
    phrases_iter = iter(phrases)
    phrases = [phrase + next(phrases_iter, '') for phrase in phrases]
    # Clean up: remove empty strings and strip whitespace.
    phrases = [p.strip() for p in phrases if p.strip()]
    return phrases


# ============== Main Execution ==============
async def main():
    # Initialize pipeline
    pipeline = TranslationPipeline()

    # Load review configuration
    review_file_path = "translation_review.txt"
    segments = parse_review_file(review_file_path)

    # Create output directory
    output_dir = OUTPUT_DIR
    os.makedirs(output_dir, exist_ok=True)

    # Process each segment
    tasks = []
    audio_segments = []
    for idx, segment in enumerate(segments):
        output_path = os.path.join(output_dir, f"segment_{idx+1}.wav")
        task = pipeline.process_segment(segment, output_path)
        tasks.append(task)
        audio_segments.append(output_path)

    # Run all tasks concurrently
    await asyncio.gather(*tasks)
    
    # Final video assembly
    input_video_path = INPUT_VIDEO
    original_video = VideoFileClip(input_video_path)
    
    # Load the audio segments
    audio_clips = [AudioFileClip(audio_path) for audio_path in audio_segments]
    
    # Prepare video segments
    video_segments = []
    current_time = 0
    for i, segment in enumerate(segments):
        start_time = segment["start"]
        end_time = segment["end"]
        
        # Extract the video clip for the segment
        video_clip = original_video.subclip(start_time, end_time)
        
        # Set the audio of the video clip
        video_clip = video_clip.set_audio(audio_clips[i])
        
        video_segments.append(video_clip)
        current_time = end_time

    # Concatenate all video segments
    final_video = concatenate_videoclips(video_segments)
    
    # Set the FPS to the original video's FPS
    final_video = final_video.set_fps(original_video.fps)

    # Write the final video file
    output_video_path = os.path.join(output_dir, "final_translated_video.mp4")
    final_video.write_videofile(output_video_path, codec="libx264", audio_codec="aac", temp_audiofile='temp-audio.m4a', remove_temp=True)
    
    print(f"✅ Final translated video created at: {output_video_path}")

if __name__ == "__main__":
    asyncio.run(main())


Exception ignored in: <coroutine object TranslationPipeline.process_segment at 0x000002613432C640>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'
Exception ignored in: <coroutine object TranslationPipeline.process_segment at 0x000002613432C640>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'


RuntimeError: asyncio.run() cannot be called from a running event loop

In [5]:
import os
import re
import ffmpeg
import pydub
import pysrt
import time
import asyncio
import edge_tts
import numpy as np
from deep_translator import GoogleTranslator
from pydub import AudioSegment
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips
from faster_whisper import WhisperModel
from shutil import which
from datetime import datetime
import tempfile
import aiohttp
import ssl
import random
from pydub.silence import detect_nonsilent
from typing import List, Dict

# ============== Configuration ==============
FFMPEG_PATH = which("ffmpeg")
INPUT_VIDEO = "to translate/4.2.4_Configuration de la solution_Avr_10_Latest.mp4"
BASE_NAME = os.path.splitext(os.path.basename(INPUT_VIDEO))[0]
TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_DIR = f"{BASE_NAME}_run_{TIMESTAMP}"
MODEL_SIZE = "small"
USE_EDGE_TTS = True

# ============== Audio Processing Functions ==============
class AudioProcessor:
    @staticmethod
    def apply_speed_adjustment(raw_audio: AudioSegment, speed_setting: str) -> AudioSegment:
        """Apply speed adjustment with duration compensation"""
        speed_factor = 1 + (int(speed_setting.strip('%')) / 100)
        original_duration = len(raw_audio)
        
        # Speed adjustment with crossfade to avoid clicks
        sped_up = raw_audio.speedup(
            playback_speed=speed_factor,
            chunk_size=150,
            crossfade=25
        )
        
        # Calculate compensation
        new_duration = len(sped_up)
        compensation_ms = original_duration - new_duration
        
        if compensation_ms > 0:
            return sped_up + AudioSegment.silent(duration=compensation_ms)
        return sped_up

    @staticmethod
    def generate_phrase_audio(text: str, voice_speed: str) -> AudioSegment:
        """Generate phrase audio with natural ending detection"""
        async def _generate():
            communicate = edge_tts.Communicate(text)
            return await communicate
        
        raw_audio = asyncio.run(_generate()).audio
        processed = AudioProcessor.apply_speed_adjustment(raw_audio, voice_speed)
        
        # Detect natural speech endings
        non_silent = detect_nonsilent(
            processed, 
            min_silence_len=50,
            silence_thresh=processed.dBFS - 16
        )
        
        if non_silent:
            end_pad = 150  # Minimum ending padding
            new_end = max(non_silent[-1][1] + end_pad, len(processed))
            return processed[:new_end]
        return processed

# ============== Timing Synchronization ==============
class SyncValidator:
    @staticmethod
    def validate_segment_timing(original_duration: float, translated_audio: AudioSegment) -> AudioSegment:
        """Ensure audio duration matches video segment duration"""
        audio_duration = len(translated_audio) / 1000  # Convert ms to seconds
        drift = original_duration - audio_duration
        
        if abs(drift) > 0.5:  # 500ms tolerance
            compensation_ms = int(drift * 1000)
            if compensation_ms > 0:
                return translated_audio + AudioSegment.silent(duration=compensation_ms)
            else:
                return translated_audio[:compensation_ms]
        return translated_audio

    @staticmethod
    def calculate_phrase_timings(phrases: List[str], silences: List[float]) -> List[Dict]:
        """Calculate precise timings for each phrase"""
        timings = []
        current_time = 0.0
        
        for i, phrase in enumerate(phrases):
            # Generate temporary audio to measure duration
            with tempfile.NamedTemporaryFile() as tmp:
                asyncio.run(edge_tts.Communicate(phrase).save(tmp.name))
                audio = AudioSegment.from_file(tmp.name)
                duration = len(audio) / 1000  # Convert ms to seconds
            
            timings.append({
                "start": current_time,
                "end": current_time + duration,
                "phrase": phrase
            })
            
            # Add silence after phrase if not last element
            if i < len(silences):
                current_time += duration + (silences[i] / 1000)
        
        return timings

# ============== Main Processing Pipeline ==============
class TranslationPipeline:
    def __init__(self):
        self.debug_log = []
        
    async def process_segment(self, segment: Dict, output_path: str):
        """Process single video segment with sync validation"""
        # Generate translated audio
        translated_audio = await self._generate_translated_audio(segment)
        
        # Validate timing
        original_duration = segment["end"] - segment["start"]
        validated_audio = SyncValidator.validate_segment_timing(
            original_duration, translated_audio
        )
        
        # Save debug information
        self._log_segment_debug(segment, translated_audio, validated_audio)
        
        # Export final audio
        validated_audio.export(output_path, format="wav")
    
    async def _generate_translated_audio(self, segment: Dict) -> AudioSegment:
        """Generate translated audio with proper timing"""
        combined_audio = AudioSegment.silent(segment["pre_silence"])
        
        for i, phrase in enumerate(segment["phrases"]):
            # Generate phrase audio
            phrase_audio = AudioProcessor.generate_phrase_audio(
                phrase, segment["speed"]
            )
            
            # Add inter-phrase silence
            if i > 0 and i <= len(segment["inter_silences"]):
                combined_audio += AudioSegment.silent(
                    segment["inter_silences"][i-1]
                )
            
            combined_audio += phrase_audio
        
        # Add post-silence
        combined_audio += AudioSegment.silent(segment["post_silence"])
        
        return combined_audio
    
    def _log_segment_debug(self, segment, translated_audio, validated_audio):
      """Log segment debug information."""
      # Duration in seconds
      original_duration = segment["end"] - segment["start"]
      translated_duration = len(translated_audio) / 1000
      validated_duration = len(validated_audio) / 1000

      # Log entry
      log_entry = {
          "segment_start": segment["start"],
          "segment_end": segment["end"],
          "original_duration": original_duration,
          "translated_duration": translated_duration,
          "validated_duration": validated_duration,
          "voice_speed": segment["speed"],
          "pre_silence": segment["pre_silence"],
          "post_silence": segment["post_silence"],
          "inter_silences": segment["inter_silences"],
          "phrases": segment["phrases"],
      }
      self.debug_log.append(log_entry)

# ============== Helper Functions ==============
def sanitize_silences(silences: List[float]) -> List[float]:
    """Ensure silences are within valid range"""
    return [max(0, min(5000, s)) for s in silences]

def parse_review_file(review_path: str) -> List[Dict]:
    """Parse review file with sanity checks"""
    segments = []
    current_segment = {}
    segment_number = 0

    with open(review_path, 'r', encoding='utf-8') as f:
      for line in f:
        line = line.strip()
        if line.startswith("Segment"):
          segment_number += 1
          # Save previous segment if it exists
          if current_segment:
            segments.append(current_segment)
          # Start a new segment
          current_segment = {}
          match = re.match(r"Segment (\d+) \(start: (\d+\.?\d*)s, end: (\d+\.?\d*)s\):", line)
          if match:
            current_segment['segment_number'] = int(match.group(1))
            current_segment['start'] = float(match.group(2))
            current_segment['end'] = float(match.group(3))
        elif line.startswith("**Original:**"):
          current_segment['original'] = line.split('**Original:**')[1].strip()
        elif line.startswith("**Auto Translated:**"):
          current_segment['auto_translated'] = line.split('**Auto Translated:**')[1].strip()
        elif line.startswith("**Final Translation:**"):
          current_segment['phrases'] = split_french_phrases(line.split('**Final Translation:**')[1].strip())
        elif line.startswith("**Voice Speed:**"):
          current_segment['speed'] = line.split('**Voice Speed:**')[1].strip()
        elif line.startswith("**Pre-Silence:**"):
          current_segment['pre_silence'] = int(line.split('**Pre-Silence:**')[1].strip())
        elif line.startswith("**Post-Silence:**"):
          current_segment['post_silence'] = int(line.split('**Post-Silence:**')[1].strip())
        elif line.startswith("**Inter-Phrase-Silence:**"):
          silences_str = line.split('**Inter-Phrase-Silence:**')[1].strip()
          if silences_str:
            current_segment['inter_silences'] = [int(s.strip()) for s in silences_str.split(',') if s.strip()]
          else:
            current_segment['inter_silences'] = []
        elif line.startswith("**Décalage (local ms):**"):
          current_segment['decalage'] = int(line.split('**Décalage (local ms):**')[1].strip())
      if current_segment:
        segments.append(current_segment)

    return segments

def split_french_phrases(text: str) -> List[str]:
    """Splits a French text into phrases using common punctuation marks."""
    # Split by periods, question marks, and exclamation points, but keep the delimiters.
    phrases = re.split(r"([.?!])", text)
    # Recombine the delimiters with the preceding text.
    # Here's the fix: convert phrases to an iterator explicitly
    phrases_iter = iter(phrases)
    phrases = [phrase + next(phrases_iter, '') for phrase in phrases]
    # Clean up: remove empty strings and strip whitespace.
    phrases = [p.strip() for p in phrases if p.strip()]
    return phrases

# ============== Main Execution ==============
async def main():
    # Initialize pipeline
    pipeline = TranslationPipeline()

    # Load review configuration
    review_file_path = "translation_review.txt"
    segments = parse_review_file(review_file_path)

    # Create output directory
    output_dir = OUTPUT_DIR
    os.makedirs(output_dir, exist_ok=True)

    # Process each segment
    tasks = []
    audio_segments = []
    for idx, segment in enumerate(segments):
        output_path = os.path.join(output_dir, f"segment_{idx+1}.wav")
        task = pipeline.process_segment(segment, output_path)
        tasks.append(task)
        audio_segments.append(output_path)

    # Run all tasks concurrently
    await asyncio.gather(*tasks)
    
    # Final video assembly
    input_video_path = INPUT_VIDEO
    original_video = VideoFileClip(input_video_path)
    
    # Load the audio segments
    audio_clips = [AudioFileClip(audio_path) for audio_path in audio_segments]
    
    # Prepare video segments
    video_segments = []
    current_time = 0
    for i, segment in enumerate(segments):
        start_time = segment["start"]
        end_time = segment["end"]
        
        # Extract the video clip for the segment
        video_clip = original_video.subclip(start_time, end_time)
        
        # Set the audio of the video clip
        video_clip = video_clip.set_audio(audio_clips[i])
        
        video_segments.append(video_clip)
        current_time = end_time

    # Concatenate all video segments
    final_video = concatenate_videoclips(video_segments)
    
    # Set the FPS to the original video's FPS
    final_video = final_video.set_fps(original_video.fps)

    # Write the final video file
    output_video_path = os.path.join(output_dir, "final_translated_video.mp4")
    final_video.write_videofile(output_video_path, codec="libx264", audio_codec="aac", temp_audiofile='temp-audio.m4a', remove_temp=True)
    
    print(f"✅ Final translated video created at: {output_video_path}")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except RuntimeError as e:
        if "cannot be called from a running event loop" in str(e):
            print("Error: It seems you are trying to call asyncio.run from within an already running event loop.")
            print("This can happen in interactive environments like Jupyter notebooks.")
            print("Please try running this script from a regular Python environment.")
        else:
            raise e


Error: It seems you are trying to call asyncio.run from within an already running event loop.
This can happen in interactive environments like Jupyter notebooks.
Please try running this script from a regular Python environment.


23APR25

In [None]:
import os
import re
import ffmpeg
import pysrt
import tempfile
import asyncio
from datetime import datetime

from deep_translator import GoogleTranslator
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
from moviepy.editor import VideoFileClip, AudioFileClip
from faster_whisper import WhisperModel
from shutil import which
import edge_tts
import aiohttp
import random
import nest_asyncio
nest_asyncio.apply()

# ── Configuration ────────────────────────────────────────────────────────────────
ffmpeg_path = which("ffmpeg")
if not ffmpeg_path:
    raise RuntimeError("ffmpeg not found. Please install ffmpeg first.")

input_video     = "to translate/4.2.4_Configuration de la solution_Avr_10_Latest.mp4"
base_name       = os.path.splitext(os.path.basename(input_video))[0]
timestamp       = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir      = f"{base_name}_run_{timestamp}"
model_size      = "small"
USE_EDGE_TTS    = True

os.makedirs(output_dir, exist_ok=True)
extracted_audio = os.path.join(output_dir, f"{base_name}-extracted-audio.wav")
subtitle_file   = os.path.join(output_dir, f"{base_name}-english.srt")
review_file     = os.path.join(output_dir, "translation_review.txt")
debug_log_file  = os.path.join(output_dir, "translation_debug_log.txt")
translated_audio= os.path.join(output_dir, f"{base_name}-french.wav")
output_video    = os.path.join(output_dir, f"{base_name}-french.mp4")

# ── Helpers: audio extraction & whisper transcription ────────────────────────────
def extract_audio():
    ffmpeg.input(input_video).output(extracted_audio, ac=1, ar=16000) \
        .overwrite_output().run(capture_stdout=True, capture_stderr=True)
    return extracted_audio

def transcribe(path):
    model = WhisperModel(model_size, device="cpu", compute_type="int8")
    segments, info = model.transcribe(path, beam_size=5)
    return info.language, [
        {"start": seg.start, "end": seg.end, "text": seg.text.strip()}
        for seg in segments
    ]

def time_to_subrip(seconds: float) -> pysrt.SubRipTime:
    h = int(seconds // 3600)
    seconds %= 3600
    m = int(seconds // 60)
    seconds %= 60
    ms = int((seconds - int(seconds)) * 1000)
    return pysrt.SubRipTime(hours=h, minutes=m, seconds=int(seconds), milliseconds=ms)

def generate_subtitle_file(segments, outpath):
    subs = pysrt.SubRipFile()
    for i, seg in enumerate(segments, 1):
        subs.append(pysrt.SubRipItem(
            index=i,
            start=time_to_subrip(seg["start"]),
            end=time_to_subrip(seg["end"]),
            text=seg["text"]
        ))
    subs.save(outpath, encoding="utf-8")
    return outpath

# ── Text grouping & review file generation ─────────────────────────────────────
def split_long_groups(groups, max_dur):
    new = []
    for grp in groups:
        start = grp[0].start.ordinal/1000
        end   = grp[-1].end.ordinal/1000
        if end - start <= max_dur:
            new.append(grp)
            continue
        temp, ts = [], start
        last_safe = None
        for idx, item in enumerate(grp):
            temp.append(item)
            if re.search(r"[.,!?]$", item.text.strip()):
                last_safe = idx
            cur_end = item.end.ordinal/1000
            if cur_end - ts >= max_dur:
                if last_safe is not None:
                    new.append(temp[:last_safe+1])
                    temp = temp[last_safe+1:]
                    ts   = temp[0].start.ordinal/1000 if temp else cur_end
                else:
                    new.append(temp)
                    temp = []
                    ts   = cur_end
                last_safe = None
        if temp:
            new.append(temp)
    return new

def enforce_punctuation_boundaries(groups):
    i = 0
    safe = r"[.!?,;:]$"
    while i < len(groups):
        last = groups[i][-1].text.strip()
        if not re.search(safe, last):
            if i+1 < len(groups):
                groups[i] += groups.pop(i+1)
            else:
                groups[i][-1].text += "."
        else:
            i += 1
    return groups

def generate_translation_review_file(source_path, review_path,
                                     from_lang="en", to_lang="fr",
                                     max_group_duration_secs=25.0):
    translator = GoogleTranslator(source=from_lang, target=to_lang)
    subs       = pysrt.open(source_path)
    sentence_end = re.compile(r"[.!?]\s*$")

    # 1) group subs by sentence
    groups, cur = [], []
    for sub in subs:
        cur.append(sub)
        if sentence_end.search(sub.text):
            groups.append(cur)
            cur = []
    if cur: groups.append(cur)

    # 2) split long groups, enforce punctuation
    groups = split_long_groups(groups, max_group_duration_secs)
    groups = enforce_punctuation_boundaries(groups)

    # 3) write review file
    with open(review_path, "w", encoding="utf-8") as f:
        f.write("Translation Review File\n")
        f.write("Edit **Final Translation**, **Voice Speed**, **Pre‑Silence**, **Post‑Silence**, **Inter‑Phrase‑Silence**\n")
        f.write("----------------------------------------------------------------------------- \n\n")
        for idx, grp in enumerate(groups, 1):
            start = grp[0].start.ordinal/1000
            end   = grp[-1].end.ordinal/1000
            orig  = " ".join(s.text for s in grp)
            auto  = translator.translate(text=orig)
            # default values
            phrases = split_french_phrases(auto)
            inter_default = ",".join("0" for _ in range(len(phrases)-1))
            f.write(f"Segment {idx} (start: {start:.2f}s, end: {end:.2f}s):\n")
            f.write(f"**Original:** {orig}\n")
            f.write(f"**Auto Translated:** {auto}\n")
            f.write(f"**Final Translation:** {auto}\n")
            f.write(f"**Voice Speed:** +0%\n")
            f.write(f"**Pre-Silence:** 100\n")
            f.write(f"**Post-Silence:** 100\n")
            f.write(f"**Inter-Phrase-Silence:** {inter_default}\n")
            f.write("-----------------------------------------------------------------------------\n\n")

    input("Review file ready. Press Enter to continue…")
    return groups

# ── Review overrides parsing ────────────────────────────────────────────────────
def parse_review_overrides(review_file_path):
    text   = open(review_file_path, "r", encoding="utf-8").read()
    blocks = re.split(r"(?m)^-{3,}\s*$", text)
    overrides = []
    for idx, blk in enumerate(blocks, start=1):
        blk = blk.strip()
        if not blk or blk.startswith("Translation Review File"):
            continue
        # defaults
        ft       = None
        vs       = "+0%"
        pre_ms   = 0.0
        post_ms  = 100.0
        inter_ms = []
        for line in blk.splitlines():
            line = line.strip()
            if line.startswith("**Final Translation:**"):
                ft = line.split("**Final Translation:**",1)[1].strip()
            elif line.startswith("**Voice Speed:**"):
                vs = line.split("**Voice Speed:**",1)[1].strip()
            elif line.startswith("**Pre-Silence:**"):
                try: pre_ms = float(line.split("**Pre-Silence:**",1)[1].strip())
                except: pass
            elif line.startswith("**Post-Silence:**"):
                try: post_ms = float(line.split("**Post-Silence:**",1)[1].strip())
                except: pass
            elif line.startswith("**Inter-Phrase-Silence:**"):
                parts = line.split("**Inter-Phrase-Silence:**",1)[1].strip()
                if parts:
                    try:
                        inter_ms = [float(x) for x in parts.split(",")]
                    except: pass
        overrides.append({
            "final_translation":     ft,
            "voice_speed":           vs,
            "pre_silence":           pre_ms,
            "post_silence":          post_ms,
            "inter_phrase_silences": inter_ms
        })
    return overrides

# ── Phrase splitting & weighting ───────────────────────────────────────────────
def split_french_phrases(text):
    parts = re.split(r"(?<=[.!?])\s+(?=[A-Z])", text)
    return [p.strip() for p in parts if p.strip()]

def calculate_phrase_weights(orig, phrases):
    counts = [len(p.split()) for p in phrases]
    total  = sum(counts)
    if total == 0:
        return [1/len(phrases)]*len(phrases)
    return [c/total for c in counts]

# ── Audio adjustments & TTS ─────────────────────────────────────────────────────
def adjust_audio_duration(audio, target_dur):
    cur = audio.duration_seconds
    diff= target_dur - cur
    if diff > 0.1:
        return audio + AudioSegment.silent(duration=diff*1000)
    return audio

def change_playback_speed(sound, speed=1.0):
    new_rate = int(sound.frame_rate * speed)
    altered  = sound._spawn(sound.raw_data, overrides={"frame_rate": new_rate})
    return altered.set_frame_rate(sound.frame_rate)

async def robust_synthesize_phrase(
    phrase: str,
    outpath: str,
    voice: str = "fr-FR-DeniseNeural",
    rate: str = "+0%",
    max_retries: int = 3
):
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=30)
            ) as session:
                communicate = edge_tts.Communicate(
                    text=phrase,
                    voice=voice,
                    rate=rate
                )
                await communicate.save(outpath)
            return
        except Exception as e:
            print(f"[Warning] TTS attempt {attempt+1} failed for phrase: {phrase}\n{e}")
            await asyncio.sleep(2 ** attempt)
    # Si après tous les essais ça échoue, on écrit un silence de secours
    silent = AudioSegment.silent(duration=500)  # 0.5s de silence
    silent.export(outpath, format="mp3")
    print(f"[Error] All TTS attempts failed; inserted 500ms silence at {outpath}")



async def synthesize_phrase_edge_hybrid(phrase, outpath, voice="fr-FR-DeniseNeural", rate="+0%"):
    await robust_synthesize_phrase(phrase, outpath, voice, rate)

# ── Main async with updated generation ──────────────────────────────────────────
async def async_generate_translated_audio_with_sync_using_review(
    subtitle_source_path, output_audio_path,
    debug_log_path, review_file_path
):
    groups    = generate_translation_review_file(subtitle_source_path, review_file_path)
    overrides = parse_review_overrides(review_file_path)

    default_ov = {
        "final_translation":     None,
        "voice_speed":           "+0%",
        "pre_silence":           0.0,
        "post_silence":          100.0,
        "inter_phrase_silences": []
    }
    while len(overrides) < len(groups):
        overrides.append(default_ov.copy())

    combined = AudioSegment.silent(duration=0)
    debug    = []

    for idx, group in enumerate(groups):
        start_s, end_s = group[0].start.ordinal/1000, group[-1].end.ordinal/1000
        seg_dur  = end_s - start_s
        total_ms = int(seg_dur * 1000)

        ovr     = overrides[idx]
        text    = ovr["final_translation"] or " ".join(s.text for s in group)
        rate    = ovr["voice_speed"]
        pre_ms  = ovr["pre_silence"]
        post_ms = ovr["post_silence"]
        inter   = ovr["inter_phrase_silences"]

        phrases    = split_french_phrases(text)
        weights    = calculate_phrase_weights(text, phrases)
        content_ms = max(0, total_ms - pre_ms - post_ms)

        # synth phrases
        phrase_audios = []
        for i, ph in enumerate(phrases):
            dur = content_ms * weights[i] / 1000.0
            tmp = os.path.join(tempfile.gettempdir(), f"tmp_{idx}_{i}.mp3")
            await synthesize_phrase_edge_hybrid(ph, tmp, voice="fr-FR-DeniseNeural", rate=rate)
            aud = AudioSegment.from_mp3(tmp)
            os.remove(tmp)
            aud = adjust_audio_duration(aud, dur)
            phrase_audios.append(aud)

        # TTS over‑run protection
        sum_tts = sum(a.duration_seconds*1000 for a in phrase_audios)
        if sum_tts > content_ms and sum_tts > 0:
            factor_audio = content_ms / sum_tts
            phrase_audios = [change_playback_speed(aud, factor_audio) for aud in phrase_audios]
            sum_tts = sum(a.duration_seconds*1000 for a in phrase_audios)

        # inter‑phrase auto‑fit (10% margin)
        available   = total_ms - pre_ms - post_ms - sum_tts
        total_inter = sum(inter)
        margin      = 0.9
        if total_inter > 0 and total_inter > available * margin:
            factor = (available * margin) / total_inter
            inter  = [int(ms * factor) for ms in inter]

        # rebuild with silences
        seq = []
        for i, aud in enumerate(phrase_audios):
            seq.append(aud)
            if i < len(inter):
                seq.append(AudioSegment.silent(duration=inter[i]))

        # assemble segment
        seg = AudioSegment.silent(duration=pre_ms)
        for clip in seq:
            seg += clip
        seg += AudioSegment.silent(duration=post_ms)

        # strip leading TTS silence
        non = detect_nonsilent(seg, min_silence_len=1, silence_thresh=seg.dBFS-16)
        if non:
            seg = seg[non[0][0]:]
        seg = AudioSegment.silent(duration=pre_ms) + seg

        # pad/trim
        if len(seg) < total_ms:
            seg += AudioSegment.silent(duration=(total_ms - len(seg)))
        seg = seg[:total_ms]

        # measure decalage
        non2            = detect_nonsilent(seg, min_silence_len=1, silence_thresh=seg.dBFS-16)
        start_audio_ms  = non2[0][0] if non2 else pre_ms
        end_audio_ms    = non2[-1][1] if non2 else total_ms - post_ms
        abs_start_audio = int(start_s*1000) + start_audio_ms
        abs_end_audio   = int(start_s*1000) + end_audio_ms
        abs_start_video = int(start_s*1000)
        abs_end_video   = int(end_s*1000)
        decal_start = abs_start_audio - abs_start_video
        decal_end   = abs_end_audio   - abs_end_video

        # optional warp
        gen_dur = seg.duration_seconds
        diff    = seg_dur - gen_dur
        if abs(diff) > 0.20:
            seg = change_playback_speed(seg, seg_dur/gen_dur)

        # place on combined
        start_ms = int(start_s*1000)
        if len(combined) < start_ms:
            combined += AudioSegment.silent(duration=(start_ms-len(combined)))
        combined += seg

        debug.append(
            f"Segment {idx+1} ({start_s:.2f}-{end_s:.2f}s): "
            f"pre={pre_ms}ms, post={post_ms}ms, speed={rate}, "
            f"décalage_start={decal_start}ms, décalage_end={decal_end}ms, "
            f"inter={inter}, phrases={phrases}\n"
        )

    # write debug & export
    with open(debug_log_file, "w", encoding="utf-8") as df:
        df.write("Translation Debug Log\n\n")
        df.writelines(debug)
    combined.export(output_audio_path, format="wav")
    return output_audio_path

# ── Merge & Main ────────────────────────────────────────────────────────────────
def merge_audio_video():
    video = VideoFileClip(input_video)
    audio = AudioFileClip(translated_audio)
    if audio.duration < video.duration:
        extra = AudioSegment.silent(duration=(video.duration - audio.duration)*1000)
        tmpf  = os.path.join(output_dir, "temp_full.wav")
        AudioSegment.from_file(translated_audio).append(extra).export(tmpf, format="wav")
        audio = AudioFileClip(tmpf)
    video.set_audio(audio).write_videofile(output_video,
                                            codec="libx264", audio_codec="aac",
                                            temp_audiofile="temp-audio.m4a", remove_temp=True)

async def async_main():
    extract_audio()
    _, segs = transcribe(extracted_audio)
    generate_subtitle_file(segs, subtitle_file)
    await async_generate_translated_audio_with_sync_using_review(
        subtitle_file, translated_audio, debug_log_file, review_file
    )
    merge_audio_video()

if __name__ == "__main__":
    # Nous pouvons maintenant utiliser run_until_complete même si un loop tourne déjà
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main())



Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000002101DE96BA0> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000002101DE969F0> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000002101DE96A80> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000002101DE96180> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x000002101DE976E0> [Une connexion existante a dû être fermée par l’hôte distant]


KeyboardInterrupt: 

Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x0000020FA1702210> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x0000020FA1702B10> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x0000020FA15ED370> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x0000020FA1701880> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x0000020FA1702570> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect to host speech.platform.bing.com:443 ssl:<ssl.SSLContext object at 0x0000020FA17027B0> [Une connexion existante a dû être fermée par l’hôte distant]
Cannot connect t

                                                                        

MoviePy - Done.
Moviepy - Writing video 4.2.4_Configuration de la solution_Avr_10_Latest_run_20250423_112245\4.2.4_Configuration de la solution_Avr_10_Latest-french.mp4



                                                                  

Moviepy - Done !
Moviepy - video ready 4.2.4_Configuration de la solution_Avr_10_Latest_run_20250423_112245\4.2.4_Configuration de la solution_Avr_10_Latest-french.mp4
