In [1]:
import site
site.addsitedir('Lib/site-packages')
from dotenv import load_dotenv
load_dotenv()
import getpass
import os
import requests
import ffmpeg
import json
import time
#import asyncio
from pydantic import BaseModel, Field
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, MessagesState
from langgraph.prebuilt import ToolNode
from lumaai import LumaAI
from typing import Literal, TypedDict

In [10]:
#LUMA
client = LumaAI(
    auth_token=os.environ.get("LUMAAI_API_KEY"),
)
#OPENAI
if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model
#LANGCHAIN llms
supervisor_llm = init_chat_model("o1", model_provider="openai")
video_gen_llm = init_chat_model("o1-mini", model_provider="openai")
storyboard_llm = init_chat_model("o1-mini", model_provider="openai")
#audio_gen_llm = init_chat_model("gpt-4o-mini", model_provider="openai")
#editor_llm = init_chat_model("gpt-4o-mini", model_provider="openai")

generatingVid = False
generatingImg = False

## TOOLS

### Video Worker

In [3]:
#SCHEMAS
class video_gen_schema(BaseModel):
    """Generates a video using text input and returns a filepath to the video"""
    vid_prompt: str = Field(..., description="The textual prompt used in generating the video")
    use_9s: bool = Field(..., description="Whether to generate a 9 second long video. If set to FALSE, this will generate a 5 second long video")


@tool("video_gen_tool",args_schema=video_gen_schema)   
def generate_vid(vid_prompt: str, use_9s: bool) -> str:
    global generatingVid
    if(generatingVid):
        return "Failed to generate video, there is currently another video being generated."
    dur = "5s"
    if(use_9s):
        dur = "9s"
    print(f"VIDEOWORKER PROMPT: {dur}\n------------------------------\n" + vid_prompt)
    generatingVid = True
    generation = client.generations.create(
        prompt=vid_prompt,
        model="ray-2",
        resolution="720p",
        duration=dur
    )
    completed = False
    while not completed:
      generation = client.generations.get(id=generation.id)
      if generation.state == "completed":
        completed = True
      elif generation.state == "failed":
        raise RuntimeError(f"Generation failed: {generation.failure_reason}")
      print("Dreaming")
      time.sleep(5)
    video_url = generation.assets.video
    # download the video
    response = requests.get(video_url, stream=True)
    with open(f'staticVid1/{generation.id}.mp4', 'wb') as file:
        file.write(response.content)
    print(f"Video generated in staticVid1/{generation.id}.mp4")
    generatingVid = False
    return f"Video generated in staticVid1/{generation.id}.mp4"

#class extend_video_schema(BaseModel):
video_tools = [generate_vid]
video_tool_node = ToolNode(video_tools)
video_worker = video_gen_llm.bind_tools(video_tools)

### Storyboard Worker

In [8]:
#SCHEMA
class step_by_step_output_schema(BaseModel):
    """Prints a storyboard for the user to view in string format"""
    story_description: str = Field(..., description="A 200-250 word description of the story")
    character_details: str = Field(..., description="A 200-300 word description of each character in the scene")
    background_details: str = Field(..., description="A 50-100 description of the scene")
    auditory_details: str = Field(..., description="A 50-100 description of the voice profile of each character")
    dialogue_details: str = Field(..., description="The parts of the dialogue that require enunciation and emotion")

@tool("storyboard_tool",args_schema=step_by_step_output_schema)
def generate_storyboard(story_description: str,character_details: str,background_details: str,auditory_details: str,dialogue_details: str) -> str:
    temp_storyboard = "STORY ANALYSIS\n------------------------------\n"+story_description+"\nCHARACTERS\n------------------------------\n"+character_details
    temp_storyboard+="\nBACKGROUND\n------------------------------\n"+background_details+"\nAUDIO\n------------------------------\n"+auditory_details+"\nDIALOGUE\n------------------------------\n"+dialogue_details
    print(temp_storyboard)
    return temp_storyboard

class character_profile_gen_schema(BaseModel):
    """Generates a character profile for a character and returns the filepath"""
    character_name: str = Field(..., description="The name of the character, taken from the script")
    character_details: str = Field(..., description="A 200-300 word description of the character in the scene, taken from the script")

@tool("character_profile_tool",args_schema=character_profile_gen_schema)
def generate_character_profile(character_name: str, character_details: str)->str:
    global generatingImg
    if(generatingImg):
        return "Failed to generate image, there is currently another image being generated"
    generatingImg = True
    generation = client.generations.image.create(
      prompt="Generate a hyperrealistic, front-facing portrait \
      The image should feature perfectly even, diffused lighting that completely\
      eliminates any shadows on the face. Use a direct, center-camera angle against a neutral,\
      unobtrusive background to ensure absolute consistency. Focus on lifelike details with natural \
      skin textures and realistic, balanced color tones, making the portrait suitable as a reference\
      for video character consistency.: "+character_name+", "+character_details,
    )
    completed = False
    while not completed:
      generation = client.generations.get(id=generation.id)
      if generation.state == "completed":
        completed = True
      elif generation.state == "failed":
        raise RuntimeError(f"Generation failed: {generation.failure_reason}")
      print("Dreaming")
      time.sleep(2)
    image_url = generation.assets.image
    print("image_url: " +image_url)
    response = requests.get(image_url, stream=True)
    with open(f'charRef/{generation.id}.jpg', 'wb') as file:
        file.write(response.content)
    print(f"Image generated in charRef/{generation.id}.jpg")
    generatingImg = False
    return f"Image generated in charRef/{generation.id}.jpg"

storyboard_tools = [generate_storyboard,generate_character_profile]
storyboard_tool_node = ToolNode(storyboard_tools)
storyboard_worker = storyboard_llm.bind_tools(storyboard_tools)

### Audio Worker

### Editor Worker

### Supervisor

## RUNNING MODEL

In [9]:
members = ["video_worker","storyboard_worker"] #"audio_worker","editor"
supervisor_options = members + [END]

class Supervisor_Router(TypedDict):
    """Worker to route to next to fulfill the user's request. If no workers are needed, route to END."""

    next: Literal[*supervisor_options]

# Define the function that determines whether to continue or not
def go_next(state: MessagesState) -> Literal[*supervisor_options]:
    print(f"traveling")
    return state["next"]


# Define the function that calls the model
def call_supervisor(state: MessagesState):
    print("supervisor")
    messages = state["messages"]
    context_message = {
        "role": "system",
        "content": f"You are a supervisor of a short film project and am in charge of a team of 4. You can delegate relevant tasks to any of these members: {members}.\
        Bob the video_worker is capable of generating high fidelity videos, but requires clear contextual information. Steve the storyboard_worker is able to analyze\
        a given input script and break it down into fine details and generate character profiles. You should ALWAYS ensure Steve has generated a STORYBOARD and TWO\
        character profiles before anything else. Generate a step-by-step plan from the following prompt and act on it."
    }
    response = supervisor_llm.with_structured_output(Supervisor_Router).invoke(messages)
    # Wrap the response in a valid message format
    structured_message = {
        "role": "assistant",
        "content": f"Routing to {response['next']}"
    }
    return {"messages": [structured_message], "next": response["next"]}

def call_video_worker(state: MessagesState):
    print("video worker")
    messages = state['messages']
    context_message = {
        "role": "system",
        "content": "You are Bob, a video worker. Process the given request accordingly. Note that you can only generate 5 second or 9 second videos.\
        If you need to generate further content, ensure the characters and the scene in the video are consistent by providing similar prompts.\
        Generate only ONE video at a time. Wait until you have received the filepath of the current video being generated before generating another. [Use Unique Prompts]"
    }
    response = video_worker.invoke([context_message]+messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}

def call_storyboard_worker(state: MessagesState):
    print("storyboard worker")
    messages = state['messages']
    context_message = {
        "role": "system",
        "content": "You are Steve, a storyboarder. Process the given request accordingly. You have access to two tools: a storyboard generator and a character profile\
        generator. You should pass in all character results from the storyboard generator into the character profile generator."
    }
    response = storyboard_worker.invoke([context_message]+messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}

def check_video_tool_calls(state: MessagesState) -> Literal["video_tools","supervisor"]:
    print("check video tool call")
    messages = state['messages']
    last_message = messages[-1]
    # If the LLM makes a tool call, then we route to the "tools" node
    if last_message.tool_calls:
        return "video_tools"
    return "supervisor"

def check_storyboard_tool_calls(state: MessagesState) -> Literal["storyboard_tools","supervisor"]:
    print("check storyboard tool call")
    messages = state['messages']
    last_message = messages[-1]
    # If the LLM makes a tool call, then we route to the "tools" node
    if last_message.tool_calls:
        return "storyboard_tools"
    return "supervisor"
# Define a new graph
workflow = StateGraph(MessagesState)

# Define the two nodes we will cycle between
workflow.add_node("supervisor", call_supervisor)
workflow.add_node("video_worker", call_video_worker)
workflow.add_node("video_tools", video_tool_node)
workflow.add_node("storyboard_worker", call_storyboard_worker)
workflow.add_node("storyboard_tools", storyboard_tool_node)

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "supervisor")
# We now add a conditional edge
workflow.add_conditional_edges(
    "supervisor",
    go_next,
)
workflow.add_conditional_edges(
    "video_worker",
    check_video_tool_calls,
)
workflow.add_conditional_edges(
    "storyboard_worker",
    check_storyboard_tool_calls,
)

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("video_tools", 'video_worker')
workflow.add_edge("storyboard_tools","storyboard_worker");

# Initialize memory to persist state between graph runs
checkpointer = MemorySaver()

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable.
# Note that we're (optionally) passing the memory when compiling the graph
app = workflow.compile(checkpointer=checkpointer)

generatingImg = False
generatingVid = False
# Use the agent
final_state = app.invoke(
    {"messages": [{"role": "user", "content": "Generate a STORYBOARD and a VIDEO about the following scene: INT. MONTAGUE MUSEUM – MODERN DAYThe once-grand family home, now a tour-driven Hearst Castle-like museum.ELIZABETH MONTAGUE, intense and angry, argues with Ethan.ELIZABETHYou know you can’t tell a soul,right?ETHANAre you kidding? I’m going to telleveryone. You can’t hide this."}]},
    config={"configurable": {"thread_id": 42}}
)
final_state["messages"][-1].content

supervisor
traveling
storyboard worker
check storyboard tool call
STORY ANALYSIS
------------------------------
In the grand yet faded Montague Museum, a once opulent family home turned tourist attraction, an argument brews between two key figures, Elizabeth and Ethan. The air is thick with tension as Elizabeth, an embodiment of noble rage, stands fiercely with her arms akimbo. Ethan, the rebellious younger counterpart, leans slightly against a marble pillar, the very picture of defiance. Their voices rise and fall amidst the echoes of the museum, a stark contrast to the whispers of history that surround them. Elizabeth, fiercely protective of her family's legacy, is determined to keep a secret buried, while Ethan, a wildcard, sees the value in exposing the truth. The stark lighting and grand museum backdrop offer a stunning contrast to their intimate confrontation, lending a dramatic flair to the unfolding familial conflict. As the argument escalates, the camera captures close-ups of 

'Routing to __end__'

In [None]:
generation = client.generations.image.create(
    prompt="sunglasses",
    image_ref=[
      {
        "url": "https://storage.cdn-luma.com/dream_machine/7e4fe07f-1dfd-4921-bc97-4bcf5adea39a/video_0_thumb.jpg",
        "weight": 0.85
      }
    ]
)