# Recap

In [3]:
from crewai.flow.flow import Flow, start

class SimpleFlow(Flow):

    @start()
    def initialize(self):
        print("Flow started")

flow = SimpleFlow()
await flow.kickoff_async()

Flow started


In [4]:
from crewai.flow.flow import Flow, listen, start

class SequentialFlow(Flow):

    @start()
    def first_task(self):
        print("Step 1: Fetching data")
        return "data_fetched"

    @listen(first_task)
    def second_task(self, result):
        print(f"Step 2: Processing {result}")

flow = SequentialFlow()
await flow.kickoff_async()

Step 1: Fetching data
Step 2: Processing data_fetched


In [5]:
from crewai.flow.flow import Flow, listen, or_, start

class OrFlow(Flow):

    @start()
    def fetch_from_api(self):
        return "API data"

    @start()
    def read_from_db(self):
        return "Database record"

    @listen(or_(fetch_from_api, read_from_db))
    def process_data(self, result):
        print(f"Processing: {result}")

flow = OrFlow()
await flow.kickoff_async()

Processing: API data
Processing: Database record


In [6]:
from crewai.flow.flow import Flow, listen, and_, start

class AndFlow(Flow):

    @start()
    def step_one(self):
        print("Step 1: Collecting user input")
        return "User data"

    @start()
    def step_two(self):
        print("Step 2: Validating input")
        return "Validated data"

    @listen(and_(step_one, step_two))
    def final_step(self):
        print("All conditions met. Proceeding with final step.")

flow = AndFlow()
await flow.kickoff_async()

Step 1: Collecting user input
Step 2: Validating input
All conditions met. Proceeding with final step.


In [7]:
import random
from crewai.flow.flow import Flow, listen, router, start

class RouterFlow(Flow):

    @start()
    def classify_request(self):
        request_type = random.choice(["urgent", "normal"])
        print(f"Request classified as: {request_type}")
        return request_type

    @router(classify_request)
    def handle_request(self, classification):
        return "handle_urgent" if classification == "urgent" else "handle_normal"

    @listen("handle_urgent")
    def urgent_handler(self):
        print("Handling urgent request")

    @listen("handle_normal")
    def normal_handler(self):
        print("Handling normal request")

flow = RouterFlow()
await flow.kickoff_async()


Request classified as: normal
Handling normal request


In [8]:
from crewai.flow.flow import Flow, listen, start

class StateFlow(Flow):

    @start()
    def initialize_state(self):
        self.state["count"] = 1
        print(f"Initial count: {self.state['count']}")

    @listen(initialize_state)
    def increment_count(self):
        self.state["count"] += 1
        print(f"Updated count: {self.state['count']}")

flow = StateFlow()
await flow.kickoff_async()


Initial count: 1
Updated count: 2


In [9]:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

# Defining a structured state model
class CounterState(BaseModel):
    count: int = 0

class StructuredStateFlow(Flow[CounterState]):

    @start()
    def initialize_state(self):
        print(f"Initial count: {self.state.count}")
        self.state.count = 1

    @listen(initialize_state)
    def increment_count(self):
        self.state.count += 1
        print(f"Updated count: {self.state.count}")

flow = StructuredStateFlow()
await flow.kickoff_async()


Initial count: 0
Updated count: 2


# Content planner flow

Before you get started:

Create a .env file in the directory and store these two values:

- FIRECRAWL_API_KEY="fc-..." (get the firecrawl API key here: https://www.firecrawl.dev/i/api)
- OPENAI_API_KEY="sk-..."

In [None]:
!pip install crewai crewai-tools firecrawl-py

In [1]:
# Importing necessary libraries
import os
import uuid
import yaml
import json
from pathlib import Path
from pydantic import BaseModel
from typing import Optional

# Firecrawl SDK
from firecrawl import FirecrawlApp

# Importing Crew related components
from crewai import Agent, Task, Crew, LLM

# Importing CrewAI Flow related components
from crewai.flow.flow import Flow, listen, start, router, or_

from dotenv import load_dotenv
load_dotenv()

import nest_asyncio
nest_asyncio.apply()

In [2]:
# llm = LLM(
#     model="ollama/llama3.2",
#     base_url="http://localhost:11434"
# )

llm = LLM(
    model="gpt-4o",
)

In [3]:
blog_post_url = "https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag"

## Twitter and LinkedIn Planning Crew

In [4]:
# define structured output for twitter and linkedin

class Tweet(BaseModel):
    """Represents an individual tweet in a thread"""
    content: str
    is_hook: bool = False  # Identifies if this is the opening/hook tweet
    media_urls: Optional[list[str]] = []  # Optional media attachments (images, code snippets)

class Thread(BaseModel):
    """Represents a Twitter thread"""
    topic: str  # Main topic/subject of the thread
    tweets: list[Tweet]  # List of tweets in the thread

class LinkedInPost(BaseModel):
    """Represents a LinkedIn post"""
    content: str
    media_url: str # Main image url for the post

In [5]:
# load agent and task configurations from yaml files and tools

from crewai_tools import (
    DirectoryReadTool,
    FileReadTool,
)

# Load agent and task configurations from YAML files
with open('config/planner_agents.yaml', 'r') as f:
    agents_config = yaml.safe_load(f)

with open('config/planner_tasks.yaml', 'r') as f:
    tasks_config = yaml.safe_load(f)

In [6]:
# create agents, their tasks and crew for twitter

draft_analyzer = Agent(config=agents_config['draft_analyzer'], tools=[
    DirectoryReadTool(),
    FileReadTool()
], llm=llm)

twitter_thread_planner = Agent(config=agents_config['twitter_thread_planner'], tools=[
    DirectoryReadTool(),
    FileReadTool()
], llm=llm)

analyze_draft = Task(
  config=tasks_config['analyze_draft'],
  agent=draft_analyzer
)

create_twitter_thread_plan = Task(
  config=tasks_config['create_twitter_thread_plan'],
  agent=twitter_thread_planner,
  output_pydantic=Thread
)

twitter_planning_crew = Crew(
    agents=[draft_analyzer, twitter_thread_planner],
    tasks=[analyze_draft, create_twitter_thread_plan],
    verbose=False
)

In [8]:
# create agents, their tasks and crew for linkedin

linkedin_post_planner = Agent(config=agents_config['linkedin_post_planner'], tools=[
    DirectoryReadTool(),
    FileReadTool()
    ], llm=llm)

create_linkedin_post_plan = Task(
  config=tasks_config['create_linkedin_post_plan'],
  agent=linkedin_post_planner,
  output_pydantic=LinkedInPost
)

linkedin_planning_crew = Crew(
    agents=[draft_analyzer, linkedin_post_planner],
    tasks=[analyze_draft, create_linkedin_post_plan],
    verbose=False
)



In [35]:
# define state for the content planning flow

class ContentPlanningState(BaseModel):
  """
  State for the content planning flow
  """
  blog_post_url: str = blog_post_url
  draft_path: Path = "assets/ "
  post_type: str = "twitter"
  path_to_example_threads: str = "assets/example_threads.txt"
  path_to_example_linkedin: str = "assets/example_linkedin.txt"

class CreateContentPlanningFlow(Flow[ContentPlanningState]):
  # No need for AI Agents on this step, so we just use regular Python code
  @start()
  def scrape_blog_post(self):
    print(f"# fetching draft from: {self.state.blog_post_url}")
    app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
    scrape_result = app.scrape_url(self.state.blog_post_url, params={'formats': ['markdown', 'html']})
    try:
      title = scrape_result['metadata']['title']
    except Exception as e:
      title = str(uuid.uuid4())
    self.state.draft_path = f'assets/{title}.md'
    with open(self.state.draft_path, 'w') as f:
      f.write(scrape_result['markdown'])
    return self.state

  @router(scrape_blog_post)
  def select_platform(self):
    if self.state.post_type == "twitter":
      return "twitter"
    elif self.state.post_type == "linkedin":
      return "linkedin"

  @listen("twitter")
  def twitter_draft(self):
    print(f"# Planning content for: {self.state.draft_path}")
    
    result = twitter_planning_crew.kickoff(inputs={'draft_path': self.state.draft_path, 'path_to_example_threads': self.state.path_to_example_threads})
    
    print(f"# Planned content for {self.state.draft_path}:")
    
    for i, tweet in enumerate(result.pydantic.tweets):
        
        print(f"Tweet {i+1}:")
        print(f"{tweet.content}")
        print(f"Media URLs: {tweet.media_urls}")

        print("-"*100)
    return result
  
  @listen("linkedin")
  def linkedin_draft(self):
    print(f"# Planning content for: {self.state.draft_path}")
    result = linkedin_planning_crew.kickoff(inputs={'draft_path': self.state.draft_path, 'path_to_example_linkedin': self.state.path_to_example_linkedin})
    print(f"# Planned content for {self.state.draft_path}:")
    print(f"{result.pydantic.content}")
    return result

  @listen(or_(twitter_draft, linkedin_draft))
  def save_plan(self, plan):
    with open(f'output/{self.state.draft_path.split("/")[-1]}_{self.state.post_type}.json', 'w') as f:
        json.dump(plan.pydantic.model_dump(), f, indent=2)

In [36]:
# Plot the flow
flow = CreateContentPlanningFlow()
flow.state.post_type = "twitter"

In [32]:
flow.plot()

Plot saved as crewai_flow.html


In [30]:
flow.state

StateWithId(id='90b27ba0-faad-41ec-9cf6-7999638968ef', blog_post_url='https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag', draft_path='assets/ ', post_type='twitter', path_to_example_threads='assets/example_threads.txt')

In [31]:
flow.kickoff()

[1m[35m Flow started with ID: 90b27ba0-faad-41ec-9cf6-7999638968ef[00m
# fetching draft from: https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag
# Planning content for: assets/5 Chunking Strategies For RAG - by Avi Chawla.md
# Planned content for assets/5 Chunking Strategies For RAG - by Avi Chawla.md:
Tweet 1:
5 Chunking Strategies For RAG
Media URLs: []
----------------------------------------------------------------------------------------------------
Tweet 2:
Chunking is crucial in RAG systems to efficiently process large documents. 📚
Here's how it fits into the RAG workflow with data stored as vectors, matched for queries, and generating responses:
Media URLs: ['https://substack-post-media.s3.amazonaws.com/public/images/6878b8fa-5e74-45a1-9a89-5aab92889126_2366x990.gif']
----------------------------------------------------------------------------------------------------
Tweet 3:
Let's break down the 5 essential chunking strategies, starting with **Fixed-size Chunkin

In [38]:
flow.state.post_type = "linkedin"
flow.kickoff()

[1m[35m Flow started with ID: 5973fef0-9201-489f-818c-a40959f64f6e[00m
# fetching draft from: https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag
# Planning content for: assets/5 Chunking Strategies For RAG - by Avi Chawla.md
# Planned content for assets/5 Chunking Strategies For RAG - by Avi Chawla.md:
🔍 **5 Chunking Strategies For RAG** 🚀

Understanding how to effectively handle large-scale data is pivotal for any data-driven business. In this exploration, we delve into '5 Chunking Strategies For RAG' to unravel the complexities and uncover strategies that illuminate the path to efficient data processing.

Effective data chunking is not just a technical hurdle; it's a business imperative. Without it, systems can become inefficient, data retrieval times can skyrocket, and ultimately, the user experience can falter. These challenges not only hinder operational efficiency but can significantly impact the competitive edge of your business.

So, what are the solutions and te

## Book Writing Flow

Since the folder already exists in the current directory, you would have to delete the existing one or define another folder name instead below.

Also, set the Serper dev and OpenAI API keys in the .env file of the book_writing_flow folder.

In [39]:
!mkdir book_flow

In [40]:
%cd book_flow

/Users/avichawla/Desktop/posts/AI Agent Crash course Part 4/book_flow


In [42]:
!crewai create flow book_writing_flow

[32m[1mCreating flow book_writing_flow...[0m
[32m[1mFlow book_writing_flow created successfully![0m


In [44]:
%cd book_writing_flow/src

/Users/avichawla/Desktop/posts/AI Agent Crash course Part 4/book_flow/book_writing_flow/src


In [53]:
!python book_writing_flow/main.py

Generating outline
[1m[95m# Agent:[00m [1m[92mBook Research Agent[00m
[95m## Task:[00m [92mResearch the topic Astronomy in 2025 and gather latest information about it. Prepare insights and key points that will be used to create a outline for a book  by the outline writer Agent.
[00m


[1m[95m# Agent:[00m [1m[92mBook Research Agent[00m
[95m## Thought:[00m [92mI need to gather the latest information about Astronomy in 2025 to create a list of insights and key points for an outline.[00m
[95m## Using tool:[00m [92mSearch the internet[00m
[95m## Tool Input:[00m [92m
"{\"search_query\": \"Astronomy advancements and discoveries in 2025\"}"[00m
[95m## Tool Output:[00m [92m

Search results: Title: What Astronomers Hope Is Discovered About Our Universe in 2025
Link: https://www.newsweek.com/what-astronomers-hope-discovered-about-our-universe-2025-1999874
Snippet: In 2025, the Rubin Observatory in Chile will start operations and survey the southern sky every four da