In [None]:
%pip install -U 'crewai[tools]'
%pip install -U crewai
%pip install -U requests
%pip install -U python-dotenv

In [None]:
from crewai import Crew, Agent, Task
import os
from dotenv import load_dotenv
from crewai_tools import ScrapeWebsiteTool

In [None]:
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or "<your_openai_key>"
SERPER_API_KEY = os.getenv("SERPER_API_KEY") or "<your_serper_key>"

In [None]:
from crewai_tools import BaseTool
import requests
import json
import os
from dotenv import load_dotenv

load_dotenv()

class CustomSerperDevTool(BaseTool):
    name: str = "Custom Serper Dev Tool"
    description: str = "Search the internet for news."

    def _run(self, query: str) -> str:
        """
        Search the internet for news.
        """

        url = "https://google.serper.dev/news"

        payload = json.dumps({
            "q": query,
            "num": 20,
            "autocorrect": False,
            "tbs": "qdr:d"
        })

        headers = {
            'X-API-KEY': SERPER_API_KEY,
            'Content-Type': 'application/json'
        }

        response = requests.request("POST", url, headers=headers, data=payload)

        # Parse the JSON response
        response_data = response.json()

        # Extract only the 'news' property
        news_data = response_data.get('news', [])

        # Convert the news data back to a JSON string
        return json.dumps(news_data, indent=2)



In [None]:
custom_serper_dev_tool = CustomSerperDevTool()
scrape_website_tool = ScrapeWebsiteTool()

class Agents:
    def researcher(self, topic):
        return Agent(
            role=f"{topic} Senior News Researcher",
            goal=f"Uncover latest news in {topic}",
            tools=[custom_serper_dev_tool, scrape_website_tool],
            backstory=f"You're a seasoned researcher with a knack for uncovering the latest developments in {topic}. Known for your ability to find the most relevant information and present it in a clear and concise manner.",
            verbose=True,
        )

    def reporting_analyst(self, topic):
        return Agent(
            role=f"{topic} News Reporting Analyst",
            goal=f"Create detailed reports based on {topic} news analysis and research findings",
            tools=[],
            backstory=f"You're a meticulous analyst with a keen eye for detail. You're known for your ability to turn complex data into clear and concise reports, making it easy for others to understand and act on the information you provide.",
            verbose=True,
        )

In [None]:
from textwrap import dedent

class Tasks:
    def research_task(self, agent, topic):
        return Task(
            description=dedent(
                f"""
                    Search news about {topic}
                    """
            ),
            expected_output=dedent(
                f"""A list of news articles about {topic} with the title, url and snippet"""
            ),
            agent=agent,
        )

    def reporting_task(self, agent):
        return Task(
            description=dedent(
                f"""
                    Review the context you got.
                    Make sure the report is detailed and contains any and all relevant information.
                """
            ),
            expected_output=dedent(
                """A fully fledge reporting of the news articles.
                Formatted as markdown without '```'"""
            ),
            agent=agent,
        )

In [None]:
from datetime import datetime

tasks = Tasks()
agents = Agents()

topic = "ai agents"

# Create Agents
researcher = agents.researcher(topic=topic)
reporting_analyst = agents.reporting_analyst(topic=topic)

# Define Tasks for each agent
research_task = tasks.research_task(agent=researcher, topic=topic)
reporting_task = tasks.reporting_task(agent=reporting_analyst)

# Instantiate the crew with a sequential process
crew = Crew(
    agents=[researcher, reporting_analyst],
    tasks=[
        research_task,
        reporting_task,
    ],
)

try:
    # Kick off the process

    result = crew.kickoff()
except Exception as e:
    print(e)

print("Your AI news crew is complete!")
print("Results: ", result)