In [1]:
import sys
# !{sys.executable} -m pip install --upgrade pip
# !{sys.executable} -m pip install --upgrade yfinance
# !{sys.executable} -m pip install langchain-text-splitters
# !{sys.executable} -m pip install langchain-pull-md
# !{sys.executable} -m pip install --upgrade phidata

In [2]:
import datetime
from datetime import date

# Headings to search in the article; map them to their correspoding field names
heading_to_field = {}
heading_to_field['Morningstar Price/Fair Value'] = 'price_fair_ratio'
heading_to_field['Morningstar Uncertainty Rating'] = 'uncertainity_rating'
heading_to_field['Morningstar Economic Moat Rating'] = 'moat_rating'
heading_to_field[' '] = 'fwd_dividend_yield'
heading_to_field['Sector'] = 'sector'

ARTICLE_URL = 'https://www.morningstar.com.au/stocks/10-best-us-dividend-aristocrats-buy-nowincluding-surprise-outperformer?user_segment=indinv'
# Data date - valid as at this date '2025-07-25'
DATA_DATE = datetime.date(2025, 7, 25)
# Today's date to calculate today's tock price
TODAY_DATE = date.today()

## Split the article based on H2

In [3]:
from langchain_pull_md import PullMdLoader
from langchain_text_splitters import MarkdownHeaderTextSplitter

def get_document_chunks():
    # Initialize the loader with the desired URL
    loader = PullMdLoader(url=ARTICLE_URL)
    # Load the content, which will be converted to Markdown
    documents = loader.load()
    # Headers to do the splitting
    headers_to_split_on = [("##", "Header 2")]
    md_splitter = MarkdownHeaderTextSplitter(headers_to_split_on)
    return md_splitter.split_text(documents[0].page_content)

In [4]:
# Convert the URL into document checkins based on Heading Level 2
document_chunks = get_document_chunks()

## Chunk 2 contains company names and ticker symbols

In [5]:
# For pattern matching
import re

# Pattern to extract comp name and symbol
pattern = r'^\d+\.\s*(.*)\s+([A-Z.]+)'
# Chunk #2 contains company names and symbols
# Skip the first and the last lines as they don't contain stock data
matches = [re.search(pattern, x) for x in document_chunks[2].page_content.splitlines()[1:-1]]
# Split into key value pair
name_to_symbol = {match.group(1):match.group(2) for match in matches if match}
stocks = [{'name':key, 'sym':value, 'alias':''} for key,value in name_to_symbol.items()]
stocks

[{'name': 'Becton Dickinson', 'sym': 'BDX', 'alias': ''},
 {'name': 'Brown-Forman', 'sym': 'BF.B', 'alias': ''},
 {'name': 'Clorox', 'sym': 'CLX', 'alias': ''},
 {'name': 'ExxonMobil', 'sym': 'XOM', 'alias': ''},
 {'name': 'Medtronic', 'sym': 'MDT', 'alias': ''},
 {'name': 'West Pharmaceutical Services', 'sym': 'WST', 'alias': ''},
 {'name': 'Amcor', 'sym': 'AMCR', 'alias': ''},
 {'name': 'PepsiCo', 'sym': 'PEP', 'alias': ''},
 {'name': 'Nordson', 'sym': 'NDSN', 'alias': ''},
 {'name': 'Kimberly-Clark', 'sym': 'KMB', 'alias': ''}]

## Adjustments

In [6]:
pos = [idx for idx, x in enumerate(stocks) if x['name'] == 'West Pharmaceutical Services'][0]
stocks[pos] = stocks[pos] | {'alias':'West Pharmaceutical'}

# Search for BF.B symbol, YF listed as BF-B
try:
    pos = [idx for idx, x in enumerate(stocks) if x['sym'] == 'BF.B'][0]
    stocks[pos]['sym'] = 'BF-B'
except:
    # Error raised when invoking again
    pass
stocks

[{'name': 'Becton Dickinson', 'sym': 'BDX', 'alias': ''},
 {'name': 'Brown-Forman', 'sym': 'BF-B', 'alias': ''},
 {'name': 'Clorox', 'sym': 'CLX', 'alias': ''},
 {'name': 'ExxonMobil', 'sym': 'XOM', 'alias': ''},
 {'name': 'Medtronic', 'sym': 'MDT', 'alias': ''},
 {'name': 'West Pharmaceutical Services',
  'sym': 'WST',
  'alias': 'West Pharmaceutical'},
 {'name': 'Amcor', 'sym': 'AMCR', 'alias': ''},
 {'name': 'PepsiCo', 'sym': 'PEP', 'alias': ''},
 {'name': 'Nordson', 'sym': 'NDSN', 'alias': ''},
 {'name': 'Kimberly-Clark', 'sym': 'KMB', 'alias': ''}]

## To capture Agent response

In [7]:
from pydantic import BaseModel, Field

# Model class for a agent response; used in capturing the response from the LLM
class AgentResponse(BaseModel):
    fair_value: float = Field(..., description='Fair value of the stock')
    summary: str = Field(..., description='Summary of the stock')

## Create an Agent

In [8]:
from phi.agent import Agent
from phi.model.groq import Groq

instructions = [
    "Summarize the user text under 200 characters and save it under the summary field",
    "Only extract the fair value from the user text and save it to the fair_value field",
    "If you can't find fair value, return 0"
]

agent = Agent(
    name='Stock Analyst',
    role='You are an expert in summarizing company data from user input',
    model=Groq(id='llama3-8b-8192'),
    instructions=instructions,
    markdown=True,
    show_tool_calls=True,
    # debug_mode=True,
    response_model=AgentResponse,
    response_format={'type': 'json_object'}
)

## Get Stock Price

In [9]:
# from datetime import datetime
import datetime
import yfinance as yf

def get_stock_price(symbol:str, target_date:datetime.time) -> float:
    # # Pick the start date or as close as to the start date
    # return float(data.iloc[0,0])
    
    # We'll go back a few days to be safe
    start_date = target_date - datetime.timedelta(days=7)
    # Also add 1 more date to the target to get the end date inclusive of target
    end_date = target_date + datetime.timedelta(days=1)
    # Fetch the data for the date range
    data = yf.download(symbol, start=start_date, end=end_date, auto_adjust=True)['Close']
    
    # Check if the dataframe is not empty
    if not data.empty:
        # Get the last available close price from the DataFrame
        # This will be the price from the most recent trading day
        closest_price = float(data.iloc[-1,0])
        last_trading_date = data.index[-1].date()
        # Commmented - debuggging only
        # print(f"The closest available closing price for {symbol} on {target_date} was:")
        # print(f"Price: {closest_price:.2f} (from the last trading day, {last_trading_date})")
        return closest_price
    else:
        raise ValueError(f'No data found for the specified period')

In [10]:
# get_stock_price(symbol='BF-B', target_date=TODAY_DATE)

## Extract stock data

In [11]:
def extract_stock_data(contents:str, sym:str) -> dict[str, str]:
    # Split into lines
    lines = contents.splitlines()
    # Fields we can extract are in the first 5 rows, extract 5 rows
    data_raw = [(x.strip().split(':')) for x in lines[0:5]]
    # Remove '* ' and other spaces from each
    data_cleansed = [(x[0].strip('* '), x[1].strip()) for x in data_raw]
    # Get the corresponding field names and the values for them
    y = [(heading_to_field[x[0]], x[1]) for x in data_cleansed if x[0] in heading_to_field.keys()]
    # Convert this to a dictionary with fields and values to append to an existing stock
    new_stock_data = {key:value for key,value in y}
    
    # The rest is the agent input; join with new line
    agent_input = '\n'.join(lines[5:])
    # Run the agent and save the response
    agent_resp: AgentResponse = agent.run(agent_input).content
    new_stock_data['fair_value'] = agent_resp.fair_value

    # Try to workout the fair price if the agent can't extract from text
    if new_stock_data['fair_value'] == 0:
        # Get the stock price for the data date (found in the article)
        stock_price = get_stock_price(symbol=sym, target_date=DATA_DATE)
        # Can be negative if over priced
        discount = 1 - float(new_stock_data['price_fair_ratio'])
        # Apply discount to the current stock price to get the new fair price
        new_stock_data['fair_value'] = round(stock_price + discount * stock_price, 2)

    # Suummary of the company from the article
    new_stock_data['summary'] = agent_resp.summary

    # Current stock price
    new_stock_data['current_price'] = round(get_stock_price(symbol=sym, target_date=TODAY_DATE), 2)

    # Replace the price to fair ratio with the current price
    new_stock_data['price_fair_ratio'] = round(new_stock_data['current_price']/new_stock_data['fair_value'], 2)
    
    # Return the new dictionary with new additions
    return new_stock_data

## Load API Keys

In [12]:
# To read environment property file
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()

True

## Populate Stocks with data

In [13]:
# need this to add delay to processing
import time

# We can start with row 3
start_row = 3
end_row = start_row + len(stocks)
# Loop through each stock
for doc in document_chunks[start_row:end_row]:
    # Name of the stock to search
    name = doc.metadata['Header 2']
    # Search for the stock using the header 2 against name or alias
    result = [idx for idx, x in enumerate(stocks) if name == x['name'] or name == x['alias']]
    # Must have a stock to continue
    if not result:
        # Continue with the next item
        continue
    # The position in the stocks array - place holder
    pos = result[0]
    # Extract stock data
    stk = extract_stock_data(contents=doc.page_content, sym=stocks[pos]['sym'])
    # Update the stock
    stocks[pos] = stocks[pos] | stk
    # Pause for 5 seconds
    time.sleep(5)
# stocks

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


In [20]:
# import json

# with open("output.json", "w") as json_file:v
#     json.dump(stocks, json_file, indent=4)

## Utility methods to get formats and styles

In [15]:
def get_headers(theme:str) -> tuple[str, str]:
    if 'blue' == theme:
        return ('[bright_cyan]', '[bold deep_sky_blue1]')
    elif 'green' == theme:
        return ('[green1]', '[spring_green3]')
    return ('', '')

def get_styles(theme:str) -> tuple[str, str, str]:
    if 'blue' == theme:
        return ('bold deep_sky_blue4', 'sky_blue3', 'steel_blue')
    elif 'green' == theme:
        return ('green4', 'green3', 'spring_green4')
    return ('', '', '')

In [16]:
from rich.table import Table
from rich import box

def create_table(stock:dict, header_style:str, border_style:str, row_style:str) -> Table:
    table = Table(show_header=True, header_style=header_style, show_lines=True,
                  expand=True, box=box.ROUNDED, border_style=border_style)
    table.add_column('Name', justify='left')
    table.add_column('Ticker', justify='left')
    table.add_column('Uncertainty Rating', justify='left')
    table.add_column('Moat Rating', justify='left')
    table.add_column('Fwd Div Yield', justify='left')
    table.add_column('Sector', justify='left')
    table.add_column('Fair Value', justify='right')
    table.add_column('Price', justify='right')
    table.add_column('Price / Fair Ratio', justify='right')

    row = [x for x in stock.values()]
    table.add_row(row[0], row[1], row[4], row[5], row[6], row[7], str(row[8]), str(row[10]), str(row[3]), style=row_style)
    return table

In [17]:
# create_table(stocks[0], header_style=table_style, border_style=panel_style, row_style=row_style)

In [31]:
from rich.console import Console
from rich.columns import Columns
from rich.panel import Panel
from rich.text import Text
from rich import box

# Save header formats and styles
theme = 'green'
header, panel_header = get_headers(theme=theme)
table_style, row_style, panel_style = get_styles(theme=theme)

# Create a summary panel for given stock
def get_summary_panel(stock) -> Panel:
    return Panel(stock['summary'], title=f'{panel_header}Summary[/]', box=box.ROUNDED,
                 style=panel_style, padding=(1,1))

# Create a Title panel
def get_title_panel() -> Panel:
    heading_title = f'{header}Morningstar 10 Best US dividend Aristocrats @ {TODAY_DATE}[/]'
    return Panel('', title=heading_title, padding=(1,1), height=3)

# Footer panel
def get_footer_panel() -> Panel:
    footer_text = f'Article: [link={ARTICLE_URL}]10 best US \
dividend aristocrats to buy now—including a surprise outperformer[/link]'
    return Panel(footer_text, title=f'{panel_header}Footer[/]', padding=(1,1))

# List of panels for each stock, each consists of table plus summary panel
panels = [Panel(Columns([create_table(stock=stk, header_style=table_style,
                                      border_style=panel_style, row_style=row_style),
                         get_summary_panel(stk)]),
                title=f"{panel_header}{stk['name']}-{stk['sym']}[/]",
                expand=True, style=panel_style, padding=(1,1)) for stk in stocks[8:10]]

console = Console()
console.print(Columns([get_title_panel()] + panels + [get_footer_panel()]))