In [11]:
# Install required packages
!pip install yfinance fredapi networkx pyyaml



In [12]:
# Imports
import yfinance as yf
from fredapi import Fred
import networkx as nx
import json
import os
import datetime
import hashlib
import yaml
from typing import Dict, List, Any

In [13]:
# SRS-29: Configuration file for sources, symbols, date windows, macro series, rubric thresholds.
CONFIG_FILE = 'config.yaml'
DEFAULT_CONFIG = {
    'fred_api_key': 'YOUR_FRED_API_KEY_HERE',  # Replace with actual key
    'default_time_window': '1y',  # Default time window for data fetch
    'macro_series': ['GDP', 'UNRATE', 'CPIAUCSL'],  # Default FRED series
    'rubric_thresholds': {
        'coverage': 0.8,
        'correctness': 0.9,
        'grounding': 0.85,
        'timeliness': 0.95,
        'uncertainty': 0.2  # Lower is better for uncertainty
    },
    'max_optimizer_iterations': 3,
    'memory_dir': 'memories',
}

if not os.path.exists(CONFIG_FILE):
    with open(CONFIG_FILE, 'w') as f:
        yaml.dump(DEFAULT_CONFIG, f)

with open(CONFIG_FILE, 'r') as f:
    config = yaml.safe_load(f)

os.makedirs(config['memory_dir'], exist_ok=True)

In [14]:
# SRS-27: Provenance logging: store every external call (endpoint, params, timestamp, checksum) and model version hashes.
# Note: Since no models are used, we skip model hashes; focus on API calls.
PROVENANCE_LOG = []

def log_provenance(endpoint: str, params: Dict, response: Any):
    timestamp = datetime.datetime.now().isoformat()
    checksum = hashlib.md5(json.dumps(response, default=str).encode()).hexdigest()
    log_entry = {
        'endpoint': endpoint,
        'params': params,
        'timestamp': timestamp,
        'checksum': checksum
    }
    PROVENANCE_LOG.append(log_entry)
    # Persist to file
    with open('provenance.json', 'w') as f:
        json.dump(PROVENANCE_LOG, f, indent=4)

In [15]:
# SRS-12: Persist lightweight memories per symbol and global
# SRS-13: At session start, load prior memories to adjust plan
MEMORY_GLOBAL_FILE = os.path.join(config['memory_dir'], 'global_memory.json')
MEMORY_SYMBOL_TEMPLATE = os.path.join(config['memory_dir'], '{symbol}_memory.json')

def load_memory(symbol: str = None) -> Dict:
    if symbol:
        file = MEMORY_SYMBOL_TEMPLATE.format(symbol=symbol)
    else:
        file = MEMORY_GLOBAL_FILE
    if os.path.exists(file):
        with open(file, 'r') as f:
            return json.load(f)
    return {'notes': [], 'last_updated': None}

def save_memory(memory: Dict, symbol: str = None):
    if symbol:
        file = MEMORY_SYMBOL_TEMPLATE.format(symbol=symbol)
    else:
        file = MEMORY_GLOBAL_FILE
    memory['last_updated'] = datetime.datetime.now().isoformat()
    with open(file, 'w') as f:
        json.dump(memory, f, indent=4)

In [16]:
# SRS-6: Provide connectors: (a) Yahoo Finance—prices, fundamentals; (b) FRED—macro series
# SRS-20: Yahoo Finance fetch
# SRS-21: FRED fetch.
# SRS-5: Failover to an alternate source or cached data if a primary tool call fails, and log the fallback rationale

CACHE_DIR = 'cache'
os.makedirs(CACHE_DIR, exist_ok=True)

def convert_timestamps(data: Any) -> Any:
    """Recursively convert Timestamp keys to strings for JSON serialization."""
    from pandas import Timestamp
    if isinstance(data, dict):
        return {str(k) if isinstance(k, Timestamp) else k: convert_timestamps(v) for k, v in data.items()}
    elif isinstance(data, list):
        return [convert_timestamps(item) for item in data]
    elif isinstance(data, Timestamp):
        return str(data)
    return data

def fetch_yfinance_data(ticker: str, data_type: str, period: str = '1y', cache=True):
    cache_file = os.path.join(CACHE_DIR, f'{ticker}_{data_type}_{period}.json')
    if cache and os.path.exists(cache_file):
        with open(cache_file, 'r') as f:
            data = json.load(f)
        log_provenance('yfinance_cache', {'ticker': ticker, 'data_type': data_type}, data)
        return data
    try:
        stock = yf.Ticker(ticker)
        if data_type == 'prices':
            data = stock.history(period=period).to_dict()
        elif data_type == 'fundamentals':
            data = stock.info
        elif data_type == 'news':
            data = stock.news
        else:
            raise ValueError(f"Unknown data_type: {data_type}")
        # Convert Timestamp keys to strings
        data = convert_timestamps(data)
        log_provenance('yfinance', {'ticker': ticker, 'data_type': data_type, 'period': period}, data)
        if cache:
            with open(cache_file, 'w') as f:
                json.dump(data, f)
        return data
    except Exception as e:
        fallback_rationale = f"YFinance failed: {str(e)}. Falling back to cache if available."
        print(fallback_rationale)
        if os.path.exists(cache_file):
            with open(cache_file, 'r') as f:
                data = json.load(f)
            log_provenance('yfinance_fallback_cache', {'ticker': ticker, 'data_type': data_type}, data)
            return data
        raise

def fetch_fred_data(series_id: str, cache=True):
    cache_file = os.path.join(CACHE_DIR, f'fred_{series_id}.json')
    if cache and os.path.exists(cache_file):
        with open(cache_file, 'r') as f:
            data = json.load(f)
        log_provenance('fred_cache', {'series_id': series_id}, data)
        return data
    try:
        fred = Fred(api_key=config['fred_api_key'])
        data = fred.get_series(series_id).to_dict()
        # Convert Timestamp keys to strings
        data = convert_timestamps(data)
        log_provenance('fred', {'series_id': series_id}, data)
        if cache:
            with open(cache_file, 'w') as f:
                json.dump(data, f)
        return data
    except Exception as e:
        fallback_rationale = f"FRED failed: {str(e)}. Falling back to cache if available."
        print(fallback_rationale)
        if os.path.exists(cache_file):
            with open(cache_file, 'r') as f:
                data = json.load(f)
            log_provenance('fred_fallback_cache', {'series_id': series_id}, data)
            return data
        raise

In [17]:
# SRS-24: Quality rubric for Evaluator: coverage, correctness, grounding, timeliness, and uncertainty
# SRS-10: Maintain a self-reflection step after each major task, scoring confidence, identifying gaps, and proposing next actions.
# SRS-26: Confidence score exposed in outputs; degrade confidence on tool errors, stale data, or conflicting sources.

def evaluate_quality(output: Any, rubric: Dict) -> Dict:
    # Simple heuristic scoring (0-1 scale); in real system, could be more sophisticated or LLM-based
    scores = {
        'coverage': 0.9 if len(output) > 0 else 0.5,  # Basic check
        'correctness': 1.0,  # Assume correct unless error
        'grounding': 0.95 if 'sources' in output else 0.7,
        'timeliness': 0.9 if 'timestamp' in output else 0.6,
        'uncertainty': 0.1  # Low uncertainty by default
    }
    # Adjust based on rubric thresholds, but here simulate
    return scores

def self_reflect(task: str, output: Any, confidence: float = 1.0) -> Dict:
    gaps = []  # Identify gaps
    if not output:
        gaps.append("No output generated.")
        confidence *= 0.5
    next_actions = ["Proceed to next task."] if confidence > 0.7 else ["Refine output."]
    reflection = {
        'task': task,
        'confidence': confidence,
        'gaps': gaps,
        'next_actions': next_actions
    }
    print(f"Self-Reflection: {reflection}")
    return reflection

In [18]:
# SRS-23: Specialists library: (a) Earnings analyzer (quarterly diffs, surprises); (b) News impact analyzer; (c) Market regime analyzer.

def earnings_analyzer(fundamentals: Dict) -> Dict:
    # Simple analysis: extract earnings if available
    analysis = {
        'earnings': fundamentals.get('trailingEps', 'N/A'),
        'surprise': 'N/A'  # Placeholder; real would compute diffs
    }
    return analysis

def news_impact_analyzer(news: List) -> Dict:
    # Simple: count positive/negative (heuristic)
    positive = sum(1 for n in news if 'up' in n['title'].lower())
    negative = sum(1 for n in news if 'down' in n['title'].lower())
    return {'impact': 'positive' if positive > negative else 'negative'}

def market_regime_analyzer(prices: Dict, macro: Dict) -> Dict:
    # Simple: check if prices increasing
    regime = 'bull' if len(prices) > 0 else 'unknown'  # Placeholder
    return {'regime': regime}

In [19]:
# SSS-5: The autonomous Investment Research Agent shall have three workflow patterns: Prompt Chaining, Routing, and Evaluator-Optimizer.
# SRS-7: Prompt-Chaining workflow: ingest news → preprocess → classify → extract key facts/figures
# SRS-8: Routing workflow: classify incoming artifacts (news, filings, prices, macro) and dispatch to specialists (earnings, news, market analyzers).
# SRS-9: Evaluator-Optimizer workflow: generate draft analysis → evaluate with a quality rubric → refine until thresholds or max-iterations.
# SRS-14: Support explanations for routing decisions and optimizer edits
# SRS-25: Refiner applies targeted edits

def prompt_chaining_workflow(ticker: str) -> Dict:
    # SRS-15: Ingest news from at least one provider (API or local cache) with publication time, source, headline, body/snippet.
    news = fetch_yfinance_data(ticker, 'news')
    # SRS-16: Preprocess text (dedupe, language detect=EN, clean HTML, sentence split, entity extract).
    preprocessed = [n for n in news if 'title' in n]  # Simple dedupe/placeholder
    # SRS-17: Classify articles into categories
    classified = [{'category': 'general', **n} for n in preprocessed]  # Placeholder
    # SRS-18: Extract entities & metrics with schema-validated fields
    extracted = [{'entities': [], 'metrics': {}, **c} for c in classified]  # Placeholder
    # SRS-19: Summarize with an explicit evidence table mapping claims
    summary = {'summary': 'News summary', 'evidence': extracted}
    return summary

def routing_workflow(artifact: Dict, artifact_type: str) -> Dict:
    # Classify and dispatch
    explanation = f"Routing {artifact_type} to specialist."
    if artifact_type == 'news':
        result = news_impact_analyzer(artifact)
    elif artifact_type == 'fundamentals':
        result = earnings_analyzer(artifact)
    elif artifact_type == 'prices' or artifact_type == 'macro':
        result = market_regime_analyzer(artifact, {})  # Simplify
    else:
        result = {}
    result['explanation'] = explanation
    return result

def evaluator_optimizer_workflow(draft: Dict, task: str) -> Dict:
    # SRS-9: Evaluator-Optimizer workflow: generate draft analysis → evaluate with a quality rubric → refine until thresholds or max-iterations.
    iteration = 0
    while iteration < config['max_optimizer_iterations']:
        scores = evaluate_quality(draft, config['rubric_thresholds'])
        overall_score = sum(scores.values()) / len(scores)
        if overall_score >= sum(config['rubric_thresholds'].values()) / len(config['rubric_thresholds']):
            break
        # SRS-25: Refiner applies targeted edits (add sources, fix numbers, clarify caveats)
        draft['refined'] = True  # Placeholder refinement
        iteration += 1
    return draft

In [20]:
# SSS-1: The autonomous Investment Research Agent shall plan its research steps for a given stock symbol.
# SSS-2: The autonomous Investment Research Agent shall use tools dynamically (APIs, datasets, retrieval)
# SSS-3: The autonomous Investment Research Agent shall self-reflect to assess the quality of its output.
# SSS-4: The autonomous Investment Research Agent shall learn across runs/keeps brief memories or notes to improve future analyses.
# SSS-6: The autonomous Investment Research Agent Prompt Chaining workflow pattern shall ingest news, pre-process, classify, extract, and then summarize.
# SSS-7: The autonomous Investment Research Agent Routing workflow pattern shall direct content to the right specialist (e.g, earnings, news, or market analyzers)
# SSS-8: The autonomous Investment Research Agent Evaluator-Optimizer workflow pattern shall generate analysis, evaluate quality, and refine using feedback.
# SSS-9: The autonomous Investment Research Agent shall utilize the Yahoo Finance dataset and FRED API.

class InvestmentResearchAgent:
    def __init__(self, symbol: str):
        # SRS-1: Accept a stock symbol input and initialize a research session context (ticker, market, time window).
        self.symbol = symbol
        self.context = {'ticker': symbol, 'market': 'US', 'time_window': config['default_time_window']}
        self.global_memory = load_memory()
        self.symbol_memory = load_memory(symbol)
        self.plan_graph = nx.DiGraph()
        self.confidence = 1.0

    def plan_research(self):
        # SRS-2: Plan a multi-step research itinerary for the symbol (e.g., data fetch → preprocess → analyze → summarize → QA), emitting an ordered, labeled task list.
        tasks = [
            'fetch_data',
            'preprocess',
            'analyze',
            'summarize',
            'qa'
        ]
        # SRS-3: Serialize the plan as a machine-readable graph (nodes=tasks, edges=dependencies) with timestamps and status.
        for i, task in enumerate(tasks):
            self.plan_graph.add_node(task, timestamp=datetime.datetime.now().isoformat(), status='pending')
            if i > 0:
                self.plan_graph.add_edge(tasks[i-1], task)
        # Adjust plan based on memory SRS-13
        if self.symbol_memory.get('notes'):
            print("Adjusting plan based on prior memories.")

    def execute_task(self, task: str):
        # SRS-4: Dynamically select tools per task via a tool-router policy.
        if task == 'fetch_data':
            prices = fetch_yfinance_data(self.symbol, 'prices', self.context['time_window'])
            fundamentals = fetch_yfinance_data(self.symbol, 'fundamentals')
            news = fetch_yfinance_data(self.symbol, 'news')
            macro = {s: fetch_fred_data(s) for s in config['macro_series']}
            output = {'prices': prices, 'fundamentals': fundamentals, 'news': news, 'macro': macro}
        elif task == 'preprocess':
            output = prompt_chaining_workflow(self.symbol)  # Use chaining for news preprocess
        elif task == 'analyze':
            draft = {}
            for artifact_type, artifact in output.items():  # From previous, but simulate
                draft[artifact_type] = routing_workflow(artifact, artifact_type)
            output = draft
        elif task == 'summarize':
            output = {'summary': 'Final summary'}  # Placeholder
        elif task == 'qa':
            output = evaluator_optimizer_workflow(output, task)  # From previous
        else:
            output = {}
        # Self-reflect
        reflection = self_reflect(task, output, self.confidence)
        if reflection['confidence'] < 0.7:
            self.confidence *= 0.8  # Degrade on issues
        self.plan_graph.nodes[task]['status'] = 'completed'
        return output

    def run(self):
        self.plan_research()
        for task in list(self.plan_graph.nodes):
            self.execute_task(task)
        # Persist memories SRS-12
        self.symbol_memory['notes'].append("Analysis completed.")
        save_memory(self.symbol_memory, self.symbol)
        # Global
        self.global_memory['notes'].append(f"Processed {self.symbol}")
        save_memory(self.global_memory)
        return {'final_output': 'Research complete', 'confidence': self.confidence, 'provenance': PROVENANCE_LOG}

In [21]:
# Example Usage
agent = InvestmentResearchAgent('AAPL')
result = agent.run()
print(result)

FRED failed: Bad Request.  The value for variable api_key is not a 32 character alpha-numeric lower-case string.  Read https://fred.stlouisfed.org/docs/api/api_key.html for more information.. Falling back to cache if available.


ValueError: Bad Request.  The value for variable api_key is not a 32 character alpha-numeric lower-case string.  Read https://fred.stlouisfed.org/docs/api/api_key.html for more information.