foreach (var wall in _walls)
{
    Debug.WriteLine($"Obstacle Position: {wall.X}, {wall.Y}");
}
<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/intro_multimodal_use_cases.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Run in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fgemini%2Fuse-cases%2Fintro_multimodal_use_cases.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Run in Colab Enterprise
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/gemini/use-cases/intro_multimodal_use_cases.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Vertex AI Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/intro_multimodal_use_cases.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>


| | |
|-|-|
|Author(s) | [Tannistha Maiti](https://github.com/tannisthamaiti) |

In [17]:
!pip install moviepy



In [18]:


from moviepy.editor import VideoFileClip

def extract_audio(video_file_path, audio_file_path):
    """
    Extracts audio from a video file and saves it as an MP3 file.

    Args:
    video_file_path (str): The path to the video file.
    audio_file_path (str): The path where the audio file will be saved.
    """
    # Load the video file
    video = VideoFileClip(video_file_path)

    # Extract the audio
    audio = video.audio

    # Write the audio to a file
    audio.write_audiofile(audio_file_path, codec='mp3')

    # Close the video and audio to free up resources
    audio.close()
    video.close()

# Usage
video_file_path = '/content/podcast1.mp4'
audio_file_path = '/content/output_audio.mp3'
extract_audio(video_file_path, audio_file_path)


MoviePy - Writing audio in /content/output_audio.mp3


                                                                        

MoviePy - Done.




In [20]:
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips

# Path to your image and audio files
image_path = '/content/LiveAI_podcast.png'
audio_path = '/content/output_audio.mp3'

# Load the audio file
audio_clip = AudioFileClip(audio_path)

# Create a video clip from the image
image_clip = ImageClip(image_path)

# Set the duration of the image clip to the duration of the audio clip
image_clip = image_clip.set_duration(audio_clip.duration)

# Set the audio of the image clip to be the audio clip
video_clip = image_clip.set_audio(audio_clip)

# Optionally, you can write the result to a file
output_path = '/content/output_video.mp4'
video_clip.write_videofile(output_path, codec='libx264', fps=24)

# Close the clips to free up resources
audio_clip.close()
video_clip.close()


Moviepy - Building video /content/output_video.mp4.
MoviePy - Writing audio in output_videoTEMP_MPY_wvf_snd.mp3




MoviePy - Done.
Moviepy - Writing video /content/output_video.mp4





Moviepy - Done !
Moviepy - video ready /content/output_video.mp4


### Define helper functions


In [None]:
import sqlite3
import os
import pandas as pd
import base64
from google.cloud import storage
import vertexai
from vertexai.generative_models import GenerativeModel, Part, SafetySetting


# Initialize database connection and cursor
def initialize_db(db_name):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS videos (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        type TEXT NOT NULL,
        data BLOB DEFAULT NULL,
        metadata TEXT DEFAULT NULL,
        description TEXT DEFAULT NULL
    );
    """)
    conn.commit()
    return conn, cursor


# Save raw video data and transcription chunks
def save_video_and_transcription(conn, cursor, video_uri, video_data, metadata, transcription_chunks):
    try:
        # Save raw video data
        cursor.execute(
            """
            INSERT INTO videos (type, data, metadata, description)
            VALUES (?, ?, ?, ?)
            """,
            ("video/mp4", video_data, metadata, f"Raw video data for {video_uri}")
        )
        video_id = cursor.lastrowid  # Get ID of the last inserted video

        # Save transcription chunks
        for i, chunk in enumerate(transcription_chunks):
            cursor.execute(
                """
                INSERT INTO videos (type, data, metadata, description)
                VALUES (?, ?, ?, ?)
                """,
                ("text/plain", None, metadata, f"Transcription Chunk {i + 1} for Video {video_id}: {chunk}")
            )

        conn.commit()
        print(f"Saved video and transcription for {video_uri}")
    except Exception as e:
        print(f"Failed to save video or transcription: {e}")


# Transcribe videos and save to database
def transcribe_videos_and_save_to_db(video_uris, max_db_size_mb=100, file_prefix="video_db", db_index = 0,chunk_size=1024):
    vertexai.init(project="gen-lang-client-0676335328", location="us-central1")
    model = GenerativeModel("gemini-1.5-flash-002")

    generation_config = {
        "max_output_tokens": 8192,
        "temperature": 1,
        "top_p": 0.95,
    }

    safety_settings = [
        SafetySetting(
            category=SafetySetting.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
            threshold=SafetySetting.HarmBlockThreshold.OFF,
        ),
        SafetySetting(
            category=SafetySetting.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
            threshold=SafetySetting.HarmBlockThreshold.OFF,
        ),
        SafetySetting(
            category=SafetySetting.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
            threshold=SafetySetting.HarmBlockThreshold.OFF,
        ),
        SafetySetting(
            category=SafetySetting.HarmCategory.HARM_CATEGORY_HARASSMENT,
            threshold=SafetySetting.HarmBlockThreshold.OFF,
        ),
    ]
    db_files = []

    db_name = f"{file_prefix}_{db_index}.db"
    conn, cursor = initialize_db(db_name)
    metadata_df = pd.read_csv("/content/missing_metadata.csv",  encoding="latin1")
    for video_uri in video_uris:
        print(f"Processing video: {video_uri}")
        # Create a Part object with the video URI
        video_part = Part.from_uri(mime_type="video/mp4", uri=video_uri)

        # Download video locally
        local_video_path = download_video(video_uri)
        if not local_video_path or not os.path.exists(local_video_path):
            print(f"Failed to download video: {video_uri}")
            continue


        local_file_name = local_video_path.split("/")[-1]

        metadata = find_metadata(metadata_df, local_file_name)
        if metadata is None:
            print(f"No metadata found for {local_file_name}. Skipping video.")
            continue
        print(f"Metadata found: {metadata}")

        # Read video data
        video_data = read_video_file(local_video_path)
        if not video_data:
            os.remove(local_video_path)
            continue

        # Generate transcription

        responses = model.generate_content(
            ["Transcribe the video", video_part],
            generation_config=generation_config,
            safety_settings=safety_settings,
            stream=True,
        )

        transcription = "".join([response.text for response in responses if hasattr(response, "text")])
        transcription_chunks = chunk_data(transcription, chunk_size)

        # Save video data and transcription
        metadata = metadata
        save_video_and_transcription(conn, cursor, video_uri, video_data, metadata, transcription_chunks)

        # Delete local video
        os.remove(local_video_path)
        print(f"Deleted local video: {local_video_path}")

        # Check DB size
        db_size_mb = os.path.getsize(db_name) / (1024 * 1024)
        if db_size_mb >= max_db_size_mb:
            db_files.append(db_name)
            conn.close()
            print(f"Database '{db_name}' reached size limit. Creating a new database.")
            db_index += 1
            db_name = f"{file_prefix}_{db_index}.db"
            conn, cursor = initialize_db(db_name)

    print(db_files)
    conn.close()
      # Add the last DB file
    print(f"All transcriptions saved across {db_index} database file(s).")
    upload_files_to_s3("liveaidb", db_files)


# Helper functions for file handling and chunking
def chunk_data(data, chunk_size):
    words = data.split()
    return [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]


def read_video_file(file_path):
    try:
        with open(file_path, "rb") as file:
            return file.read()
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        return None


def download_video(video_uri):
    if video_uri.startswith("gs://"):
        client = storage.Client()
        bucket_name, blob_name = video_uri[5:].split("/", 1)
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        local_path = os.path.basename(blob_name)
        blob.download_to_filename(local_path)
        print(f"Downloaded video: {video_uri} to {local_path}")
        return local_path
    else:
        print(f"Unsupported URI format: {video_uri}")
        return None
def list_video_files_by_pattern(bucket_name, pattern="temp", file_extension=".mp4"):
    """
    List video files from a Google Cloud Storage bucket matching a pattern.

    Args:
        bucket_name (str): Name of the GCS bucket.
        pattern (str): Pattern to match filenames (e.g., "temp").
        file_extension (str): File extension to filter files (e.g., ".mp4").

    Returns:
        list: List of video file URIs.
    """
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs()
    video_files = [
        f"gs://{bucket_name}/{blob.name}"
        for blob in blobs
        if blob.name.startswith(pattern) and blob.name.endswith(file_extension)
    ]
    return video_files
# Upload database files to S3
def upload_files_to_s3(bucket_name, files):
    """
    Upload multiple files to an S3 bucket.

    Args:
        bucket_name (str): Name of the S3 bucket.
        files (list): List of local file paths to upload.
    """
    s3 = boto3.client(
    's3',
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_key,
    region_name=region_name
)
    for file_path in files:
        try:
            s3.upload_file(file_path, bucket_name, os.path.basename(file_path))
            print(f"Uploaded '{file_path}' to S3 bucket '{bucket_name}'.")
            os.remove(file_path)  # Delete the file locally after upload
        except Exception as e:
            print(f"Failed to upload '{file_path}': {e}")

def find_metadata(data, local_file_name):
    """
    Find metadata for a given local file name.

    Args:
        data (DataFrame): The metadata DataFrame.
        local_file_name (str): The local file name to find metadata for.

    Returns:
        str: The metadata associated with the local file name, or None if not found.
    """
    result = data[data['local_file_name'] == local_file_name]
    if not result.empty:
        return result.iloc[0]['metadata']
    else:
        return None


# Main script to dynamically list files and transcribe
bucket_name = "liveai"  # Replace with your GCS bucket name
video_files = list_video_files_by_pattern(bucket_name, pattern="converted_videos/temp", file_extension=".mp4")

if not video_files:
    print("No video files found matching the pattern in the bucket.")
else:
    print(f"Found {len(video_files)} video files. Starting transcription...")
    transcribe_videos_and_save_to_db(video_files,db_index = 7)


Found 89 video files. Starting transcription...
Processing video: gs://liveai/converted_videos/temp1.mp4
Downloaded video: gs://liveai/converted_videos/temp1.mp4 to temp1.mp4
No metadata found for temp1.mp4. Skipping video.
Processing video: gs://liveai/converted_videos/temp10.mp4
Downloaded video: gs://liveai/converted_videos/temp10.mp4 to temp10.mp4
No metadata found for temp10.mp4. Skipping video.
Processing video: gs://liveai/converted_videos/temp11.mp4
Downloaded video: gs://liveai/converted_videos/temp11.mp4 to temp11.mp4
No metadata found for temp11.mp4. Skipping video.
Processing video: gs://liveai/converted_videos/temp12.mp4
Downloaded video: gs://liveai/converted_videos/temp12.mp4 to temp12.mp4
No metadata found for temp12.mp4. Skipping video.
Processing video: gs://liveai/converted_videos/temp14.mp4
Downloaded video: gs://liveai/converted_videos/temp14.mp4 to temp14.mp4
No metadata found for temp14.mp4. Skipping video.
Processing video: gs://liveai/converted_videos/temp15.mp

In [None]:
# Restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}