In [None]:
!pip install langgraph
!pip install langchain_openai
!pip install langchain-community
!pip install langchain-core
!pip install langgraph
!pip install --upgrade langchain_community
!pip install openai langgraph langchain-core

Collecting langgraph
  Downloading langgraph-0.2.60-py3-none-any.whl.metadata (15 kB)
Collecting langgraph-checkpoint<3.0.0,>=2.0.4 (from langgraph)
  Downloading langgraph_checkpoint-2.0.9-py3-none-any.whl.metadata (4.6 kB)
Collecting langgraph-sdk<0.2.0,>=0.1.42 (from langgraph)
  Downloading langgraph_sdk-0.1.48-py3-none-any.whl.metadata (1.8 kB)
Downloading langgraph-0.2.60-py3-none-any.whl (135 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.7/135.7 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading langgraph_checkpoint-2.0.9-py3-none-any.whl (37 kB)
Downloading langgraph_sdk-0.1.48-py3-none-any.whl (43 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.7/43.7 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: langgraph-sdk, langgraph-checkpoint, langgraph
Successfully installed langgraph-0.2.60 langgraph-checkpoint-2.0.9 langgraph-sdk-0.1.48
Collecting langchain_openai
  Downloading langchain_ope

In [None]:
from typing import TypedDict, Annotated, Sequence, Dict, List, TypeVar, Union
from langgraph.graph import Graph, StateGraph
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from datetime import datetime
import json
from enum import Enum
from openai import OpenAI
from langchain_core.runnables import RunnableConfig
from langgraph.graph import Graph, StateGraph, END


In [None]:
class TaskState(TypedDict):
    tasks: List[Dict]
    next_action: str
    messages: List[Union[HumanMessage, AIMessage, SystemMessage]]
    current_task: Dict
    recommendations: List[str]
    error: str
    processed: bool
    should_end: bool

class Action(str, Enum):
    ADD_TASK = "add_task"
    COMPLETE_TASK = "complete_task"
    GET_RECOMMENDATIONS = "get_recommendations"
    GET_SUMMARY = "get_summary"
    ERROR = "error"
    DONE = "done"

class XAPI:
    def __init__(self, api_key: str):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.x.ai/v1"
        )

    def invoke(self, messages, model="grok-2-1212"):
        valid_models = ["grok-2-1212", "grok-2-vision-1212", "grok-beta", "grok-vision-beta"]
        if model not in valid_models:
            raise ValueError(f"Invalid model name. Choose from: {', '.join(valid_models)}")

        formatted_messages = [
            {"role": "system" if isinstance(msg, SystemMessage) else "assistant" if isinstance(msg, AIMessage) else "user",
             "content": msg.content}
            for msg in (messages if isinstance(messages, list) else [messages])
        ]

        try:
            response = self.client.chat.completions.create(
                model=model,
                messages=formatted_messages
            )
            return AIMessage(content=response.choices[0].message.content)
        except Exception as e:
            raise Exception(f"X.AI API error: {str(e)}")

In [None]:
llm = XAPI(api_key="xai-OCc18UPfYRelwCxRVeUGYOy2ObPfRBCW7aaL9GDhIoei0SkacrvXoTFmWWcGkyB9h6dNPusr4cqNRXIg")

In [None]:
def analyze_input(state: TaskState) -> TaskState:
    """Determine what action to take based on the last message."""
    if state.get("processed", False):
        new_state = state.copy()
        new_state["next_action"] = Action.DONE.value
        new_state["should_end"] = True
        return new_state

    messages = [
        SystemMessage(content="""You are a task management assistant. Analyze the user's input and
        determine the appropriate action to take. Respond with one of: 'add_task', 'complete_task',
        'get_recommendations', 'get_summary', or 'error'."""),
        state["messages"][-1]
    ]

    response = llm.invoke(messages)
    action = response.content.strip().lower()

    if action not in [e.value for e in Action]:
        action = Action.ERROR.value

    new_state = state.copy()
    new_state["next_action"] = action
    new_state["should_end"] = False
    return new_state
def add_task(state: TaskState) -> TaskState:
    messages = [
        SystemMessage(content="""Extract task details from the user's message and format them as JSON.
        Include:
        - description (string)
        - priority (number 1-3)
        - deadline (string)
        Example output:
        {
            "description": "Review quarterly report",
            "priority": 1,
            "deadline": "2024-01-05"
        }"""),
        state["messages"][-1]
    ]

    try:
        response = llm.invoke(messages)
        content = response.content.strip()
        if content.startswith("```json"):
            content = content[7:-3]
        elif content.startswith("```"):
            content = content[3:-3]

        task_details = json.loads(content)

        if not all(key in task_details for key in ["description", "priority", "deadline"]):
            raise ValueError("Missing required task fields")

        task = {
            "description": task_details["description"],
            "priority": int(task_details["priority"]),
            "deadline": task_details["deadline"],
            "status": "pending",
            "created_at": datetime.now().isoformat()
        }

        new_state = state.copy()
        new_state["tasks"] = state["tasks"] + [task]
        new_state["current_task"] = task
        new_state["messages"] = state["messages"] + [
            AIMessage(content=f"Successfully added task:\n- Description: {task['description']}\n- Priority: {task['priority']}\n- Deadline: {task['deadline']}")
        ]
        new_state["error"] = ""
        new_state["processed"] = True
        new_state["should_end"] = True
        return new_state

    except json.JSONDecodeError as e:
        # Handle JSON parsing errors
        new_state = state.copy()
        new_state["error"] = f"Invalid task format: {str(e)}"
        new_state["messages"] = state["messages"] + [
            AIMessage(content=f"Error: Could not parse task details. Please try again with a clearer format.")
        ]
        new_state["processed"] = True
        new_state["should_end"] = True
        return new_state
    except Exception as e:
        # Handle other errors
        new_state = state.copy()
        new_state["error"] = f"Failed to add task: {str(e)}"
        new_state["messages"] = state["messages"] + [
            AIMessage(content=f"Error: {str(e)}. Please try again.")
        ]
        new_state["processed"] = True
        new_state["should_end"] = True
        return new_state

def complete_task(state: TaskState) -> TaskState:
    new_state = state.copy()
    messages = [
        SystemMessage(content="Extract the task description from the user's message."),
        state["messages"][-1]
    ]

    try:
        response = llm.invoke(messages)
        task_description = response.content.strip().lower()


        task_description = task_description.replace("mark", "").replace("task:", "").replace("complete", "").strip()

        # Finding the best matching task
        found = False
        for task in new_state["tasks"]:
            if (task["status"] == "pending" and
                (task_description in task["description"].lower() or
                 task["description"].lower() in task_description)):
                task["status"] = "completed"
                task["completed_at"] = datetime.now().isoformat()
                found = True
                new_state["messages"] = state["messages"] + [
                    AIMessage(content=f"Task completed: {task['description']}")
                ]
                break

        if not found:
            new_state["error"] = f"No matching pending task found for: {task_description}"
            new_state["messages"] = state["messages"] + [
                AIMessage(content=f"Could not find a pending task matching: {task_description}\nAvailable pending tasks:\n" +
                         "\n".join(f"- {t['description']}" for t in new_state["tasks"] if t["status"] == "pending"))
            ]

        new_state["processed"] = True
        new_state["should_end"] = True
        return new_state

    except Exception as e:
        new_state["error"] = f"Error completing task: {str(e)}"
        new_state["messages"] = state["messages"] + [
            AIMessage(content=f"Error: {str(e)}. Please try again.")
        ]
        new_state["processed"] = True
        new_state["should_end"] = True
        return new_state

def generate_recommendations(state: TaskState) -> TaskState:
    """Generate recommendations based on current tasks."""
    new_state = state.copy()
    pending_tasks = [t for t in state["tasks"] if t["status"] == "pending"]

    if not pending_tasks:
        new_state["recommendations"] = [
            "No pending tasks. Consider adding new tasks to track your work.",
            "Use this time to plan upcoming projects or review completed work.",
            "Set up task categories and priorities for future tasks."
        ]
        new_state["messages"] = state["messages"] + [
            AIMessage(content="No pending tasks found. This is a good time to:\n" +
                     "\n".join(f"- {r}" for r in new_state["recommendations"]))
        ]
    else:
        messages = [
            SystemMessage(content="""Analyze the current tasks and provide specific, actionable recommendations.
            Consider priorities, deadlines, and task relationships."""),
            AIMessage(content=f"Current pending tasks: {json.dumps(pending_tasks, indent=2)}")
        ]

        response = llm.invoke(messages)
        recommendations = [r.strip() for r in response.content.split('\n') if r.strip()]

        new_state["recommendations"] = recommendations
        new_state["messages"] = state["messages"] + [
            AIMessage(content="Recommendations based on your current tasks:\n" +
                     "\n".join(f"- {r}" for r in recommendations))
        ]

    new_state["error"] = ""
    new_state["processed"] = True
    new_state["should_end"] = True
    return new_state

def get_summary(state: TaskState) -> TaskState:
    """Generate a detailed summary of all tasks."""
    new_state = state.copy()
    pending = [t for t in state["tasks"] if t["status"] == "pending"]
    completed = [t for t in state["tasks"] if t["status"] == "completed"]

    summary = {
        "total_tasks": len(state["tasks"]),
        "pending_tasks": len(pending),
        "completed_tasks": len(completed),
        "high_priority_pending": len([t for t in pending if t["priority"] == 1])
    }


    text_summary = [
        "Task Summary:",
        f"- Total Tasks: {summary['total_tasks']}",
        f"- Pending Tasks: {summary['pending_tasks']}",
        f"- Completed Tasks: {summary['completed_tasks']}",
        f"- High Priority Pending: {summary['high_priority_pending']}"
    ]

    if pending:
        text_summary.extend([
            "\nPending Tasks:",
            *[f"- {t['description']} (Priority: {t['priority']}, Due: {t['deadline']})"
              for t in pending]
        ])

    new_state["messages"] = state["messages"] + [
        AIMessage(content="\n".join(text_summary))
    ]
    new_state["error"] = ""
    new_state["processed"] = True
    new_state["should_end"] = True
    return new_state





In [None]:
def handle_error(state: TaskState) -> TaskState:
    """Handles invalid inputs or errors."""
    new_state = state.copy()
    new_state["error"] = "An unexpected error occurred."
    new_state["messages"] = state["messages"] + [AIMessage(content="An error occurred processing your request.")]
    new_state["processed"] = True
    new_state["should_end"] = True
    return new_state

In [None]:
def build_task_graph():
    graph = StateGraph(TaskState)

    # Add nodes
    graph.add_node("analyze", analyze_input)
    graph.add_node("add_task", add_task)
    graph.add_node("complete_task", complete_task)
    graph.add_node("get_recommendations", generate_recommendations)
    graph.add_node("get_summary", get_summary)
    graph.add_node("handle_error", handle_error)

    # Adding  conditional edges
    def route_next_step(state: TaskState) -> str:
        if state.get("should_end", False):
            return "end_processing"
        return state["next_action"]

    # Adding  edges from analyze to all other nodes
    graph.add_conditional_edges(
        "analyze",
        route_next_step,
        {
            "add_task": "add_task",
            "complete_task": "complete_task",
            "get_recommendations": "get_recommendations",
            "get_summary": "get_summary",
            "error": "handle_error",
            "end_processing": END
        }
    )

    # Adding edges from each action node back to analyze for continuation
    graph.add_edge("add_task", "analyze")
    graph.add_edge("complete_task", "analyze")
    graph.add_edge("get_recommendations", "analyze")
    graph.add_edge("get_summary", "analyze")
    graph.add_edge("handle_error", "analyze")

    # Set entry point
    graph.set_entry_point("analyze")

    return graph.compile()



In [None]:
class TaskManagementAgent:
    def __init__(self):
        self.graph = build_task_graph()
        self.state = TaskState(
            tasks=[],
            next_action="",
            messages=[],
            current_task={},
            recommendations=[],
            error="",
            processed=False,
            should_end=False
        )

    def run(self, user_input: str) -> str:
        """Process user input and return response."""
        current_state = TaskState(
            tasks=self.state["tasks"].copy(),
            next_action="",
            messages=self.state["messages"] + [HumanMessage(content=user_input)],
            current_task=self.state["current_task"].copy(),
            recommendations=self.state["recommendations"].copy(),
            error="",
            processed=False,
            should_end=False
        )

        config = RunnableConfig(recursion_limit=500)
        result = self.graph.invoke(current_state, config)
        self.state = result
        return self.state["messages"][-1].content


In [None]:
agent = TaskManagementAgent()

# Add a task
print(agent.run("Add a new task to review the quarterly report, high priority, due next Friday"))
print(agent.run("Ive got to cook some stew , high priority, due next Friday"))
print(agent.run("I need to feed my dogs, due next Friday"))

# Complete a task
print(agent.run("Mark the quarterly report review as complete"))

# Get recommendations
print(agent.run("What should I work on next?"))

# Get summary
print(agent.run("Show me a summary of all tasks"))

Successfully added task:
- Description: Review quarterly report
- Priority: 1
- Deadline: 2024-01-05
Successfully added task:
- Description: Cook some stew
- Priority: 1
- Deadline: 2024-01-12
Successfully added task:
- Description: Feed my dogs
- Priority: 2
- Deadline: next Friday
Could not find a pending task matching: the quarterly report review as
Available pending tasks:
- Review quarterly report
- Cook some stew
- Feed my dogs
Recommendations based on your current tasks:
- Based on the current tasks listed, here are specific, actionable recommendations considering priorities, deadlines, and task relationships:
- 1. **Review Quarterly Report**
- - **Priority**: High (Priority 1)
- - **Deadline**: January 5, 2024
- - **Action**: This task should be your immediate focus due to its high priority and imminent deadline. Allocate at least 2-3 hours today to thoroughly review the report. If possible, break it down into sections and review them one at a time to manage the workload effect