In [None]:
# Tell Python to look "one level up."
# Python only looks for imports in the current directory or your installed packages.
# To import a .py file from a parent folder,
# you have to manually tell Python to look "one level up."

import sys
from pathlib import Path

# Get the absolute path to the directory one level up
parent_dir = str(Path().resolve().parent)

# Add it to sys.path if it's not already there
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

# Now you can import modules from the parent directory
print(parent_dir)

In [22]:
from langgraph.graph import StateGraph, END
from state import VideoState
from nodes import *
from graph import *

In [11]:
# Initialize Graph
workflow = StateGraph(VideoState)

#### --- Add Nodes ---

In [None]:
workflow.add_node("topic_planner", topic_planner)
workflow.add_node("content_type_router", content_type_router)

In [None]:
# Long Form Nodes
workflow.add_node("script_generator", script_generator)
workflow.add_node("voice_generator", voice_generator)
workflow.add_node("asset_generator", asset_generator)
workflow.add_node("video_composer", video_composer)
workflow.add_node("metadata_generator", metadata_generator)
workflow.add_node("youtube_upload", youtube_upload)

In [None]:
# Short Form Nodes
workflow.add_node("short_script_generator", short_script_generator)
workflow.add_node("short_voice_generator", short_voice_generator)
workflow.add_node("short_asset_generator", short_asset_generator)
workflow.add_node("short_video_composer", short_video_composer)
workflow.add_node("short_metadata_generator", short_metadata_generator)
workflow.add_node("short_youtube_upload", short_youtube_upload)

#### --- Define Edges ---

In [None]:
# Entry
workflow.set_entry_point("topic_planner")
workflow.add_edge("topic_planner", "content_type_router")

In [None]:
# Branching Logic (Section 12.7)
workflow.add_conditional_edges(
    "content_type_router",
    route_content_type,
    ["script_generator", "short_script_generator"]
)

In [None]:
# Long Form Pipeline Flow
# Implements Section 11.1 Retry Logic for Script Generator
workflow.add_conditional_edges(
    "script_generator",
    should_retry,
    {"retry": "script_generator", "next": "voice_generator"}
)

In [None]:
workflow.add_edge("voice_generator", "asset_generator")
workflow.add_edge("asset_generator", "video_composer")
workflow.add_edge("video_composer", "metadata_generator")
workflow.add_edge("metadata_generator", "youtube_upload")
workflow.add_edge("youtube_upload", END)

In [None]:
workflow.add_edge("voice_generator", "asset_generator")
workflow.add_edge("asset_generator", "video_composer")
workflow.add_edge("video_composer", "metadata_generator")
workflow.add_edge("metadata_generator", "youtube_upload")
workflow.add_edge("youtube_upload", END)

In [None]:
# Short Form Pipeline Flow
workflow.add_edge("short_script_generator", "short_voice_generator")
workflow.add_edge("short_voice_generator", "short_asset_generator")
workflow.add_edge("short_asset_generator", "short_video_composer")
workflow.add_edge("short_video_composer", "short_metadata_generator")
workflow.add_edge("short_metadata_generator", "short_youtube_upload")
workflow.add_edge("short_youtube_upload", END)

In [None]:
app = workflow.compile()