# UNIVERSITY OF SAN DIEGO - MS AAI
## Natural Language Processing and Generative AI
### Final Project - Team 1: Multi-Agent Financial Analysis System.
#### By Manikandan Perumal & Israel Romero Olvera
#### The purpose of this final project is to build a real-world financial analysis system powered by agentic AI, with the abilities of reasoning, planning, and acting based on the user's prompt. It will coordinate multiple specialized LLM agents to handle complex financial tasks end-to-end.
#### Our Agentic AI system was developed in a folder structure that can be found in our GitHub site: https://github.com/isralennon/AAI_520_Group_1/tree/main
#### For delivery purposes we've condensed all the code into this document, structured the following way:
#### 1. Tools - this section contains the code in file /modules/tools.py which will perform basic RAG connections.
#### 2. Parser - this section contains the code in file /modules/parser.py, which provides basic functionality to parse data in JSON format.
#### 3. Memory - this section contains the code in file /modules/memory.py that handles the storage of ongoing knowledge, to provide a robust and efficient functionality.
#### 4. Agents - this section contains the code in file /modules/subagents.py, designed to host the definitions of the main Agent class as well as our specialized subagents - the team of agents available to the main orchestrator. We developed the following team of agents:
#### - Orchestrator - the "Manager" of the Agents
#### - News Researcher - the specialist of finding financial news, using FinnHub.
#### - Market Researcher - the specialist of finding financial hard data like market trends, stock prices, etc.
#### - Writer - the specialist of taking all the information and preparing a polished answer for the user
#### 5. Main Orchestrator Agent - this section contains the code in file /modules/agent.py and has the definition for the orchestrator agent, which develops the strategy and coordinates all subagents.
#### 6. Main - this section contains the code in our main notebook, /main.ipynb - our implementation file where we execute all the above.
### 1. TOOLS.
#### One of the four agent functions we'll implement is the usage of tools, which will be defined in this first section.

In [60]:
import dotenv
import os
#import modules.tools as tools
import yfinance as yf
import requests
import finnhub
from typing import Callable
from datetime import datetime, timedelta
from google import genai
import openai
# For privacy reasons, we'll store our token keys on a .env file, which we'll load here:
dotenv.load_dotenv(dotenv_path=".env")



# First, we'll define a generic Tool class, which will serve as a structure for all of our tools
class Tool:
    def __init__(self, name, function, description, api=None): # This is the initialization method of the class
        self.name = name # Placeholder for the name of the tool
        self.function = function # Placeholder for the code of the tool's function
        self.description = description # Placeholder for the description of the tool - very important since the agents will use this description to know what the tool does
        self.api = api  # Placeholder for API details when needed
        
    def to_dict(self): # The structure of each class will always be a standard dictionary object that can be easily interpreted by the Agents
        return {
            "name": self.name,
            "description": self.description,
            "api": self.api
        }
    
    def invoke(self, **kwargs): # This is the placeholder of the function for the tool, which will receive a variable number of parameters
        print(f"Invoking {self.name} with arguments {kwargs}")
        return self.function(**kwargs) # Returning the results of the function

# Next, we'll declare each individual tool as a class, inheriting from the generic class Tool above
class YahooFinance(Tool): # The first tool is YahooFinance, which will pull stock quotes for a given financial symbol, like AAPL for Apple
    def __init__(self):
        super().__init__(
            name="Yahoo Finance Stock Quote", # Name of the tool
            function=self.get_stock_quote_yahoo, # Pointing to the YahooFinance function below as this class's own function
            description="Get the latest stock quote for a given symbol from Yahoo Finance.", # Definition of the tool for our agents
            api="""{ ""symbol": "AAPL"}""", # Parameter sample for the agent to use when it uses this class
        )
    def get_stock_quote_yahoo(self, symbol: str, step: str='') -> dict: # This is the function that pulls the stock using YahooFinance API
        # Here we'll perform the call to YahooFinance to get the data from the specified symbol.
        ticker = yf.Ticker(symbol)
        # Then, we'll use the 'fast_info' method, which pulls basic financial information, including the price.
        try:
            info = ticker.fast_info # Pulling the information and parsing it to return it
            return {
                "symbol": symbol,
                "last_price": info["lastPrice"],
                "day_high": info["dayHigh"],
                "day_low": info["dayLow"],
                "previous_close": info["previousClose"]
            }
        except Exception as e: # Should there be any errors, we will print the error message instead and return an empty dictionary
            print(f"Yahoo Finance API error: {e}")
            return {}
#Now, we'll continue with the class that calls Financial Modeling Prep API
class FMP(Tool):
    def __init__(self,name:str,function:Callable=None,description:str=None,api:str=None,endPoint:str=None):
        super().__init__(name=name,function=self.execute if function==None else function,description=description,api=api)
        self.endpoint = endPoint if endPoint!=None else  os.getenv("FMP_Endpoint") # It reads the endpoint from our .env file
        self.apikey = os.getenv("FMP_API_KEY") # It also reads the API key from our .env file
    def execute(self, symbol: str) -> dict: # This is the function that pulls the stock data using FMP API
        params = { #These are the parameters for the API call in a dictionary format
            "symbol": symbol,
            "apikey": self.apikey,
            "exchange": "NASDAQ"
        }
        try: #Then we'll try to make the call to the API and return its formatted response as a JSON text
            # print(f'Calling FMP API at endpoint: {self.endpoint} with params: {params}')
            response=requests.get(self.endpoint, params=params)
            return response.json()
        except requests.exceptions.RequestException as e: # Should there be any errors, we'll print the error message and return an empty dictionary
            print(f'FMP API error: {e}')
            return {}
        
class StockQuote(FMP):
    def __init__(self):
        super().__init__(
            name="Stack Quote", # Name of the tool
            description="Get the latest stock quote for a given symbol from Stack Quote.", # Definition of the tool for our agents
            api="""{ "symbol": "AAPL"}""", # Parameter sample for the agent to use when it uses this class
            endPoint='https://financialmodelingprep.com/stable/quote' # It reads the endpoint from our .env file
        )

        
class StockPriceChange(FMP):
    def __init__(self):
        super().__init__(
            name="Stock Price Change", # Name of the tool
            description="Get the stock price change for a given symbol over the past.", # Definition of the tool for our agents
            api="""{ "symbol": "AAPL", "days": 7}""", # Parameter sample for the agent to use when it uses this class
            endPoint='https://financialmodelingprep.com/stable/stock-price-change' # It reads the endpoint from our .env file
        )
        
class IncomeStatement(FMP):
    def __init__(self):
        super().__init__(
            name="Income Statement", # Name of the tool
            description="Get the income statement for a given symbol from Financial Modeling Prep (FMP).", # Definition of the tool for our agents
            api="""{ "symbol": "AAPL"}""", # Parameter sample for the agent to use when it uses this class
            endPoint='https://financialmodelingprep.com/stable/income-statement' # It reads the endpoint from our .env file
        )
  
class FinancialScore(FMP):
    def __init__(self):
        super().__init__(
            name="Financial Score", # Name of the tool
            description="Get the financial score for a given symbol from Financial Modeling Prep (FMP).", # Definition of the tool for our agents
            api="""{ "symbol": "AAPL"}""" ,# Parameter sample for the agent to use when it uses this class
            endPoint='https://financialmodelingprep.com/stable/financial-scores' # It reads the endpoint from our .env file
        )
   
        
#We'll be using FinnHub as our News provider next
class FinancialNews(Tool): 
    def __init__(self):
        super().__init__(
            name="FinnHub News", # Name of the tool
            function=self.get_stock_quote_finnhub, # Pointing to the FinnHub function below as this class's own function
            description="Get the latest financial news for a given symbol from FinnHub.", # Definition of the tool for our agents
            api="""{ ""symbol": "AAPL"}""" # Parameter sample for the agent to use when it uses this class
        )
    def get_stock_quote_finnhub(self, symbol: str, step: str='') -> dict: # This is the function that pulls the news data using FinnHub
        FinnHubAPIKey = os.getenv("FINNHUB_API_KEY") # Gets the API key from our .env file
        # Next, we setup the client to perform calls:
        finn_client = finnhub.Client(api_key=FinnHubAPIKey)

        # Setting a time frame for the news, ending today and starting a week ago
        end_date = datetime.today().strftime("%Y-%m-%d")
        start_date = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d")

        # Now, we call the API, returning the news in the already pre-formatted dictionary structure.
        try:
            news= finn_client.company_news(symbol, _from=start_date, to=end_date)
            if len(news)==0:
                return {"message": f"No news found for symbol {symbol} from {start_date} to {end_date}."}
            top_news = sorted(news, key=lambda x: x['datetime'], reverse=True)[:5]
            top_news_formatted = []
            for item in top_news:
                top_news_formatted.append({
                    "headline": item.get("headline"),
                    "summary": item.get("summary"),
                    "datetime": datetime.fromtimestamp(item.get("datetime")).strftime("%Y-%m-%d %H:%M:%S")
                })
        
            return {
                "symbol": symbol,
                "news": top_news_formatted  
            }
    
        except Exception as e: # Should there be any errors, we'll print the error message and return an empty dictionary
            print(f'Finnhub.io API error: {e}')
            return {}

class RecommendationTrends(Tool):
    def __init__(self):
        super().__init__(
            name="FinnHub Recommendation Trends", # Name of the tool
            function=self.get_recommendation_trends, # Pointing to the FinnHub function below as this class's own function
            description="Get the recommendation trends for a given symbol from FinnHub.", # Definition of the tool for our agents
            api="""{ ""symbol": "AAPL"}""" # Parameter sample for the agent to use when this class
        )
    def get_recommendation_trends(self, symbol: str) -> dict:
        FinnHubAPIKey = os.getenv("FINNHUB_API_KEY") # Gets the API key from our .env file
        finn_client = finnhub.Client(api_key=FinnHubAPIKey)
        try:
            return finn_client.recommendation_trends(symbol)
        except Exception as e: # Should there be any errors, we'll print the error message and return an empty dictionary
            print(f'Finnhub.io API error: {e}')
            return {}
        
class EarningSurprise(Tool):
    def __init__(self):
        super().__init__(
            name="FinnHub Earning Surprise", # Name of the tool
            function=self.get_earning_surprise, # Pointing to the FinnHub function below as this class's own function
            description="Get the earning surprise for a given symbol from FinnHub.", # Definition of the tool for our agents
            api="""{ ""symbol": "AAPL"}""" # Parameter sample for the agent to use when this class
        )
    def get_earning_surprise(self, symbol: str) -> dict:
        FinnHubAPIKey = os.getenv("FINNHUB_API_KEY") # Gets the API key from our .env file
        finn_client = finnhub.Client(api_key=FinnHubAPIKey)
        try:
            return finn_client.company_earnings(symbol,limit=5)
        except Exception as e: # Should there be any errors, we'll print the error message and return an empty dictionary
            print(f'Finnhub.io API error: {e}')
            return {}

In [49]:
news= FinancialNews().get_stock_quote_finnhub(symbol="AAPL")
print(news)

{'symbol': 'AAPL', 'news': [{'headline': "September Readers ID'd 17 Ideal 'Safer' Dividends In 39 Dogs", 'summary': 'Discover the top ReFa/Ro dividend stocks for September 2025 with high yields and analyst-backed returns. Click for the picks.', 'datetime': '2025-10-17 10:54:16'}, {'headline': 'Apple Commits New Investments in Both China and the U.S.', 'summary': 'CEO Tim Cook balances expansion between two key markets amid global supply chain competition.', 'datetime': '2025-10-17 10:06:14'}, {'headline': 'Sell Tesla Stock, Analyst Says. Why Shares Are Rising.', 'summary': 'Sell Tesla Stock, Analyst Says. Why Shares Are Rising.', 'datetime': '2025-10-17 09:21:00'}, {'headline': 'Apple Secures Formula One’s U.S. Streaming Rights', 'summary': 'Apple  has signed a five-year deal with Formula One for the U.S. rights to air its races, broadening its menu of live sporting events it will offer on its streaming platform.  Apple TV will begin airing races in 2026.  F1 TV Premium, F1’s own strea

In [45]:
datetime

datetime.datetime

### 2. PARSER
#### One of the workflow patterns our agents will do is routing, meaning our main agent will coordinate with subagents. To accomplish this communication, we need a "common language", which in this case will be JSON. This section defines the functions to implement the JSON parsing functionality.

In [29]:
# We'll define first a Parser abstract class
class Parser:
    def parse(self, response): # This is the placeholder of the default method for this class
        # Here's the returned value, which will be a dictionary with an Action value, and a list of dynamic parameters.
        return {"action": "FinalAnswer", "parameters": {}}
# Next, we'll define an XML parser, which inherits from our abstract class Parser.    
class XmlParser(Parser):
    def parse(self, response):
        # A parser that extracts XML tags from the response.
        # For example, it looks for <InvokeTool>{"symbol": "AAPL", "step": "financials"}</InvokeTool>
        # or <FinalAnswer>answer</FinalAnswer>.
        # Returns : a dict with action and parameters. Example:
        # {
        #    "action": "InvokeTool",
        #    "parameters": {
        #        "symbol": "AAPL",
        #        "step": "financials"
        #    }
        #}
        import re
        pattern = r'<(\w+)>(.*?)</\1>' # Defining the regular expression for XML structure
        matches = re.findall(pattern, response) # Identifying all matches of XML
        if matches: # When there are XML matches, we'll separate them and parse their contents
            action, content = matches[0]
            content = content.strip()
            contentJson = {}
            try:
                import json
                contentJson = json.loads(content) # Once parsed, we'll reformat them to JSON
            except:
                contentJson = {"content": content} # If the content is not valid, we'll return the error message with the invalid content text
                return {"action": action, "parameters": contentJson, "error": "Content is not valid JSON"}
            return {"action": action, "parameters": contentJson} # If it was valid, we return the parsed content in JSON format
        return {"action": "FinalAnswer", "parameters": {}} #If there wasn't any XML to begin with, we just return an empty list of parameters
    # Next, we have a specialized parsing for our agent's functionality that will interpret the actions in XML tags and encode them as a list of dictionaries
    def  parse_all(self, response):
        import re
        pattern = r'<(\w+)>(.*?)</\1>' # Defining the regular expression for XML structure
        matches = re.findall(pattern, response) # Identifying all matches of XML
        results = [] # Preparing an empty array for the results
        for action, content in matches: # For each detected action (if any),
            content = content.strip()   # we'll parse its contents
            contentJson = {}
            try: # Then, we'll try to convert it to JSON format
                import json
                contentJson = json.loads(content)
            except: # Should any errors occur, we'll return the error message as part of the response
                contentJson = {"content": content}
                results.append({"action": action, "parameters": contentJson, "error": "Content is not valid JSON"})
                continue
            results.append({"action": action, "parameters": contentJson}) # If everything's fine, we'll return the parsed JSON content
        if not results:
            results.append({"action": "FinalAnswer", "parameters": {}}) # If there were no actions, we'll return an empty dictionary
        return results  
    
    def parseTags(self, response):
        '''Agent response parser to extract all TAGS.
            Returns a dictionary with tag names as keys and tag values as values.
        '''
        import re
        pattern = r'<(\w+)>(.*?)</\1>'
        matches = re.findall(pattern, response)
        result = {}
        for tag, value in matches:
                result[tag.lower()] = value.strip() 
        return result

### 3. MEMORY.
#### Another feature of our agent is learning, which means the agent must remember information as it gets prompted to refine their answers and keep getting more knowledgeable as it gets used. The functions that perform such learning are defined in this section.

In [None]:
import os
import pickle
# We're creating a class called MemorySystem with all the learning functionality
class MemorySystem:
    # This class stores insights and lessons from previous analyses to improve future runs.
    def __init__(self, memory_file='agent_memory.pkl'): # It will store the learned data into the specified file, or the default file name.
        self.memory_file = memory_file
        self.stock_insights = {}
        self.news_insights = {}
        self.load_memory()
    
    def load_memory(self): # Should there be a previous file in existence, it can load it using this function
        try:
            if os.path.exists(self.memory_file): # It will look for the file name specified in the instance of this class
                with open(self.memory_file, 'rb') as f: # If it exists, it will attempt to open it
                    memory_data = pickle.load(f) # Then, it will load the data into memory
                    self.stock_insights = memory_data.get('stock_insights', {}) # separating stock insights,
                    self.news_insights = memory_data.get('news_insights', {}) # market news insights,
            else: # Should there be no prior file, it will start fresh
                print("No memory file found. Starting with empty memory.")
        except Exception as e: # Should there be an error while loading the file, it will start fresh as well
            print(f"Error loading memory: {e}")
            print("Starting with empty memory.")
    
    def save_memory(self): # This method will save the memory in the file in a structured manner
        try:
            memory_data = {
                'stock_insights': self.stock_insights, # It will save all stock insights currently provided,
                'news_insights': self.news_insights # followed by news insights
            }
            with open(self.memory_file, 'wb') as f: # It will first open the file name specified in the instance of this class
                pickle.dump(memory_data, f) # and then write in it the contents of the memory_data dictionary
            print("Memory saved successfully.")
        except Exception as e:
            print(f"Error saving memory: {e}") # Should there be any errors saving, it will print out the error
    
    def add_stock_insight(self, symbol, insight, timestamp=None): # With this method, we'll add knowledge classified as stock insights
        if timestamp is None:
            timestamp = datetime.now().isoformat() # If no timestamp is specified, we'll initialize the current time stamp
        
        if symbol not in self.stock_insights: # If the current symbol (financial company) is not in previous insights, we'll add it
            self.stock_insights[symbol] = []
        
        self.stock_insights[symbol].append({ # Finally, we encode the insight with its timestamp in the stock_insights dictionary of this class
            'insight': insight,
            'timestamp': timestamp
        })
        self.save_memory() # And we save the memory right away
    
    def add_market_news(self,symbol, news_item, timestamp=None): # This method adds market news insights for a given symbol
        if timestamp is None:
            timestamp = datetime.now().isoformat() # If no timestamp is specified, we'll initialize the current time stamp

        if symbol not in self.news_insights: # If the current symbol (financial company) is not in previous insights, we'll add it
            self.news_insights[symbol] = []

        self.news_insights[symbol].append({ # Finally, we encode the news item with its timestamp in the news_insights dictionary of this class
            'news_item': news_item,
            'timestamp': timestamp
        })
        self.save_memory() # And we save the memory right away

    def get_stock_insights(self, symbol): # This method retrieves all stock insights for a given symbol
        results=self.stock_insights.get(symbol, [])
        if not results:
            print(f"No insights found for symbol {symbol}.")
            return []
        if results:
            filtered_results = []
            for result in results:
                # if the timestamp is older than 7 days, we can choose to ignore it
                timestamp = datetime.fromisoformat(result['timestamp'])
                if (datetime.now() - timestamp).days > 7:
                    continue
                filtered_results.append(result)
        return filtered_results

    def get_news_insights(self, symbol): # This method retrieves all market news insights for a given symbol
        results=self.news_insights.get(symbol, [])
        if not results:
            print(f"No news insights found for symbol {symbol}.")
            return []
        if results:
            filtered_results = []
            for result in results:
                # if the timestamp is older than 2 days, we can choose to ignore it
                timestamp = datetime.fromisoformat(result['timestamp'])
                if (datetime.now() - timestamp).days > 2:
                    continue
                filtered_results.append(result)
        return filtered_results

### 4. AGENTS
#### For our routing workflow, along with communication also comes specialization and tool usage: a team of agents that will collaborate, coordinated by the main orchestrator agent. That's what we'll define in this section.

In [51]:
#First, we'll initialize the Google GenAI and OpenAI
from google import genai
import openai
#Make sure to load the environmental variables
dotenv.load_dotenv(dotenv_path=".env")

import nltk
import numpy as np
import pandas as pd
import yfinance as yf
from nltk.sentiment import SentimentIntensityAnalyzer
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize


# Downloading necessary libraries and functionality - uncomment when needed.
#nltk.download('vader_lexicon')
#nltk.download('punkt')
#nltk.download('stopwords')

class Agent: # This will be our base class for all our agents
    def __init__(self, name, role, system_prompt, model, generate_response, agents=None, tools=None, memory_system=None, parser=None): # This is the initialization method of the Agent class
        self.name = name # Placeholder for the name of the tool
        self.model = model # Placeholder for the LLM model
        self.role = role # Placeholder for the role of this agent
        self.system_prompt = system_prompt # Placeholder for the system prompt that defines this agent
        self.memory_system = memory_system # Placeholder for the memory object for this agent - it could be None, so the agent would start without knowledge
        self.parser = parser  # Placeholder for API details when needed
        self.generate_response = generate_response # Placeholder for the generate response method
        self.agents = agents 
        self.tools = tools # Placeholder for the tools passed on to this agent, which should be a list
        self.conversation_history = [] # Initializing a blank conversation history
        self.max_history_length = 10 # Initializing a default max number of history length

        self.prompt_template = (
            "You are {agent_name}, an AI agent. Use the following tools as needed:\n"
            "{tools}\n"
            "Conversation history:\n"
            "{history}\n"
            "Current input: {input}\n"
            "Respond appropriately."
        )
        self.initialize_client()
    #We want our Agent class to support multiple LLMs, so this function will help initialize its internal client dynamically.
    def initialize_client(self):
        #For GPT models
        if "gpt" in self.model.lower(): 
            self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        #For Gemini models
        elif "gemini" in self.model.lower():
            self.client = genai.Client()
    def to_dict(self): # The structure of each class will always be a standard dictionary object that can be easily interpreted by the Agents
        return {
            "name": self.name,
            "description": self.description,
            "api": self.api
        }
    def register_tool(self, tool):
        self.tools.append(tool)
    def remember(self, message):
        self.conversation_history.append(message)
        if len(self.conversation_history) > self.max_history_length:
            self.conversation_history.pop(0)
    def call_llm(self, input_prompt):
        try:
            #For GPT models
            if "gpt" in self.model.lower(): 
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=[
                        {"role": "system", "content": self.system_prompt},
                        {"role": "user", "content": input_prompt}
                    ],
                    max_tokens=300,
                    temperature=0.7
                )
                result = response.choices[0].message.content
            #For Gemini models
            elif "gemini" in self.model.lower():
                prompt = self.system_prompt
                prompt += "\n Orchestrator:" + input_prompt
                response = self.client.models.generate_content(
                    model=self.model, contents=prompt
                )
                result = response.text
            #print(f"{self.name} using model '{self.model}': {result[:60]}...")
            #print(result)
            return result
        except Exception as e:
            print(f" API failed for {self.name} using model '{self.model}': {e}")
            return f"Mock response from {self.name} with model '{self.model}': {input_prompt[:50]}..."
    def generate_response(self, **kwargs): # This is the placeholder of the generative function for the agent, which will receive a variable number of parameters
        print(f"Invoking {self.name} generative response function with arguments {kwargs}")
        return self.generate_response(**kwargs) # Returning the results of the function

#### Next, let's define some Sub-agents that will inherit from the class above:

In [52]:
class Writer(Agent):
    # This agent takes the results of other agents (like news or market research) and creates a professional report that will be returned to the Orchestrator for the Final Response to the user.
    def __init__ (self, model="gpt-3.5-turbo", agents=None, memory_system=None):
        super().__init__(
            name="Writer", #Name of the Writer class
            role="financial content writer", #Role of this class
            system_prompt=(
                "You are Writer, an AI agent part of an agents team. Your role is a professional "
                "financial report writer, capable of taking financial news "
                "or financial information provided by the Orchestrator and "
                "preparing a 2–3 paragraph report that provides a clear final "
                "answer to the user."
                "Here are some guidelines for you:"
                "Start your answers giving a positive message like 'Excellent question', 'Great question', or similar."
                "Focus on answering the user's question."
                "When recommendations are requested, only provide guidance and highlight pros and cons."
                "The news or financial information you're receiving came from other agents in the team, so never refer to it as 'the data provided'."
            ),            
            model = model,
            generate_response = self.generate_response,
            memory_system = memory_system
        )
    def generate_response(self, input_prompt):
        result = self.call_llm(input_prompt)
        return result

In [53]:
#We can use "gpt-3.5-turbo" or "gemini-2.5-flash", #Uncomment if using Google GenAI
MyWriter = Writer(model="gemini-2.5-flash")

sample_prompt = ("Orchestrator: The user wants to know if, based on the latest news and stock prices it is a good time to buy Apple stock. "
                    "Here are the latest news: Apple is deeply involved in AI, planning smart glasses and integrating AI into its products. The iPhone 17 is seeing strong demand, though some analysts are cautious about future models' expectations. Apple faces legal scrutiny regarding chip royalties and data collection."
                    "Here are stock prices for the past 2 days: The stock price on 10/17/2025 was **$254.04**. The stock price on 10/18/2025 is **$255.74**"
                )

MyWriter.generate_response(sample_prompt)

"Excellent question! Evaluating whether it's a good time to buy a stock like Apple involves weighing multiple factors, including recent developments and market sentiment.\n\nApple is showing strong engagement in the burgeoning field of Artificial Intelligence, with plans for smart glasses and integrating AI capabilities across its product lines, which could be a significant long-term growth driver. Furthermore, demand for the upcoming iPhone 17 is reportedly robust. The stock has also seen a modest positive movement over the past two days, rising from $254.04 to $255.74, indicating some positive short-term momentum. These aspects suggest a company actively innovating and maintaining strong consumer interest in its core products.\n\nHowever, potential investors should also consider some cautionary points. While current iPhone demand is strong, some analysts express caution regarding expectations for future models. Additionally, Apple is currently facing legal scrutiny concerning chip ro

In [61]:
class MarketResearchAgent(Agent):
    def __init__(self,model="gemini-2.5-flash"):
        name="Market Research Agent"
        model=model
        role="Market Research Agent specialized in financial data analysis and market trends"
        system_prompt=f"""You are a Market Research Agent specialized in financial data analysis and market trends.
         Your role is to assist users by providing accurate and up-to-date financial information, stock quotes, market trends, and insights based on the latest data available from various financial APIs and tools.

         Based on the data retrieved from the tools at your disposal, provide comprehensive answers to user queries related to stock performance, market analysis, and financial news.
        
        """
        self.memory_system=MemorySystem()
        super().__init__(name=name,system_prompt=system_prompt,model=model,generate_response=self.generate_response,role=role,agents=None,tools=None,memory_system=self.memory_system,parser=None) 
    
    def generate_response(self, **kwargs): # This is the placeholder of the generative function for the agent, which will receive a variable number of parameters
        # print(f"Invoking {self.name} generative response function with arguments {kwargs}")
        input_prompt=kwargs.get("prompt",[])
        try:
            #For GPT models
            if "gpt" in self.model.lower(): 
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=input_prompt,
                    max_tokens=300,
                    temperature=0.7
                )
                result = response.choices[0].message.content
            #For Gemini models
            elif "gemini" in self.model.lower():
                response = self.client.models.generate_content(
                    model=self.model, contents=str(input_prompt)
                )
                result = response.text
            return result
        except Exception as e:
            print(f" API failed for {self.name} using model '{self.model}': {e}")
            return f"Mock response from {self.name} with model '{self.model}': {input_prompt[:50]}..."

    def getMarketSummary(self,symbol:str  ) -> str:
        prompt=f"""Provide a comprehensive market summary for the stock symbol: {symbol}. 
                Include recent performance, key financial metrics, and any notable news or trends affecting the stock.
                Use data from Yahoo Finance, Financial Modeling Prep, and FinnHub to inform your summary.
                Format the response in a clear and concise manner suitable for a financial report."""
        insights = self.memory_system.get_stock_insights(symbol)
        if insights:
            print(f"Using cached insight for symbol {symbol}.")
            return insights[-1]['insight']
        else:
            tools_list=[FinancialScore(),IncomeStatement(),StockQuote(),StockPriceChange()]
            for tool in tools_list:
                tool_response=tool.invoke(symbol=symbol)
                prompt+=f"\nData from {tool.name}: {tool_response}"
            response=self.generate_response(prompt=prompt)
            self.memory_system.add_stock_insight(symbol, response,timestamp=datetime.now().isoformat())
        return response  
    
    def  processUserInput(self, user_input: str) -> str:
        tags=self.getEntities(user_input=user_input)
        if "symbol" in tags:
            marketSummary=self.getMarketSummary(symbol=tags.get("symbol"))
        prompt=f"""Based on the {marketSummary} Analyze the following user input
                and provide a short answer for the user query.
                Rules:
                - If the user input is related to stock performance, provide insights based on the market summary.
                - If the user input is unrelated to financial markets, respond with "I'm sorry, I can only assist with financial market-related queries."
                - Keep the response concise and relevant to the user's query.
                - Use a professional and informative tone suitable for financial discussions.
                - Limit the response to 150 words.

                User Input: "{user_input}"


                Answer:
                """,
        response=self.generate_response(prompt=prompt)
        return response

    def getEntities(self, user_input: str) -> str:
        prompt=f"""Determine entities the following user input related to financial markets and stock analysis:
                if the input contains Apple Inc, return SYMBOL as AAPL
                if the input contains Microsoft Corporation, return SYMBOL as MSFT
                User Input: "{user_input}
                Extracted Entities:
                    <SYMBOL>...</SYMBOL>
                    <EXCHANGE>...</EXCHANGE><INDUSTRY>...</INDUSTRY>  """
        response=self.generate_response(prompt=prompt)
        parser=XmlParser()
        parsed_response=parser.parseTags(response)
        return parsed_response

In [62]:
class MarketSentimentAgent(Agent):
    def __init__(self,model="gemini-2.5-flash"):
        name="Market Sentiment Agent"
        model=model
        role="Market Sentiment Agent specialized in financial news sentiment analysis"
        system_prompt=f"""You are a Market Sentiment Agent specialized in financial news sentiment analysis.
         Your role is to assist users by analyzing the sentiment of financial news articles and providing insights based on the emotional tone of the content.

         Based on the news data retrieved from FinnHub, provide comprehensive sentiment analysis to help users understand market mood and potential impacts on stock performance.
        
        """
        self.memory_system=MemorySystem()
        super().__init__(name=name,system_prompt=system_prompt,model=model,generate_response=self.generate_response,role=role,agents=None,tools=None,memory_system=self.memory_system,parser=None)
        
    def generate_response(self, **kwargs): # This is the placeholder of the generative function for the agent, which will receive a variable number of parameters
        # print(f"Invoking {self.name} generative response function with arguments {kwargs}")
        input_prompt=kwargs.get("prompt",[])
        try:
            #For GPT models
            if "gpt" in self.model.lower(): 
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=input_prompt,
                    max_tokens=300,
                    temperature=0.7
                )
                result = response.choices[0].message.content
            #For Gemini models
            elif "gemini" in self.model.lower():
                response = self.client.models.generate_content(
                    model=self.model, contents=str(input_prompt)
                )
                result = response.text
            return result
        except Exception as e:
            print(f" API failed for {self.name} using model '{self.model}': {e}")
            return f"Mock response from {self.name} with model '{self.model}': {input_prompt[:50]}..."
        
    def getNewsSummary(self,symbol:str  ) -> str:
            prompt=f"""Provide a comprehensive news summary for the stock symbol: {symbol}.
                    Include recent news articles, key events, and any notable trends affecting the stock.
                    Use data from FinnHub and other news sources to inform your summary.
                    Format the response in a clear and concise manner suitable for a financial report."""
            insights = self.memory_system.get_news_insights(symbol)
            if insights:
                print(f"Using cached insight for symbol {symbol}.")
                return insights[-1]['insight']
            else:
                tools_list=[FinancialNews(),RecommendationTrends(),EarningSurprise()]
                for tool in tools_list:
                    tool_response=tool.invoke(symbol=symbol)
                    prompt+=f"\nData from {tool.name}: {tool_response}"
                response=self.generate_response(prompt=prompt)
                self.memory_system.add_market_news(symbol, response,timestamp=datetime.now().isoformat())
            return response
        
    def  processUserInput(self, user_input: str) -> str:
        tags=self.getEntities(user_input=user_input)
        if "symbol" in tags:
            newsSummary=self.getNewsSummary(symbol=tags.get("symbol"))
        prompt=f"""Based on the {newsSummary} Analyze the following user input
                and provide a short answer for the user query.
                Rules:
                - If the user input is related to financial news sentiment, provide insights based on the news summary.
                - If the user input is unrelated to financial markets, respond with "I'm sorry, I can only assist with financial market-related queries."
                - Keep the response concise and relevant to the user's query.
                - Use a professional and informative tone suitable for financial discussions.
                - Limit the response to 150 words.

                User Input: "{user_input}"


                Answer:
                """,
        response=self.generate_response(prompt=prompt)
        return response
    def getEntities(self, user_input: str) -> str:
        prompt=f"""Determine entities the following user input related to financial markets and stock analysis:
                if the input contains Apple Inc, return SYMBOL as AAPL
                if the input contains Microsoft Corporation, return SYMBOL as MSFT
                User Input: "{user_input}
                Extracted Entities:
                    <SYMBOL>...</SYMBOL>
                    <EXCHANGE>...</EXCHANGE><INDUSTRY>...</INDUSTRY>  """
        response=self.generate_response(prompt=prompt)
        parser=XmlParser()
        parsed_response=parser.parseTags(response)
        return parsed_response

In [64]:
newsAgent=MarketSentimentAgent(model="gemini-2.5-flash")
response=newsAgent.processUserInput("What is the sentiment around Tesla's stock based on the latest news?")
print(response)


Memory loaded with 1 stock insights, 0 industry insights, and 0 general lessons.
No news insights found for symbol TSLA.
Invoking FinnHub News with arguments {'symbol': 'TSLA'}
Invoking FinnHub Recommendation Trends with arguments {'symbol': 'TSLA'}
Invoking FinnHub Earning Surprise with arguments {'symbol': 'TSLA'}
Memory saved successfully.
Based on the latest news, the sentiment around Tesla's stock is largely cautious, leaning towards negative. Analysts view TSLA as being at a "make-or-break point," with a recent rating downgrade and concerns about a "front-loading effect" in Q3 deliveries.

The company has missed EPS estimates for three consecutive quarters, placing immense pressure on the upcoming Q3 2025 earnings report. Broader challenges include the persistently high cost of EVs, impacting adoption and market growth, along with macroeconomic headwinds. While analyst ratings are mixed, there's a notable number of 'Hold' ratings and a marginal increase in 'Sell' recommendations,

In [17]:
marketAgent = MarketResearchAgent()
marketAgent.processUserInput("What is the best price to buy Tesla stock right now?")

You are a Market Research Agent specialized in financial data analysis and market trends.
         Your role is to assist users by providing accurate and up-to-date financial information, stock quotes, market trends, and insights based on the latest data available from various financial APIs and tools.

         Based on the data retrieved from the tools at your disposal, provide comprehensive answers to user queries related to stock performance, market analysis, and financial news.
        
        
Memory loaded with 1 stock insights, 0 industry insights, and 0 general lessons.
Using cached insight for symbol TSLA.


'The current price of Tesla (TSLA) stock is $439.31. According to the market summary, its P/E ratio, based on FY2024 estimated EPS, is 215.35x, reflecting a premium valuation driven by high investor expectations for future growth. The stock is currently trading above its 50-day ($387.45) and 200-day ($334.72) moving averages, indicating strong recent momentum. While long-term performance has been exceptional, recent financial trends show margin compression. The "best" price to buy depends on your individual investment strategy, risk tolerance, and outlook on the company\'s ability to manage costs and drive future growth.'

In [None]:
prompt = systemPrompt
userText =""
continueFlag = False
exitFlag = ""
debug = False
while userText.lower() != "exit" or exitFlag.lower() != "y":
    if not continueFlag:
        userText = input("User: ")
        if userText.lower() == "exit":
            break
        prompt += "\n User:"+ userText
        continueFlag = False
    if debug:
        print('Prompt: ')
        print(prompt)
    response = client.models.generate_content(
        model="gemini-2.5-flash", contents=prompt
    )
    if debug:
        print('Model response: ')
        print(response.text)
    prompt = prompt + response.text
    actions = parser.parse_all(response.text.replace('\n', ' '))
    if debug:
        print('Actions: ')
        print(actions)
    if not actions:
        continue
    if len(actions) >= 1:
        actions = { action['action']: action for action in actions if action.get("action","") != "Final Answer" }
        if "NeedApproval" in actions:
            actions.pop("NeedApproval")
            if "InvokeTool" in actions:
                #result=input("Need to Call " + actions["InvokeTool"].get("name","") + " Y/y to continue...)")
                result=input("Need to Call " + actions["InvokeTool"]['parameters']['name'] + ". Type Y/y to continue...)")
            else:
                result=input("Need User Approval Y/y to continue...)")
            if result.lower() != "y":
                print("Exiting...")
                break
            actions["NeedApproval"] = {"action":"NeedApproval", "content":"User approved to continue."}
            prompt += "\n User:"+ actions["NeedApproval"].get("content","")
        if 'InvokeTool' in actions:
            action = actions["InvokeTool"]
            tools_name = action["parameters"]["name"]
            tool_params = action["parameters"]["api"]
            result = executionMap[tools_name].invoke(**json.loads(tool_params))
            actions["Tool result"] = {"action":"Tool result", "content":result}
            prompt += "\n User:"+ str(result)
            continueFlag = True
        if "FinalAnswer" in actions:
            content = actions["FinalAnswer"]['parameters']['content']
            print("Final Answer: "+ str(content))
            exitFlag = input("Do you want to exit? Type Y/y to exit...")
            if exitFlag.lower() == "y":
                print('Thanks for chatting! Goodbye!')
                break
            

In [None]:
class ResearchPlanner(Agent):
    # This agent plans the research steps for a given stock symbol.
    
    def __init__(self, memory_system):
        super().__init__(
            name="ResearchPlanner", #Name of the Agent class
            system_prompt = "You are a helpful ",
            self.memory_system = memory_system
        )
    def available_research_steps(self, symbol, context=None):
        """Generate a list of research steps for the given stock symbol.
        """
        
        # Basic research plan steps
        plan = [
            {
                'step': 'Company Overview',
                'description': f'Gather basic information about {symbol} including company description, sector, and industry',
                'tools': ['Yahoo Finance API'],
                'api': '{"symbol": symbol, "step": "info"}',
            },
            {
                'step': 'Financial Analysis',
                'description': f'Analyze the financial statements of {symbol} including income statement, balance sheet, and cash flow',
                'tools': ['Yahoo Finance API', 'SEC EDGAR'],
                'api': '{"symbol": symbol, "step": "financials"}',
            },
            {
                'step': 'Stock Performance',
                'description': f'Analyze historical stock performance of {symbol} and compare with market benchmarks',
                'tools': ['Yahoo Finance API'],
                'api': '{"symbol": symbol, "step": "performance"}',
            },
            {
                'step': 'News Sentiment',
                'description': f'Collect and analyze recent news articles about {symbol} to assess sentiment',
                'tools': ['News API', 'Sentiment Analysis'],
                'api': '{"symbol": symbol, "step": "news_sentiment"}',
            },
            {
                'step': 'Market Context',
                'description': 'Analyze broader market trends and economic indicators',
                'tools': ['FRED API', 'Yahoo Finance API'],
                'api': '{"symbol": symbol, "step": "market_context"}',
            },
            {
                'step': 'Competitive Analysis',
                'description': f'Identify and analyze key competitors of {symbol}',
                'tools': ['Yahoo Finance API', 'News API'],
                'api': '{"symbol": symbol, "step": "competitors"}',
            },
            {
                'step': 'Risk Assessment',
                'description': f'Identify and evaluate potential risks for {symbol}',
                'tools': ['SEC EDGAR', 'News API', 'Financial Analysis'],
                'api': '{"symbol": symbol, "step": "risk_assessment"}',
            },
            {
                'step': 'Investment Recommendation',
                'description': f'Synthesize findings into an investment recommendation for {symbol}',
                'tools': ['Analysis Integration'],
                'api': '{"symbol": symbol, "step": "investment_recommendation"}'
            },
            {
                'step': 'Earnings Analysis',
                'description': f'Analyze upcoming earnings announcement for {symbol} and historical earnings patterns',
                'tools': ['Yahoo Finance API', 'Earnings Calendar', 'SEC EDGAR'],
                'api': '{"symbol": symbol, "step": "earnings_analysis"}'
            },
            {
                'step': 'Industry Analysis',
                'description': f'Deep dive into the industry trends and outlook for the sector of {symbol}',
                'tools': ['Industry Reports', 'FRED API', 'News API'],
                'api': '{"symbol": symbol, "step": "industry_analysis"}'
            },
            {
                'step': 'Previous Insights Review',
                'description': f'Review previous research insights about {symbol}',
                'tools': ['Memory System'],
                'api': '{"symbol": symbol, "step": "previous_insights"}'
            }
        ]
        
        # Customize the plan based on context and previous insights
        return plan

class DataAcquisition:
    """Interfaces with external APIs and datasets to collect data."""
    
    def __init__(self, api_keys=None):
        self.api_keys = api_keys or {}
    
    def get_stock_info(self, symbol):
        """Get basic information about a stock."""
        try:
            stock = yf.Ticker(symbol)
            info = stock.info
            return {
                'name': info.get('longName', 'N/A'),
                'sector': info.get('sector', 'N/A'),
                'industry': info.get('industry', 'N/A'),
                'country': info.get('country', 'N/A'),
                'exchange': info.get('exchange', 'N/A'),
                'website': info.get('website', 'N/A'),
                'employees': info.get('fullTimeEmployees', 'N/A'),
                'description': info.get('longBusinessSummary', 'N/A')
            }
        except Exception as e:
            print(f"Error getting stock info for {symbol}: {e}")
            return None
    
    def get_financial_statements(self, symbol, statement_type='income', period='annual'):
        """Get financial statements for a stock.
        
        Args:
            symbol: Stock symbol
            statement_type: 'income', 'balance', or 'cash'
            period: 'annual' or 'quarterly'
        """
        try:
            stock = yf.Ticker(symbol)
            
            if statement_type == 'income':
                return stock.income_stmt if period == 'annual' else stock.quarterly_income_stmt
            elif statement_type == 'balance':
                return stock.balance_sheet if period == 'annual' else stock.quarterly_balance_sheet
            elif statement_type == 'cash':
                return stock.cashflow if period == 'annual' else stock.quarterly_cashflow
            else:
                print(f"Invalid statement type: {statement_type}")
                return None
        except Exception as e:
            print(f"Error getting {statement_type} statement for {symbol}: {e}")
            return None
    
    def get_stock_price_history(self, symbol, period='1y', interval='1d'):
        """Get historical stock prices.
        
        Args:
            symbol: Stock symbol
            period: '1d', '5d', '1mo', '3mo', '6mo', '1y', '2y', '5y', '10y', 'ytd', 'max'
            interval: '1m', '2m', '5m', '15m', '30m', '60m', '90m', '1h', '1d', '5d', '1wk', '1mo', '3mo'
        """
        try:
            stock = yf.Ticker(symbol)
            history = stock.history(period=period, interval=interval)
            return history
        except Exception as e:
            print(f"Error getting price history for {symbol}: {e}")
            return None
    
    def get_news(self, query, num_articles=10):
        """Get news articles based on a query."""
        # This is a mock implementation since we don't have actual API keys
        # In a real implementation, you would use the NewsAPI client
        
        # Simulate news results
        mock_news = [
            {
                'title': f"News about {query} - Article {i}",
                'description': f"This is a simulated news article about {query}.",
                'source': f"Source {i % 5 + 1}",
                'url': f"https://example.com/news/{i}",
                'publishedAt': (datetime.datetime.now() - datetime.timedelta(days=i % 7)).isoformat()
            }
            for i in range(num_articles)
        ]
        
        return mock_news
    
    def get_economic_indicators(self, indicators=None):
        """Get economic indicators from FRED."""
        # This is a mock implementation since we don't have actual API keys
        # In a real implementation, you would use the FRED API client
        
        indicators = indicators or ['GDP', 'UNRATE', 'CPIAUCSL', 'FEDFUNDS']
        
        # Simulate economic data
        mock_data = {}
        for indicator in indicators:
            # Create fake time series data
            dates = [datetime.datetime.now() - datetime.timedelta(days=30*i) for i in range(12)]
            values = []
            
            if indicator == 'GDP':
                base = 23000
                growth = 0.01
                values = [base * (1 + growth)**i for i in range(12)]
            elif indicator == 'UNRATE':
                values = [3.5 + 0.1 * np.sin(i) for i in range(12)]
            elif indicator == 'CPIAUCSL':
                values = [300 + 3 * i + 0.5 * np.sin(i) for i in range(12)]
            elif indicator == 'FEDFUNDS':
                values = [4.5 + 0.25 * np.sin(i) for i in range(12)]
            
            # Create DataFrame
            df = pd.DataFrame({
                'date': dates,
                'value': values
            })
            df = df.sort_values('date')
            mock_data[indicator] = df
        
        return mock_data
class FinancialAnalyzer:
    """Analyzes financial data for a stock."""
    
    def __init__(self, data_acquisition):
        self.data_acquisition = data_acquisition
    
    def calculate_financial_ratios(self, symbol):
        """Calculate key financial ratios for a stock."""
        try:
            # Get financial statements
            income_stmt = self.data_acquisition.get_financial_statements(symbol, 'income', 'annual')
            balance_sheet = self.data_acquisition.get_financial_statements(symbol, 'balance', 'annual')
            cash_flow = self.data_acquisition.get_financial_statements(symbol, 'cash', 'annual')
            
            if income_stmt is None or balance_sheet is None or cash_flow is None:
                return None
            
            # Get stock info for market data
            stock = yf.Ticker(symbol)
            info = stock.info
            
            # Calculate ratios
            ratios = {}
            
            # Profitability ratios
            if 'TotalRevenue' in income_stmt.index and 'NetIncome' in income_stmt.index:
                ratios['profit_margin'] = income_stmt.loc['NetIncome'] / income_stmt.loc['TotalRevenue']
            
            if 'TotalAssets' in balance_sheet.index and 'NetIncome' in income_stmt.index:
                ratios['return_on_assets'] = income_stmt.loc['NetIncome'] / balance_sheet.loc['TotalAssets']
            
            if 'StockholdersEquity' in balance_sheet.index and 'NetIncome' in income_stmt.index:
                ratios['return_on_equity'] = income_stmt.loc['NetIncome'] / balance_sheet.loc['StockholdersEquity']
            
            # Liquidity ratios
            if 'CurrentAssets' in balance_sheet.index and 'CurrentLiabilities' in balance_sheet.index:
                ratios['current_ratio'] = balance_sheet.loc['CurrentAssets'] / balance_sheet.loc['CurrentLiabilities']
            
            # Leverage ratios
            if 'TotalAssets' in balance_sheet.index and 'TotalLiabilities' in balance_sheet.index:
                ratios['debt_to_assets'] = balance_sheet.loc['TotalLiabilities'] / balance_sheet.loc['TotalAssets']
            
            if 'StockholdersEquity' in balance_sheet.index and 'TotalLiabilities' in balance_sheet.index:
                ratios['debt_to_equity'] = balance_sheet.loc['TotalLiabilities'] / balance_sheet.loc['StockholdersEquity']
            
            # Valuation ratios
            if 'marketCap' in info and 'NetIncome' in income_stmt.index:
                ratios['pe_ratio'] = info['marketCap'] / income_stmt.loc['NetIncome'].iloc[0]
            
            if 'marketCap' in info and 'TotalRevenue' in income_stmt.index:
                ratios['price_to_sales'] = info['marketCap'] / income_stmt.loc['TotalRevenue'].iloc[0]
            
            if 'marketCap' in info and 'TotalAssets' in balance_sheet.index and 'TotalLiabilities' in balance_sheet.index:
                book_value = balance_sheet.loc['TotalAssets'].iloc[0] - balance_sheet.loc['TotalLiabilities'].iloc[0]
                ratios['price_to_book'] = info['marketCap'] / book_value
            
            return ratios
        except Exception as e:
            print(f"Error calculating financial ratios for {symbol}: {e}")
            return None
    
    def analyze_growth_trends(self, symbol, num_years=5):
        """Analyze growth trends in revenue, earnings, etc."""
        try:
            # Get financial statements
            income_stmt = self.data_acquisition.get_financial_statements(symbol, 'income', 'annual')
            
            if income_stmt is None:
                return None
            
            # Calculate growth rates for key metrics
            growth_metrics = ['TotalRevenue', 'GrossProfit', 'OperatingIncome', 'NetIncome']
            growth_rates = {}
            
            for metric in growth_metrics:
                if metric in income_stmt.index:
                    values = income_stmt.loc[metric]
                    
                    if len(values) >= 2:
                        # Calculate year-over-year growth rates
                        growth = [(values.iloc[i] / values.iloc[i+1] - 1) * 100 for i in range(len(values)-1)]
                        growth_rates[f"{metric}_growth"] = growth
                        
                        # Calculate compound annual growth rate (CAGR)
                        if len(values) >= num_years:
                            start_value = values.iloc[min(num_years-1, len(values)-1)]
                            end_value = values.iloc[0]
                            years = min(num_years-1, len(values)-1)
                            cagr = (end_value / start_value) ** (1 / years) - 1
                            growth_rates[f"{metric}_cagr_{num_years}yr"] = cagr * 100
            
            return growth_rates
        except Exception as e:
            print(f"Error analyzing growth trends for {symbol}: {e}")
            return None
    
    def evaluate_financial_health(self, symbol):
        """Evaluate overall financial health of a company."""
        try:
            # Get financial ratios
            ratios = self.calculate_financial_ratios(symbol)
            growth_rates = self.analyze_growth_trends(symbol)
            
            if ratios is None or growth_rates is None:
                return None
            
            # Evaluate profitability
            profitability_score = 0
            if 'profit_margin' in ratios:
                margin = ratios['profit_margin'].iloc[0]
                if margin > 0.2:
                    profitability_score = 5  # Excellent
                elif margin > 0.15:
                    profitability_score = 4  # Very good
                elif margin > 0.1:
                    profitability_score = 3  # Good
                elif margin > 0.05:
                    profitability_score = 2  # Fair
                elif margin > 0:
                    profitability_score = 1  # Poor
                else:
                    profitability_score = 0  # Very poor
            
            # Evaluate liquidity
            liquidity_score = 0
            if 'current_ratio' in ratios:
                cr = ratios['current_ratio'].iloc[0]
                if cr > 3:
                    liquidity_score = 5  # Excellent
                elif cr > 2:
                    liquidity_score = 4  # Very good
                elif cr > 1.5:
                    liquidity_score = 3  # Good
                elif cr > 1:
                    liquidity_score = 2  # Fair
                elif cr > 0.5:
                    liquidity_score = 1  # Poor
                else:
                    liquidity_score = 0  # Very poor
            
            # Evaluate leverage
            leverage_score = 0
            if 'debt_to_equity' in ratios:
                de = ratios['debt_to_equity'].iloc[0]
                if de < 0.3:
                    leverage_score = 5  # Excellent
                elif de < 0.5:
                    leverage_score = 4  # Very good
                elif de < 1:
                    leverage_score = 3  # Good
                elif de < 1.5:
                    leverage_score = 2  # Fair
                elif de < 2:
                    leverage_score = 1  # Poor
                else:
                    leverage_score = 0  # Very poor
            
            # Evaluate growth
            growth_score = 0
            if 'TotalRevenue_cagr_5yr' in growth_rates:
                growth = growth_rates['TotalRevenue_cagr_5yr']
                if growth > 20:
                    growth_score = 5  # Excellent
                elif growth > 15:
                    growth_score = 4  # Very good
                elif growth > 10:
                    growth_score = 3  # Good
                elif growth > 5:
                    growth_score = 2  # Fair
                elif growth > 0:
                    growth_score = 1  # Poor
                else:
                    growth_score = 0  # Very poor
            
            # Calculate overall score
            overall_score = (profitability_score + liquidity_score + leverage_score + growth_score) / 4
            
            evaluation = {
                'profitability': {
                    'score': profitability_score,
                    'rating': ['Very Poor', 'Poor', 'Fair', 'Good', 'Very Good', 'Excellent'][profitability_score]
                },
                'liquidity': {
                    'score': liquidity_score,
                    'rating': ['Very Poor', 'Poor', 'Fair', 'Good', 'Very Good', 'Excellent'][liquidity_score]
                },
                'leverage': {
                    'score': leverage_score,
                    'rating': ['Very Poor', 'Poor', 'Fair', 'Good', 'Very Good', 'Excellent'][leverage_score]
                },
                'growth': {
                    'score': growth_score,
                    'rating': ['Very Poor', 'Poor', 'Fair', 'Good', 'Very Good', 'Excellent'][growth_score]
                },
                'overall': {
                    'score': overall_score,
                    'rating': ['Very Poor', 'Poor', 'Fair', 'Good', 'Very Good', 'Excellent'][min(5, int(overall_score))]
                }
            }
            
            return evaluation
        except Exception as e:
            print(f"Error evaluating financial health for {symbol}: {e}")
            return None
        
class NewsSentimentAnalyzer:
    """Analyzes news sentiment for a stock."""
    
    def __init__(self, data_acquisition):
        self.data_acquisition = data_acquisition
        self.sia = SentimentIntensityAnalyzer()
    
    def preprocess_news(self, news_articles):
        """Preprocess news articles for analysis."""
        preprocessed_articles = []
        
        for article in news_articles:
            # Combine title and description
            text = article['title'] + '. ' + article.get('description', '')
            
            # Store preprocessed article
            preprocessed_articles.append({
                'text': text,
                'source': article.get('source', 'Unknown'),
                'url': article.get('url', ''),
                'publishedAt': article.get('publishedAt', '')
            })
        
        return preprocessed_articles
    
    def classify_news(self, preprocessed_articles):
        """Classify news articles by relevance and potential impact."""
        classified_articles = []
        
        for article in preprocessed_articles:
            text = article['text']
            
            # Analyze sentiment
            sentiment = self.sia.polarity_scores(text)
            
            # Classify relevance (mock implementation)
            # In a real implementation, you would use a more sophisticated approach
            relevance = 'high' if len(text) > 100 else 'medium' if len(text) > 50 else 'low'
            
            # Determine potential impact based on sentiment
            impact = 'high' if abs(sentiment['compound']) > 0.5 else 'medium' if abs(sentiment['compound']) > 0.2 else 'low'
            
            # Store classified article
            classified_articles.append({
                **article,
                'sentiment': sentiment,
                'relevance': relevance,
                'impact': impact
            })
        
        return classified_articles
    
    def extract_insights(self, classified_articles):
        """Extract key insights from classified news articles."""
        insights = []
        
        for article in classified_articles:
            if article['relevance'] == 'high' or article['impact'] == 'high':
                # In a real implementation, you would use NER and other techniques
                # to extract entities, events, and other key information
                
                # Simplified insight extraction based on sentiment
                sentiment = article['sentiment']['compound']
                
                if sentiment > 0.2:
                    insight_type = 'positive'
                    insight_description = f"Positive news: {article['text'][:100]}..."
                elif sentiment < -0.2:
                    insight_type = 'negative'
                    insight_description = f"Negative news: {article['text'][:100]}..."
                else:
                    insight_type = 'neutral'
                    insight_description = f"Neutral news: {article['text'][:100]}..."
                
                insights.append({
                    'type': insight_type,
                    'description': insight_description,
                    'source': article['source'],
                    'url': article['url'],
                    'sentiment_score': sentiment
                })
        
        return insights
    
    def summarize_sentiment(self, insights):
        """Summarize the overall sentiment from news insights."""
        if not insights:
            return {
                'overall_sentiment': 'neutral',
                'sentiment_score': 0,
                'positive_count': 0,
                'negative_count': 0,
                'neutral_count': 0,
                'key_insights': []
            }
        
        # Count sentiment types
        positive_count = sum(1 for insight in insights if insight['type'] == 'positive')
        negative_count = sum(1 for insight in insights if insight['type'] == 'negative')
        neutral_count = sum(1 for insight in insights if insight['type'] == 'neutral')
        
        # Calculate average sentiment score
        avg_sentiment = sum(insight['sentiment_score'] for insight in insights) / len(insights)
        
        # Determine overall sentiment
        if avg_sentiment > 0.1:
            overall_sentiment = 'positive'
        elif avg_sentiment < -0.1:
            overall_sentiment = 'negative'
        else:
            overall_sentiment = 'neutral'
        
        # Select key insights (highest impact)
        sorted_insights = sorted(insights, key=lambda x: abs(x['sentiment_score']), reverse=True)
        key_insights = sorted_insights[:min(5, len(sorted_insights))]
        
        return {
            'overall_sentiment': overall_sentiment,
            'sentiment_score': avg_sentiment,
            'positive_count': positive_count,
            'negative_count': negative_count,
            'neutral_count': neutral_count,
            'key_insights': key_insights
        }
    
    def analyze_news_sentiment(self, symbol):
        """Full workflow: ingest → preprocess → classify → extract → summarize."""
        # Ingest news
        news = self.data_acquisition.get_news(symbol, num_articles=15)
        
        if not news:
            return None
        
        # Preprocess news
        preprocessed_news = self.preprocess_news(news)
        
        # Classify news
        classified_news = self.classify_news(preprocessed_news)
        
        # Extract insights
        insights = self.extract_insights(classified_news)
        
        # Summarize sentiment
        summary = self.summarize_sentiment(insights)
        
        return {
            'news_count': len(news),
            'classified_news': classified_news,
            'insights': insights,
            'summary': summary
        }
class MarketAnalyzer:
    """Analyzes market context and economic indicators."""
    
    def __init__(self, data_acquisition):
        self.data_acquisition = data_acquisition
    
    def analyze_market_trends(self, benchmark_symbols=['SPY', 'QQQ', 'IWM'], period='1y'):
        """Analyze broader market trends using major indices."""
        market_data = {}
        
        # Get historical data for benchmark indices
        for symbol in benchmark_symbols:
            market_data[symbol] = self.data_acquisition.get_stock_price_history(symbol, period=period)
        
        # Calculate performance metrics
        performance = {}
        for symbol, data in market_data.items():
            if data is not None and not data.empty:
                # Calculate returns
                start_price = data['Close'].iloc[0]
                end_price = data['Close'].iloc[-1]
                total_return = (end_price / start_price - 1) * 100
                
                # Calculate volatility
                daily_returns = data['Close'].pct_change().dropna()
                volatility = daily_returns.std() * (252 ** 0.5) * 100  # Annualized
                
                performance[symbol] = {
                    'total_return': total_return,
                    'volatility': volatility,
                    'sharpe_ratio': total_return / volatility if volatility > 0 else 0
                }
        
        return {
            'market_data': market_data,
            'performance': performance
        }
    
    def analyze_economic_indicators(self):
        """Analyze economic indicators for market context."""
        # Get economic indicators
        indicators = self.data_acquisition.get_economic_indicators()
        
        if not indicators:
            return None
        
        # Analyze trends in economic indicators
        analysis = {}
        
        for indicator, data in indicators.items():
            if not data.empty:
                current_value = data['value'].iloc[-1]
                previous_value = data['value'].iloc[-2] if len(data) > 1 else None
                change = (current_value / previous_value - 1) * 100 if previous_value else None
                
                # Interpret the indicator
                interpretation = ""
                if indicator == 'GDP':
                    if change is not None:
                        if change > 3:
                            interpretation = "Strong growth"
                        elif change > 1:
                            interpretation = "Moderate growth"
                        elif change > 0:
                            interpretation = "Slow growth"
                        elif change > -1:
                            interpretation = "Mild contraction"
                        else:
                            interpretation = "Significant contraction"
                elif indicator == 'UNRATE':
                    if current_value < 4:
                        interpretation = "Very low unemployment"
                    elif current_value < 5:
                        interpretation = "Low unemployment"
                    elif current_value < 6:
                        interpretation = "Moderate unemployment"
                    else:
                        interpretation = "High unemployment"
                elif indicator == 'CPIAUCSL':
                    if change is not None:
                        if change > 4:
                            interpretation = "High inflation"
                        elif change > 2:
                            interpretation = "Moderate inflation"
                        elif change > 1:
                            interpretation = "Low inflation"
                        elif change > 0:
                            interpretation = "Very low inflation"
                        else:
                            interpretation = "Deflation"
                elif indicator == 'FEDFUNDS':
                    if current_value > 4:
                        interpretation = "Restrictive monetary policy"
                    elif current_value > 2:
                        interpretation = "Neutral monetary policy"
                    else:
                        interpretation = "Accommodative monetary policy"
                
                analysis[indicator] = {
                    'current_value': current_value,
                    'previous_value': previous_value,
                    'change': change,
                    'interpretation': interpretation
                }
        
        return analysis
    
    def analyze_sector_performance(self, period='1y'):
        """Analyze performance of different market sectors."""
        # Sector ETFs
        sector_etfs = {
            'Technology': 'XLK',
            'Financial': 'XLF',
            'Healthcare': 'XLV',
            'Consumer Discretionary': 'XLY',
            'Consumer Staples': 'XLP',
            'Energy': 'XLE',
            'Materials': 'XLB',
            'Industrials': 'XLI',
            'Utilities': 'XLU',
            'Real Estate': 'XLRE'
        }
        
        # Get historical data for sector ETFs
        sector_data = {}
        for sector, symbol in sector_etfs.items():
            sector_data[sector] = self.data_acquisition.get_stock_price_history(symbol, period=period)
        
        # Calculate performance metrics
        performance = {}
        for sector, data in sector_data.items():
            if data is not None and not data.empty:
                # Calculate returns
                start_price = data['Close'].iloc[0]
                end_price = data['Close'].iloc[-1]
                total_return = (end_price / start_price - 1) * 100
                
                performance[sector] = {
                    'total_return': total_return
                }
        
        # Sort sectors by performance
        sorted_sectors = sorted(
            performance.items(),
            key=lambda x: x[1]['total_return'],
            reverse=True
        )
        
        return {
            'sector_data': sector_data,
            'performance': performance,
            'top_sectors': [sector for sector, _ in sorted_sectors[:3]],
            'bottom_sectors': [sector for sector, _ in sorted_sectors[-3:]]
        }

### 5. MAIN ORCHESTRATOR AGENT.
#### This is the section where we define the orchestrator agent, which performs the interpretation of the user's prompt, prepares a plan, calls the subagents as needed, and prepares the final answer to the user.

In [None]:
class OrchestratorAgent(Agent):
    def __init__(self, name, model, memory, parser=None):
        self.name = name
        self.model = model
        self.memory = memory
        self.conversation_history = []
        self.tools = []
        self.max_history_length = 10 
        self.parser = parser
        # Limit for conversation history
        '''
        You are a helpful Finance assistant that explains things in a few words.
          You have the following tools:
            {tools}
        
            ### Instructions for using the tools:
            You should only use the tools listed above.
            When you use a tool, you must call the API exactly as shown above.
            You must always include the symbol in the API call.
            You must always include the step in the API call.
            You must always use double quotes for the JSON keys and string values in the API call.
            You must never use single quotes in the API call.
            You must never call any API that is not listed above.
            You must never make up any API calls.
            You must never repeat any steps.
            
            Here is an example of how to use the tools:
            User : What is the company overview for "AAPL".
            Assistant : To gather the company overview, I will use the "Company Overview" tool.
            Assistant : <InvokeTool>{"symbol": "AAPL", "step": "info"}</InvokeTool>
            Tool : <ToolResult>{"name": "Company Overview", "result": {"companyName": "Apple Inc.", "sector": "Technology", "industry": "Consumer Electronics"}}</ToolResult>
            Assistant : <Thought>Based on the company overview, Apple Inc. is a leading technology company in the consumer electronics industry.</Thought>
            
            ### TAGs for tool usage:
            - To use a tool, you must wrap the API call in <InvokeTool> and </InvokeTool> tags.
            -  For thinking, you must wrap your thoughts in <Thought> and </Thought> tags.
            -  For final answers, you must wrap your answer in <FinalAnswer> and </FinalAnswer> tags.
            -  If you need users to provide more information, you must wrap your request in <RequestMoreInfo> and </RequestMoreInfo> tags.
            -  If you are going to use a tool, you must always think first. Get the concern of the user query with <NeedApproval>, decide which tool to use, and then use the tool.
           
            ### Thinking Process: 
              Based on the user query, decide which tools to use and in what order.
              After using a tool, analyze the result and decide the next step.
              Continue this process until you have enough information to answer the user's query.
              Finally, provide a comprehensive answer to the user's query.
        '''
        self.prompt_template = (
            "You are {agent_name}, an AI agent. Use the following tools as needed:\n"
            "{tools}\n"
            "Conversation history:\n"
            "{history}\n"
            "Current input: {input}\n"
            "Respond appropriately."
        )

    def register_tool(self, tool):
        self.tools.append(tool)
        
    def remember(self, message):
        self.conversation_history.append(message)
        if len(self.conversation_history) > self.max_history_length:
            self.conversation_history.pop(0)
    
    def generate_response(self, user_input):
        tools_description = "\n".join([f"- {tool['name']}: {tool['description']}" for tool in self.tools])
        history_text = "\n".join(self.conversation_history)
        
        prompt = self.prompt_template.format(
            agent_name=self.name,
            tools=tools_description,
            history=history_text,
            input=user_input
        )
        
        response = self.model.generate_text(prompt)
        self.remember(f"User: {user_input}")
        self.remember(f"{self.name}: {response}")
        
        return response
    
    def reAct(self, user_input):
        # Here you would implement the logic to parse the response for tool usage
        # and handle the tool invocation and results.
        if self.parser and self.tools:
            # Execute generate response in a loop until a final answer is reached
            response = self.generate_response(user_input)
            parsed_response = self.parser.parse(response) ## parsed response is a dict {"InvokeTool": "tool_name", "parameters": {...}} or {"FinalAnswer": "answer"} or {"RequestMoreInfo": "info"}
            system_message = f"System: {response}"
            self.remember(system_message)
            self.conversation_history.append(system_message)
             '''
                    parsed_response= {
            "action": "InvokeTool",
            "parameters": {
                "symbol": "AAPL",
                "step": "financials"
            }
        }
                    '''
            action = parsed_response.get("action")
            if action == "InvokeTool":
                tool_name = parsed_response["parameters"].get("tool_name")
                tool = next((t for t in self.tools if t['name'] == tool_name), None)
                if tool:
                    tool_result = tool['function'](**parsed_response.get("parameters", {}))
                    self.remember(f"Tool Result: {tool_result}")
                    self.conversation_history.append(f"Tool Result: {tool_result}") 
                    # Generate a new response based on the tool result
                    response = self.generate_response(f"Tool Result: {tool_result}")
                    parsed_response = self.parser.parse(response)
            elif action == "FinalAnswer" or action == "RequestMoreInfo" or action == "NeedApproval":
                return parsed_response.get("content")
            else:
                return "I'm not sure how to proceed. Could you please clarify?"
            # Handle tool invocation and results based on parsed_response
            # This is a placeholder for actual implementation
            print(f"Parsed Response: {parsed_response}")

        return response

### 6. MAIN.
#### This is the final section, which contains the implementation of the entire system using all elements above.

In [None]:
registered_tools = [tools.YahooFinance(), tools.FMP(), tools.FinnHub()]
toolsList = [ tool.to_dict() for tool in registered_tools ]
executionMap= { tool.name: tool for tool in registered_tools }

executionMap

In [None]:
import modules.parser as parser
parser = parser.XmlParser()

In [None]:
prompt = systemPrompt
userText =""
continueFlag = False
exitFlag = ""
debug = False
while userText.lower() != "exit" or exitFlag.lower() != "y":
    if not continueFlag:
        userText = input("User: ")
        if userText.lower() == "exit":
            break
        prompt += "\n User:"+ userText
        continueFlag = False
    if debug:
        print('Prompt: ')
        print(prompt)
    response = client.models.generate_content(
        model="gemini-2.5-flash", contents=prompt
    )
    if debug:
        print('Model response: ')
        print(response.text)
    prompt = prompt + response.text
    actions = parser.parse_all(response.text.replace('\n', ' '))
    if debug:
        print('Actions: ')
        print(actions)
    if not actions:
        continue
    if len(actions) >= 1:
        actions = { action['action']: action for action in actions if action.get("action","") != "Final Answer" }
        if "NeedApproval" in actions:
            actions.pop("NeedApproval")
            if "InvokeTool" in actions:
                #result=input("Need to Call " + actions["InvokeTool"].get("name","") + " Y/y to continue...)")
                result=input("Need to Call " + actions["InvokeTool"]['parameters']['name'] + ". Type Y/y to continue...)")
            else:
                result=input("Need User Approval Y/y to continue...)")
            if result.lower() != "y":
                print("Exiting...")
                break
            actions["NeedApproval"] = {"action":"NeedApproval", "content":"User approved to continue."}
            prompt += "\n User:"+ actions["NeedApproval"].get("content","")
        if 'InvokeTool' in actions:
            action = actions["InvokeTool"]
            tools_name = action["parameters"]["name"]
            tool_params = action["parameters"]["api"]
            result = executionMap[tools_name].invoke(**json.loads(tool_params))
            actions["Tool result"] = {"action":"Tool result", "content":result}
            prompt += "\n User:"+ str(result)
            continueFlag = True
        if "FinalAnswer" in actions:
            content = actions["FinalAnswer"]['parameters']['content']
            print("Final Answer: "+ str(content))
            exitFlag = input("Do you want to exit? Type Y/y to exit...")
            if exitFlag.lower() == "y":
                print('Thanks for chatting! Goodbye!')
                break
            

## CONCLUSION.
#### Enter our conclusions here!