<h1>Video Voice Clean-Up AI Agent</h1>

In [None]:
pip install langgraph openai ffmpeg-python requests python-dotenv demucs

In [None]:
import subprocess
import os
from langchain.tools import Tool
from langchain.tools import StructuredTool
from langchain_openai import ChatOpenAI
from openai import OpenAI
from langgraph.prebuilt import create_react_agent
import requests
from dotenv import load_dotenv
from pydantic import BaseModel

# Load environment variables
_ = load_dotenv()
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")  # Set your ElevenLabs API key in the .env file


In [None]:
# First create the schema
class ExtractAudioSchema(BaseModel):
    video_path: str
    output_audio_path: str



def extract_audio(video_path: str, output_audio_path: str = None) -> str:
    """Extracts audio from a video file using FFmpeg."""
    
    # If output_audio_path is not provided, generate a default name
    if output_audio_path is None:
        base_name = os.path.splitext(os.path.basename(video_path))[0]
        output_audio_path = os.path.join("downloads", f"{base_name}_audio.wav")

    print(f"Extracting audio from {video_path} to {output_audio_path}")

    subprocess.run(["ffmpeg", "-i", video_path, "-q:a", "0", "-map", "a", output_audio_path], check=True)
    
    if not os.path.exists(output_audio_path):
        raise FileNotFoundError(f"Audio extraction failed: {output_audio_path} was not created.")

    return output_audio_path
    

extract_audio_tool = StructuredTool(
    name="ExtractAudio",
    func=extract_audio,
    description="Extracts audio from a video file using FFmpeg. Requires video_path and output_audio_path arguments.",
    args_schema=ExtractAudioSchema
)

# Using requests at first but then switching to OpenAI API directly - see uncommented tool definition below.
# Transcribe Audio Tool using OpenAI Whisper API
# File Size Limit: 25 MB (for a single request)
# Duration Limit: ~30 minutes (varies depending on bitrate and file compression)
# If your audio file exceeds 25 MB, you’ll need to chunk it into smaller segments before processing!
# def transcribe_audio(audio_path):
#     """Transcribes audio using OpenAI Whisper API."""
#     with open(audio_path, "rb") as audio_file:
#         result = requests.post(
#             "https://api.openai.com/v1/audio/transcriptions",
#             headers={"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"},
#             files={"file": audio_file},
#             data={"model": "whisper-1"}
#         )
#         result.raise_for_status()
#         transcript = result.json()["text"]

#     # Save the transcript to a file
#     transcript_path = audio_path.replace(".wav", ".txt")
#     with open(transcript_path, "w") as file:
#         file.write(transcript)

#     return transcript_path

# transcribe_audio_tool = Tool(
#     name="TranscribeAudio",
#     func=transcribe_audio,
#     description="Transcribes an audio file into text using OpenAI Whisper API."
# )


def transcribe_audio(audio_path):
    """Transcribes audio using OpenAI Whisper API."""
    client = OpenAI()
    
    with open(audio_path, "rb") as audio_file:
        transcription = client.audio.transcriptions.create(
            model="whisper-1", 
            file=audio_file
        )
    
    transcript_text = transcription.text
    
    # Save the transcript to a file
    transcript_path = audio_path.replace(".wav", ".txt")
    with open(transcript_path, "w") as file:
        file.write(transcript_text)
    
    return transcript_path

transcribe_audio_tool = Tool(
    name="TranscribeAudio",
    func=transcribe_audio,
    description="Transcribes an audio file into text using OpenAI Whisper API."
)


llm = ChatOpenAI(model="gpt-4o", temperature=0)

def clean_transcript(transcript_path):
    """Uses GPT-4o to clean and enhance the transcript."""
    
    with open(transcript_path, "r") as file:
        transcript = file.read()

    prompt = f"""
    Clean up this transcript: {transcript}
    
    - Remove filler words
    - Improve readability
    - Preserve meaning and context
    - Return only the cleaned transcript as plain text
    """

    response = llm.invoke(prompt)
    
    try:
        cleaned_text = response.content.strip()

        cleaned_path = transcript_path.replace(".txt", "_cleaned.txt")
        with open(cleaned_path, "w") as file:
            file.write(cleaned_text)

        return cleaned_path
    except Exception as e:
        raise ValueError(f"Failed to clean transcript: {response.content}. Error: {e}")

clean_transcript_tool = Tool(
    name="CleanTranscript",
    func=clean_transcript,
    description="Cleans up a transcript using GPT-4o to remove filler words and improve readability."
)


# Define ElevenLabs Voice Generation
def generate_ai_voice_elevenlabs(cleaned_transcript_path, output_audio_path=None):
    """Generates an AI voice from the cleaned transcript using ElevenLabs."""
    ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")  # Load API key from env

    if not ELEVENLABS_API_KEY:
        raise ValueError("ElevenLabs API key is missing. Set it in the .env file.")

    # Ensure the output path is set and within the downloads directory
    if output_audio_path is None:
        filename = os.path.basename(cleaned_transcript_path).replace("_cleaned.txt", "_ai.wav")
        output_audio_path = os.path.join("downloads", filename)

    # Read the cleaned transcript
    with open(cleaned_transcript_path, "r") as file:
        text = file.read()

    # ElevenLabs API settings
    url = "https://api.elevenlabs.io/v1/text-to-speech"
    # voice_id = "pMsXgVXv3BLzUgSXRplE"  # EventLabs voice ID (woman), cannot remember which one exactly.
    voice_id = "!!!!!!!!!!!!!!!!!!!!!REMOVED SINCE IT WAS MY VOICE, USE THE ONE ABOVE OR CLONE YOURS!!!!!!!!!!!!!!!!!!!!" # JENYS

    headers = {
        "xi-api-key": ELEVENLABS_API_KEY,
        "Content-Type": "application/json"
    }

    payload = {
        "text": text,
        "voice_settings": {
            "stability": 0.7,  # Adjust for consistency in speech
            "similarity_boost": 0.9  # Boosts similarity to training data
        }
    }

    # Make the API request
    response = requests.post(f"{url}/{voice_id}", headers=headers, json=payload)
    response.raise_for_status()  # Raise error if request fails

    # Save the generated audio
    with open(output_audio_path, "wb") as audio_file:
        audio_file.write(response.content)

    return output_audio_path


# AI Voice Tool
class GenerateAIVoiceSchema(BaseModel):
    cleaned_transcript_path: str
    output_audio_path: str = "downloads/generated_voice_ai.wav"

ai_voice_tool = StructuredTool(
    name="GenerateAIVoice",
    func=generate_ai_voice_elevenlabs,
    description="Generates AI voice from cleaned transcript using ElevenLabs.",
    args_schema=GenerateAIVoiceSchema
)




def remove_old_voice(video_path: str, output_video_path: str = None) -> str:
    """Removes all audio from the original video using FFmpeg."""
    
    # Ensure the function does not create endless `_cleaned.mov` files
    if "_cleaned" in video_path:
        print(f"⚠️ Skipping already cleaned video: {video_path}")
        return video_path  # Return the same path if it's already cleaned

    if output_video_path is None:
        base_name = os.path.splitext(video_path)[0]
        output_video_path = f"{base_name}_cleaned.mov"

    print(f"🧹 Removing all audio from {video_path} -> {output_video_path}")

    # FFmpeg command to remove all audio
    result = subprocess.run([
        "ffmpeg", "-i", video_path, "-c:v", "copy", "-an", "-y", output_video_path
    ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

    if result.returncode != 0:
        print(f"❌ FFmpeg error while removing audio: {result.stderr}")
        return None

    if not os.path.exists(output_video_path):
        print(f"❌ Error: Cleaned video {output_video_path} was not created.")
        return None

    print(f"✅ Successfully created cleaned video: {output_video_path}")
    return output_video_path


# Register the tool in LangGraph
remove_old_voice_tool = Tool(
    name="RemoveOldVoice",
    func=remove_old_voice,
    description="Removes all audio from a video file using FFmpeg, creating a silent version."
)

# Used by the merge_audio_with_adjustment Tool
def adjust_voice_timing(original_audio: str, ai_audio: str, adjusted_ai_audio: str):
    """Synchronizes the AI-generated voice duration with the original extracted audio."""
    
    def get_audio_duration(audio_file):
        if not os.path.exists(audio_file):
            print(f"❌ Error: {audio_file} does not exist.")
            return None
        
        result = subprocess.run(
            ["ffprobe", "-i", audio_file, "-show_entries", "format=duration", "-v", "quiet", "-of", "csv=p=0"],
            capture_output=True, text=True
        )

        if result.returncode != 0:
            print(f"❌ FFprobe error for {audio_file}: {result.stderr}")
            return None

        try:
            return float(result.stdout.strip())
        except ValueError:
            print(f"❌ Error: Could not parse duration from {audio_file}")
            return None

    print(f"🔍 Checking durations for: {original_audio} (original) and {ai_audio} (AI-generated)")

    original_duration = get_audio_duration(original_audio)
    ai_duration = get_audio_duration(ai_audio)

    if original_duration is None or ai_duration is None:
        print("❌ Error: Could not retrieve audio durations.")
        return None

    print(f"🕒 Original Duration: {original_duration:.2f} sec, AI Duration: {ai_duration:.2f} sec")

    # Correct speed factor to match original duration
    speed_factor = ai_duration / original_duration if original_duration > 0 else 1.0

    if not (0.5 <= speed_factor <= 2.0):
        print(f"⚠️ Warning: Speed factor {speed_factor:.2f} is out of range. Clamping.")
        speed_factor = max(0.5, min(speed_factor, 2.0))

    print(f"🎵 Applying time-stretching adjustment with atempo={speed_factor:.3f}")

    # Ensure adjusted file doesn't exist before processing
    if os.path.exists(adjusted_ai_audio):
        print(f"🗑️ Deleting existing adjusted audio file: {adjusted_ai_audio}")
        os.remove(adjusted_ai_audio)

    # Apply time-stretching with correct speed factor
    result = subprocess.run([
        "ffmpeg", "-i", ai_audio, "-filter:a", f"atempo={speed_factor:.3f}", 
        "-y", adjusted_ai_audio
    ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

    if result.returncode != 0:
        print(f"❌ FFmpeg error while adjusting voice timing: {result.stderr}")
        return None

    if not os.path.exists(adjusted_ai_audio):
        print(f"❌ Error: Adjusted audio file {adjusted_ai_audio} was not created.")
        return None

    print(f"✅ Successfully created adjusted audio: {adjusted_ai_audio}")
    return adjusted_ai_audio


# def merge_audio_with_adjustment(video_path: str, new_voice_path: str, output_video_path: str = None):
#     """Merges new AI-generated voice into the cleaned original video using FFmpeg."""

#     # If the user already passed a file ending in "_cleaned.mov",
#     # use that directly as the cleaned video; otherwise append "_cleaned".
#     if video_path.endswith("_cleaned.mov"):
#         cleaned_video_path = video_path
#         # Remove just the "_cleaned" portion from base_name so we pick up the correct
#         # original extracted audio name (e.g. myvideo_audio.wav, not myvideo_cleaned_audio.wav).
#         base_name = os.path.splitext(video_path)[0].replace("_cleaned", "")
#     else:
#         base_name = os.path.splitext(video_path)[0]
#         cleaned_video_path = f"{base_name}_cleaned.mov"

#     extracted_audio_path = f"{base_name}_audio.wav"

#     if not os.path.exists(cleaned_video_path):
#         print(f"❌ Error: Cleaned video file {cleaned_video_path} does not exist.")
#         return None

#     if not os.path.exists(extracted_audio_path):
#         print(f"❌ Error: Extracted audio file {extracted_audio_path} does not exist.")
#         return None

#     # Adjust AI-generated voice timing before merging
#     adjusted_voice_path = os.path.splitext(new_voice_path)[0] + "_adjusted.wav"
#     adjusted_voice_path = adjust_voice_timing(extracted_audio_path, new_voice_path, adjusted_voice_path)
#     if adjusted_voice_path is None:
#         print("❌ Error: Adjusted voice file was not created. Merging process aborted.")
#         return None

#     if output_video_path is None:
#         output_video_path = f"{base_name}_final.mov"

#     print(f"🎬 Merging cleaned video: {cleaned_video_path} with adjusted voice: {adjusted_voice_path}")

#     # FFmpeg command: Replace the cleaned video audio track with the adjusted AI-generated voice
#     result = subprocess.run([
#         "ffmpeg", "-i", cleaned_video_path, "-i", adjusted_voice_path,
#         "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0",
#         "-movflags", "faststart",
#         "-y", output_video_path
#     ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

#     if result.returncode != 0:
#         print(f"❌ FFmpeg error while merging audio and video: {result.stderr}")
#         return None

#     print(f"✅ Successfully created final video: {output_video_path}")
#     return output_video_path


# Merge AI Voice into the Original Video (.mov format). !!!DOES NOT SUPPORT ADJUSTED TIMING!!!
def merge_audio_no_adjustment(video_path: str, new_voice_path: str, output_video_path: str = None):
    """
    Merges the new AI-generated voice into a cleaned version of the video,
    WITHOUT applying any timing adjustments to the AI voice.
    If the passed video_path does not end with "_cleaned.mov", it appends "_cleaned.mov".
    """

    # If user passes e.g. "myvideo_cleaned.mov", use that directly.
    # Otherwise, assume the cleaned version is "myvideo_cleaned.mov".
    if video_path.endswith("_cleaned.mov"):
        cleaned_video_path = video_path
        # Remove "_cleaned" to get the base for naming final output if needed
        base_name = os.path.splitext(video_path)[0].replace("_cleaned", "")
    else:
        base_name = os.path.splitext(video_path)[0]
        cleaned_video_path = f"{base_name}_cleaned.mov"

    # Ensure the cleaned video exists
    if not os.path.exists(cleaned_video_path):
        print(f"❌ Error: Cleaned video file {cleaned_video_path} does not exist.")
        return None

    # If user didn't supply output, create it as "myvideo_final.mov" or similar
    if output_video_path is None:
        output_video_path = f"{base_name}_final.mov"

    print(f"🎬 Merging (no timing adjust) video: {cleaned_video_path} + AI voice: {new_voice_path}")

    # FFmpeg command: Replace the cleaned video audio track with the AI-generated voice
    result = subprocess.run([
        "ffmpeg", "-i", cleaned_video_path, "-i", new_voice_path,
        "-c:v", "copy", "-c:a", "aac",
        "-map", "0:v:0", "-map", "1:a:0",
        "-movflags", "faststart", "-y", output_video_path
    ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

    if result.returncode != 0:
        print(f"❌ FFmpeg error while merging audio and video: {result.stderr}")
        return None

    print(f"✅ Successfully created final video (no timing adjust): {output_video_path}")
    return output_video_path


# Registering MergeAudio as a StructuredTool
class MergeAudioSchema(BaseModel):
    video_path: str
    new_voice_path: str
    output_video_path: str = None  # Optional parameter

merge_audio_tool = StructuredTool(
    name="MergeAudio",
    # func=merge_audio_with_adjustment, #!!! If you want to use adjustment, slower or faster voice pace, then uncomment this line and the respective function above AND comment next line!
    func=merge_audio_no_adjustment,
    description="Merges the new AI-generated voice into the original .mov video using FFmpeg.",
    args_schema=MergeAudioSchema
)


# Tools for LangGraph
tools = [
    extract_audio_tool,
    transcribe_audio_tool,
    clean_transcript_tool,
    ai_voice_tool,
    remove_old_voice_tool,
    merge_audio_tool
]

# System Prompt for LangGraph
system_prompt = """You are an AI assistant that processes videos by replacing their original voice with an AI-generated voice. 
Your workflow includes: 
extracting audio from the video, 
transcribing it, 
cleaning the transcript to remove filler words, 
generating a new AI voice from the cleaned text, 
and merging it back into the video while removing the old voice. 
Ensure high accuracy and natural-sounding output."""


# Create LangGraph Agent
graph = create_react_agent(
    model=ChatOpenAI(model="gpt-4o"),
    tools=tools,
    prompt=system_prompt,
    debug=False
)

# Process Video Function
def process_video(video_path):
    """Runs the complete AI workflow for voice replacement."""
    inputs = {"messages": [("user", f"Help me clean my {video_path} video and get one with clean voice.")]}
    for step in graph.stream(inputs, stream_mode="steps"):
        message = step["messages"][-1]
        if isinstance(message, tuple):
            print(message)
        else:
            message.pretty_print()

# Example Usage
process_video("downloads/test_recording.mov")
