In [None]:
!pip install opencv-python
!pip install rembg
!pip install Pillow
!pip install boto3
!pip install moviepy
!pip install requests
!pip install gradio

In [None]:
import os
import cv2
import numpy as np
import boto3
import requests
import json
import random
from PIL import Image
from rembg import remove
from moviepy import VideoFileClip
from moviepy.video.fx.Crop import Crop
import moviepy.video.VideoClip as ImageClip
import moviepy.video.compositing.CompositeVideoClip as CompositeVideoClip
import tempfile
import time
import uuid
from IPython.display import display, Image as IPImage

# S3 bucket settings
s3_bucket = os.environ.get("S3_BUCKET", "generative-banners")
s3_input_banner_prefix = 'input/banners/'
s3_input_video_prefix = 'input/videos/'
s3_output_prefix = 'output/animated_banners/'
s3_region = os.environ.get("AWS_REGION", "ap-northeast-1")
s3 = boto3.client('s3', region_name=s3_region)


# 1. Function to remove background from ad banner
def remove_background(image_path, use_manual_transparency=False):
    """
    Remove background from an image
    
    Parameters:
    image_path (str): Path to the image
    use_manual_transparency (bool): If True, use pre-uploaded transparent image from S3
    
    Returns:
    str: Path to the image with transparent background
    """
    if use_manual_transparency:
        # Use pre-uploaded transparent image from S3
        filename = os.path.basename(image_path)
        base_name, ext = os.path.splitext(filename)
        transparent_filename = f"{base_name}_transparent.png"
        
        # Path for downloading the transparent image
        transparent_path = os.path.join(os.path.dirname(image_path), transparent_filename)
        print(transparent_filename, transparent_path)
        
        try:
            # Download transparent image from S3
            s3.download_file(s3_bucket, f"{s3_input_banner_prefix}transparent/{transparent_filename}", transparent_path)
            print(f"Using transparent image from S3: {transparent_path}")
            return transparent_path
        except Exception as e:
            print(f"Error downloading transparent image: {str(e)}")
            print("Falling back to automatic transparency processing")
    
    print(f"Starting automatic background removal: {image_path}")
    
    # Load the image
    input_image = Image.open(image_path)
    
    # Remove background using rembg
    output_image = remove(input_image)
    
    # Path to save the transparent image
    transparent_path = image_path.replace('.', '_transparent.')
    output_image.save(transparent_path, 'PNG')
    
    print(f"Background removal completed: {transparent_path}")
    return transparent_path

# 2. Function to extract frames from video
def extract_frames(video_path, max_frames=30, target_aspect_ratio=None):
    """
    Extract frames from a video and adjust aspect ratio if needed
    
    Parameters:
    video_path (str): Path to the video file
    max_frames (int): Maximum number of frames to extract
    target_aspect_ratio (str, optional): Target aspect ratio (e.g., "16:9", "1:1")
    
    Returns:
    tuple: (frames, frame_times, fps)
    """
    print(f"Starting frame extraction from video: {video_path}")

    video = VideoFileClip(video_path)
    
    # Get video duration and frame rate
    duration = video.duration
    fps = video.fps
    
    # Check if aspect ratio adjustment is needed
    if target_aspect_ratio:
        # Original video size
        original_width, original_height = video.size
        original_aspect = original_width / original_height
        
        # Calculate target aspect ratio
        target_w, target_h = map(int, target_aspect_ratio.split(':'))
        target_aspect = target_w / target_h
        
        # If aspect ratios differ, crop the video
        if abs(original_aspect - target_aspect) > 0.01:  # Allow small margin of error
            print(f"Adjusting aspect ratio: {original_width}x{original_height} ({original_aspect:.2f}) → {target_aspect:.2f}")
            
            if original_aspect > target_aspect:
                # If too wide, crop left and right
                new_width = int(original_height * target_aspect)
                x1 = (original_width - new_width) // 2
                # Use Crop effect
                crop_effect = Crop(x1=x1, y1=0, width=new_width, height=original_height)
                video = crop_effect.apply(video)
            else:
                # If too tall, crop top and bottom
                new_height = int(original_width / target_aspect)
                y1 = (original_height - new_height) // 2
                # Use Crop effect
                crop_effect = Crop(x1=0, y1=y1, width=original_width, height=new_height)
                video = crop_effect.apply(video)
            
            print(f"Size after cropping: {video.size[0]}x{video.size[1]}")
    
    # Adjust the number of frames to extract
    if max_frames > duration * fps:
        max_frames = int(duration * fps)
    
    # Calculate time interval between frames
    interval = duration / max_frames
    
    frames = []
    frame_times = []
    
    # Extract frames
    for i in range(max_frames):
        t = i * interval
        frame_times.append(t)
        frame = video.get_frame(t)
        frames.append(frame)
    
    print(f"Frame extraction completed: {len(frames)} frames")
    return frames, frame_times, fps

def composite_banner_with_frame(banner_path, frame, output_path):
    # Load banner
    banner = Image.open(banner_path).convert("RGBA")
    banner_width, banner_height = banner.size
    
    # Convert frame to PIL format
    frame_pil = Image.fromarray(frame).convert("RGBA")
    
    # Resize frame to banner size while maintaining aspect ratio and cropping center
    # This will crop parts of the frame but keep the banner intact
    frame_width, frame_height = frame_pil.size
    frame_aspect = frame_width / frame_height
    banner_aspect = banner_width / banner_height
    
    if frame_aspect > banner_aspect:
        # Frame is wider, match height and crop width
        new_height = banner_height
        new_width = int(new_height * frame_aspect)
        frame_pil = frame_pil.resize((new_width, new_height), Image.LANCZOS)
        left = (new_width - banner_width) // 2
        frame_pil = frame_pil.crop((left, 0, left + banner_width, banner_height))
    else:
        # Frame is taller, match width and crop height
        new_width = banner_width
        new_height = int(new_width / frame_aspect)
        frame_pil = frame_pil.resize((new_width, new_height), Image.LANCZOS)
        top = (new_height - banner_height) // 2
        frame_pil = frame_pil.crop((0, top, banner_width, top + banner_height))
    
    # Overlay banner
    result = frame_pil.copy()
    result.paste(banner, (0, 0), banner)
    
    result.convert("RGB").save(output_path, "JPEG")
    return output_path

# 4. Function to create GIF from composite frames
def create_gif_from_frames(frame_paths, output_gif_path, fps):
    """Create GIF from multiple frames"""
    print(f"Starting GIF creation: {output_gif_path}")
    
    frames = [Image.open(frame_path) for frame_path in frame_paths]
    
    # Save GIF (duration in milliseconds)
    frames[0].save(
        output_gif_path,
        format='GIF',
        append_images=frames[1:],
        save_all=True,
        duration=int(1000 / fps),  # Convert frame rate to milliseconds
        loop=0  # 0 = infinite loop
    )
    
    print(f"GIF creation completed: {output_gif_path}")
    return output_gif_path

# Function to generate video using Amazon Nova Reel API
def generate_nova_reel_video(prompt, duration=6):
    """
    Generate video using Amazon Nova Reel (Amazon Bedrock)
    
    Parameters:
    prompt (str): Text description of the video to generate
    duration (int): Duration of the video in seconds, max 6 seconds
    
    Returns:
    str: Local path to the generated video
    """
    print(f"Starting Amazon Nova Reel video generation: prompt='{prompt}'")
    
    # Create temporary directory
    temp_dir = tempfile.mkdtemp()
    output_video_path = os.path.join(temp_dir, f"nova_reel_{uuid.uuid4()}.mp4")
    
    try:
        # Create Bedrock Runtime client
        bedrock_runtime = boto3.client("bedrock-runtime", region_name="ap-northeast-1")
        
        # Set up temporary S3 output destination
        temp_output_s3_uri = f"s3://{s3_bucket}/{s3_output_prefix}nova_output/"
        print(temp_output_s3_uri)
        
        dimension = "1280x720"  # Default
        
        # Generate random seed value
        seed = random.randint(0, 2147483646)
        
        # Model input parameters
        model_input = {
            "taskType": "TEXT_VIDEO",
            "textToVideoParams": {"text": prompt},
            "videoGenerationConfig": {
                "fps": 24,
                "durationSeconds": min(duration, 6),  # Max 6 seconds
                "dimension": dimension,
                "seed": seed,
            },
        }
        
        # Output configuration
        output_config = {"s3OutputDataConfig": {"s3Uri": temp_output_s3_uri}}
        
        print("Starting Nova Reel asynchronous job...")
        
        # Asynchronous model invocation
        response = bedrock_runtime.start_async_invoke(
            modelId="amazon.nova-reel-v1:0",
            modelInput=model_input,
            outputDataConfig=output_config
        )
        
        invocation_arn = response["invocationArn"]
        print(f"Job ARN: {invocation_arn}")
        
        # Poll for job completion
        max_attempts = 20
        attempt = 0
        
        while attempt < max_attempts:
            print(f"Checking job status... (attempt {attempt+1}/{max_attempts})")
            job = bedrock_runtime.get_async_invoke(invocationArn=invocation_arn)
            status = job["status"]

            if status == "Completed":
                # If completed, get S3 URI from job
                output_s3_uri = job["outputDataConfig"]["s3OutputDataConfig"]["s3Uri"]
                print(f"Generation completed. S3 URI: {output_s3_uri}")
                
                # Search for files in output directory
                s3_parts = output_s3_uri.replace("s3://", "").split("/")
                bucket_name = s3_parts[0]
                prefix = "/".join(s3_parts[1:])
                
                # Get S3 object list
                response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
                
                # Find output.mp4
                mp4_key = None
                for obj in response.get('Contents', []):
                    if obj['Key'].endswith('output.mp4'):
                        mp4_key = obj['Key']
                        break
                
                if mp4_key:
                    print(f"Found video file: {mp4_key}")
                    # Download video from S3
                    s3.download_file(bucket_name, mp4_key, output_video_path)
                    print(f"Video downloaded: {output_video_path}")
                    return output_video_path
                else:
                    print("Output video not found")
                    raise Exception("Output video not found")
                
            elif status == "Failed":
                error_msg = job.get("failureMessage", "Unknown error")
                print(f"Video generation failed: {error_msg}")
                raise Exception(f"Nova Reel generation error: {error_msg}")
                
            else:
                print(f"Processing... waiting 30 seconds")
                time.sleep(30)
                attempt += 1
        
        raise Exception("Nova Reel generation timed out")
        
    except Exception as e:
        print(f"Nova Reel video generation error: {str(e)}")
        print("Fallback: Using sample video")
        
        # Use sample video in case of error
        demo_video_path = os.path.join(temp_dir, "demo_video.mp4")
        s3.download_file(s3_bucket, f"{s3_input_video_prefix}demo_video.mp4", demo_video_path)
        return demo_video_path

# 5. Complete workflow execution function
def create_animated_banner(banner_file, video_prompt=None, video_file=None, output_name=None, max_frames=20, use_manual_transparency=False, aspect_ratio="16:9"):
    """Complete workflow to create animated GIF banner from ad banner and video"""
    start_time = time.time()
    
    if not output_name:
        output_name = f"animated_banner_{int(time.time())}"
    
    print(f"Starting process: banner={banner_file}")
    
    # Create temporary directory
    temp_dir = tempfile.mkdtemp()
    
    # Download banner file
    local_banner_path = os.path.join(temp_dir, os.path.basename(banner_file))
    print(local_banner_path)
    s3.download_file(s3_bucket, f"{s3_input_banner_prefix}{banner_file}", local_banner_path)
    
    # Get video (Generate with Nova Reel or download from S3)
    if video_prompt:
        # Generate video with Nova Reel
        local_video_path = generate_nova_reel_video(video_prompt)
        print(f"Using video generated by Nova Reel: {local_video_path}")
    elif video_file:
        # Download existing video from S3
        local_video_path = os.path.join(temp_dir, os.path.basename(video_file))
        s3.download_file(s3_bucket, f"{s3_input_video_prefix}{video_file}", local_video_path)
        print(f"Using video downloaded from S3: {local_video_path}")
    else:
        raise ValueError("Please specify either video_prompt or video_file")
    
    # Get banner size (for aspect ratio calculation)
    banner_image = Image.open(local_banner_path)
    banner_width, banner_height = banner_image.size
    banner_aspect_ratio = f"{banner_width}:{banner_height}"
    print(f"Banner size: {banner_width}x{banner_height} (aspect ratio: {banner_aspect_ratio})")
    
    # 1. Make banner background transparent
    transparent_banner_path = remove_background(local_banner_path, use_manual_transparency)
    
    # 2. Extract frames from video (matching banner aspect ratio)
    frames, frame_times, fps = extract_frames(local_video_path, max_frames, target_aspect_ratio=banner_aspect_ratio)
    
    # 3. Composite each frame with banner
    composite_frames = []
    for i, frame in enumerate(frames):
        output_frame_path = os.path.join(temp_dir, f"composite_frame_{i:03d}.jpg")
        composite_path = composite_banner_with_frame(transparent_banner_path, frame, output_frame_path)
        composite_frames.append(composite_path)
    
    # 4. Create GIF from composite frames
    output_gif_path = os.path.join(temp_dir, f"{output_name}.gif")
    gif_path = create_gif_from_frames(composite_frames, output_gif_path, fps / 2)  # Slightly reduce frame rate
    
    # 5. Upload result to S3
    s3_output_key = f"{s3_output_prefix}{output_name}.gif"
    s3.upload_file(gif_path, s3_bucket, s3_output_key)
    
    # Display result
    display(IPImage(filename=gif_path))
    
    print(f"Process completed: Time taken {time.time() - start_time:.2f} seconds")
    print(f"Output GIF: s3://{s3_bucket}/{s3_output_key}")
    
    return gif_path, s3_output_key

In [None]:
# 方法1: 既存のNova Reel動画を使用
banner_file = "A_1200_1200_500_without_bg.png"
video_file = "A_1200_1200_500_without_bg_office_animated.mp4"
output_name = "animated_banner_existing_video"

# 実行（既存動画使用）
animated_banner_s3_key = create_animated_banner(
    banner_file=banner_file, 
    video_file=video_file, 
    output_name=output_name,
    use_manual_transparency=True  # 自動透過を使用
)

In [None]:
# 方法2: Nova Reel APIで動画を生成
banner_file = "Big-sale-banner-design-template-on-transparent-background-PNG.png"
# video_prompt = "A seamless loop of glowing neon lights forming a tunnel in a futuristic cyberpunk city, vivid pinks, blues, and purples, flying particles, digital haze, sci-fi ambiance" # 日本語プロンプトも可能
# video_prompt = "A chaotic theme park filled with flying notification icons, spinning email wheels, and roller coasters shaped like social media graphs, vivid cartoon style, loopable"
video_prompt = "A radiant golden light background, with shimmering beams of light radiating outward from the center, glowing and elegant, soft gradients, luxurious and ethereal atmosphere, seamless and loopable"
output_name = "animated_banner_generated_video"

# 実行（Nova Reel動画生成）
animated_banner_s3_key = create_animated_banner(
    banner_file=banner_file, 
    video_prompt=video_prompt, 
    output_name=output_name,
    use_manual_transparency=True,  # 手動で作成した透過画像を使用
)

In [None]:
import gradio as gr

def gradio_workflow(banner_image, video_prompt, use_manual_transparency, status_box):
    try:
        status_box.append("🔁 Uploading banner image...")
        temp_banner_path = "/tmp/gradio_banner.png"
        banner_image.save(temp_banner_path)

        banner_file_key = "gradio_uploaded_banner.png"
        s3.upload_file(temp_banner_path, s3_bucket, f"{s3_input_banner_prefix}{banner_file_key}")

        status_box.append("✨ Creating animated banner...")

        # 🚨 修正済み: create_animated_banner は (gif_path, s3_key) を返す前提
        gif_path, gif_s3_key = create_animated_banner(
            banner_file=banner_file_key,
            video_prompt=video_prompt,
            output_name="gradio_generated_banner",
            use_manual_transparency=use_manual_transparency,
        )

        # 🚨 修正済み: S3 URLは表示用のみ
        gif_url = f"https://{s3_bucket}.s3.ap-northeast-1.amazonaws.com/{gif_s3_key}"
        status_box.append("✅ Done!")

        # ✅ Gradioの画像出力にはローカルパス（type="filepath"）
        return gif_path, gif_url
    except Exception as e:
        status_box.append(f"❌ Error: {str(e)}")
        return None, None

with gr.Blocks() as demo:
    gr.Markdown("## 🎬 Create Animated Banner with Amazon Nova Reel")

    with gr.Row():
        with gr.Column():
            banner_input = gr.Image(type="pil", label="📌 Upload Banner Image")
            prompt_input = gr.Textbox(label="🎨 Video Prompt", placeholder="e.g. Radiant golden light...")
            transparency_checkbox = gr.Checkbox(label="Use Manual Transparency", value=True)
            run_button = gr.Button("🚀 Generate")

        with gr.Column():
            status_output = gr.Textbox(label="Progress Log", lines=8)
            gif_output = gr.Image(type="filepath", label="🎞️ Generated GIF Preview")  # ✅ ローカルパスを期待
            link_output = gr.Textbox(label="🔗 S3 Output URL")  # ✅ 表示用のURL

    # 状態更新の仕組みを使う
    status_box = gr.State([])

    def append_status(msg, history):
        history.append(msg)
        return "\n".join(history), history

    def run_and_display(banner_image, video_prompt, use_manual_transparency, history):
        status_text, history = append_status("🟢 Starting process...", history)
        gif_path, gif_url = gradio_workflow(banner_image, video_prompt, use_manual_transparency, history)
        status_text = "\n".join(history)
        return gif_path, gif_url, status_text, history

    run_button.click(
        fn=run_and_display,
        inputs=[banner_input, prompt_input, transparency_checkbox, status_box],
        outputs=[gif_output, link_output, status_output, status_box]
    )

demo.launch(share=True)