<a href="https://colab.research.google.com/github/jimwhite/commentator_ai/blob/main/Transcript_to_Video.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Visualizer.TV: "MP3 to MTV" Demo

 * Concept: Robert Sloan (https://www.linkedin.com/in/sloanrobert/)
 * Code: Jim White (https://www.linkedin.com/in/jamespaulwhite/)
 * License: GPL v3 (https://github.com/jimwhite/commentator_ai/blob/main/LICENSE)

In [1]:
#@title Install dependencies
%pip install -qU stability-sdk youtube-transcript-api langchain openai opencv-python yt-dlp ffmpeg-python

In [2]:
#@title Set up Google Drive for file storage
try:
    from google.colab import drive
    drive.mount('/content/gdrive')
    outputs_path = "/content/gdrive/MyDrive/Commentator_AI/Transcript_to_Video"
    !mkdir -p $outputs_path
except:
    outputs_path = "."
print(f"Files will be saved to {outputs_path}")

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
Files will be saved to /content/gdrive/MyDrive/Commentator_AI/Transcript_to_Video


In [5]:
#@title Set YouTube URL

YOUTUBE_URL = "https://www.youtube.com/watch?v=vPKp29Luryc" #@param {type:"string"}

import os
import re
# import requests
# from bs4 import BeautifulSoup

def ytIdFromURL(url:str)->str:
   data = re.findall(r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", url)
   if data:
       return data[0]
   return None

# def ytTitleFromURL(url:str)->str:
#   r = requests.get(url)
#   soup = BeautifulSoup(r.text, features="lxml")

#   link = soup.find_all(name="title")[0]
#   artist, title = get_artist_title(link.text)
#   print(link.text)
#   return title

video_id = ytIdFromURL(YOUTUBE_URL)
# video_title = ytTitleFromURL(YOUTUBE_URL)
# video_title = "Star Spangled Banner"
# print(f'YouTube video ID = {video_id} title: {video_title}')

if not video_id:
  raise ValueError("video_id isn't set")

out_dir = os.path.join(outputs_path, video_id)

video_id

'vPKp29Luryc'

In [6]:
#@title Get the Audio
import os

audio_file_path = os.path.join(out_dir, 'audio.m4a')

if os.path.exists(audio_file_path):
  print('Audio already downloaded')
else:
  !yt-dlp -f "bestaudio[ext=m4a]"  -o "{audio_file_path}" "{YOUTUBE_URL}"

Audio already downloaded


In [7]:
#@title Get Video Transcript (CSV)

# Using CSV files is a convenient way to integrate with LangChain (and LangFlow).
# Also is much more efficient in token usage so longer transcripts will work for
# any given LLM context token limit.

import csv
from youtube_transcript_api import YouTubeTranscriptApi

transcript = []

transcript_file_path = os.path.join(out_dir, 'transcript.csv')
fieldnames = ['start', 'duration', 'text']
if os.path.exists(transcript_file_path):
  with open(transcript_file_path, 'r') as csv_file:
    reader = csv.DictReader(csv_file, fieldnames=fieldnames, quoting=csv.QUOTE_NONNUMERIC)
    next(reader)  # skip header
    for row in reader:
      transcript.append(row)
    print(f'Read transcript from file: {transcript_file_path}')

if not transcript:
  transcript = YouTubeTranscriptApi.get_transcript(video_id)
  print('Got transcript from YouTube API')
  os.makedirs(out_dir, exist_ok=True)
  with open(transcript_file_path, 'w', newline='') as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames, quoting=csv.QUOTE_NONNUMERIC)
    writer.writeheader()
    for row in transcript:
      writer.writerow(row)
    print(f'Saved transcript to file: {transcript_file_path}')

transcript

Read transcript from file: /content/gdrive/MyDrive/Commentator_AI/Transcript_to_Video/vPKp29Luryc/transcript.csv


[{'start': 7.407, 'duration': 4.071, 'text': 'O say can you see,'},
 {'start': 11.478, 'duration': 3.737, 'text': "by the dawn's early light,"},
 {'start': 15.215, 'duration': 3.837, 'text': 'What so proudly we hailed'},
 {'start': 19.052,
  'duration': 3.937,
  'text': "at the twilight's last gleaming?"},
 {'start': 22.989,
  'duration': 3.937,
  'text': 'Whose broad stripes and bright stars,'},
 {'start': 26.926, 'duration': 3.537, 'text': 'through the perilous fight,'},
 {'start': 30.463, 'duration': 3.737, 'text': "O'er the ramparts we watched,"},
 {'start': 34.2, 'duration': 4.071, 'text': 'were so gallantly streaming?'},
 {'start': 38.271, 'duration': 3.837, 'text': 'And the rockets red glare,'},
 {'start': 42.108, 'duration': 3.837, 'text': 'the bombs bursting in air,'},
 {'start': 45.945, 'duration': 3.737, 'text': 'Gave proof through the night'},
 {'start': 49.682,
  'duration': 3.871,
  'text': 'that our flag was still there.'},
 {'start': 53.553, 'duration': 2.603, 'text': '

In [8]:
#@title Get OpenAI API key
from getpass import getpass

if not 'OPENAI_API_KEY' in os.environ:
  key = getpass('Enter your OpenAI API key: ')
  if key:
    os.environ['OPENAI_API_KEY'] = key


Enter your OpenAI API key: ··········


In [9]:
%%writefile prompt.txt
You're a visual musical artist.
Given the following lyrics choose the phrases that should be illustrated to make a timed music video for this song.
Respond in CSV format with the columns 'start', 'duration', 'text' (for the transcription text), 'description' (for the image description).
For the first row start at time 0 and make an image description that reflects the songs theme.
Keep in mind that each image description will be rendered separately so don't use any references between them.
Also because the image rendering is done in isolation for each description please be sure to include
enough thematic keys in them so the images are holistic related to the song's theme.
=== lyrics ===

Overwriting prompt.txt


In [10]:
#@title ChatGPT selects lyrics to illustrate and generates image descriptions

CHAT_MODEL = 'gpt-4-0613'  #@param {type:"string"}
TEMPERATURE = 0.5  #@param {type:"number"}

from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage

image_description_csv_text = None
out_dir = os.path.join(outputs_path, video_id)
image_description_file_path = os.path.join(out_dir, 'image_descriptions.csv')
fieldnames = ['start', 'duration', 'text', 'description']
if os.path.exists(image_description_file_path):
  with open(image_description_file_path, 'r') as csv_file:
    image_description_csv_text = csv_file.read()
    print(f'Read image descriptions from file: {image_description_file_path}')
    print(image_description_csv_text)

if not image_description_csv_text:
  chat = ChatOpenAI(temperature=TEMPERATURE, model=CHAT_MODEL)
  print('ChatGPT working...')
  prompt_text = ''
  with open('prompt.txt', 'r') as f:
    prompt_text = f.read()
  with open(transcript_file_path, 'r') as csv_file:
    prompt_text = '\n\n'.join([prompt_text, csv_file.read()])
  response = chat([HumanMessage(content=prompt_text)])
  print('Got image descriptions from ChatGPT')
  image_description_csv_text = response.content
  print(image_description_csv_text)
  os.makedirs(out_dir, exist_ok=True)
  with open(image_description_file_path, 'w', newline='') as csv_file:
    csv_file.write(response.content)
    print(f'Saved image descriptions to file: {image_description_file_path}')


Read image descriptions from file: /content/gdrive/MyDrive/Commentator_AI/Transcript_to_Video/vPKp29Luryc/image_descriptions.csv
"start","duration","text","description"
0,7.407,"","A patriotic scene with an American flag waving in the wind, set against a backdrop of a beautiful dawn sky."
7.407,4.071,"O say can you see,","A person standing on a hill, looking out over a vast landscape, with the dawn light illuminating the scene."
11.478,3.737,"by the dawn's early light,","The first light of dawn breaking over the horizon, casting a warm glow on the American flag."
15.215,3.837,"What so proudly we hailed","A group of people saluting the American flag, their faces filled with pride."
19.052,3.937,"at the twilight's last gleaming?","The last light of twilight reflecting off the American flag, creating a gleaming effect."
22.989,3.937,"Whose broad stripes and bright stars,","Close-up view of the American flag, focusing on the broad stripes and bright stars."
26.926,3.537,"through the perilo

In [11]:
#@title Connect to the Stability API
import io
import os
import warnings

from IPython.display import display
from PIL import Image
from stability_sdk import client
import stability_sdk.interfaces.gooseai.generation.generation_pb2 as generation

if not 'STABILITY_KEY' in os.environ:
  key = getpass('Enter your Stability API key: ')
  if key:
    os.environ['STABILITY_KEY'] = key

stability_api = client.StabilityInference(
    key=os.environ['STABILITY_KEY'],
    verbose=True,
)


Enter your Stability API key: ··········


INFO:stability_sdk.client:Opening channel to grpc.stability.ai:443
INFO:stability_sdk.client:Channel opened to grpc.stability.ai:443


In [12]:
#@title Generate Images

WIDTH=640 #@param {type:"number"}
HEIGHT=512 #@param {type:"number"}
STEPS=22 #@param {type:"number"}

def generate_image(prompt: str):
  # the object returned is a python generator
  answers = stability_api.generate(
      prompt=prompt,
      width=WIDTH, height=HEIGHT, steps=STEPS
  )

  # iterating over the generator produces the api response
  for resp in answers:
      for artifact in resp.artifacts:
          if artifact.finish_reason == generation.FILTER:
              warnings.warn(
                  "Your request activated the API's safety filters and could not be processed."
                  "Please modify the prompt and try again.")
          if artifact.type == generation.ARTIFACT_IMAGE:
              img = Image.open(io.BytesIO(artifact.binary))
              return img

  print(f"No image for '{prompt}'")
  return None


def description_to_filepath(description:str):
  filename = re.sub(r'[^\w\d-]','_', description).lower()
  return os.path.join(out_dir, filename + '.png')


with open(image_description_file_path, 'r') as csv_file:
  reader = csv.DictReader(csv_file, quoting=csv.QUOTE_NONNUMERIC)
  for row in reader:
    print(row)
    description = row['description']
    image_file_path = description_to_filepath(description)
    if os.path.exists(image_file_path):
      print(f"Skipping description that has an image: {description}")
      # image = Image.open(image_file_path)
      # display(image)
    else:
      print(f"Generating image for {description}")
      image = generate_image(description)
      image.save(image_file_path)
      display(image)


{'start': 0.0, 'duration': 7.407, 'text': '', 'description': 'A patriotic scene with an American flag waving in the wind, set against a backdrop of a beautiful dawn sky.'}
Skipping description that has an image: A patriotic scene with an American flag waving in the wind, set against a backdrop of a beautiful dawn sky.
{'start': 7.407, 'duration': 4.071, 'text': 'O say can you see,', 'description': 'A person standing on a hill, looking out over a vast landscape, with the dawn light illuminating the scene.'}
Skipping description that has an image: A person standing on a hill, looking out over a vast landscape, with the dawn light illuminating the scene.
{'start': 11.478, 'duration': 3.737, 'text': "by the dawn's early light,", 'description': 'The first light of dawn breaking over the horizon, casting a warm glow on the American flag.'}
Skipping description that has an image: The first light of dawn breaking over the horizon, casting a warm glow on the American flag.
{'start': 15.215, 'du

In [27]:
import cv2

video_path = os.path.join(out_dir, 'video.mp4')
fps = 24.0
frame_duration = 1.0 / fps
total_frame_count = 0
max_frame_count_limit = int(100 / frame_duration)
print(max_frame_count_limit)

def write_frames(image:Image, limit_seconds:float):
  global total_frame_count
  frame_count_limit = int(limit_seconds * fps)
  print(f"Adding {frame_count_limit - total_frame_count} frames for '{last_row['description']}'")
  if ((frame_count_limit - total_frame_count) > max_frame_count_limit):
    frame_count_limit = total_frame_count + max_frame_count_limit
  while total_frame_count < frame_count_limit:
    video.write(image)
    total_frame_count += 1

try:
  fourcc = cv2.VideoWriter_fourcc('F','M','P','4')
  video = cv2.VideoWriter(video_path, fourcc, fps, (WIDTH, HEIGHT))
  with open(image_description_file_path, 'r') as csv_file:
    reader = csv.DictReader(csv_file, quoting=csv.QUOTE_NONNUMERIC)

    # The image for frames we add are from the previous scene row.
    last_row = None
    last_image = None
    for row in reader:
      if last_image is not None:
        write_frames(image=last_image, limit_seconds=row['start'])
      print(row)
      description = row['description']
      image_file_path = description_to_filepath(description)
      if not os.path.exists(image_file_path):
        print(f"Missing image file: {filename}")
        continue
      last_row = row
      last_image = cv2.imread(image_file_path)
    if last_image is not None:
      write_frames(image=last_image, limit_seconds=last_row['start'] + last_row['duration'])

finally:
  print('finalizing')
  cv2.destroyAllWindows()
  video.release()

print(f'Done! Wrote {total_frame_count} frames ({total_frame_count * frame_duration} seconds) to {video_path}')

2400
{'start': 0.0, 'duration': 7.407, 'text': '', 'description': 'A patriotic scene with an American flag waving in the wind, set against a backdrop of a beautiful dawn sky.'}
Adding 177 frames for 'A patriotic scene with an American flag waving in the wind, set against a backdrop of a beautiful dawn sky.'
{'start': 7.407, 'duration': 4.071, 'text': 'O say can you see,', 'description': 'A person standing on a hill, looking out over a vast landscape, with the dawn light illuminating the scene.'}
Adding 98 frames for 'A person standing on a hill, looking out over a vast landscape, with the dawn light illuminating the scene.'
{'start': 11.478, 'duration': 3.737, 'text': "by the dawn's early light,", 'description': 'The first light of dawn breaking over the horizon, casting a warm glow on the American flag.'}
Adding 90 frames for 'The first light of dawn breaking over the horizon, casting a warm glow on the American flag.'
{'start': 15.215, 'duration': 3.837, 'text': 'What so proudly we h

In [23]:
#@title Merge Audio with Video
import ffmpeg

video_with_audio_path = os.path.join(out_dir, 'video_with_audio.mp4')

if os.path.exists(video_with_audio_path):
  print('video with audio already exists: ', video_with_audio_path)
else:
  video_in = ffmpeg.input(video_path)
  audio_in = ffmpeg.input(audio_file_path)
  print('concatenating with ffmpeg')
  result = ffmpeg.concat(video_in, audio_in, v=1, a=1).output(video_with_audio_path).run()
  print(result)

video with audio already exists:  /content/gdrive/MyDrive/Commentator_AI/Transcript_to_Video/vPKp29Luryc/video_with_audio.mp4
