# Belfius Alytics (Part 1)

### Handle Imports:

In [1]:
# Move to root directory
import os

notebooks_dir = 'notebooks'
if notebooks_dir in os.path.abspath(os.curdir):
    while not os.path.abspath(os.curdir).endswith('notebooks'):
        print(os.path.abspath(os.curdir))
        os.chdir('..')
    os.chdir('..')  # to get to root

print(os.path.abspath(os.curdir))

C:\Users\MD726YR\PycharmProjects\eyalytics


In [2]:
# Supress SSL verification (EY problem):
import requests

from requests.packages.urllib3.exceptions import InsecureRequestWarning

# Suppress the warning from urllib3.
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

old_send = requests.Session.send

def new_send(*args, **kwargs):
    kwargs['verify'] = False
    return old_send(*args, **kwargs)

requests.Session.send = new_send

In [3]:
# Import relevant libraries for llm ochestration:
from langchain import OpenAI, LLMChain, PromptTemplate
from langchain.prompts import StringPromptTemplate
from langchain.agents import (
    AgentExecutor, LLMSingleActionAgent, AgentOutputParser, 
    AgentType, initialize_agent, Tool
)
from langchain.tools import BaseTool
from langchain.callbacks import get_openai_callback
from langchain.schema import AgentAction, AgentFinish

In [4]:
# Import libraries for web scraping:
import mimetypes
import urllib.parse
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options 
from selenium.webdriver.common.by import By
from PyPDF2 import PdfReader
from io import BytesIO

In [5]:
# Other imports:
import re 

from typing import List, Dict, Optional, Union

In [6]:
# Get API Key:
from dotenv import load_dotenv
load_dotenv()
if not os.getenv("OPENAI_API_KEY"):
    raise KeyError(
        "You will need an OPENAI_API_KEY to use the LLM models in this notebook."
    )

Set up token counting to monitor costs:

In [7]:
# TODO: Update
def count_tokens(agent, query):
    with get_openai_callback() as cb:
        result = agent(query)
        print(f'Spent a total of {cb.total_tokens} tokens')

    return result

## Commence Agent Development:

#### Custom Tooling:

Start by developing tools for fetching urls and descriptions from duckduckgo. Once you have this, additionally, develop a url scraper that fetches all the downloadable links from a website.

For more information about creating custom tooling check out: 
    
https://github.com/pinecone-io/examples/blob/master/generation/langchain/handbook/07-langchain-tools.ipynb

In [8]:
import requests
import mimetypes
import urllib.parse
from bs4 import BeautifulSoup
from selenium import webdriver
from typing import List, Dict, Optional


def is_downloadable(url: str) -> bool:
    """
    Check if a url is a downloadable link.
    
    Example:
        is_downloadable(
            'https://www.coca-colacompany.com/content/dam/journey/us/en/reports/coca-cola-business-and-sustainability-report-2022.pdf'
        ) -> True
    """
    mimetype, encoding = mimetypes.guess_type(url)
    if mimetype and mimetype.startswith('application'):
        return True

    try:
        h = requests.head(url, allow_redirects=True)
        header = h.headers
        content_type = header.get('content-type')
        if content_type and 'application' in content_type.lower():
            return True
    except Exception as e:
        print(f"An error occurred: {e}")

    return False


def fetch_duckduckgo_results(query: str, num_results: int = 5) -> List[Dict[str, Union[str, bool, int]]]:
    """
    Fetch search results from DuckDuckGo using Selenium and BeautifulSoup.
    
    Parameters:
    - query (str): The search query string.
    - num_results (int, optional): Maximum number of results to fetch. Defaults to 5.

    Returns:
    - list[dict]: Returns a list of dictionaries containing the following details about the search results:
        * title (str): Title of the search result.
        * description (str, optional): Description or snippet of the result.
        * link (str, optional): Direct URL of the search result.
        * downloadable (bool): Whether the link points to downloadable content.
        * rank (int): Rank or position of the search result on the page.
    """
    
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--headless")
    driver = webdriver.Chrome(options=chrome_options)
    url = f"https://duckduckgo.com/html/?q={query}"
    driver.get(url)
    
    soup = BeautifulSoup(driver.page_source, 'html.parser')
    search_results = soup.find_all('div', class_='result__body')
    
    results = []
    for rank, result in enumerate(search_results):
        title = result.find('a', class_='result__a').get_text(strip=True) if result.find('a', class_='result__a') else None
        description = result.find('a', class_='result__snippet').get_text(strip=True) if result.find('a', class_='result__snippet') else None
        link = result.find('a', class_='result__url').get('href') if result.find('a', class_='result__url') else None
        
        if link:
            actual_url_encoded = urllib.parse.parse_qs(
                urllib.parse.urlparse(link).query
            ).get("uddg", [None])[0]
            if actual_url_encoded:
                final_link = urllib.parse.unquote(actual_url_encoded)
            else:
                final_link = None
        else:
            final_link = None
        
        results.append({
            'title': title,
            'description': description,
            'link': final_link,
            'downloadable': is_downloadable(final_link) if final_link else False,
            'rank': rank,
        })
        
        if len(results) >= num_results:
            break
    
    driver.quit()
    return results


def extract_link_description(link: BeautifulSoup) -> str:
    """
    Extract the most relevant description for a BeautifulSoup link element.
    
    The function uses a series of heuristics to determine the best description:
    1. The link's text.
    2. The 'alt' attribute of any child <img> tag.
    3. The 'title' attribute of the link.
    4. The link's href itself.
    
    Parameters:
    - link (BeautifulSoup): The BeautifulSoup element representing the link (a-tag).
    
    Returns:
    - str: A description extracted based on the heuristics.
    """
    
    description = link.string.strip() if link.string else None

    if not description:
        img_tag = link.find('img')
        if img_tag and 'alt' in img_tag.attrs:
            description = img_tag['alt'].strip()

    if not description and 'title' in link.attrs:
        description = link['title'].strip()

    if not description:
        description = link['href']

    return description


def extract_download_links_from_url(url: str) -> List[Dict[str, str]]:
    """
    Scrape a given URL and extract links that are deemed downloadable.
    
    Parameters:
    - url (str): The URL to scrape.
    
    Returns:
    - list[dict]: A list of dictionaries with two keys:
        * description (str): A description or title for the link.
        * link (str): The actual URL of the link.
    """
    
    if is_downloadable(url):
        return [{'description': 'Downloadable link.', 'link': url}]
        
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    anchors = soup.find_all('a', href=True)

    link_dict = {}

    for anchor in anchors:
        # Step 1: Resolve to absolute URL
        relative_href = anchor['href']
        absolute_href = urllib.parse.urljoin(url, relative_href)
        
        # Step 2: Strip fragments
        main_url = urllib.parse.urldefrag(absolute_href).url

        # Step 3: Check if downloadable
        if is_downloadable(main_url):
            description = extract_link_description(anchor)

            # Step 4: Decide on description update
            if main_url not in link_dict or len(description) > len(link_dict[main_url]['description']):
                link_dict[main_url] = {'description': description, 'link': main_url}

    return list(link_dict.values())


def get_pdf_page_count(url: str) -> int:
    try:
        # Fetch the PDF content using requests
        response = requests.get(url)
        response.raise_for_status()  # Raises an HTTPError if the response returned an unsuccessful status code
        
        # Use BytesIO to convert the byte stream to a file-like object so it can be read by PyPDF2
        with BytesIO(response.content) as pdf_data:
            # Read the PDF
            pdf_reader = PdfReader(pdf_data)
            return len(pdf_reader.pages)
    except requests.RequestException as e:
        print(f"Error fetching the PDF: {e}")
        return -1
    except Exception as e:
        print(f"Error processing the PDF: {e}")
        return -1

In [9]:
# Test the tool.
query = "Coca-Cola sustainability report 2023"
results = fetch_duckduckgo_results(query, 5)
for _result in results:
    print(f"{_result}\n")

{'title': '2022 Business & Sustainability Report - The Coca-Cola Company', 'description': 'CLIMATE Our ability to deliver quality products requires a sustainable and secure supply of agricultural ingredients. In 2021, we launched our Principles for Sustainable Agriculture (PSA) to encourage and guide our suppliers to drive continuous improvement in sustainable farming practices.', 'link': 'https://www.coca-colacompany.com/reports/business-and-sustainability-report', 'downloadable': False, 'rank': 0}

{'title': 'Sustainability - The Coca-Cola Company', 'description': "Water Stewardship Water balance is giving back to nature and communities. We go beyond to improve water security where it's needed most. Explore In Our Products We provide high-quality products and cutting edge ingredients sourced sustainably and ethically. Explore Sustainable Agriculture", 'link': 'https://www.coca-colacompany.com/sustainability', 'downloadable': False, 'rank': 1}

{'title': 'Sustainability & Governance R

In [None]:
for _result in results[:5]:
    link = _result['link']
    _results = extract_download_links_from_url(link)
    print(f"Scraping: {link}\n{_results}\n")

In [None]:
# Testing the function
url = "https://www.coca-colacompany.com/content/dam/company/us/en/reports/coca-cola-business-and-sustainability-report-2022.pdf"
print(f"The PDF has {get_pdf_page_count(url)} pages.")

With the functions prepared, lets rap these in custom langchain tools:

In [11]:
def format_search_results(search_results):
    search_results.sort(key=lambda x: x['rank'])  # Sort by rank
    formatted_results = [
        "Search Results (sorted based on page rank):\n"
    ]

    for entry in search_results:
        formatted_results.append("URL: '{}'\n".format(entry['link']))
        formatted_results.append("Downloadable: {}\n".format(
            entry['downloadable']
        ))
        formatted_results.append("Description: {}; {}\n".format(
            entry['title'], entry['description']))
        formatted_results.append("\n\n")

    formatted_results.append(
        "\nNote that when Downloadable is True, the url " +
        "is a downloadable link and should not be scraped."
    )
    return "".join(formatted_results)


def format_url_scrape_results(url, scrape_results):
    formatted_results = [
        f"Downloadable links scraped from {url} (in no particular order):\n"]

    for entry in scrape_results:
        formatted_results.append("URL: {}".format(entry['link']))
        formatted_results.append("Description: {}\n".format(entry['description']))
        formatted_results.append("\n")
        
    return "".join(formatted_results)
    
    
class DDGoSearch(BaseTool):
    
    name = fetch_duckduckgo_results.__name__
    description = """
        Fetch search results from DuckDuckGo.
        
        Elaborating, this tool will return a list of urls and their 
        accompanying duckduckgo descriptions. For each url a flag
        indicating if the url is downloadable is outputed as well.
    """
    
    def _run(self, query: str) -> bool:
        ddgo_results = fetch_duckduckgo_results(
            query=query, 
            num_results=5,
        )
        return format_search_results(ddgo_results)
        
    
    def _arun(self, query: str):
        raise NotImplementedError(
            f"{self.__class__.__name__} does not currently support async run."
        )
    
    
class URLScraper(BaseTool):
    
    name = extract_download_links_from_url.__name__
    description = """
        URL Scraper. Given a URL this tool 
        scrapes a webpage and returns all downloadable links, 
        along with their descriptions, appearing on that webpage.
        Note this tool should be used on non-downloadable links.
    """
    
    def _run(self, url: str) -> bool:
        scrape_results = extract_download_links_from_url(url.strip(" '\""))
        return format_url_scrape_results(url, scrape_results)
    
    def _arun(self, url: str):
        raise NotImplementedError(
            f"{self.__class__.__name__} does not currently support async run."
        )
        

class PDFPageCounter(BaseTool):
    
    name = get_pdf_page_count.__name__
    description = """
        PDF page counter. Use to determine if a url
        pointing to a pdf links to a summary or 
        a full document. 
    """
    
    def _run(self, url: str) -> bool:
        page_count = get_pdf_page_count(url.strip(" '\""))
        if page_count == -1: 
            return f"URL: {url} | Error, page count could not be determined. Document may not be a pdf."
        elif page_count < 20:
            return f"URL: {url} | Page Count: {page_count} | " + \
                "url is probably an executive summary and not a full report."
        else:
            return f"URL: {url} | Page Count: {page_count} | " + \
                "url is significant in size indicating that it is probably a full report."
        return "URL: {pae}"
    
    def _arun(self, url: str):
        raise NotImplementedError(
            f"{self.__class__.__name__} does not currently support async run."
        )
        
tools = [DDGoSearch(), URLScraper(), PDFPageCounter()]

Now that we have functioning tools for URL retrival, let's build custom prompts and agents for our task:

*For details about agents see: 
- https://www.pinecone.io/learn/series/langchain/langchain-agents/*
- https://colab.research.google.com/drive/1ipuSd6Jnl9KMF39LbA1n6FDDx5HBntZd?usp=sharing#scrollTo=nBKP5V4Aifx6

#### Prompt Template:
This instructs the agent on what to do. Generally, the template should incorporate:

**`tools:`** which tools the agent has access and how and when to call them.

**`intermediate_steps:`** These are tuples of previous (**`AgentAction`**, **`Observation`**) pairs. These are generally not passed directly to the model, but the prompt template formats them in a specific way.

**`input:`** generic user input

*WARNING:* Avoid changing these key names, as using varying key names can cause 
    downstream issues.

In [12]:
# Set up the base template
template = """You are a web scraping agent. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin! Remember to answer as succinctly as possible when giving your final answer.

Question: {input}
{agent_scratchpad}"""

In [13]:
# Set up a prompt template which breaksup the intermediate_steps
# into thoughts that are used to fill the agent_scratchpad, 
# tools, and tool_names in the base template:
class CustomPromptTemplate(StringPromptTemplate):
    # The template to use
    template: str
    # The list of tools available
    tools: List[BaseTool or Tool]
    
    def format(self, **kwargs) -> str:
        # Get the intermediate steps (AgentAction, Observation tuples)
        # Format them in a particular way
        intermediate_steps = kwargs.pop("intermediate_steps")
        thoughts = ""
        for action, observation in intermediate_steps:
            thoughts += action.log
            thoughts += f"\nObservation: {observation}\nThought: "
        # Set the agent_scratchpad variable to that value
        kwargs["agent_scratchpad"] = thoughts
        # Create a tools variable from the list of tools provided
        kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
        # Create a list of tool names for the tools provided
        kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools])
        return self.template.format(**kwargs)

In [14]:
prompt = CustomPromptTemplate(
    template=template,
    tools=tools,
    # This omits the `agent_scratchpad`, `tools`, and `tool_names` variables because those are generated dynamically
    # This includes the `intermediate_steps` variable because that is needed
    input_variables=["input", "intermediate_steps"]
)

#### Custom Output Parser:
The output parser is responsible for parsing the LLM output into AgentAction and AgentFinish. This usually depends heavily on the prompt used.

This is where you can change the parsing to do retries, handle whitespace, etc

In [15]:
class CustomOutputParser(AgentOutputParser):
    
    def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
        # Check if agent should finish
        if "Final Answer:" in llm_output:
            return AgentFinish(
                # Return values is generally always a dictionary with a single `output` key
                # It is not recommended to try anything else at the moment :)
                return_values={"output": llm_output.split("Final Answer:")[-1].strip()},
                log=llm_output,
            )
        # Parse out the action and action input
        regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
        match = re.search(regex, llm_output, re.DOTALL)
        if not match:
            raise ValueError(f"Could not parse LLM output: `{llm_output}`")
        action = match.group(1).strip()
        action_input = match.group(2)
        # Return the action and action input
        return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output)

In [16]:
output_parser = CustomOutputParser()

#### Agent Set-Up:

In [17]:
llm = OpenAI(
    temperature=0,  # measure of randomness/creativity
    model_name='gpt-3.5-turbo'
)

# LLM chain consisting of the LLM and a prompt
llm_chain = LLMChain(
    llm=llm, 
    prompt=prompt  # Custom Prompt
)

tool_names = [tool.name for tool in tools]

agent = LLMSingleActionAgent(
    llm_chain=llm_chain, 
    output_parser=output_parser,  # Custom Output Parser
    stop=["\nObservation:"],  # you want this to be whatever token you use in the prompt to denote the start of an Observation
    allowed_tools=tool_names
) 



Set up an Agent Executor, which orchestrates an agent to decide which tools to call and in what order.

In [18]:
agent_executor = AgentExecutor.from_agent_and_tools(
    agent=agent, 
    tools=tools, 
    verbose=True
)

**Define** the agent's goal/question and run the agent executor

In [19]:
search_template = """
    You are tasked with finding a downloadable URL link to the most recent FULL sustainability report of {company}. Please search the web for this link; checking years 2023, 2022, and2021 (in that order) until you find one. You may need the scrape {company}'s official site to find a downloadable link to a report.
    
    Note, the final output should (where available) be a downloadable url, specifying a path to the sustainability report. As soon as you find a downloadable report you may stop the search. For instance if you search for the 2023 report, and find a downloadable link for the company's FULL 2023 report, you should return this url. If based on a url's description, you think that a longer report may be found on some web-page, you can check that web-page for downloadable links. This may happen when a recent executive summary of a report is found from a search, but you believe that a FULL report may be available on a company's web-page. If neither the search not url scraping finds anything please continue your search with less recent dates. If you fail to find a full report feel free to return an empty url.
"""

In [20]:
# Start the agent - you can use the search template we devised before. 
example_company = "Coca-Cola"
task = search_template.format(company=example_company)
print(f"{task}\n")
result = agent_executor.run(
    input=task
)


    You are tasked with finding a downloadable URL link to the most recent FULL sustainability report of Coca-Cola. Please search the web for this link; checking years 2023, 2022, and2021 (in that order) until you find one. You may need the scrape Coca-Cola's official site to find a downloadable link to a report.
    
    Note, the final output should (where available) be a downloadable url, specifying a path to the sustainability report. As soon as you find a downloadable report you may stop the search. For instance if you search for the 2023 report, and find a downloadable link for the company's FULL 2023 report, you should return this url. If based on a url's description, you think that a longer report may be found on some web-page, you can check that web-page for downloadable links. This may happen when a recent executive summary of a report is found from a search, but you believe that a FULL report may be available on a company's web-page. If neither the search not url scraping f