In [26]:
from langchain_google_genai import ChatGoogleGenerativeAI 
from langchain_groq import ChatGroq
from dotenv import load_dotenv
import os


In [27]:
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
llm = ChatGoogleGenerativeAI(model = "gemini-1.5-pro"	 , temperature = 0.7 , max_tokens=200)


In [28]:
GROQ_API_KEY  = os.getenv("GROQ_API_KEY")
os.environ["GROQ_API_KEY"] = GROQ_API_KEY
llm = ChatGroq(model="Gemma2-9b-It")


In [29]:
from langchain.prompts import ChatPromptTemplate

prompt_template = ChatPromptTemplate.from_template(
    "Write 5 short captions (max 8 words each) for a social video ad. Use a hook and storytelling style. Topic: {topic}"
)

def script_writer(state: dict) -> dict:
    prompt = prompt_template.format_messages(topic=state["prompt"])
    response = llm(prompt)
    text = response.content.strip()
    print(response)
    captions = [line.strip("-• ").strip() for line in text.split("\n") if line.strip()]
    return {**state, "captions": captions}


In [30]:
state = {"prompt": "Advertisement for Car"}
state = script_writer(state)
state["captions"]


content="Here are 5 short captions (max 8 words each) for a car social video ad, using a hook and storytelling style:\n\n1. **Craving adventure? This car is ready.**\n2. **Escape the ordinary.  Hit the open road.**\n3. **One drive.  Endless possibilities.**\n4. **Where will your journey take you?**\n5. **Built for life.  Live it to the fullest.** \n\n\nLet me know if you'd like more options! \n" additional_kwargs={} response_metadata={'token_usage': {'completion_tokens': 109, 'prompt_tokens': 39, 'total_tokens': 148, 'completion_time': 0.198181818, 'prompt_time': 0.002275503, 'queue_time': 0.24867157699999998, 'total_time': 0.200457321}, 'model_name': 'Gemma2-9b-It', 'system_fingerprint': 'fp_10c08bf97d', 'finish_reason': 'stop', 'logprobs': None} id='run--02c521bd-9d01-4fda-adc1-29a018bfab56-0' usage_metadata={'input_tokens': 39, 'output_tokens': 109, 'total_tokens': 148}


['Here are 5 short captions (max 8 words each) for a car social video ad, using a hook and storytelling style:',
 '1. **Craving adventure? This car is ready.**',
 '2. **Escape the ordinary.  Hit the open road.**',
 '3. **One drive.  Endless possibilities.**',
 '4. **Where will your journey take you?**',
 '5. **Built for life.  Live it to the fullest.**',
 "Let me know if you'd like more options!"]

In [31]:
from gtts import gTTS

def voice_actor(state: dict) -> dict:
    captions = state["captions"]
    output_dir = "outputs/audio"
    os.makedirs(output_dir, exist_ok=True)
    audio_paths = []

    for i, caption in enumerate(captions):
        file_path = os.path.join(output_dir, f"caption_{i}.mp3")
        gTTS(text=caption, lang='en').save(file_path)
        audio_paths.append(file_path)

    return {**state, "audio_paths": audio_paths}


In [32]:
state = voice_actor(state)
state["audio_paths"]


['outputs/audio/caption_0.mp3',
 'outputs/audio/caption_1.mp3',
 'outputs/audio/caption_2.mp3',
 'outputs/audio/caption_3.mp3',
 'outputs/audio/caption_4.mp3',
 'outputs/audio/caption_5.mp3',
 'outputs/audio/caption_6.mp3']

In [33]:
import os
from huggingface_hub import InferenceClient

def graphic_designer(state: dict) -> dict:
    captions = state["captions"]
    output_dir = "outputs/images"
    os.makedirs(output_dir, exist_ok=True)
    image_paths = []

    client = InferenceClient(
        model="black-forest-labs/FLUX.1-dev",
        provider="nebius",
        api_key=os.environ["HF_API_TOKEN"]
    )

    for i, caption in enumerate(captions):
        prompt = f"{caption}, realistic style"
        try:
            image = client.text_to_image(prompt)
            img_path = os.path.join(output_dir, f"image_{i}.png")
            image.save(img_path)
            image_paths.append(img_path)
        except Exception as e:
            print(f"[✗] Error generating image for caption {i}: {e}")

    return {**state, "image_paths": image_paths}


In [34]:
state = graphic_designer(state)
state["image_paths"]


[✗] Error generating image for caption 0: 402 Client Error: Payment Required for url: https://router.huggingface.co/nebius/v1/images/generations (Request ID: Root=1-686d3dd0-7ff8d159738c9a047f68710f;89d9fed7-28eb-4f74-a285-888620f7a2ff)

You have exceeded your monthly included credits for Inference Providers. Subscribe to PRO to get 20x more monthly included credits.
[✗] Error generating image for caption 1: 402 Client Error: Payment Required for url: https://router.huggingface.co/nebius/v1/images/generations (Request ID: Root=1-686d3dd1-5e11c2591e63d6c720f26410;079c9070-35b8-4bb0-9416-4214f82c2810)

You have exceeded your monthly included credits for Inference Providers. Subscribe to PRO to get 20x more monthly included credits.
[✗] Error generating image for caption 2: 402 Client Error: Payment Required for url: https://router.huggingface.co/nebius/v1/images/generations (Request ID: Root=1-686d3dd1-73e7a610562a31bc6c14a8e9;48d44dd4-4f59-4afe-ae4d-9d98ec92bc63)

You have exceeded your

[]

In [35]:
import os
import ffmpeg
from IPython.display import Video, display

def director(state: dict) -> dict:
    
    base_dir = "outputs"
    images_dir = os.path.join(base_dir, "images")
    audio_dir = os.path.join(base_dir, "audio")
    final_dir = os.path.join(base_dir, "final")
    os.makedirs(final_dir, exist_ok=True)

    # List and sort files to align pairs by order
    image_files = sorted([f for f in os.listdir(images_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
    audio_files = sorted([f for f in os.listdir(audio_dir) if f.lower().endswith(('.mp3', '.wav', '.aac'))])

    video_paths = []

    # Process pairs - take min length to avoid out of index
    for i in range(min(len(image_files), len(audio_files))):
        image_path = os.path.join(images_dir, image_files[i])
        audio_path = os.path.join(audio_dir, audio_files[i])
        out_path = os.path.join(final_dir, f"clip_{i}.mp4")

        try:
            image_input = ffmpeg.input(image_path, loop=1, t=5)  # loop image for 5 seconds
            audio_input = ffmpeg.input(audio_path)

            (
                ffmpeg
                .output(image_input, audio_input, out_path, vcodec='libx264', acodec='aac', shortest=None)
                .run(overwrite_output=True, quiet=True)
            )
            video_paths.append(out_path)
            print(f"[✓] Created: {out_path}")
        except Exception as e:
            print(f"[✗] Error creating clip {i}:", e)

    if video_paths:
        display(Video(video_paths[0]))

    return {**state, "video_paths": video_paths}

# Run the function
state = {}
state = director(state)


[✓] Created: outputs/final/clip_0.mp4
[✓] Created: outputs/final/clip_1.mp4
[✓] Created: outputs/final/clip_2.mp4
[✓] Created: outputs/final/clip_3.mp4
[✓] Created: outputs/final/clip_4.mp4
[✓] Created: outputs/final/clip_5.mp4
[✓] Created: outputs/final/clip_6.mp4


In [36]:
state = director(state)


[✓] Created: outputs/final/clip_0.mp4
[✓] Created: outputs/final/clip_1.mp4
[✓] Created: outputs/final/clip_2.mp4
[✓] Created: outputs/final/clip_3.mp4
[✓] Created: outputs/final/clip_4.mp4
[✓] Created: outputs/final/clip_5.mp4
[✓] Created: outputs/final/clip_6.mp4


In [37]:
import ffmpeg
import os
from IPython.display import Video, display

def director(state: dict) -> dict:
    audio_paths = state["audio_paths"]
    image_paths = state["image_paths"]
    captions = state["captions"]

    final_dir = "outputs/final"
    os.makedirs(final_dir, exist_ok=True)
    video_paths = []

    for i, (audio, image, caption) in enumerate(zip(audio_paths, image_paths, captions)):
        out_path = os.path.join(final_dir, f"clip_{i}.mp4")
        try:
            image_input = ffmpeg.input(image, loop=1, t=5)
            audio_input = ffmpeg.input(audio)

            (
                ffmpeg
                .output(image_input, audio_input, out_path, vcodec='libx264', acodec='aac', strict='experimental', shortest=None)
                .run(overwrite_output=True, quiet=True)
            )
            video_paths.append(out_path)
            print(f"[✓] Created: {out_path}")
        except ffmpeg.Error as e:
            print(f"[✗] Error for clip {i}:", e)

    if video_paths:
        display(Video(video_paths[0]))

    return {**state, "video_paths": video_paths}


In [38]:
state = director(state)


KeyError: 'audio_paths'

In [23]:
import ffmpeg
import os

def merge_clips(video_paths, output_path="outputs/final/final_video.mp4"):

    if not video_paths:
        print("No video clips to merge.")
        return

    # Create temporary text file with all paths
    list_file = "outputs/final/merge_list.txt"
    with open(list_file, "w") as f:
        for path in video_paths:
            f.write(f"file '{os.path.abspath(path)}'\n")

    # Merge using ffmpeg concat demuxer
    try:
        (
            ffmpeg
            .input(list_file, format='concat', safe=0)
            .output(output_path, c='copy')
            .run(overwrite_output=True, quiet=True)
        )
        print(f"[✓] Merged into: {output_path}")
    except ffmpeg.Error as e:
        print("[✗] Merge error:", e)

    return output_path


In [24]:
merged_path = merge_clips(state["video_paths"])


[✓] Merged into: outputs/final/final_video.mp4


In [25]:
from IPython.display import Video, display
display(Video(merged_path))
