In [1]:
import getpass
import os

# del os.environ['NVIDIA_API_KEY']  ## delete key and reset
if os.environ.get("NVIDIA_API_KEY", "").startswith("nvapi-"):
    print("Valid NVIDIA_API_KEY already in environment. Delete to reset")
else:
    nvapi_key = getpass.getpass("NVAPI Key (starts with nvapi-): ")
    assert nvapi_key.startswith(
        "nvapi-"
    ), f"{nvapi_key[:5]}... is not a valid key"
    os.environ["NVIDIA_API_KEY"] = nvapi_key

NVAPI Key (starts with nvapi-):  ········


In [2]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
#from pydantic import BaseModel, Field
#from pydantic_core import from_json
#from typing import List
from prompts import STORY_JSON_PROMPT, STORY_GENERATE_IMAGE_PROMPT, SAFE_STORY_PROMPT, EXTRACT_SUMMARIZE_STORY_PROMPT
from llama_index.core import Settings
from llama_index.core import  SimpleDirectoryReader, ServiceContext, SummaryIndex
from llama_index.llms.nvidia import NVIDIA
from llama_index.core.workflow.retry_policy import ConstantDelayRetryPolicy
    
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)
from llama_index.core import PromptTemplate
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.core.response_synthesizers import SimpleSummarize

from nemoguardrails import LLMRails, RailsConfig
from llama_index.core.output_parsers import PydanticOutputParser

from image_gen import (generate_image, base64_to_imagefile)
from utils import (write_file, read_file, has_file, save_url_data)

from events import (ChildrenStoryEvent, PromptEvent, PDFEvent, StorySummaryEvent, BookImageEvent, AudioEvent)


Settings.llm = NVIDIA(model="meta/llama3-70b-instruct")


DATA_PATH = './data'
IMAGE_PATH = 'image'
AUDIO_PATH = 'audio'
VIDEO_PATH = 'video'
RAW_STORY_FILE = 'raw_story.txt'
STORY_JSON_FILE = 'story.json'
STORY_PDF_FILE = 'story.pdf'
TITLE_PROMPT_FILE = 'title_prompt.txt'
VIDEO_NAME = 'story_video.mp4'
CREATE_PDF = False

class ChildrenStoryGenerationWorkflow(Workflow):
    @step
    async def read_story(self, ev: StartEvent) -> ChildrenStoryEvent|PromptEvent|PDFEvent|StorySummaryEvent|StopEvent:
        CREATE_PDF = ev.pdf
        if len(ev.url) > 0: 
            save_url_data(ev.url, f'{DATA_PATH}/{RAW_STORY_FILE}')
            return StartEvent(url='')
        elif has_file(DATA_PATH, STORY_JSON_FILE) and has_file(DATA_PATH, STORY_PDF_FILE):
            story = read_story_json(f'{DATA_PATH}/{STORY_JSON_FILE}')
            return PDFEvent(story = story, path=f'{DATA_PATH}/{STORY_PDF_FILE}')
        elif has_file(DATA_PATH, STORY_JSON_FILE):
            story = read_story_json(f'{DATA_PATH}/{STORY_JSON_FILE}')
            if has_prompts(DATA_PATH, story):
                return PromptEvent(path=DATA_PATH, story=story)
            else:
                return ChildrenStoryEvent(story=output)
        elif has_file(DATA_PATH, RAW_STORY_FILE):
            reader = SimpleDirectoryReader(input_files=[f'{DATA_PATH}/{RAW_STORY_FILE}'])
            docs = reader.load_data()
            texts = [d.text for d in docs]
            summarizer = SimpleSummarize()
            response = await summarizer.aget_response(EXTRACT_SUMMARIZE_STORY_PROMPT, texts)
            return StorySummaryEvent(story=str(response))
        else :
            return StopEvent(result="{error:'Please specify url'}")
          
    @step 
    async def create_guardrail(self, ev: StorySummaryEvent) -> StoryEvent:
        config = RailsConfig.from_path('config')
        rails = LLMRails(config)
        template = PromptTemplate(SAFE_STORY_PROMPT)
        prompt = template.format(story=ev.story)
        res = await rails.generate_async(prompt=prompt)
        return StoryEvent(story=str(res))
    
    @step #(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=5))
    async def generate_json(self, ev: StoryEvent) -> ChildrenStoryEvent:
        story = ev.story
        #print(story)
        program = LLMTextCompletionProgram.from_defaults(
            output_parser=PydanticOutputParser(output_cls=ChildrenStory),
            prompt_template_str=STORY_JSON_PROMPT,
            verbose=True,
        )
        output:ChildrenStory = program(story=story)
        write_file(output.json(), f'{DATA_PATH}/{STORY_JSON_FILE}')
        return ChildrenStoryEvent(story=output)
    
    @step #(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=20))
    async def generate_prompt(self, ev: ChildrenStoryEvent) -> PromptEvent:
        story = ev.story
        template = PromptTemplate(STORY_GENERATE_IMAGE_PROMPT)
        prompt = template.format(page = "title page", story=get_full_story_with_title(story))

        response = Settings.llm.complete(prompt)
        write_file(response.text, f'{DATA_PATH}/{TITLE_PROMPT_FILE}')
        for page in story.pages:
            template = PromptTemplate(STORY_GENERATE_IMAGE_PROMPT)
            prompt = template.format(page = f"page_no {str(page.page_no)}", story=get_full_story_with_title(story))

            response = Settings.llm.complete(prompt)
            write_file(response.text, f'{DATA_PATH}/{str(page.page_no)}_prompt.txt')
        return PromptEvent(path=DATA_PATH, story=story)
    @step
    async def generate_image(self, ev: PromptEvent) -> BookImageEvent:
        path = ev.path
        story = ev.story
        prompt = parse_prompt(f'{DATA_PATH}/{TITLE_PROMPT_FILE}')
        outfile = f'{path}/{IMAGE_PATH}/title.jpg'
        key = os.environ["NVIDIA_API_KEY"]
        base64_to_imagefile(generate_image(prompt, key), out_file)
        
        for i in range(len(story.pages)):
            prompt = parse_prompt(f'{path}/{i+1}_prompt.txt')
            outfile = f'{path}/{IMAGE_PATH}/{i+1}.jpg'
            base64_to_imagefile(generate_image(prompt, key), out_file)
        return BookImageEvent(story = story, path=f'{path}/{IMAGE_PATH}')
    @step
    async def generate_pdf(self, ev: BookImageEvent) -> PDFEvent|StopEvent:
        create_pdf(f'{DATA_PATH}/{STORY_PDF_FILE}', ev.story, ev.path)
        if CREATE_PDF:
            return StopEvent(result=f'{DATA_PATH}/{STORY_PDF_FILE}')
        else:
            return PDFEvent(story = ev.story, path=f'{DATA_PATH}/{STORY_PDF_FILE}')

    @step
    async def generate_audio(self, ev: PDFEvent) -> AudioEvent:
        story = ev.story
        save_audio(data, f'{DATA_PATH}/{AUDIO_PATH}' )
        return AudioEvent(story=story, pdf=ev.path, path=f'{DATA_PATH}/{AUDIO_PATH}' )

    @step
    async def generate_video(self, ev: AudioEvent) -> StopEvent:
        pdf_to_image(ev.pdf, f'{DATA_PATH}/{IMAGE_PATH}' )
        save_audio_video(ev.story, f'{DATA_PATH}/{IMAGE_PATH}' , ev.path, f'{DATA_PATH}/{VIDEO_PATH}' )
        save_video(len(ev.story.pages) + 1, f'{DATA_PATH}/{VIDEO_PATH}' , f'{DATA_PATH}/{VIDEO_PATH}/{VIDEO_NAME}')
        return StopEvent(result = f'{DATA_PATH}/{VIDEO_PATH}/{VIDEO_NAME}')


story_url = "https://en.wikipedia.org/wiki/The_Sparrow%27s_Lost_Bean"

w = ChildrenStoryGenerationWorkflow(timeout=600, verbose=True)
result = await w.run(url = story_url, pdf=False)
print(result)