# Amazon Nova Reel Demo

### Primero vamos a importar las librerías

In [None]:
import random
import time
import boto3

import base64

### Un ejemplo de como generar un video usando Bedrock `start_async_invoke()` (de boto3) a partir de **texto**
Se requiere especificar la región, el bucket S3 de destino donde se almacenará el video y un prompt
Actualmente la duración es de 6 segundos únicamente, con 24 fps, dimensiones de 1280x720. Se puede especificar una semilla para reproducir resultados.

Toma alrededor de 5 min generar el video

In [None]:
AWS_REGION = "us-east-1"
MODEL_ID = "amazon.nova-reel-v1:0"
SLEEP_TIME = 30
S3_DESTINATION_BUCKET = "bedrock-video-generation-us-east-1-y5s9fj"

video_prompt = "Closeup of a large seashell in the sand. Gentle waves flow all around the shell. Sunset light. Camera zoom in very close."

bedrock_runtime = boto3.client("bedrock-runtime", region_name=AWS_REGION)
model_input = {
    "taskType": "TEXT_VIDEO",
    "textToVideoParams": {"text": video_prompt},
    "videoGenerationConfig": {
        "durationSeconds": 6,
        "fps": 24,
        "dimension": "1280x720",
        "seed": random.randint(0, 2147483648)
    }
}

invocation = bedrock_runtime.start_async_invoke(
    modelId=MODEL_ID,
    modelInput=model_input,
    outputDataConfig={"s3OutputDataConfig": {"s3Uri": f"s3://{S3_DESTINATION_BUCKET}"}}
)

invocation_arn = invocation["invocationArn"]
s3_prefix = invocation_arn.split('/')[-1]
s3_location = f"s3://{S3_DESTINATION_BUCKET}/{s3_prefix}"
print(f"\nS3 URI: {s3_location}")

while True:
    response = bedrock_runtime.get_async_invoke(
        invocationArn=invocation_arn
    )
    status = response["status"]
    print(f"Status: {status}")
    if status != "InProgress":
        break
    time.sleep(SLEEP_TIME)

if status == "Completed":
    print(f"\nVideo is ready at {s3_location}/output.mp4")
else:
    print(f"\nVideo generation status: {status}")

Podemos descargar el video desde s3 hacia nuestro workspace con:

In [None]:
!aws s3 cp s3://{S3_DESTINATION_BUCKET}/{s3_prefix}/output.mp4 ./output-from-text.mp4

Con la función `Video` podemos tener una visualización en los jupyternotebooks de los videos que generamos

In [None]:
from IPython.display import Video

Video("output-from-text.mp4")

### Un ejemplo de como generar un video usando Bedrock `start_async_invoke()` (de boto3) a partir de **texto e imagen**

Para este ejemplo adicionalmente requerimos de especificar una imágen a partir de su representación en bytes

In [None]:
input_image_path = "seascape.png"
video_prompt = "drone view flying over a coastal landscape"

# Load the input image as a Base64 string.
with open(input_image_path, "rb") as f:
    input_image_bytes = f.read()
    input_image_base64 = base64.b64encode(input_image_bytes).decode("utf-8")

model_input = {
    "taskType": "TEXT_VIDEO",
    "textToVideoParams": {
        "text": video_prompt,
        "images": [{ "format": "png", "source": { "bytes": input_image_base64 } }]
        },
    "videoGenerationConfig": {
        "durationSeconds": 6,
        "fps": 24,
        "dimension": "1280x720",
        "seed": random.randint(0, 2147483648)
    }
}

invocation = bedrock_runtime.start_async_invoke(
    modelId=MODEL_ID,
    modelInput=model_input,
    outputDataConfig={"s3OutputDataConfig": {"s3Uri": f"s3://{S3_DESTINATION_BUCKET}"}}
)

invocation_arn = invocation["invocationArn"]
s3_prefix = invocation_arn.split('/')[-1]
s3_location = f"s3://{S3_DESTINATION_BUCKET}/{s3_prefix}"

print(f"\nS3 URI: {s3_location}")

while True:
    response = bedrock_runtime.get_async_invoke(
        invocationArn=invocation_arn
    )
    status = response["status"]
    print(f"Status: {status}")
    if status != "InProgress":
        break
    time.sleep(SLEEP_TIME)
if status == "Completed":
    print(f"\nVideo is ready at {s3_location}/output.mp4")
else:
    print(f"\nVideo generation status: {status}")

Descargamos el video desde s3 hacia nuestro workspace:

In [None]:
!aws s3 cp {s3_location}/output.mp4 ./output.mp4

Visualización del vídeo:

In [None]:
Video("output.mp4")

In [10]:
import random
import time
import boto3

import base64

from IPython.display import Video

import os


In [11]:
AWS_REGION = "us-east-1"
bedrock_runtime = boto3.client("bedrock-runtime", region_name=AWS_REGION)

### Aqui presentamos un función para generalizar la generación de video a partir de texto y texto + imágen
En los parametros encontramos el prompt, nombre del archivo final, ruta de la imágen a utilizar (en caso de ser requerida), bucket de S3 destino y una opción para sobreescribir.
Si el archivo ya existe en el espacio de trabajo, la función no generará otro video a menos que utilicemos la opción *overwrite*

In [12]:
def generate_video(
        video_prompt,
        output_name = "output",
        input_image_path = "",
        S3_DESTINATION_BUCKET = "bedrock-video-generation-us-east-1-y5s9fj",
        AWS_REGION = "us-east-1",
        overwrite=False,
):
    
    MODEL_ID = "amazon.nova-reel-v1:0"
    SLEEP_TIME = 30

    if input_image_path != "" and os.path.isfile(input_image_path):
        with open(input_image_path, "rb") as f:
            input_image_bytes = f.read()
            input_image_base64 = base64.b64encode(input_image_bytes).decode("utf-8")

        model_input = {
            "taskType": "TEXT_VIDEO",
            "textToVideoParams": {
                "text": video_prompt,
                "images": [{ "format": "png", "source": { "bytes": input_image_base64 } }]
                },
            "videoGenerationConfig": {
                "durationSeconds": 6,
                "fps": 24,
                "dimension": "1280x720",
                "seed": random.randint(0, 2147483648)
            }
        }
    else:

        model_input = {
            "taskType": "TEXT_VIDEO",
            "textToVideoParams": {"text": video_prompt},
            "videoGenerationConfig": {
                "durationSeconds": 6,
                "fps": 24,
                "dimension": "1280x720",
                "seed": random.randint(0, 2147483648)
            }
        }

    if os.path.isfile(f"{output_name}.mp4") and not overwrite:
        print("File exists")
    else:
        print("File does not exist")

        invocation = bedrock_runtime.start_async_invoke(
            modelId=MODEL_ID,
            modelInput=model_input,
            outputDataConfig={"s3OutputDataConfig": {"s3Uri": f"s3://{S3_DESTINATION_BUCKET}"}}
        )

        invocation_arn = invocation["invocationArn"]
        s3_prefix = invocation_arn.split('/')[-1]
        s3_location = f"s3://{S3_DESTINATION_BUCKET}/{s3_prefix}"
        print(f"\nS3 URI: {s3_location}")

        while True:
            response = bedrock_runtime.get_async_invoke(
                invocationArn=invocation_arn
            )
            status = response["status"]
            print(f"Status: {status}")
            if status != "InProgress":
                break
            time.sleep(SLEEP_TIME)

        if status == "Completed":
            print(f"\nVideo is ready at {s3_location}/output.mp4")
        else:
            print(f"\nVideo generation status: {status}")

        !aws s3 cp s3://{S3_DESTINATION_BUCKET}/{s3_prefix}/output.mp4 ./{output_name}.mp4

    return Video(f"{output_name}.mp4")

In [13]:
generate_video("Primer plano de un caparazón en la arena. Las olas suaves fluyen por todo el caparazón. Con luz del atardecer. Acercamiento de cámara lento.", output_name="caparazon")

File exists


In [14]:
generate_video("vista de dron volando sobre un paisaje costero", input_image_path='seascape.png', output_name="costa")

File exists


In [None]:
generate_video("Una persona comiendose un taco con salsa roja", output_name="taco2")

In [None]:
generate_video(video_prompt="Show a sleek hair dryer with turbo jet technology, highlighting powerful airflow and quick drying results.",
output_name='secadora_pelo')

In [None]:
video_prompt = """
Show a quick, vibrant shot of a steaming cup of coffee with aromatic steam, placed on a cozy, rustic table with a warm, inviting coffee shop background,
accompanied by the sound of a coffee machine and soft, uplifting music
"""
generate_video(video_prompt = video_prompt, output_name='coffee_shop')


In [None]:
video_prompt = """
Create a vibrant 2D animation of nutritionist's office, with smooth panning to highlight the interaction.
Show a friendly, seated and smiling blonde nutritionist in her nutriologist's white coat handing a fresh, colorful smoothie to a delighted client in a green t-shirt.
Set the scene in a bright, modern nutritionist's office and a white wall to enhance the welcoming atmosphere.
"""
generate_video(video_prompt = video_prompt, output_name='nutritionist10')

In [None]:
video_prompt = """
Capture a realistic, candid shot of a well-organized travel bag, neatly packed and zipped, using a smartphone camera with slight handheld movement, natural lighting, and a casual angle.
"""
generate_video(video_prompt = video_prompt, output_name='bag')

In [None]:
video_prompt = """
Capture a handheld, realistic shot of a damaged travel bag, showing tears and wear. Use natural lighting, slight camera shake for authenticity, and focus on the bag's condition.
"""
generate_video(video_prompt = video_prompt, output_name='damaged_bag')

# Nova Canvas + Nova Reel

In [15]:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Shows how to generate an image from a text prompt with the Amazon Nova Canvas model (on demand).
"""
import base64
import io
import json
import boto3
from PIL import Image
from botocore.config import Config

from botocore.exceptions import ClientError

class ImageError(Exception):
    "Custom exception for errors returned by Amazon Nova Canvas"

    def __init__(self, message):
        self.message = message

def generate_image_(model_id, body):
    """
    Generate an image using Amazon Nova Canvas model on demand.
    Args:
        model_id (str): The model ID to use.
        body (str) : The request body to use.
    Returns:
        image_bytes (bytes): The image generated by the model.
    """

    bedrock = boto3.client(
        service_name='bedrock-runtime',
        config=Config(read_timeout=300)
    )

    accept = "application/json"
    content_type = "application/json"

    response = bedrock.invoke_model(
        body=body, modelId=model_id, accept=accept, contentType=content_type
    )
    response_body = json.loads(response.get("body").read())

    base64_image = response_body.get("images")[0]
    base64_bytes = base64_image.encode('ascii')
    image_bytes = base64.b64decode(base64_bytes)

    finish_reason = response_body.get("error")

    if finish_reason is not None:
        raise ImageError(f"Image generation error. Error is {finish_reason}")

    return image_bytes

def generate_image(prompt, image_output_name):

    if os.path.isfile(f"{image_output_name}.png") and not overwrite:
        print("File exists")
    else:
        print("File does not exist")

        model_id = 'amazon.nova-canvas-v1:0'

        body = json.dumps({
            "taskType": "TEXT_IMAGE",
            "textToImageParams": {
                "text": prompt
            },
            "imageGenerationConfig": {
                "numberOfImages": 1,
                "height": 720, #Important!
                "width": 1280, #Important!
                "cfgScale": 8.0,
                "seed": 0
            }
        })

        try:
            image_bytes = generate_image_(model_id=model_id,
                                            body=body)
            image = Image.open(io.BytesIO(image_bytes))
            #image.show()
            image.save(f"{image_output_name}.png")
            return image

        except ClientError as err:
            message = err.response["Error"]["Message"]
            print("A client error occured: " +
                    format(message))
        except ImageError as err:
            print(err.message)

        else:
            print(
                f"Finished generating image with Amazon Nova Canvas  model {model_id}.")
    

In [16]:
def generate_image_variation(prompt, image_output_name, image_name):

    if os.path.isfile(f"{image_output_name}.png") and not overwrite:
        print("File exists")
    else:

        model_id = 'amazon.nova-canvas-v1:0'

        # Read image from file and encode it as base64 string.
        with open(f"{image_name}.png", "rb") as image_file:
            input_image = base64.b64encode(image_file.read()).decode('utf8')

        body = json.dumps({
            "taskType": "IMAGE_VARIATION",
            "imageVariationParams": {
                "text": prompt,
                "negativeText": "bad quality, low resolution, cartoon",
                "images": [input_image],
                "similarityStrength": 0.7,  # Range: 0.2 to 1.0
            },
            "imageGenerationConfig": {
                "numberOfImages": 1,
                "height": 720,
                "width": 1280,
                "cfgScale": 8.0
            }
        })

        image_bytes = generate_image_(model_id=model_id,
                                        body=body)
        image = Image.open(io.BytesIO(image_bytes))
        #image.show()
        image.save(f"{image_output_name}.png")
        return image

In [None]:
image_prompt = """
A cartoon of a seated and smiling with eyes closed blonde nutritionist in her white coat talking to a delighted client in a green t-shirt.
Set the scene in a bright, modern nutritionist's office and a white wall to enhance the welcoming atmosphere.
"""
generate_image(prompt=image_prompt, image_output_name='nutritionist')

In [19]:
generate_video(
"""
smooth panning
"""
    , input_image_path='nutritionist.png', output_name="nutritionist_2")

File exists


In [None]:
generate_video(
"""
zoom out
"""
    , input_image_path='nutritionist.png', output_name="nutritionist_3")

In [None]:
image_prompt = """
Realistic image of a luxury fast golden car, sleek modern design, shiny metallic gold color, low angle view highlighting the curves and aerodynamic features, bright daylight with subtle shadows, high-resolution detail.
"""
generate_image(prompt=image_prompt, image_output_name='luxury_car')

In [None]:
image_prompt_variation = """
Another shoot from the car
"""
generate_image_variation(prompt=image_prompt_variation, image_output_name='luxury_car2',image_name='luxury_car')

In [17]:
generate_video("zoom out", input_image_path='luxury_car.png', output_name="luxury_car")

File exists


In [18]:
generate_video("zoom in", input_image_path='luxury_car2.png', output_name="luxury_car2")

File exists


In [20]:
import tkinter as tk
from tkinter import *
import cv2
from PIL import Image, ImageTk

class VideoPlayer:
    def __init__(self, window):
        self.window = window
        self.window.title("Video Player")
        
        # Get video dimensions first
        temp_vid = cv2.VideoCapture("luxury_car.mp4")
        self.video_width = int(temp_vid.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.video_height = int(temp_vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
        temp_vid.release()
        
        # Create canvas matching video dimensions
        self.canvas = Canvas(window, width=self.video_width, height=self.video_height)
        self.canvas.pack()
        
        # Load and display the initial image
        self.setup_thumbnail()
        
        # Initialize video variables
        self.video_source = "luxury_car.mp4"
        self.vid = None
        self.is_playing = False
        self.delay = 15
        self.photo = None

        # Bind hover events to canvas
        self.canvas.bind("<Enter>", self.start_video)
        self.canvas.bind("<Leave>", self.stop_video)

    def setup_thumbnail(self):
        # Load and resize thumbnail to match video dimensions
        self.thumbnail = Image.open("luxury_car.png")
        self.thumbnail = self.thumbnail.resize((self.video_width, self.video_height))
        self.thumbnail_photo = ImageTk.PhotoImage(self.thumbnail)
        
        # Display thumbnail on canvas
        self.canvas.delete("all")
        self.canvas.create_image(0, 0, image=self.thumbnail_photo, anchor=NW)

    def start_video(self, event=None):
        if not self.is_playing:
            self.vid = cv2.VideoCapture(self.video_source)
            self.is_playing = True
            self.update_frame()

    def stop_video(self, event=None):
        if self.vid is not None:
            self.vid.release()
        self.is_playing = False
        self.setup_thumbnail()  # Show the thumbnail again
            
    def update_frame(self):
        if self.is_playing:
            ret, frame = self.vid.read()
            if ret:
                # Convert frame from BGR to RGB
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                
                # Convert frame to PhotoImage
                frame_image = Image.fromarray(frame)
                self.photo = ImageTk.PhotoImage(image=frame_image)
                
                # Clear canvas and show new frame
                self.canvas.delete("all")
                self.canvas.create_image(0, 0, image=self.photo, anchor=NW)
                
                # Schedule next update
                self.window.after(self.delay, self.update_frame)
            else:
                # Video ended - restart video
                self.vid.release()
                self.start_video()  # Restart video instead of stopping

    def __del__(self):
        if self.vid is not None:
            self.vid.release()

# Create main window and start application
root = tk.Tk()
app = VideoPlayer(root)
root.mainloop()


In [21]:
import tkinter as tk
from tkinter import *
import cv2
from PIL import Image, ImageTk

class VideoPlayer:
    def __init__(self, window):
        self.window = window
        self.window.title("Video Player")
        
        # Create frame to hold both canvases horizontally
        self.frame = Frame(window)
        self.frame.pack(expand=True, fill="both")
        
        # Initialize both videos and canvases
        self.videos = [
            {
                "image": "luxury_car.png",
                "video": "luxury_car.mp4",
                "canvas": None,
                "vid": None,
                "is_playing": False,
                "photo": None,
                "thumbnail_photo": None,
                "width": 1280//2,
                "height": 720//2
            },
            {
                "image": "luxury_car2.png",
                "video": "luxury_car2.mp4",
                "canvas": None,
                "vid": None,
                "is_playing": False,
                "photo": None,
                "thumbnail_photo": None,
                "width": 1280//2,
                "height": 720//2
            }
        ]
        
        # Setup each video player
        for i, video in enumerate(self.videos):
            # Create canvas with dimensions from videos list
            canvas = Canvas(self.frame, width=video["width"], height=video["height"])
            canvas.pack(side=LEFT, padx=10)
            video["canvas"] = canvas
            
            # Load and display thumbnail
            self.setup_thumbnail(i)
            
            # Bind hover events
            canvas.bind("<Enter>", lambda e, idx=i: self.start_video(idx))
            canvas.bind("<Leave>", lambda e, idx=i: self.stop_video(idx))
        
        self.delay = 15

    def setup_thumbnail(self, index):
        video = self.videos[index]
        # Load and resize thumbnail
        thumbnail = Image.open(video["image"])
        # Use dimensions from videos list
        thumbnail = thumbnail.resize((video["width"], video["height"]))
        video["thumbnail_photo"] = ImageTk.PhotoImage(thumbnail)
        
        # Display thumbnail
        video["canvas"].delete("all")
        video["canvas"].create_image(0, 0, image=video["thumbnail_photo"], anchor=NW)

    def start_video(self, index):
        video = self.videos[index]
        if not video["is_playing"]:
            video["vid"] = cv2.VideoCapture(video["video"])
            video["is_playing"] = True
            self.update_frame(index)

    def stop_video(self, index):
        video = self.videos[index]
        if video["vid"] is not None:
            video["vid"].release()
        video["is_playing"] = False
        self.setup_thumbnail(index)
            
    def update_frame(self, index):
        video = self.videos[index]
        if video["is_playing"]:
            ret, frame = video["vid"].read()
            if ret:
                # Convert frame from BGR to RGB
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                
                # Resize frame using dimensions from videos list
                frame = cv2.resize(frame, (video["width"], video["height"]))
                
                # Convert frame to PhotoImage
                frame_image = Image.fromarray(frame)
                video["photo"] = ImageTk.PhotoImage(image=frame_image)
                
                # Clear canvas and show new frame
                video["canvas"].delete("all")
                video["canvas"].create_image(0, 0, image=video["photo"], anchor=NW)
                
                # Schedule next update
                self.window.after(self.delay, lambda: self.update_frame(index))
            else:
                # Video ended - restart video
                video["vid"].release()
                self.start_video(index)

    def __del__(self):
        for video in self.videos:
            if video["vid"] is not None:
                video["vid"].release()

# Create main window and start application
root = tk.Tk()
app = VideoPlayer(root)
root.mainloop()


2025-01-14 22:03:31.499 python[31610:95310789] _TIPropertyValueIsValid called with 16 on nil context!
2025-01-14 22:03:31.499 python[31610:95310789] imkxpc_getApplicationProperty:reply: called with incorrect property value 16, bailing.
2025-01-14 22:03:31.499 python[31610:95310789] Text input context does not respond to _valueForTIProperty:
