In [10]:
# ATLAS : Academic Task and Learning Agent System

# ATLAS demonstrates how to build an intelligent multi-agent system that transforms the way students manage their academic life. Using LangGraph's workflow framework, we'll create a network of specialized AI agents that work together to provide personalized academic support, from automated scheduling to intelligent lectures summarization.

# Key Components

# Coordinator Agent: Orchestrates the interaction between specialized agents and manages the overall system state
# Planner Agent: Handles calendar integration and schedule optimization
# Notewriter Agent: Processes academic content and generates study materials
# Advisor Agent: Provides personalized learning and time management advices


%pip install langgraph langchain langchain-openai openai python-dotenv capture

Note: you may need to restart the kernel to use updated packages.


In [11]:
## Graph Visualization
%pip install python3-dev graphviz libgraphviz-dev pkg-config
%pip install graphviz pygraphviz

Note: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement python3-dev (from versions: none)
ERROR: No matching distribution found for python3-dev


Collecting graphvizNote: you may need to restart the kernel to use updated packages.


  error: subprocess-exited-with-error
  
  × Building wheel for pygraphviz (pyproject.toml) did not run successfully.
  │ exit code: 1
  ╰─> [48 lines of output]
      running bdist_wheel
      running build
      running build_py
      creating build\lib.win-amd64-cpython-313\pygraphviz
      copying pygraphviz\agraph.py -> build\lib.win-amd64-cpython-313\pygraphviz
      copying pygraphviz\graphviz.py -> build\lib.win-amd64-cpython-313\pygraphviz
      copying pygraphviz\scraper.py -> build\lib.win-amd64-cpython-313\pygraphviz
      copying pygraphviz\testing.py -> build\lib.win-amd64-cpython-313\pygraphviz
      copying pygraphviz\__init__.py -> build\lib.win-amd64-cpython-313\pygraphviz
      creating build\lib.win-amd64-cpython-313\pygraphviz\tests
      copying pygraphviz\tests\test_attribute_defaults.py -> build\lib.win-amd64-cpython-313\pygraphviz\tests
      copying pygraphviz\tests\test_clear.py -> build\lib.win-amd64-cpython-313\pygraphviz\tests
      copying pygraphviz\test


  Using cached graphviz-0.20.3-py3-none-any.whl.metadata (12 kB)
Collecting pygraphviz
  Using cached pygraphviz-1.14.tar.gz (106 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Using cached graphviz-0.20.3-py3-none-any.whl (47 kB)
Building wheels for collected packages: pygraphviz
  Building wheel for pygraphviz (pyproject.toml): started
  Building wheel for pygraphviz (pyproject.toml): finished with status 'error'
Failed to build pygraphviz


In [9]:
import operator
from functools import reduce
from typing import Annotated, List , Dict, TypedDict, Literal, Optional, Callable,  Set, Tuple, Any, Union,  TypeVar
from datetime import datetime, timedelta, timezone
import asyncio
from pydantic import BaseModel, Field
from operator import add
from IPython.display import Image, display
import json 
import re
import os
from openai import OpenAI, AsyncOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage
from langchain.prompts import PromptTemplate
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, Graph, END , START


# Pretty Markdown Output
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.text import Text
from rich import box
from rich.style import Style 
from dotenv import load_dotenv


In [10]:
def configure_api_keys():
    """Configure and verify API keys for LLM services."""
    load_dotenv()
    api_key = os.getenv("NEMOTRON_4_340B_INSTRUCT_KEY")

   
    is_configured = bool(os.getenv("NEMOTRON_4_340B_INSTRUCT_KEY"))
    print(f"API Key configured: {is_configured}")
    return is_configured

api_configured = configure_api_keys()
if not api_configured:
    print("\nAPI key not found. Please ensure you have:")
    print("1. Set up your API key in Google Colab secrets, or")
    print("2. Created a .env file with NEMOTRON_4_340B_INSTRUCT_KEY")

API Key configured: True


In [11]:
#Define the AcademicState class to hold the workflow's state.

T = TypeVar('T')

def dict_reducer(dict1: Dict[str,Any], dict2: Dict[str,Any]) -> Dict[str,Any]:
    """
    Merge two dictionaries recursively

    Example:
    dict1 = {"a": {"x": 1}, "b": 2}
    dict2 = {"a": {"y": 2}, "c": 3}
    result = {"a": {"x": 1, "y": 2}, "b": 2, "c": 3}
    """
    merged = dict1.copy()
    for key, value in dict2.items():
        if key in merged and isinstance(merged[key], dict) and isinstance(value, dict):
            merged[key] = dict_reducer(merged[key], value)
        else:
            merged[key] = value
    return merged

In [12]:
class AcademicState(TypedDict):
    """Master state container for the academic assistance system"""
    messages : Annotated[List[BaseMessage],add] # Conversation history
    profile : Annotated[Dict, dict_reducer]     # Student information
    calendar : Annotated[Dict, dict_reducer]    # Scheduled events
    tasks : Annotated[Dict, dict_reducer]       # To-do items and assignments
    results : Annotated[Dict[str,Any], dict_reducer]       # Operation outputs

In [16]:
# LLM Initialization
# Key Differences:

# 1. Concurrency Model
#   - AsyncOpenAI: Asynchronous operations using `async/await`
#   - OpenAI: Synchronous operations that block execution

# 2. Use Cases
#   - AsyncOpenAI: High throughput, non-blocking operations
#   - OpenAI: Simple sequential requests, easier debugging

class LLMConfig:
    """Configuration settings for the LLM."""
    base_url = "https://integrate.api.nvidia.com/v1"
    model: str = "nvidia/nemotron-4-340b-instruct"
    max_tokens: int = 1024
    default_temp : float = 0.5

class NeMoLLaMa:
    """
    A class to interact with NVIDIA's nemotron-4-340b-instruct model through their API
    This implementation uses AsyncOpenAI client for asynchronous operations
    """
    def __init__(self, api_key: str):
        """Initialize NeMoLLaMa with API key.

        Args:
            api_key (str): NVIDIA API authentication key
        """
        self.config = LLMConfig()
        self.client = AsyncOpenAI(
            base_url=self.config.base_url,
            api_key=api_key
        )
        self.is_authenticated = False

    async def check_auth(self)->bool:
        """Verify API authentication with test request.

        Returns:
            bool: Authentication status

        Example:
            >>> is_valid = await llm.check_auth()
            >>> print(f"Authenticated: {is_valid}")
        """
        test_message = [{"role":"user","content":"test"}]
        try:
            await self.agenerate(test_message, temperature=0.1)
            self.is_authenticated = True
            return True
        except Exception as e:
            print(f"❌ Authentication failed: {str(e)}")
            return False
    
    async def agenerate(self, messages: List[Dict], temperature: Optional[float]= None)-> str:
        """Generate text using NeMo LLaMa model.

        Args:
            messages: List of message dicts with 'role' and 'content'
            temperature: Sampling temperature (0.0 to 1.0, default from config)

        Returns:
            str: Generated text response

        Example:
            >>> messages = [
            ...     {"role": "system", "content": "You are a helpful assistant"},
            ...     {"role": "user", "content": "Plan my study schedule"}
            ... ]
            >>> response = await llm.agenerate(messages, temperature=0.7)
        """
        completion = await self.client.chat.completions.create(
            model = self.config.model,
            messages = messages,
            temperature=temperature or self.config.default_temp,
            max_tokens=self.config.max_tokens,
            stream = False
        )
        return completion.choices[0].message.content


In [17]:
# DataManager
# A centralized data management system for AI agents to handle multiple data sources.

# This class serves as a unified interface for accessing and managing different types of
# structured data (profiles, calendars, tasks) that an AI agent might need to process.
# It handles data loading, parsing, and provides methods for intelligent filtering and retrieval.

class DataManager:

    def __init__(self):
        """
        Initialize data storage containers.
        All data sources start as None until explicitly loaded through load_data().
        """
        self.profile_data = None
        self.calendar_data = None
        self.tasks_data = None

    def load_data(self, profile_json:str, calendar_json:str, task_json:str):
        """
        Load and parse multiple JSON data sources simultaneously.

        Args:
            profile_json (str): JSON string containing user profile information
            calendar_json (str): JSON string containing calendar events
            task_json (str): JSON string containing task/todo items

        Note: This method expects valid JSON strings. Any parsing errors will propagate up.
        """
        self.profile_data = json.loads(profile_json)
        self.calendar_data = json.loads(calendar_json)
        self.tasks_data = json.loads(task_json)

    def get_student_profile(self, student_id:str)-> Dict:
        """
        Retrieve a specific student's profile using their unique identifier.

        Args:
            student_id (str): Unique identifier for the student

        Returns:
            Dict: Student profile data if found, None otherwise

        Implementation Note:
            Uses generator expression with next() for efficient search through profiles,
            avoiding full list iteration when possible.
        """

        if self.profile_data:
            return next((p for p in self.profile_data["profiles"] if p["id"] == student_id), None)
        
        return None
    
    def parse_datetime(self, dt_str:str)-> datetime:
        """
        Smart datetime parser that handles multiple formats and ensures UTC timezone.

        Args:
            dt_str (str): DateTime string in ISO format, with or without timezone

        Returns:
            datetime: Parsed datetime object in UTC timezone

        Implementation Note:
            Handles both timezone-aware and naive datetime strings by:
            1. First attempting to parse with timezone information
            2. Falling back to assuming UTC if no timezone is specified
        """
        try :
            # First attempt: Parse ISO format with timezone
            dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
            return dt.astimezone(timezone.utc)
        except ValueError:
            # Fallback: Assume UTC if no timezone provided
            dt = datetime.fromisoformat(dt_str)
            return dt.replace(tzinfo=timezone.utc)
        
    def get_upcoming_events(self, days:int =7)-> List[Dict]:
        """
        Intelligently filter and retrieve upcoming calendar events within a specified timeframe.

        Args:
            days (int): Number of days to look ahead (default: 7)

        Returns:
            List[Dict]: List of upcoming events, chronologically ordered

        Implementation Note:
            - Uses UTC timestamps for consistent timezone handling
            - Implements error handling for malformed event data
            - Only includes events that start in the future up to the specified timeframe
        """
        if not self.calendar_data:
            return []
        
        now = datetime.now(timezone.utc)
        future = now + timedelta(days=days)
        events = []
        for event in self.calendar_data.get("events",[]):
            try:
                start_time = self.parse_datetime(event["start"]["dateTime"])
                if now <= start_time <= future:
                    events.append(event)
            except (KeyError, ValueError) as e:
                print(f"Warning: Could not process event due to {str(e)}")
                continue
        return events
    
    def get_active_tasks(self)-> List[Dict]:
        """
        Retrieve and filter active tasks, enriching them with parsed datetime information.

        Returns:
            List[Dict]: List of active tasks with parsed due dates

        Implementation Note:
            - Filters for tasks that are:
              1. Not completed ("needsAction" status)
              2. Due in the future
            - Enriches task objects with parsed datetime for easier processing
            - Implements robust error handling for malformed task data
        """

        if not self.tasks_data:
            return []
        
        now = datetime.now(timezone.utc)
        active_tasks = []

        for task in self.tasks_data.get("tasks",[]):
            try:
                due_date = self.parse_datetime(task["due"])
                if task["status"] == "needsAction" and due_date > now:
                    # Enrich task object with parsed datetime
                    task["due_datetime"] = due_date
                    active_tasks.append(task)
            except (KeyError, ValueError) as e:
                print(f"Warning: Could not process task due to {str(e)}")
                continue

        return active_tasks
    

In [18]:
llm = NeMoLLaMa(api_key=os.getenv("NEMOTRON_4_340B_INSTRUCT_KEY"))
data_manager = DataManager()
print(llm)

<__main__.NeMoLLaMa object at 0x0000029204A117F0>


In [19]:
# Agent Executor
# Orchestrates the concurrent execution of multiple specialized AI agents.

# This class implements a sophisticated execution pattern that allows multiple AI agents
# to work together, either sequentially or concurrently, based on a coordination analysis.
# It handles agent initialization, concurrent execution, error handling, and fallback strategies.

class AgentExecutor:

    def __init(self, llm):
        """
        Initialize the executor with a language model and create agent instances.

        Args:
            llm: Language model instance to be used by all agents

        Implementation Note:
            - Creates a dictionary of specialized agents, each initialized with the same LLM
            - Supports multiple agent types: PLANNER (default), NOTEWRITER, and ADVISOR
            - Agents are instantiated once and reused across executions
        """
        self.llm = llm
        self.agents = {
            "PLANNER" : PlannerAgent(llm),  # Strategic planning agent
            "NOTEWRITER" : NoteWriterAgent(llm),    # Documentation agent
            "ADVISOR" : AdvisorAgent(llm)   # Academic advice agent
        }

    async def execute(self, state: AcademicState)-> Dict:
        """
        Orchestrates concurrent execution of multiple AI agents based on analysis results.

        This method implements a sophisticated execution pattern:
        1. Reads coordination analysis to determine required agents
        2. Groups agents for concurrent execution
        3. Executes agent groups in parallel
        4. Handles failures gracefully with fallback mechanisms

        Args:
            state (AcademicState): Current academic state containing analysis results

        Returns:
            Dict: Consolidated results from all executed agents

        Implementation Details:
        ---------------------
        1. Analysis Interpretation:
           - Extracts coordination analysis from state
           - Determines required agents and their concurrent execution groups

        2. Concurrent Execution Pattern:
           - Processes agents in groups that can run in parallel
           - Uses asyncio.gather() for concurrent execution within each group
           - Only executes agents that are both required and available

        3. Result Management:
           - Collects and processes results from each concurrent group
           - Filters out failed executions (exceptions)
           - Formats successful results into a structured output

        4. Fallback Mechanisms:
           - If no results are gathered, falls back to PLANNER agent
           - Provides emergency fallback plan in case of complete failure

        Error Handling:
        --------------
        - Catches and handles exceptions at multiple levels:
          * Individual agent execution failures don't affect other agents
          * System-level failures trigger emergency fallback
        - Maintains system stability through graceful degradation
        """
        try:
           # Extract coordination analysis from state
           analysis = state["results"].get("coordinator_analysis", {})

           # Determine execution requirements
           required_agents = analysis.get("required_agents", ["PLANNER"])# PLANNER as default
           concurrent_groups = analysis.get("concurrent_groups", []) # Groups for parallel execution
        
           # Initialize results container
           results = {}

           # Process each concurrent group sequentially
           for group in concurrent_groups:
               # Prepare concurrent tasks for current group
               tasks = []
               for agent_name in group:
                   # Validate agent availability and requirement
                   if agent_name in required_agents and agent_name in self.agents:
                       tasks.append(self.agents[agent_name](state))
               # Execute group tasks concurrently if any exist
               if tasks:
                   # Gather results from concurrent execution
                   group_results = await asyncio.gather(*tasks, return_exceptions=True)

                   # Process successful results only
                   for agent_name, result in zip(group, group_results):
                       if not isinstance(result, Exception):
                           results[agent_name.lower()] = result

           # Implement fallback strategy if no results were obtained
           if not results and "PLANNER" in self.agents:
               planner_result = await self.agents["PLANNER"](state)
               results["planner"] = planner_result

           print("agent_outputs", results)
           # Return structured results
           return {
                "results": {
                    "agent_outputs": results
                }
           }

        except Exception as e:
            print(f"Execution error: {e}")
            # Emergency fallback with minimal response
            return {
                "results": {
                    "agent_outputs": {
                        "planner": {
                            "plan": "Emergency fallback plan: Please try again or contact support."
                        }
                    }
                }
            }

           

In [None]:
# Agent Action and Output Models
# Defines the structure for agent actions and outputs using Pydantic models. These models ensure type safety and validation for agent operations.

class AgentAction(BaseModel):
    """
    Model representing an agent's action decision.

    Attributes:
        action (str): The specific action to be taken (e.g., "search_calendar", "analyze_tasks")
        thought (str): The reasoning process behind the action choice
        tool (Optional[str]): The specific tool to be used for the action (if needed)
        action_input (Optional[Dict]): Input parameters for the action

    Example:
        >>> action = AgentAction(
        ...     action="search_calendar",
        ...     thought="Need to check schedule conflicts",
        ...     tool="calendar_search",
        ...     action_input={"date_range": "next_week"}
        ... )
    """
    action: str # Required action to be performed
    thought: str    # Reasoning behind the action
    tool: Optional[str] = None  # Optional tool specification
    action_input: Optional[Dict] = None # Optional input parameters

class AgentOutput(BaseModel):
    """
    Model representing the output from an agent's action.

    Attributes:
        observation (str): The result or observation from executing the action
        output (Dict): Structured output data from the action

    Example:
        >>> output = AgentOutput(
        ...     observation="Found 3 free time slots next week",
        ...     output={
        ...         "free_slots": ["Mon 2PM", "Wed 10AM", "Fri 3PM"],
        ...         "conflicts": []
        ...     }
        ... )
    """
    observation: str  # Result or observation from the action
    output: Dict     # Structured output data

In [13]:
# ReACT agent
# What's actually is ReACT?

# ReACT (Reasoning and Acting) is a framework that combines reasoning and acting in an iterative process. It enables LLMs to approach complex tasks by breaking them down into:

# (Re)act: Take an action based on observations and tools
# (Re)ason: Think about what to do next
# (Re)flect: Learn from the outcome
# Example Flow:

# Thought: Need to check student's schedule for study time
# Action: search_calendar
# Observation: Found 2 free hours tomorrow morning
# Thought: Student prefers morning study, this is optimal
# Action: analyze_tasks
# Observation: Has 3 pending assignments
# Plan: Schedule morning study session for highest priority task

class ReActAgent:
    """
        Base class for ReACT-based agents implementing reasoning and action capabilities.

        Features:
        - Tool management for specific actions
        - Few-shot learning examples
        - Structured thought process
        - Action execution framework
    """
    def __init__(self, llm):
        """
        Initialize the ReActAgent with language model and available tools

        Args:
            llm: Language model instance for agent operations
        """
        self.llm = llm
        # Storage for few-shot examples to guide the agent
        self.few_shot_examples = []

        # Dictionary of available tools with their corresponding methods\
        self.tools = {
          "search_calendar": self.search_calendar,      # Calendar search functionality
          "analyze_tasks": self.analyze_tasks,          # Task analysis functionality
          "check_learning_style": self.check_learning_style,  # Learning style assessment
          "check_performance": self.check_performance   # Academic performance checking
        }
    
    async def search_calendar(self, state: AcademicState) -> List[Dict]:
      """
      Search for upcoming calendar events

      Args:
          state (AcademicState): Current academic state

      Returns:
          List[Dict]: List of upcoming calendar events
      """
      # Get events from calendar or empty list if none exist
      events = state["calendar"].get("events", [])
      # Get current time in UTC
      now = datetime.now(timezone.utc)
      # Filter and return only future events
      return [e for e in events if datetime.fromisoformat(e["start"]["dateTime"]) > now]

    async def analyze_tasks(self, state: AcademicState) -> List[Dict]:
        """
        Analyze academic tasks from the current state

        Args:
            state (AcademicState): Current academic state

        Returns:
            List[Dict]: List of academic tasks
        """
        # Return tasks or empty list if none exist
        return state["tasks"].get("tasks", [])

    async def check_learning_style(self, state: AcademicState) -> AcademicState:
            """
            Retrieve student's learning style and study patterns

            Args:
                state (AcademicState): Current academic state

            Returns:
                AcademicState: Updated state with learning style analysis
            """
            # Get user profile from state
            profile = state["profile"]

            # Get learning preferences
            learning_data = {
                "style": profile.get("learning_preferences", {}).get("learning_style", {}),
                "patterns": profile.get("learning_preferences", {}).get("study_patterns", {})
            }

            # Add to results in state
            if "results" not in state:
                state["results"] = {}
            state["results"]["learning_analysis"] = learning_data

            return state

    async def check_performance(self, state: AcademicState) -> AcademicState:
            """
            Check current academic performance across courses

            Args:
                state (AcademicState): Current academic state

            Returns:
                AcademicState: Updated state with performance analysis
            """
            # Get user profile from state
            profile = state["profile"]

            # Get course information
            courses = profile.get("academic_info", {}).get("current_courses", [])

            # Add to results in state
            if "results" not in state:
                state["results"] = {}
            state["results"]["performance_analysis"] = {"courses": courses}

            return state

In [None]:
# Coordinator Agent

