In [2]:
from crewai import Task, Agent, Crew, Process
from langchain.tools import tool
import re
import os
from langchain_groq import ChatGroq
llm = ChatGroq(model='llama3-70b-8192', temperature=1.0, max_tokens=2048)

In [3]:
from crewai_tools import SerperDevTool, WebsiteSearchTool, ScrapeWebsiteTool
search_tool = SerperDevTool()
web_rag_tool = WebsiteSearchTool()

In [4]:
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper

In [5]:
api_wrapper = WikipediaAPIWrapper(top_k_results=1)#, doc_content_chars_max=100)
# tool = WikipediaQueryRun(api_wrapper=api_wrapper)

In [6]:
from langchain_core.pydantic_v1 import BaseModel, Field


class WikiInputs(BaseModel):
    """Inputs to the wikipedia tool."""

    query: str = Field(
        description="query to look up in Wikipedia, should be 3 or less words"
    )

In [7]:
wiki_tool = WikipediaQueryRun(
    name="wiki-tool",
    description="{query:'input here'}",
    args_schema=WikiInputs,
    api_wrapper=api_wrapper,
    return_direct=True,
)

In [8]:
from langchain.tools import Tool

wiki = Tool(
    name = 'wikipedia',
    func = wiki_tool.run,
    description= "{query:'input here'}"
)

In [9]:
wiki_tool.run("latest news in India")

'Page: 2025 elections in India\nSummary: The 2025 elections in India are expected to include the elections of the Rajya Sabha and 1 state and 1 union territory legislative assemblies.\n\n'

In [10]:
@tool
def process_script(script):
    """Used to process the script into dictionary format"""
    dict = {}
    dict['text_for_image_generation'] = re.findall(r'<image>(.*?)<image>', script)
    dict['text_for_speech_generation'] = re.findall(r'<narration>.*?<narration>', script)
    return dict

In [11]:
script_agent = Agent(
    role='Senior Content Writer',
    goal='Craft engaging, concise, and informative narrations for YouTube short videos',
    backstory="""As a seasoned content writer, you excel at breaking down complex topics into captivating narratives that educate and entertain audiences. Your expertise lies in writing short, concise, attention-grabbing scripts for YouTube short videos.""",
    verbose=True,
    llm=llm,
    allow_delegation=False,
    tools=[wiki_tool]
)

image_descriptive_agent = Agent(
    role='Visual Storyteller',
    goal='Design stunning, contextually relevant visuals for YouTube short videos',
    backstory='With a keen eye for visual storytelling, you create compelling imagery that elevates the narrative and captivates the audience. You will ',
    verbose=True,
    llm=llm,
    allow_delegation=False
)

ScriptSynthesizer = Agent(
    role='Script Processor',
    goal='To process scripts by extracting text for image and speech generation',
    backstory='you are designed to assist in the automated creation of YouTube Shorts by processing scripts and preparing them for further steps in the content creation pipeline.',
    verbose=True,
    llm=llm,
    allow_delegation=False,
    tools=[process_script]
)

In [12]:
story_writing_task = Task(
    description='Write an engaging narration for a YouTube short video on the topic: {topic}. Use the provided tool if have less idea about the given topic.',
    expected_output="""A short paragraph suitable for narrating in five seconds also provides immensive experice to audience. Folow the below example for output length and format.

    **Example:**

    **topic:**
    Powerful Kings of History

    **narration:**
    In the pages of history, powerful kings have shaped the destinies of nations.
    From Alexander the Great to Genghis Khan, their conquests have etched unforgettable legacies across civilizations.
    Their leadership continues to inspire awe and fascination to this day.
    """,
    agent=script_agent
)

img_text_task = Task(
    description='Given the narration,visually describe each sentence in the narration which will be used as a prompt for an image generation.',
    expected_output="""Sentences encoded in <narration> and <image> tags. Follow the below example for output format. The output pairs should not be more than five.

    example input:
    In the pages of history, powerful kings have shaped the destinies of nations. From Alexander the Great to Genghis Khan, their conquests have etched unforgettable legacies across civilizations. Their leadership continues to inspire awe and fascination to this day.

    outupt format:
    <narration>In the pages of history, powerful kings have shaped the destinies of nations.<narration>
    <image>An epic portrayal of ancient kings standing triumphantly, clad in regal attire, commanding their kingdoms with strength and wisdom, amidst grandeur and splendor.<image>
    <narration>From Alexander the Great to Genghis Khan, their conquests have etched unforgettable legacies across civilizations.<narration>
    <image>Dramatic portraits of Alexander the Great and Genghis Khan, adorned in battle armor, leading their armies across vast landscapes and leaving a lasting mark on history.<image>
    <narration>Their leadership continues to inspire awe and fascination to this day.<narration>
    <image>A powerful visual of kings seated on thrones, symbols of authority and ambition, evoking admiration and wonder, against a backdrop of their enduring achievements.<image>
    """,
    agent=image_descriptive_agent,
    context=[story_writing_task]
)

process_script_task = Task(
    description="Extract text for image and speech generation from a provided script.",
    expected_output="A dictionary containing lists of texts for image generation and speech generation.",
    agent=ScriptSynthesizer
)

In [13]:
crew = Crew(
    agents=[script_agent, image_descriptive_agent, ScriptSynthesizer],
    tasks=[story_writing_task, img_text_task, process_script_task],
    process = Process.sequential,
    cache = True,
    # memory=True,
    verbose=2
)

result = crew.kickoff(inputs={'topic': 'AI Agents'})

[1m[95m [2024-07-03 16:43:53][DEBUG]: == Working Agent: Senior Content Writer[00m
[1m[95m [2024-07-03 16:43:53][INFO]: == Starting Task: Write an engaging narration for a YouTube short video on the topic: AI Agents. Use the provided tool if have less idea about the given topic.[00m


[1m> Entering new CrewAgentExecutor chain...[0m
[32;1m[1;3mI need to research and understand the concept of AI Agents to craft an engaging narration for the YouTube short video.

Action: wiki-tool
Action Input: {"query": "AI Agents"}[0m[95m 

Page: Intelligent agent
Summary: In intelligence and artificial intelligence, an intelligent agent (IA) is an agent acting in an intelligent manner. It perceives its environment, takes actions autonomously in order to achieve goals, and may improve its performance with learning or acquiring knowledge. An intelligent agent may be simple or complex: A thermostat or other control system is considered an example of an intelligent agent, as is a human being, as

In [17]:
result

"{'text_for_image_generation': ['A vibrant illustration of the Indian national flag waving in the wind, with ballot boxes, voting booths, and diverse citizens in the foreground, representing democracy in action, set against a bright and vibrant background symbolizing hope and progress.'], 'text_for_speech_generation': ['The 2025 elections in India are expected to include the elections of the Rajya Sabha and 1 state and 1 union territory legislative assemblies.']}"

In [8]:
type(result)

str

In [6]:
import os
from groq import Groq

client = Groq()
filename = os.path.dirname(__name__) + 'outputs/speeches/speech_0.mp3'

with open(filename, "rb") as file:
    transcription = client.audio.transcriptions.create(
      file=(filename, file.read()),
      model="whisper-large-v3",
      response_format="verbose_json",
    )
    print(transcription.text)

 Imagine a world where machines learn to understand human emotions, and relationships are redefined.


In [13]:
import os
import cv2
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from groq import Groq

client = Groq()

class VideoGeneration(BaseModel):
    images_dir: str = Field(description='Path to images directory, such as "outputs/images"')
    speeches_dir: str = Field(description='Path to speeches directory, such as "outputs/speeches"')

def add_captions_to_clip(clip, caption, font_path='C:/Users/prudh/Desktop/Python/Generative AI/GenerativeAI-Projects/speed/Montserrat-Bold.ttf', fontsize=40, fontcolor=(255, 255, 255), outline_thickness=2, outline_color=(0, 0, 0)):
    font = ImageFont.truetype(font_path, fontsize)
    width, height = clip.size

    def add_caption(get_frame, t):
        frame = get_frame(t)
        frame_pil = Image.fromarray(frame)
        draw = ImageDraw.Draw(frame_pil)

        text_width, text_height = draw.textsize(caption, font=font)
        text_x = (width - text_width) // 2
        text_y = height - text_height - 50  # 50 pixels from the bottom

        for dx in range(-outline_thickness, outline_thickness + 1):
            for dy in range(-outline_thickness, outline_thickness + 1):
                if dx != 0 or dy != 0:
                    draw.text((text_x + dx, text_y + dy), caption, font=font, fill=outline_color)
        
        draw.text((text_x, text_y), caption, font=font, fill=fontcolor)
        return np.array(frame_pil)

    return clip.fl(lambda gf, t: add_caption(gf, t), apply_to=['mask'])

def create_video_from_images_and_audio(images_dir, speeches_dir, zoom_factor=1.2):
    """Creates video using images and audios.
    Args:
    images_dir: path to images folder, example 'outputs/images'
    speeches_dir: path to speeches folder, example 'outputs/speeches'"""
    images_paths = sorted(os.listdir(images_dir))
    audio_paths = sorted(os.listdir(speeches_dir))
    clips = []
    
    for i in range(min(len(images_paths), len(audio_paths))):
        # Load the image
        img_clip = ImageClip(os.path.join(images_dir, images_paths[i]))
        
        # Load the audio file
        audioclip = AudioFileClip(os.path.join(speeches_dir, audio_paths[i]))
        
        # Set the duration of the video clip to the duration of the audio file
        videoclip = img_clip.set_duration(audioclip.duration)
        
        # Apply zoom-in effect to the video clip
        zoomed_clip = apply_zoom_in_effect(videoclip, zoom_factor)
        
        # Generate transcription for the audio file (caption)
        with open(os.path.join(speeches_dir, audio_paths[i]), "rb") as file:
            transcription = client.audio.transcriptions.create(
                file=(audio_paths[i], file.read()),
                model="whisper-large-v3",
                response_format="verbose_json",
            )
            caption = transcription.text
        
        # Add captions to the zoomed video clip
        captioned_clip = add_captions_to_clip(zoomed_clip, caption)
        
        # Add audio to the captioned video clip
        captioned_clip = captioned_clip.set_audio(audioclip)
        
        clips.append(captioned_clip)
    
    # Concatenate all video clips
    final_clip = concatenate_videoclips(clips)
    
    # Write the result to a file
    final_clip.write_videofile("outputs/final_video/final_video.mp4", codec='libx264', fps=24)
    
    return "outputs/final_video/final_video.mp4"

def apply_zoom_in_effect(clip, zoom_factor=1.2):
    width, height = clip.size
    duration = clip.duration

    def zoom_in_effect(get_frame, t):
        frame = get_frame(t)
        zoom = 1 + (zoom_factor - 1) * (t / duration)
        new_width, new_height = int(width * zoom), int(height * zoom)
        resized_frame = cv2.resize(frame, (new_width, new_height))
        
        # Calculate the position to crop the frame to the original size
        x_start = (new_width - width) // 2
        y_start = (new_height - height) // 2
        cropped_frame = resized_frame[y_start:y_start + height, x_start:x_start + width]
        
        return cropped_frame

    return clip.fl(zoom_in_effect, apply_to=['mask'])

# Example usage
image_paths = "outputs/images"
audio_paths = "outputs/speeches"

video_path = create_video_from_images_and_audio(image_paths, audio_paths)
print(f"Video created at: {video_path}")


Moviepy - Building video outputs/final_video/final_video.mp4.
MoviePy - Writing audio in final_videoTEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
Moviepy - Writing video outputs/final_video/final_video.mp4



                                                              

Moviepy - Done !
Moviepy - video ready outputs/final_video/final_video.mp4
Video created at: outputs/final_video/final_video.mp4


In [15]:
import os
import cv2
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from groq import Groq
from moviepy.editor import VideoFileClip

client = Groq()

class VideoGeneration(BaseModel):
    images_dir: str = Field(description='Path to images directory, such as "outputs/images"')
    speeches_dir: str = Field(description='Path to speeches directory, such as "outputs/speeches"')

def split_text_into_chunks(text, chunk_size):
    words = text.split()
    return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]

def add_text_to_video(input_video, output_video, text, duration=1, fontsize=40, fontcolor=(255, 255, 255),
                      outline_thickness=2, outline_color=(0, 0, 0),
                      font_path='C:\\Users\\prudh\\Downloads\\montserrat\\Montserrat\\Montserrat-Bold.ttf'):
    
    chunks = split_text_into_chunks(text, 3)  # Adjust chunk size as needed

    cap = cv2.VideoCapture(input_video)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    chunk_duration_frames = duration * fps

    font = ImageFont.truetype(font_path, fontsize)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        draw = ImageDraw.Draw(frame_pil)

        current_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        chunk_index = current_frame // chunk_duration_frames

        if chunk_index < len(chunks):
            chunk = chunks[chunk_index]
            text_width, text_height = draw.textsize(chunk, font=font)
            text_x = (width - text_width) // 2
            text_y = height - 300  # Position text at the bottom

            if text_width > width:
                words = chunk.split()
                half = len(words) // 2
                line1 = ' '.join(words[:half])
                line2 = ' '.join(words[half:])

                text_size_line1 = draw.textsize(line1, font=font)
                text_size_line2 = draw.textsize(line2, font=font)
                text_x_line1 = (width - text_size_line1[0]) // 2
                text_x_line2 = (width - text_size_line2[0]) // 2
                text_y = height - 250 - text_size_line1[1]  # Adjust vertical position for two lines

                for dx in range(-outline_thickness, outline_thickness + 1):
                    for dy in range(-outline_thickness, outline_thickness + 1):
                        if dx != 0 or dy != 0:
                            draw.text((text_x_line1 + dx, text_y + dy), line1, font=font, fill=outline_color)
                            draw.text((text_x_line2 + dx, text_y + text_size_line1[1] + dy), line2, font=font, fill=outline_color)
                
                draw.text((text_x_line1, text_y), line1, font=font, fill=fontcolor)
                draw.text((text_x_line2, text_y + text_size_line1[1]), line2, font=font, fill=fontcolor)

            else:
                for dx in range(-outline_thickness, outline_thickness + 1):
                    for dy in range(-outline_thickness, outline_thickness + 1):
                        if dx != 0 or dy != 0:
                            draw.text((text_x + dx, text_y + dy), chunk, font=font, fill=outline_color)
                
                draw.text((text_x, text_y), chunk, font=font, fill=fontcolor)

            frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)

        out.write(frame)

    cap.release()
    out.release()
    cv2.destroyAllWindows()

def apply_zoom_in_effect(clip, zoom_factor=1.2):
    width, height = clip.size
    duration = clip.duration

    def zoom_in_effect(get_frame, t):
        frame = get_frame(t)
        zoom = 1 + (zoom_factor - 1) * (t / duration)
        new_width, new_height = int(width * zoom), int(height * zoom)
        resized_frame = cv2.resize(frame, (new_width, new_height))
        
        x_start = (new_width - width) // 2
        y_start = (new_height - height) // 2
        cropped_frame = resized_frame[y_start:y_start + height, x_start:x_start + width]
        
        return cropped_frame

    return clip.fl(zoom_in_effect, apply_to=['mask'])

def create_video_from_images_and_audio(images_dir, speeches_dir, zoom_factor=1.2):
    images_paths = sorted(os.listdir(images_dir))
    audio_paths = sorted(os.listdir(speeches_dir))
    clips = []
    
    for i in range(min(len(images_paths), len(audio_paths))):
        img_clip = ImageClip(os.path.join(images_dir, images_paths[i]))
        audioclip = AudioFileClip(os.path.join(speeches_dir, audio_paths[i]))
        videoclip = img_clip.set_duration(audioclip.duration)
        zoomed_clip = apply_zoom_in_effect(videoclip, zoom_factor)
        
        with open(os.path.join(speeches_dir, audio_paths[i]), "rb") as file:
            transcription = client.audio.transcriptions.create(
                file=(audio_paths[i], file.read()),
                model="whisper-large-v3",
                response_format="verbose_json",
            )
            caption = transcription.text
        
        temp_video_path = f"temp_zoomed_{i}.mp4"
        zoomed_clip.write_videofile(temp_video_path, codec='libx264', fps=24)
        
        final_video_path = f"temp_captioned_{i}.mp4"
        add_text_to_video(temp_video_path, final_video_path, caption, duration=1)
        final_clip = VideoFileClip(final_video_path)
        final_clip = final_clip.set_audio(audioclip)
        
        clips.append(final_clip)
    
    final_clip = concatenate_videoclips(clips)
    final_clip.write_videofile("outputs/final_video/final_video.mp4", codec='libx264', fps=24)
    
    return "outputs/final_video/final_video.mp4"

# Example usage
image_paths = "outputs/images"
audio_paths = "outputs/speeches"

video_path = create_video_from_images_and_audio(image_paths, audio_paths)
print(f"Video created at: {video_path}")

Moviepy - Building video temp_zoomed_0.mp4.
Moviepy - Writing video temp_zoomed_0.mp4



                                                              

Moviepy - Done !
Moviepy - video ready temp_zoomed_0.mp4
Moviepy - Building video temp_zoomed_1.mp4.
Moviepy - Writing video temp_zoomed_1.mp4



                                                              

Moviepy - Done !
Moviepy - video ready temp_zoomed_1.mp4
Moviepy - Building video temp_zoomed_2.mp4.
Moviepy - Writing video temp_zoomed_2.mp4



                                                            

Moviepy - Done !
Moviepy - video ready temp_zoomed_2.mp4
Moviepy - Building video temp_zoomed_3.mp4.
Moviepy - Writing video temp_zoomed_3.mp4



                                                            

Moviepy - Done !
Moviepy - video ready temp_zoomed_3.mp4
Moviepy - Building video outputs/final_video/final_video.mp4.
MoviePy - Writing audio in final_videoTEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
Moviepy - Writing video outputs/final_video/final_video.mp4



                                                               

Moviepy - Done !
Moviepy - video ready outputs/final_video/final_video.mp4
Video created at: outputs/final_video/final_video.mp4


In [2]:
import os
import cv2
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips, VideoFileClip
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from groq import Groq

client = Groq()

# class VideoGeneration(BaseModel):
#     images_dir: str = Field(description='Path to images directory, such as "outputs/images"')
#     speeches_dir: str = Field(description='Path to speeches directory, such as "outputs/speeches"')

def split_text_into_chunks(text, chunk_size):
    words = text.split()
    return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]

def add_text_to_video(input_video, output_video, text, duration=1, fontsize=40, fontcolor=(255, 255, 255),
                      outline_thickness=2, outline_color=(0, 0, 0),
                      font_path='C:\\Users\\prudh\\Downloads\\montserrat\\Montserrat\\Montserrat-Bold.ttf'):
    
    chunks = split_text_into_chunks(text, 3)  # Adjust chunk size as needed

    cap = cv2.VideoCapture(input_video)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    chunk_duration_frames = duration * fps

    font = ImageFont.truetype(font_path, fontsize)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        draw = ImageDraw.Draw(frame_pil)

        current_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        chunk_index = current_frame // chunk_duration_frames

        if chunk_index < len(chunks):
            chunk = chunks[chunk_index]
            text_width, text_height = draw.textsize(chunk, font=font)
            text_x = (width - text_width) // 2
            text_y = height - 300  # Position text at the bottom

            if text_width > width:
                words = chunk.split()
                half = len(words) // 2
                line1 = ' '.join(words[:half])
                line2 = ' '.join(words[half:])

                text_size_line1 = draw.textsize(line1, font=font)
                text_size_line2 = draw.textsize(line2, font=font)
                text_x_line1 = (width - text_size_line1[0]) // 2
                text_x_line2 = (width - text_size_line2[0]) // 2
                text_y = height - 250 - text_size_line1[1]  # Adjust vertical position for two lines

                for dx in range(-outline_thickness, outline_thickness + 1):
                    for dy in range(-outline_thickness, outline_thickness + 1):
                        if dx != 0 or dy != 0:
                            draw.text((text_x_line1 + dx, text_y + dy), line1, font=font, fill=outline_color)
                            draw.text((text_x_line2 + dx, text_y + text_size_line1[1] + dy), line2, font=font, fill=outline_color)
                
                draw.text((text_x_line1, text_y), line1, font=font, fill=fontcolor)
                draw.text((text_x_line2, text_y + text_size_line1[1]), line2, font=font, fill=fontcolor)

            else:
                for dx in range(-outline_thickness, outline_thickness + 1):
                    for dy in range(-outline_thickness, outline_thickness + 1):
                        if dx != 0 or dy != 0:
                            draw.text((text_x + dx, text_y + dy), chunk, font=font, fill=outline_color)
                
                draw.text((text_x, text_y), chunk, font=font, fill=fontcolor)

            frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)

        out.write(frame)

    cap.release()
    out.release()
    cv2.destroyAllWindows()

def apply_zoom_in_effect(clip, zoom_factor=1.2):
    width, height = clip.size
    duration = clip.duration

    def zoom_in_effect(get_frame, t):
        frame = get_frame(t)
        zoom = 1 + (zoom_factor - 1) * (t / duration)
        new_width, new_height = int(width * zoom), int(height * zoom)
        resized_frame = cv2.resize(frame, (new_width, new_height))
        
        x_start = (new_width - width) // 2
        y_start = (new_height - height) // 2
        cropped_frame = resized_frame[y_start:y_start + height, x_start:x_start + width]
        
        return cropped_frame

    return clip.fl(zoom_in_effect, apply_to=['mask'])

def create_video_from_images_and_audio(images_dir, speeches_dir, zoom_factor=1.2):
    images_paths = sorted(os.listdir(images_dir))
    audio_paths = sorted(os.listdir(speeches_dir))
    clips = []
    temp_files = []
    
    for i in range(min(len(images_paths), len(audio_paths))):
        img_clip = ImageClip(os.path.join(images_dir, images_paths[i]))
        audioclip = AudioFileClip(os.path.join(speeches_dir, audio_paths[i]))
        videoclip = img_clip.set_duration(audioclip.duration)
        zoomed_clip = apply_zoom_in_effect(videoclip, zoom_factor)
        
        with open(os.path.join(speeches_dir, audio_paths[i]), "rb") as file:
            transcription = client.audio.transcriptions.create(
                file=(audio_paths[i], file.read()),
                model="whisper-large-v3",
                response_format="verbose_json",
            )
            caption = transcription.text
        
        temp_video_path = f"temp_zoomed_{i}.mp4"
        zoomed_clip.write_videofile(temp_video_path, codec='libx264', fps=24)
        temp_files.append(temp_video_path)
        
        final_video_path = f"temp_captioned_{i}.mp4"
        add_text_to_video(temp_video_path, final_video_path, caption, duration=1)
        temp_files.append(final_video_path)
        
        final_clip = VideoFileClip(final_video_path)
        final_clip = final_clip.set_audio(audioclip)
        
        clips.append(final_clip)
    
    final_clip = concatenate_videoclips(clips)
    final_clip.write_videofile("outputs/final_video/final_video.mp4", codec='libx264', fps=24)
    
    # Close all video files properly
    for clip in clips:
        clip.close()
        
    # Remove all temporary files
    for temp_file in temp_files:
        try:
            os.remove(temp_file)
        except Exception as e:
            print(f"Error removing file {temp_file}: {e}")
    
    return "outputs/final_video/final_video.mp4"

# Example usage
image_paths = "outputs/images"
audio_paths = "outputs/speeches"

video_path = create_video_from_images_and_audio(image_paths, audio_paths)
print(f"Video created at: {video_path}")


Moviepy - Building video temp_zoomed_0.mp4.
Moviepy - Writing video temp_zoomed_0.mp4



                                                              

Moviepy - Done !
Moviepy - video ready temp_zoomed_0.mp4
Moviepy - Building video temp_zoomed_1.mp4.
Moviepy - Writing video temp_zoomed_1.mp4



                                                              

Moviepy - Done !
Moviepy - video ready temp_zoomed_1.mp4
Moviepy - Building video temp_zoomed_2.mp4.
Moviepy - Writing video temp_zoomed_2.mp4



                                                            

Moviepy - Done !
Moviepy - video ready temp_zoomed_2.mp4
Moviepy - Building video temp_zoomed_3.mp4.
Moviepy - Writing video temp_zoomed_3.mp4



                                                            

Moviepy - Done !
Moviepy - video ready temp_zoomed_3.mp4
Moviepy - Building video outputs/final_video/final_video.mp4.
MoviePy - Writing audio in final_videoTEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
Moviepy - Writing video outputs/final_video/final_video.mp4



                                                               

Moviepy - Done !
Moviepy - video ready outputs/final_video/final_video.mp4
Video created at: outputs/final_video/final_video.mp4


In [5]:
import os
import cv2
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips, VideoFileClip
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from groq import Groq

client = Groq()

# class VideoGeneration(BaseModel):
#     images_dir: str = Field(description='Path to images directory, such as "outputs/images"')
#     speeches_dir: str = Field(description='Path to speeches directory, such as "outputs/speeches"')

def split_text_into_chunks(text, chunk_size):
    words = text.split()
    return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]

def add_text_to_video(input_video, output_video, text, duration=1, fontsize=40, fontcolor=(255, 255, 255),
                      outline_thickness=2, outline_color=(0, 0, 0), delay_between_chunks=0.2,
                      font_path='C:\\Users\\prudh\\Downloads\\montserrat\\Montserrat\\Montserrat-Bold.ttf'):
    
    chunks = split_text_into_chunks(text, 3)  # Adjust chunk size as needed

    cap = cv2.VideoCapture(input_video)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    chunk_duration_frames = duration * fps
    delay_frames = int(delay_between_chunks * fps)

    font = ImageFont.truetype(font_path, fontsize)

    current_frame = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        draw = ImageDraw.Draw(frame_pil)

        chunk_index = current_frame // (chunk_duration_frames + delay_frames)

        if current_frame % (chunk_duration_frames + delay_frames) < chunk_duration_frames and chunk_index < len(chunks):
            chunk = chunks[chunk_index]
            text_width, text_height = draw.textsize(chunk, font=font)
            text_x = (width - text_width) // 2
            text_y = height - 300  # Position text at the bottom

            if text_width > width:
                words = chunk.split()
                half = len(words) // 2
                line1 = ' '.join(words[:half])
                line2 = ' '.join(words[half:])

                text_size_line1 = draw.textsize(line1, font=font)
                text_size_line2 = draw.textsize(line2, font=font)
                text_x_line1 = (width - text_size_line1[0]) // 2
                text_x_line2 = (width - text_size_line2[0]) // 2
                text_y = height - 250 - text_size_line1[1]  # Adjust vertical position for two lines

                for dx in range(-outline_thickness, outline_thickness + 1):
                    for dy in range(-outline_thickness, outline_thickness + 1):
                        if dx != 0 or dy != 0:
                            draw.text((text_x_line1 + dx, text_y + dy), line1, font=font, fill=outline_color)
                            draw.text((text_x_line2 + dx, text_y + text_size_line1[1] + dy), line2, font=font, fill=outline_color)
                
                draw.text((text_x_line1, text_y), line1, font=font, fill=fontcolor)
                draw.text((text_x_line2, text_y + text_size_line1[1]), line2, font=font, fill=fontcolor)

            else:
                for dx in range(-outline_thickness, outline_thickness + 1):
                    for dy in range(-outline_thickness, outline_thickness + 1):
                        if dx != 0 or dy != 0:
                            draw.text((text_x + dx, text_y + dy), chunk, font=font, fill=outline_color)
                
                draw.text((text_x, text_y), chunk, font=font, fill=fontcolor)

            frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)

        out.write(frame)
        current_frame += 1

    cap.release()
    out.release()
    cv2.destroyAllWindows()

def apply_zoom_in_effect(clip, zoom_factor=1.2):
    width, height = clip.size
    duration = clip.duration

    def zoom_in_effect(get_frame, t):
        frame = get_frame(t)
        zoom = 1 + (zoom_factor - 1) * (t / duration)
        new_width, new_height = int(width * zoom), int(height * zoom)
        resized_frame = cv2.resize(frame, (new_width, new_height))
        
        x_start = (new_width - width) // 2
        y_start = (new_height - height) // 2
        cropped_frame = resized_frame[y_start:y_start + height, x_start:x_start + width]
        
        return cropped_frame

    return clip.fl(zoom_in_effect, apply_to=['mask'])

def create_video_from_images_and_audio(images_dir, speeches_dir, zoom_factor=1.2):
    images_paths = sorted(os.listdir(images_dir))
    audio_paths = sorted(os.listdir(speeches_dir))
    clips = []
    temp_files = []
    
    for i in range(min(len(images_paths), len(audio_paths))):
        img_clip = ImageClip(os.path.join(images_dir, images_paths[i]))
        audioclip = AudioFileClip(os.path.join(speeches_dir, audio_paths[i]))
        videoclip = img_clip.set_duration(audioclip.duration)
        zoomed_clip = apply_zoom_in_effect(videoclip, zoom_factor)
        
        with open(os.path.join(speeches_dir, audio_paths[i]), "rb") as file:
            transcription = client.audio.transcriptions.create(
                file=(audio_paths[i], file.read()),
                model="whisper-large-v3",
                response_format="verbose_json",
            )
            caption = transcription.text
        
        temp_video_path = f"temp_zoomed_{i}.mp4"
        zoomed_clip.write_videofile(temp_video_path, codec='libx264', fps=24)
        temp_files.append(temp_video_path)
        
        final_video_path = f"temp_captioned_{i}.mp4"
        add_text_to_video(temp_video_path, final_video_path, caption, duration=1)
        temp_files.append(final_video_path)
        
        final_clip = VideoFileClip(final_video_path)
        final_clip = final_clip.set_audio(audioclip)
        
        clips.append(final_clip)
    
    final_clip = concatenate_videoclips(clips)
    final_clip.write_videofile("outputs/final_video/final_video.mp4", codec='libx264', fps=24)
    
    # Close all video files properly
    for clip in clips:
        clip.close()
        
    # Remove all temporary files
    for temp_file in temp_files:
        try:
            os.remove(temp_file)
        except Exception as e:
            print(f"Error removing file {temp_file}: {e}")
    
    return "outputs/final_video/final_video.mp4"

# Example usage
image_paths = "outputs/images"
audio_paths = "outputs/speeches"

video_path = create_video_from_images_and_audio(image_paths, audio_paths)
print(f"Video created at: {video_path}")

Moviepy - Building video temp_zoomed_0.mp4.
Moviepy - Writing video temp_zoomed_0.mp4



                                                              

Moviepy - Done !
Moviepy - video ready temp_zoomed_0.mp4
Moviepy - Building video temp_zoomed_1.mp4.
Moviepy - Writing video temp_zoomed_1.mp4



                                                              

Moviepy - Done !
Moviepy - video ready temp_zoomed_1.mp4
Moviepy - Building video temp_zoomed_2.mp4.
Moviepy - Writing video temp_zoomed_2.mp4



                                                            

Moviepy - Done !
Moviepy - video ready temp_zoomed_2.mp4
Moviepy - Building video temp_zoomed_3.mp4.
Moviepy - Writing video temp_zoomed_3.mp4



                                                            

Moviepy - Done !
Moviepy - video ready temp_zoomed_3.mp4
Moviepy - Building video outputs/final_video/final_video.mp4.
MoviePy - Writing audio in final_videoTEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
Moviepy - Writing video outputs/final_video/final_video.mp4



                                                               

Moviepy - Done !
Moviepy - video ready outputs/final_video/final_video.mp4
Video created at: outputs/final_video/final_video.mp4
