In [1]:
#Require Python 3.12.3
%pip freeze > requirements.txt

Note: you may need to restart the kernel to use updated packages.


# Setup

In [2]:
import cv2
import numpy as np
from PIL import Image
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip
import tempfile
import os

from diffusers import StableDiffusionPipeline
import torch
from pathlib import Path

import numpy as np

In [3]:
assts_dir= Path(os.getcwd())/"assets"


# classes

In [4]:

class VideoEditor:
    def __init__(self, fps=30, fourcc='mp4v', frame_size=(640, 480)):
        self.fps = fps
        self.frame_size = frame_size
        self.fourcc = cv2.VideoWriter_fourcc(*fourcc)
        self.temp_video_path = tempfile.mktemp(suffix='.mp4')
        self.video_writer = cv2.VideoWriter(self.temp_video_path, self.fourcc, self.fps, self.frame_size)
        self.current_time = 0  # in seconds, tracks the current duration of the video content
        self.audio_clips = []

    def _resize_frame(self, frame):
        """
        Resizes an image frame to the video's frame_size.
        """
        return cv2.resize(frame, self.frame_size)

    def _convert_to_cv2(self, image):
        """
        Converts various image types (path, Pillow, NumPy array) to an OpenCV image (NumPy array)
        and resizes it.
        """
        if isinstance(image, str):
            # Read image from file path
            img = cv2.imread(image)
            if img is None:
                raise FileNotFoundError(f"Image file not found or could not be read: {image}")
        elif isinstance(image, Image.Image):
            # Convert Pillow Image to OpenCV format (RGB to BGR)
            img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
        elif isinstance(image, np.ndarray):
            # Assume it's already an OpenCV image (NumPy array)
            img = image
        else:
            raise ValueError("Unsupported image type. Must be a file path (string), Pillow Image, or NumPy array.")
        return self._resize_frame(img)

    def add_image(self, image, duration_sec):
        """
        Adds a single image to the video for a specified duration.

        Args:
            image: The image to add. Can be a file path (string), a Pillow Image object,
                   or an OpenCV image (NumPy array).
            duration_sec (float): The duration (in seconds) for which the image should be displayed.
        """
        frame = self._convert_to_cv2(image)
        frame_count = int(self.fps * duration_sec)
        for _ in range(frame_count):
            self.video_writer.write(frame)
        self.current_time += duration_sec

    def add_images_from_list(self, images, total_duration_sec):
        """
        Adds several images to the video, distributing them evenly over a total duration.

        Args:
            images: Can be a string (directory path), a list of strings (image file paths),
                    a list of OpenCV images (numpy arrays), or a list of Pillow images.
            total_duration_sec (float): The total duration (in seconds) that these images
                                        should occupy in the video.
        """
        image_list = []

        if isinstance(images, str) and os.path.isdir(images):
            # If a directory is provided, read all supported image files from it
            for filename in sorted(os.listdir(images)):
                if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
                    image_list.append(os.path.join(images, filename))
        elif isinstance(images, list):
            # If a list is provided, check its contents
            if all(isinstance(img, str) for img in images):
                # List of image file paths
                image_list = images
            elif all(isinstance(img, (np.ndarray, Image.Image)) for img in images):
                # List of OpenCV or Pillow image objects
                image_list = images
            else:
                raise ValueError("List must contain only strings (paths), OpenCV images, or Pillow images.")
        else:
            raise ValueError("Unsupported 'images' type. Must be a directory path (string), a list of paths, a list of OpenCV images, or a list of Pillow images.")

        if not image_list:
            print("No images found to add.")
            return

        # Calculate duration for each individual image
        single_image_duration = total_duration_sec / len(image_list)

        # Add each image using the existing add_image method
        for img in image_list:
            self.add_image(img, single_image_duration)

    def add_video(self, video_path):
        """
        Adds another video clip to the current video. The added video retains its original duration.

        Args:
            video_path (str): Path to the video file to be added.
        """
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise FileNotFoundError(f"Video file not found or could not be opened: {video_path}")

        video_fps = cap.get(cv2.CAP_PROP_FPS)
        if video_fps == 0: # Handle case where FPS might be reported as 0
            video_fps = self.fps

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = self._resize_frame(frame)
            self.video_writer.write(frame)

        duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / video_fps
        cap.release()
        self.current_time += duration

    def add_audio(self, audio_path, audio_clip_start=None, audio_clip_end=None, video_start_offset=None):
        """
        Adds an audio clip to the video timeline.

        Args:
            audio_path (str): Path to the audio file.
            audio_clip_start (float, optional): The start time (in seconds) within the audio file itself.
                                                Defaults to 0 (beginning of the audio file).
            audio_clip_end (float, optional): The end time (in seconds) within the audio file itself.
                                              Defaults to the end of the audio clip.
            video_start_offset (float, optional): The time (in seconds) on the video timeline where this
                                                  audio should start. If None, it starts at the current
                                                  end time of the video (`self.current_time`).
        """
        try:
            audio_clip = AudioFileClip(audio_path)
        except Exception as e:
            raise ValueError(f"Could not load audio file {audio_path}: {e}")

        # Subclip the audio file based on audio_clip_start and audio_clip_end
        if audio_clip_start is not None or audio_clip_end is not None:
            # Ensure start is not greater than end if both are provided
            if audio_clip_start is not None and audio_clip_end is not None and audio_clip_start > audio_clip_end:
                raise ValueError("audio_clip_start cannot be greater than audio_clip_end.")
            
            # Use 0 if audio_clip_start is None, and audio_clip.duration if audio_clip_end is None
            start_subclip = audio_clip_start if audio_clip_start is not None else 0
            end_subclip = audio_clip_end if audio_clip_end is not None else audio_clip.duration
            
            audio_clip = audio_clip.subclip(start_subclip, end_subclip)

        # Determine the offset on the video timeline
        offset_on_video = video_start_offset if video_start_offset is not None else self.current_time

        self.audio_clips.append((audio_clip, offset_on_video))

    def get_video_duration(self):
        """
        Returns the current duration of the video content in seconds.
        """
        return self.current_time

    def save(self, output_path):
        """
        Finalizes the video and merges audio if present.
        """
        self.video_writer.release()

        final_clip = VideoFileClip(self.temp_video_path)

        if self.audio_clips:
            all_audios = []
            for audio, offset in self.audio_clips:
                all_audios.append(audio.set_start(offset))
            
            # Create a composite audio clip, ensuring its duration doesn't exceed the video's duration
            composite_audio = CompositeAudioClip(all_audios)
            
            # Set the audio to the final video clip. MoviePy will automatically trim the audio
            # to the length of the video clip if the audio is longer.
            final_clip = final_clip.set_audio(composite_audio)

        print(f"Saving video to {output_path}...")
        final_clip.write_videofile(output_path, codec='libx264', audio_codec='aac')
        final_clip.close()
        os.remove(self.temp_video_path)
        print("Video saved successfully and temporary file removed.")



In [5]:
class SD15ImageGenerator:
    def __init__(self, model_id="runwayml/stable-diffusion-v1-5", use_cuda=True, num_inference_steps=25):
        """
        Initialize the Stable Diffusion 1.5 pipeline and inference settings.
        """
        self.device = "cuda" if use_cuda and torch.cuda.is_available() else "cpu"
        self.num_inference_steps = num_inference_steps
        self.intermediate_images = []

        self.pipe = StableDiffusionPipeline.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
            safety_checker=None
        ).to(self.device)

    def _capture_step(self, step, timestep, latents):
        """
        Internal callback to capture the image at each step.
        """
        # Decode latent to image at this step
        with torch.no_grad():
            image = self.pipe.vae.decode(latents / self.pipe.vae.config.scaling_factor).sample
            image = (image / 2 + 0.5).clamp(0, 1)
            image = image.cpu().permute(0, 2, 3, 1).numpy()[0]
            image_pil = Image.fromarray((image * 255).astype("uint8"))
            self.intermediate_images.append(image_pil)

    def generate_image(self, prompt, negative_prompt=None, guidance_scale=7.5):
        """
        Generate image and collect intermediate steps.
        Returns a list of PIL images (one per step).
        """
        self.intermediate_images = []

        with torch.autocast(self.device) if self.device == "cuda" else torch.no_grad():
            _ = self.pipe(
                prompt=prompt,
                negative_prompt=negative_prompt,
                guidance_scale=guidance_scale,
                num_inference_steps=self.num_inference_steps,
                callback=self._capture_step,
                callback_steps=1  # capture every step
            )

        return self.intermediate_images

    def save_image(self, image: Image.Image, output_path: str):
        """
        Save a single PIL image to the specified path.
        """
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        image.save(output_path)
        print(f"Image saved to {output_path}")
    def save_images(self, images, directory="generated"):
        """
        Save a list of images to the given directory.
        """
        os.makedirs(directory, exist_ok=True)
        for i, img in enumerate(images):
            path = os.path.join(directory, f"step_{i:02d}.png")
            img.save(path)
        print(f"Saved {len(images)} images to '{directory}/'")


# Generator


In [6]:
generator = SD15ImageGenerator(num_inference_steps=100)


Loading pipeline components...:   0%|          | 0/6 [00:00<?, ?it/s]

You have disabled the safety checker for <class 'diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline'> by passing `safety_checker=None`. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .


In [7]:

prompt = "a fantasy castle floating in the sky, vivid colors, highly detailed"
    
images = generator.generate_image(prompt)


  0%|          | 0/100 [00:00<?, ?it/s]

In [8]:


def generate_evenly_distributed_values(data):
    """
    Generates evenly distributed values for each tuple (number, start, end) in a list.

    Args:
        data (list): A list of tuples, where each tuple is (number, start, end).
                     'number' is the count of values to generate, 'start' is the
                     beginning of the range, and 'end' is the end of the range.

    Returns:
        list: A single list containing all the generated evenly distributed values.
    """
    all_values = []
    for num, start, end in data:
        # Generate 'num' evenly distributed values between 'start' and 'end'
        # np.linspace includes both start and end points
        if num > 0:
            generated_values = np.linspace(start, end, num).tolist()
            all_values.extend(generated_values)
    return all_values

# Example Usage:
# data1 = [(5, 0, 10), (3, 100, 102)]
# result1 = generate_evenly_distributed_values(data1)
# print(f"Result for data1: {result1}")
# # Expected output for data1: [0.0, 2.5, 5.0, 7.5, 10.0, 100.0, 101.0, 102.0]

# data2 = [(1, 5, 5), (4, -2, 2)]
# result2 = generate_evenly_distributed_values(data2)
# print(f"Result for data2: {result2}")
# # Expected output for data2: [5.0, -2.0, -0.6666666666666666, 0.6666666666666666, 2.0]

# data3 = []
# result3 = generate_evenly_distributed_values(data3)
# print(f"Result for data3: {result3}")
# # Expected output for data3: []

# data4 = [(0, 1, 10)]
# result4 = generate_evenly_distributed_values(data4)
# print(f"Result for data4: {result4}")
# # Expected output for data4: []


In [9]:

# Example Usage:
speed_stop=0.5
#speed_distribution = [(40, 1/30, speed_stop), (40, speed_stop, speed_stop), (19, speed_stop, 1/30)]
speed_distribution = [(40, 1/60, 1/60), (30,1/30, speed_stop),(20, speed_stop, speed_stop), (9, speed_stop, 1/30)]
result1 = generate_evenly_distributed_values(speed_distribution)
result1.append(3)
print(f"Result for data1: {result1}")
print(f"len {len(result1)}, sum {sum(result1)}" )
# # Expected output for data1: [0.0, 2.5, 5.0, 7.5, 10.0, 100.0, 101.0, 102.0]

Result for data1: [0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.016666666666666666, 0.03333333333333333, 0.04942528735632184, 0.06551724137931034, 0.08160919540229886, 0.097701149425287

In [13]:
editor = VideoEditor(fps=30, frame_size=(512, 512))
generator.save_images(images, directory="generated")
#editor.add_images_from_list(images, duration_sec=30)
if(len(result1) != len(images   )):
    print("time_duration and number of images are not matched.")
logo=Image.open(str(assts_dir/"AiArtStudio.AILogo.png"))
editor.add_image(logo, 3)  # Add logo for 3 seconds

for index, duration in enumerate(result1):
   img = images[index] 
   editor.add_image(img, duration)

editor.add_audio(str(assts_dir/"Long Distance.mp3"),audio_clip_end=editor.get_video_duration(),video_start_offset=0)  # Add audio starting at the beginning of the video
#editor.add_image(images[-1],3)  # Add last image for 3 seconds
editor.save("output_video1.mp4")

Saved 100 images to 'generated/'
Saving video to output_video1.mp4...
Moviepy - Building video output_video1.mp4.
MoviePy - Writing audio in output_video1TEMP_MPY_wvf_snd.mp4


                                                                   

MoviePy - Done.
Moviepy - Writing video output_video1.mp4



                                                                

Moviepy - Done !
Moviepy - video ready output_video1.mp4
Video saved successfully and temporary file removed.
