In [None]:
import os
import datetime
from datetime import date, timedelta, datetime
import pandas as pd
import numpy as np
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from crewai_tools import TavilySearchTool
from crewai import Agent, Task, Crew, Process
from langchain_groq import ChatGroq
from dotenv import load_dotenv
load_dotenv()
from langchain_core.rate_limiters import InMemoryRateLimiter

In [10]:
rate_limiter = InMemoryRateLimiter(requests_per_second=0.2,
                                   check_every_n_seconds=5,
                                   max_bucket_size=2
                                   )

In [11]:
import os
import json
from pathlib import Path

class CheckpointManager:
    """Manages checkpoint files for crew task resumption"""
    
    def __init__(self, checkpoint_dir="checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
        
    def save_checkpoint(self, task_name, output, metadata=None):
        """Save task output to checkpoint file"""
        checkpoint = {
            "task_name": task_name,
            "output": output,
            "timestamp": datetime.now().isoformat(),
            "metadata": metadata or {}
        }
        
        filepath = self.checkpoint_dir / f"{task_name}.json"
        with open(filepath, 'w') as f:
            json.dump(checkpoint, f, indent=2)
        print(f"‚úì Checkpoint saved: {task_name}")
        
    def load_checkpoint(self, task_name):
        """Load task output from checkpoint file"""
        filepath = self.checkpoint_dir / f"{task_name}.json"
        if filepath.exists():
            with open(filepath, 'r') as f:
                checkpoint = json.load(f)
            print(f"‚úì Checkpoint loaded: {task_name}")
            return checkpoint
        return None
    
    def checkpoint_exists(self, task_name):
        """Check if checkpoint exists for a task"""
        filepath = self.checkpoint_dir / f"{task_name}.json"
        return filepath.exists()
    
    def get_last_completed_task(self, task_names):
        """Find the last successfully completed task"""
        for i in range(len(task_names) - 1, -1, -1):
            if self.checkpoint_exists(task_names[i]):
                return i, task_names[i]
        return -1, None
    
    def clear_checkpoints(self):
        """Clear all checkpoint files"""
        for file in self.checkpoint_dir.glob("*.json"):
            file.unlink()
        print("‚úì All checkpoints cleared")

# Initialize checkpoint manager
checkpoint_mgr = CheckpointManager()


In [12]:
# llm = ChatGroq(
#     model="groq/llama-3.1-8b-instant", 
#     temperature=0.5,
#     stop=None,
#     timeout=120,
#     max_retries=3,
#     request_timeout=90,
#     rate_limiter=rate_limiter  
# )


# messages = [
#     ("system", "You are a helpful assistant."),
#     ("human", "Translate this to French: I love programming.")
# ]
# ai_msg = llm.invoke(messages)
# print(ai_msg.content)

In [13]:
search_tool = TavilySearchTool()

In [14]:
planner = Agent(
    role="Research & Content Planner",
    goal=(
        "Conduct thorough research and curate factually accurate, engaging content on {topic}. "
        "Ensure the collected information is credible, up-to-date, and valuable for the target audience."
    ),
    backstory=(
        "A meticulous Content Planner with expertise in research, content strategy, and audience engagement. "
        "With a keen eye for detail and a strong analytical mindset, they specialize in gathering reliable data, "
        "identifying key trends, and structuring insights into well-organized content plans. "
        "Their experience spans digital content creation, market research, and fact-checking, ensuring "
        "that every piece of information is credible and impactful. "
        "They excel at distilling complex topics into clear, actionable insights that serve as a foundation "
        "for compelling content creation."
    ),
    allow_delegation=False,
    verbose=True,
    llm="groq/llama-3.3-70b-versatile",
    max_rpm=10,
)

writer = Agent(
    role="Content Writer",
    goal="Write insightful and factually accurate "
            "opinion piece about the topic: {topic}",
    backstory="You're working on a writing "
                "a new opinion piece about the topic: {topic}. "
                "You base your writing on the work of "
                "the Research & Content Planner, who provides an outline "
                "and relevant context about the topic. "
                "You follow the main objectives and "
                "direction of the outline, "
                "as provide by the Content Planner. "
                "You also provide objective and impartial insights "
                "and back them up with information "
                "provide by the Content Planner. "
                "You acknowledge in your opinion piece "
                "when your statements are opinions "
                "as opposed to objective statements.",
    allow_delegation=False,
    verbose=True,
    llm="groq/llama-3.1-8b-instant",
    max_rpm=10,
)

editor = Agent(
    role="Editor",
    goal="Edit a given blog post to align with "
            "the writing style of the organization. ",
    backstory="You are an editor who receives a blog post "
                "from the Content Writer. "
                "Your goal is to review the blog post "
                "to ensure that it follows journalistic best practices,"
                "provides balanced viewpoints "
                "when providing opinions or assertions, "
                "and also avoids major controversial topics "
                "or opinions when possible.",
    allow_delegation=False,
    verbose=True,
    llm="groq/llama-3.1-8b-instant",
    max_rpm=10,
)


In [15]:
from crewai import Task

# Custom callback to save checkpoints after each task
def save_task_checkpoint(task_output, task_name):
    """Callback to save task output to checkpoint"""
    checkpoint_mgr.save_checkpoint(
        task_name,
        task_output.raw,
        metadata={"agent": task_output.agent, "status": "completed"}
    )
    return task_output

plan = Task(
    description=(
        "1. Prioritize the latest trends, key players, "
        "and noteworthy news on {topic}.\n"
        "2. Identify the target audience, considering "
        "their interests and pain points.\n"
        "3. Develop a detailed content outline including "
        "an introduction, key points, and a call to action, conclusion and necessary references.\n"
        "4. Include SEO keywords and relevant data or sources.\n"
        "When using search tools, provide the search query as a simple string."
    ),
    expected_output="A comprehensive content plan document "
    "with an outline, audience analysis, "
    "SEO keywords, and resources.",
    tools=[search_tool],
    agent=planner,
    output_file="checkpoints/plan.txt"  # Saves raw text output
)

write = Task(
    description=(
        "1. Use the content plan to craft a compelling "
        "blog post on {topic}.\n"
        "2. Incorporate SEO keywords naturally.\n"
        "3. Sections/Subtitles are properly named "
        "in an engaging manner.\n"
        "4. Adding necessary Hyperlinks and bolding for important sentences/words/statements.\n"
        "5. If required you can add comparison tables/table and data if it is necessary for the topic.\n"
        "6. Ensure the post is structured with an "
        "engaging introduction, insightful body, "
        "and a summarizing conclusion.\n"
        "7. Proofread for grammatical errors and "
        "alignment with the brand's voice.\n"
        "When using search tools, provide the search query as a simple string."
    ),
    expected_output="A well-written blog post "
    "in markdown format, ready for publication, "
    "each section should have 2 or 3 paragraphs.",
    tools=[],
    context=[plan],
    agent=writer,
    output_file="checkpoints/write.txt"
)

edit = Task(
    description=("Proofread the given blog post for "
    "grammatical errors, checks plagiarism and "
    "alignment with the brand's voice."),
    expected_output="A well-written blog post, "
    "ready for publication, "
    "each section should have 2 or 3 paragraphs.",
    tools=[],
    context=[write],
    agent=editor,
    output_file="checkpoints/edit.txt"
)


In [16]:
from crewai import Crew, Process
import time

def run_crew_with_checkpoints(topic, force_restart=False):
    """
    Run crew with automatic checkpoint resumption
    
    Args:
        topic: The topic to research and write about
        force_restart: If True, clears all checkpoints and starts fresh
    """
    
    # Clear checkpoints if force restart
    if force_restart:
        checkpoint_mgr.clear_checkpoints()
        print("üîÑ Starting fresh (all checkpoints cleared)")
    
    # Define task names in execution order
    task_names = ["plan", "write", "edit"]
    
    # Find where to resume from
    last_completed_idx, last_task = checkpoint_mgr.get_last_completed_task(task_names)
    
    if last_completed_idx >= 0:
        print(f"üìç Resuming from checkpoint: {last_task}")
        print(f"   Skipping tasks: {task_names[:last_completed_idx+1]}")
        
        # Load previous outputs for context
        plan_output = None
        write_output = None
        
        if checkpoint_mgr.checkpoint_exists("plan"):
            plan_checkpoint = checkpoint_mgr.load_checkpoint("plan")
            plan_output = plan_checkpoint["output"]
            
        if checkpoint_mgr.checkpoint_exists("write"):
            write_checkpoint = checkpoint_mgr.load_checkpoint("write")
            write_output = write_checkpoint["output"]
    else:
        print("üöÄ Starting crew from beginning (no checkpoints found)")
        plan_output = None
        write_output = None
    
    # Configure crew with rate limiting
    crew = Crew(
        agents=[planner, writer, editor],
        tasks=[plan, write, edit],
        verbose=True,
        process=Process.sequential,
        max_rpm=10,  # Limit to 10 requests per minute
        share_crew=False
    )
    
    # Execute crew with checkpoint saving
    try:
        # If we have all checkpoints, just return the final result
        if checkpoint_mgr.checkpoint_exists("edit"):
            print("‚úì All tasks completed! Loading final output...")
            final_checkpoint = checkpoint_mgr.load_checkpoint("edit")
            return final_checkpoint["output"]
        
        # Run the crew
        print(f"ü§ñ Running crew for topic: {topic}")
        result = crew.kickoff(inputs={"topic": topic})
        
        # Save checkpoints after successful completion
        # Note: Individual task outputs are saved via output_file parameter
        # This saves the final crew output
        checkpoint_mgr.save_checkpoint(
            "edit", 
            result.raw,
            metadata={"topic": topic, "status": "completed"}
        )
        
        print("‚úÖ Crew execution completed successfully!")
        return result
        
    except Exception as e:
        print(f"‚ùå Error during crew execution: {e}")
        print("üíæ Progress has been saved. Run again to resume from last checkpoint.")
        raise

# Wrapper with exponential backoff for rate limits
@retry(
    wait=wait_exponential(multiplier=2, min=4, max=120),
    stop=stop_after_attempt(5),
    retry=retry_if_exception_type((Exception,))
)
def kickoff_with_retry(topic, force_restart=False):
    """Execute crew with retry logic and checkpoint management"""
    try:
        return run_crew_with_checkpoints(topic, force_restart)
    except Exception as e:
        print(f"‚ö†Ô∏è  Retry triggered due to: {e}")
        time.sleep(5)  # Additional delay between retries
        raise


In [17]:
topic = "OpenAI Agentkit"
result = kickoff_with_retry(topic)

üöÄ Starting crew from beginning (no checkpoints found)
ü§ñ Running crew for topic: OpenAI Agentkit
‚úì Checkpoint saved: edit
‚úÖ Crew execution completed successfully!
