In [1]:
# Importing necessary libraries
import os
import uuid
import yaml
import json
from pathlib import Path
from pydantic import BaseModel
from typing import Optional

# Firecrawl SDK
from firecrawl import FirecrawlApp

# Importing Crew related components
from crewai import Agent, Task, Crew, LLM

# Importing CrewAI Flow related components
from crewai.flow.flow import Flow, listen, start, router, or_

from dotenv import load_dotenv
load_dotenv()

import nest_asyncio
nest_asyncio.apply()

In [2]:
from crewai import LLM
llm = LLM(model = "ollama/llama3.2", base_url="http://localhost:11434")

In [3]:
import yaml
with open("config/planner_agents.yaml", 'r') as file:
    agents_config = yaml.safe_load(file)

with open("config/planner_tasks.yaml", 'r') as file:
    tasks_config = yaml.safe_load(file)


In [4]:
from pydantic import BaseModel
from typing import List, Optional

class Tweet(BaseModel):
    """Represets a single tweet in a thread."""
    content: str
    is_hook: bool = False
    media_urls: Optional[List[str]] = None

class Thread(BaseModel):
    """Represents a Twitter thread."""
    topic: str
    tweets: List[Tweet]

class LinkedinPost(BaseModel):
    """Represents a LinkedIn post."""
    content: str
    media_urls: Optional[List[str]] = None


In [5]:
from crewai_tools import DirectoryReadTool, FileReadTool
all_tools = [
    DirectoryReadTool(),
    FileReadTool()
]

In [6]:
# create agents, their tasks and crew for twitter

draft_analyzer = Agent(config=agents_config['draft_analyzer'], tools=[
    DirectoryReadTool(),
    FileReadTool()
], llm=llm)

twitter_thread_planner = Agent(config=agents_config['twitter_thread_planner'], tools=[
    DirectoryReadTool(),
    FileReadTool()
], llm=llm)

analyze_draft = Task(
  config=tasks_config['analyze_draft'],
  agent=draft_analyzer
)

create_twitter_thread_plan = Task(
  config=tasks_config['create_twitter_thread_plan'],
  agent=twitter_thread_planner,
  output_pydantic=Thread
)

twitter_planning_crew = Crew(
    agents=[draft_analyzer, twitter_thread_planner],
    tasks=[analyze_draft, create_twitter_thread_plan],
    verbose=False
)

# create agents, their tasks and crew for linkedin

linkedin_post_planner = Agent(config=agents_config['linkedin_post_planner'], tools=[
    DirectoryReadTool(),
    FileReadTool()
    ], llm=llm)

create_linkedin_post_plan = Task(
  config=tasks_config['create_linkedin_post_plan'],
  agent=linkedin_post_planner,
  output_pydantic=LinkedinPost
)

linkedin_planning_crew = Crew(
    agents=[draft_analyzer, linkedin_post_planner],
    tasks=[analyze_draft, create_linkedin_post_plan],
    verbose=False
)

In [7]:
from pydantic import BaseModel
from pathlib import Path
blog_post_url = "https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag"
class ContentPlanningState(BaseModel):
    """
    State for the content planning flow
    """
    # URL of the blog to scrape
    blog_post_url: str = blog_post_url
    
    # Path where the scraped content will be stored
    draft_path: Path = "assets/"
    
    # Determines whether to create a Twitter or LinkedIn post 
    post_type: str = "twitter"  
    
    # Example Twitter threads for style reference
    path_to_example_threads: str = "assets/example_threads.txt" 
    
    # Example LinkedIn posts for reference
    path_to_example_linkedin: str = "assets/example_linkedin.txt"

In [8]:
from firecrawl import FirecrawlApp
from crewai.flow.flow import Flow, start, router, listen, or_
import os
import uuid
import json

class CreateContentPlanningFlow(Flow[ContentPlanningState]):

    @start()
    def scrape_blog_post(self):
        print(f"# Fetching draft from: {self.state.blog_post_url}")

        # Initialize FireCrawl
        app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
        
        # Scrape the blog post in Markdown and HTML format
        scrape_result = app.scrape(url=self.state.blog_post_url,
                                       formats=['markdown', 'html'])

        # Extract the title (fallback to a UUID if not found)
        try:
            title = scrape_result.metadata.get('title', str(uuid.uuid4()))
        except Exception:
            title = str(uuid.uuid4())

        # Store the scraped content as a markdown file
        self.state.draft_path = f'assets/{title}.md'
        with open(self.state.draft_path, 'w') as f:
            f.write(scrape_result.markdown)

        return self.state
    
    @router(scrape_blog_post)
    def select_platform(self):
        if self.state.post_type == "twitter":
            return "twitter"
        elif self.state.post_type == "linkedin":
            return "linkedin"
        else:
            raise ValueError(f"Unknown post type: {self.state.post_type}")
    
    @listen("twitter")
    def create_twitter_thread(self):
        print("# Creating Twitter thread plan...")

        # Create and execute Crew
        crew_output = twitter_planning_crew.kickoff(
            inputs={
                "draft_path": self.state.draft_path,
                "path_to_example_threads": self.state.path_to_example_threads
            }
        )

        # Store the Twitter thread plan in the state
        self.state.twitter_thread = crew_output.pydantic

        print("# Twitter thread plan created successfully!")

        return self.state
    
    @listen("linkedin")
    def create_linkedin_post(self):
        print("# Creating LinkedIn post plan...")

        # Create and execute Crew
        crew_output = linkedin_planning_crew.kickoff(
            inputs={
                "draft_path": self.state.draft_path,
                "path_to_example_linkedin": self.state.path_to_example_linkedin
            }
        )

        # Store the LinkedIn post plan in the state
        self.state.linkedin_post = crew_output.pydantic

        print("# LinkedIn post plan created successfully!")

        return self.state

    @listen(or_(create_twitter_thread, create_linkedin_post))
    def save_results(self):
        print("# Saving results...")
        
        file_path = f"result_{self.state.post_type}.json"
        with open(file_path, "w") as f:
            if self.state.post_type == "twitter":
                json.dump(self.state.twitter_thread.model_dump(), f, indent=2)
            elif self.state.post_type == "linkedin":
                json.dump(self.state.linkedin_post.model_dump(), f, indent=2)
        
        print(f"# Results saved to {file_path}!")
        return self.state

In [9]:
flow = CreateContentPlanningFlow()

In [10]:
blog_post_url = "https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag"
draft_path = "assets/"
post_type = "twitter"
path_to_example_threads = "assets/example_threads.txt"
path_to_example_linkedin = "assets/example_linkedin.txt"


In [11]:
flow.plot()

'C:\\Users\\kusumuru\\AppData\\Local\\Temp\\crewai_flow_5isl623i\\crewai_flow.html'

In [12]:
flow.kickoff()

# Fetching draft from: https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag




UnicodeEncodeError: 'charmap' codec can't encode character '\U0001f449' in position 11930: character maps to <undefined>