In [None]:
# @title 🎬 Assistente de Dublagem v2.0
# @markdown Execute esta célula para instalar todas as dependências necessárias

print("🚀 Iniciando instalação das dependências...")

#@markdown ### Configurações Obrigatórias
HF_TOKEN = "" #@param {type:"string"}
#@markdown ### Configurações Opcionais
OPENAI_KEY = "" #@param {type:"string"}
MODELO_WHISPER = "tiny" #@param ["tiny", "base", "small", "medium", "large-v3"]
VOZ_PADRAO = "pt-BR-AntonioNeural" #@param ["pt-BR-AntonioNeural", "pt-BR-FranciscaNeural", "pt-BR-BrendaNeural", "pt-BR-FabioNeural"]
MANTER_AUDIO_ORIGINAL = True #@param {type:"boolean"}
GERAR_LEGENDAS = True #@param {type:"boolean"}

# Instalação silenciosa das dependências
!apt-get -qq update && apt-get -qq install -y ffmpeg
!pip install -q pydub moviepy edge-tts whisper openai-whisper spacy==3.7.2 spacy-syllables tqdm
!pip install -q googletrans==3.1.0a0
!pip install -q deep-translator
!pip install -q openai
!python -m spacy download en_core_web_sm
!python -m spacy download pt_core_news_sm

# Importações necessárias
import os
import uuid
import subprocess
import asyncio
import time
import tempfile
import re
import shutil
from pydub import AudioSegment
from moviepy.editor import VideoFileClip, AudioFileClip
import whisper
import spacy
from spacy_syllables import SpacySyllables
from tqdm import tqdm
import edge_tts
#from googletrans import Translator
from deep_translator import GoogleTranslator
from google.colab import drive
import torch
import gc

# Força uso de CPU e otimizações
os.environ["CUDA_VISIBLE_DEVICES"] = ""
torch.set_num_threads(4)
gc.enable()  # Ativa garbage collector

# Configuração do OpenAI (se disponível)
import openai
if OPENAI_KEY:
    openai.api_key = OPENAI_KEY
    print("✅ API OpenAI configurada")

# Verificação do token HF
if not HF_TOKEN:
    raise ValueError("❌ É necessário fornecer um token do Hugging Face (HF_TOKEN)")
os.environ["HF_TOKEN"] = HF_TOKEN

# Montagem do Google Drive
print("\n📁 Montando Google Drive...")
drive.mount('/content/drive', force_remount=True)

# Criação das pastas necessárias
def criar_diretorios():
    diretorios = [
        '/content/drive/MyDrive/Dubbing',
        '/content/drive/MyDrive/Dubbing/Results',
        '/content/temp'
    ]
    for dir in diretorios:
        os.makedirs(dir, exist_ok=True)
    return True

criar_diretorios()

# Verificação de GPU
def verificar_gpu():
    try:
        !nvidia-smi
        print("✅ GPU disponível - otimizações serão aplicadas")
        return True
    except:
        print("📌 GPU não disponível - usando CPU")
        return False

tem_gpu = verificar_gpu()

print("\n✨ Configuração inicial concluída!")

In [None]:
# CÉLULA 2 - Funções de Tradução e Processamento
#@title 🔤 Funções de Tradução e Processamento de Texto { display-mode: "form" }

def traduzir_texto_google(texto):
    """Traduz o texto usando Google Translate."""
    try:
        translator = GoogleTranslator(source='en', target='pt')
        return translator.translate(texto)
    except Exception as e:
        print(f"⚠️ Erro na tradução Google: {e}")
        return texto

def traduzir_texto_openai(texto):
    """Traduz o texto usando a API da OpenAI."""
    try:
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": "Você é um tradutor profissional de inglês para português brasileiro."},
                {"role": "user", "content": f"Traduza o seguinte texto para português brasileiro, mantendo o tom e contexto: {texto}"}
            ]
        )
        return response.choices[0].message['content'].strip()
    except Exception as e:
        print(f"⚠️ Erro na tradução OpenAI: {e}")
        return traduzir_texto_google(texto)  # Fallback para Google Translate

def traduzir_texto(texto):
    """Função principal de tradução que escolhe o método apropriado."""
    if not texto.strip():
        return ""
    if OPENAI_KEY:
        return traduzir_texto_openai(texto)
    return traduzir_texto_google(texto)

async def listar_vozes_disponiveis():
    """Lista todas as vozes disponíveis do edge-tts."""
    try:
        voices = await edge_tts.list_voices()
        vozes_pt = [v for v in voices if 'pt-BR' in v['ShortName']]
        return vozes_pt
    except Exception as e:
        print(f"Erro ao listar vozes: {e}")
        return []

def verificar_voz(voz_selecionada):
    """Verifica se a voz selecionada está disponível."""
    vozes_padrao = [
        "pt-BR-AntonioNeural", "pt-BR-FranciscaNeural", "pt-BR-BrendaNeural",
        "pt-BR-DonatoNeural", "pt-BR-ElzaNeural", "pt-BR-FabioNeural",
        "pt-BR-GiovannaNeural", "pt-BR-HumbertoNeural", "pt-BR-JulioNeural",
        "pt-BR-LeilaNeural", "pt-BR-LeticiaNeural", "pt-BR-ManuelaNeural",
        "pt-BR-NicolauNeural", "pt-BR-ThalitaNeural", "pt-BR-ValerioNeural",
        "pt-BR-YaraNeural"
    ]

    if voz_selecionada not in vozes_padrao:
        print(f"⚠️ Voz '{voz_selecionada}' não encontrada. Usando pt-BR-AntonioNeural.")
        return "pt-BR-AntonioNeural"
    return voz_selecionada

def normalizar_audio(audio_segment, target_dbfs=-20.0):
    """Normaliza o volume do áudio para melhor qualidade."""
    try:
        diferenca_db = target_dbfs - audio_segment.dBFS
        return audio_segment.apply_gain(diferenca_db)
    except Exception as e:
        print(f"⚠️ Erro ao normalizar áudio: {e}")
        return audio_segment

def processar_texto_para_fala(texto):
    """Prepara o texto para síntese de fala, melhorando a naturalidade."""
    # Remove múltiplos espaços
    texto = re.sub(r'\s+', ' ', texto.strip())

    # Adiciona pausas em pontuações
    texto = texto.replace('.', '... ')
    texto = texto.replace('!', '! ... ')
    texto = texto.replace('?', '? ... ')
    texto = texto.replace(',', ', ')

    # Normaliza números e abreviações comuns
    texto = re.sub(r'(\d+)', lambda m: num2words(int(m.group(0)), lang='pt-BR'), texto)

    return texto

def verificar_duracao_video(video_path):
    """Verifica se o vídeo não excede o limite de processamento do Colab."""
    try:
        with VideoFileClip(video_path) as video:
            duracao = video.duration
            tamanho_mb = os.path.getsize(video_path) / (1024 * 1024)

            if duracao > 3600:  # 1 hora
                print(f"⚠️ Aviso: Vídeo muito longo ({duracao/60:.1f} min). Pode haver problemas de memória.")
            if tamanho_mb > 1000:  # 1GB
                print(f"⚠️ Aviso: Arquivo muito grande ({tamanho_mb:.1f} MB). Pode haver problemas de memória.")

            return True
    except Exception as e:
        print(f"❌ Erro ao verificar vídeo: {e}")
        return False

print("✅ Funções de tradução e processamento definidas!")

✅ Funções de tradução e processamento definidas!


In [None]:
# CÉLULA 3 - Funções de Processamento de Vídeo
#@title 🎥 Funções de Processamento de Vídeo { display-mode: "form" }

def gerar_audio_dublado(texto, voz, arquivo_saida, rate="+0%", volume="+0%"):
    """Gera áudio dublado usando edge-tts de forma síncrona."""
    try:
        comando = f'edge-tts --voice "{voz}" --text "{texto}" --rate={rate} --volume={volume} --write-media "{arquivo_saida}"'
        subprocess.run(comando, shell=True, check=True)
        return True
    except Exception as e:
        print(f"❌ Erro ao gerar áudio: {e}")
        return False

def extrair_audio(video_path, output_path):
    """Extrai o áudio do vídeo."""
    try:
        video = VideoFileClip(video_path)
        audio = video.audio
        audio.write_audiofile(output_path, codec='pcm_s16le', verbose=False, logger=None)
        audio.close()
        video.close()
        return True
    except Exception as e:
        print(f"❌ Erro ao extrair áudio: {e}")
        return False

def transcrever_audio(audio_path):
    """Transcreve o áudio usando Whisper."""
    try:
        print("🎯 Iniciando transcrição do áudio...")

        # Força o uso de CPU e configura o mapa de memória corretamente
        os.environ["CUDA_VISIBLE_DEVICES"] = ""
        device = "cpu"

        # Carrega o modelo com configurações específicas para CPU
        model = whisper.load_model(
            MODELO_WHISPER,
            device=device,
            download_root=os.path.join(os.getcwd(), "models")
        )

        # Configura as opções de transcrição
        options = {
            "language": 'en',
            "task": 'transcribe',
            "fp16": False,
            "beam_size": 5,
            "best_of": 5
        }

        # Realiza a transcrição com tratamento de memória otimizado
        result = model.transcribe(
            audio_path,
            **options,
            temperature=0.0,  # Reduz aleatoriedade nas predições
            compression_ratio_threshold=2.4,
            no_speech_threshold=0.6
        )

        # Processa e organiza os segmentos
        segments = []
        for segment in result["segments"]:
            # Limpa e formata o texto
            texto = segment['text'].strip()
            if texto:  # Só adiciona se houver texto
                segments.append({
                    'start': round(segment['start'], 2),
                    'end': round(segment['end'], 2),
                    'text': texto
                })

        # Libera memória explicitamente
        del model
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

        return segments
    except Exception as e:
        print(f"❌ Erro detalhado na transcrição: {str(e)}")
        # Tenta recuperar memória em caso de erro
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        return []

def verificar_ambiente():
    """Verifica e configura o ambiente de execução."""
    try:
        # Força modo CPU
        os.environ["CUDA_VISIBLE_DEVICES"] = ""

        # Configura torch para usar CPU
        torch.set_num_threads(4)  # Ajuste conforme necessário

        # Configura cache do modelo
        cache_dir = os.path.join(os.getcwd(), "models")
        os.makedirs(cache_dir, exist_ok=True)

        return True
    except Exception as e:
        print(f"❌ Erro na configuração do ambiente: {str(e)}")
        return False

# Adicione esta chamada no início do seu script principal
verificar_ambiente()

def gerar_legenda_srt(segments, traducoes, output_path):
    """Gera arquivo de legendas no formato SRT."""
    try:
        with open(output_path, 'w', encoding='utf-8') as f:
            for i, (segment, traducao) in enumerate(zip(segments, traducoes), 1):
                start = time.strftime('%H:%M:%S,000', time.gmtime(segment['start']))
                end = time.strftime('%H:%M:%S,000', time.gmtime(segment['end']))

                f.write(f"{i}\n")
                f.write(f"{start} --> {end}\n")
                f.write(f"{traducao}\n\n")
        return True
    except Exception as e:
        print(f"❌ Erro ao gerar legendas: {e}")
        return False

def combinar_audio_video(video_path, audio_dublado_path, audio_original_path, legenda_path, output_path):
    """Combina vídeo, áudios e legendas em um único arquivo."""
    try:
        temp_output = tempfile.mktemp(suffix='.mp4')

        # Comando ffmpeg base
        cmd = [
            'ffmpeg', '-y', '-i', video_path,
            '-i', audio_dublado_path
        ]

        # Adiciona áudio original se necessário
        if MANTER_AUDIO_ORIGINAL and os.path.exists(audio_original_path):
            cmd.extend(['-i', audio_original_path])

        # Adiciona legendas se necessário
        if GERAR_LEGENDAS and os.path.exists(legenda_path):
            cmd.extend(['-i', legenda_path])

        # Configuração de streams
        cmd.extend([
            '-map', '0:v',  # Vídeo do primeiro input
            '-map', '1:a'   # Áudio dublado do segundo input
        ])

        # Adiciona áudio original como segunda trilha
        if MANTER_AUDIO_ORIGINAL:
            cmd.extend(['-map', '2:a'])

        # Adiciona legendas
        if GERAR_LEGENDAS:
            cmd.extend(['-map', '3', '-c:s', 'mov_text'])

        # Configurações de codecs e metadata
        cmd.extend([
            '-c:v', 'copy',
            '-c:a', 'aac',
            '-metadata:s:a:0', 'language=por',
            '-metadata:s:a:0', 'title=Português'
        ])

        if MANTER_AUDIO_ORIGINAL:
            cmd.extend([
                '-metadata:s:a:1', 'language=eng',
                '-metadata:s:a:1', 'title=English'
            ])

        if GERAR_LEGENDAS:
            cmd.extend([
                '-metadata:s:s:0', 'language=por',
                '-metadata:s:s:0', 'title=Português'
            ])

        cmd.append(temp_output)

        # Executa o comando ffmpeg
        subprocess.run(cmd, check=True, capture_output=True)

        # Move para o destino final
        shutil.move(temp_output, output_path)
        return True
    except Exception as e:
        print(f"❌ Erro ao combinar áudio e vídeo: {e}")
        return False

def limpar_arquivos_temporarios():
    """Remove arquivos temporários após o processamento."""
    try:
        shutil.rmtree('/content/temp', ignore_errors=True)
        os.makedirs('/content/temp', exist_ok=True)
    except Exception as e:
        print(f"⚠️ Erro ao limpar arquivos temporários: {e}")

print("✅ Funções de processamento de vídeo definidas!")

✅ Funções de processamento de vídeo definidas!


In [None]:
#@title 🎮 Interface de Dublagem { display-mode: "form" }

def dublar_video(video_path):
    """Função principal que coordena todo o processo de dublagem."""
    try:
        print(f"\n🎬 Iniciando processo de dublagem para: {os.path.basename(video_path)}")

        # Verifica o caminho do vídeo
        if not video_path or not os.path.exists(video_path):
            raise ValueError("❌ Caminho do vídeo inválido ou arquivo não encontrado.")

        # Verifica duração e tamanho do vídeo
        if not verificar_duracao_video(video_path):
            return False

        # Cria nome do arquivo de saída
        nome_base = os.path.splitext(os.path.basename(video_path))[0]
        output_dir = '/content/drive/MyDrive/Dubbing/Results'
        output_path = os.path.join(output_dir, f"{nome_base}_dubbed.mp4")

        # Se o arquivo já foi dublado antes, pula
        if os.path.exists(output_path):
            print(f"⏩ Arquivo {nome_base} já foi dublado anteriormente. Pulando...")
            return False

        # Arquivos temporários
        temp_dir = '/content/temp'
        audio_original = os.path.join(temp_dir, 'audio_original.wav')
        audio_dublado = os.path.join(temp_dir, 'audio_dublado.wav')
        legenda_path = os.path.join(temp_dir, 'legendas.srt')

        # Extrai áudio original
        print("📼 Extraindo áudio original...")
        if not extrair_audio(video_path, audio_original):
            return False

        # Transcreve áudio
        print("🎤 Transcrevendo áudio...")
        segments = transcrever_audio(audio_original)
        if not segments:
            return False

        # Traduz segmentos
        print("🔤 Traduzindo segmentos...")
        traducoes = []
        for segment in tqdm(segments, desc="Traduzindo"):
            traducao = traduzir_texto(segment['text'])
            traducoes.append(traducao)

        # Gera legendas se necessário
        if GERAR_LEGENDAS:
            print("📝 Gerando arquivo de legendas...")
            if not gerar_legenda_srt(segments, traducoes, legenda_path):
                print("⚠️ Erro ao gerar legendas, continuando sem elas...")

        # Gera áudio dublado
        print("🗣️ Gerando áudio dublado...")
        voz = verificar_voz(VOZ_PADRAO)
        audio_final = AudioSegment.empty()

        for i, (segment, traducao) in enumerate(zip(segments, traducoes), 1):
            # Arquivo temporário para cada segmento
            temp_audio = os.path.join(temp_dir, f'segment_{i}.mp3')

            # Gera o áudio do segmento
            texto_processado = processar_texto_para_fala(traducao)
            if not gerar_audio_dublado(texto_processado, voz, temp_audio):
                continue

            # Carrega e processa o segmento
            segment_audio = AudioSegment.from_file(temp_audio)
            segment_audio = normalizar_audio(segment_audio)

            # Calcula posição e duração
            start_ms = int(segment['start'] * 1000)
            duracao_atual = len(audio_final)

            # Adiciona silêncio se necessário
            if start_ms > duracao_atual:
                audio_final += AudioSegment.silent(duration=start_ms - duracao_atual)

            # Adiciona o segmento
            audio_final += segment_audio

        # Salva áudio dublado final
        audio_final.export(audio_dublado, format="wav")

        # Combina tudo
        print("🎥 Combinando áudio e vídeo...")
        if not combinar_audio_video(
            video_path,
            audio_dublado,
            audio_original if MANTER_AUDIO_ORIGINAL else None,
            legenda_path if GERAR_LEGENDAS else None,
            output_path
        ):
            return False

        # Limpa arquivos temporários
        limpar_arquivos_temporarios()

        print(f"\n✨ Dublagem concluída com sucesso!")
        print(f"📁 Arquivo salvo em: {output_path}")
        return True

    except Exception as e:
        print(f"❌ Erro durante o processo de dublagem: {e}")
        return False

def processar_todos_videos():
    """Processa todos os vídeos de forma síncrona."""
    # Configura diretórios
    os.makedirs('/content/temp', exist_ok=True)
    os.makedirs('/content/drive/MyDrive/Dubbing/Results', exist_ok=True)

    # Processa vídeos
    input_dir = '/content/drive/MyDrive/Dubbing'
    video_extensions = ['.mp4', '.avi', '.mkv', '.mov']

    videos = [os.path.join(input_dir, f) for f in os.listdir(input_dir)
              if os.path.splitext(f)[1].lower() in video_extensions and
              os.path.isfile(os.path.join(input_dir, f))]

    if not videos:
        print("⚠️ Nenhum vídeo encontrado na pasta Dubbing.")
        return

    print(f"🎥 Encontrados {len(videos)} vídeos para processar")

    # Processa cada vídeo sequencialmente
    for video in videos:
        dublar_video(video)
        limpar_arquivos_temporarios()

# Executa o processamento
processar_todos_videos()