In [50]:
from bs4 import BeautifulSoup
import foodBanks
from us import states
import pickle
import pyperclip
import requests

In [51]:
currentFoodBanks = foodBanks.currentFoodBanks

In [52]:
with open("FoodBankList.html") as file:
    soup = BeautifulSoup(file, "html.parser")
    divs = soup.find_all("div", class_="results-box")

    foodBanks = []
    for div in divs:
        h3 = div.find("h3", class_="name")
        if h3:
            name = h3.get_text()
            p = div.find("p")
            p_text = p.get_text()            
            state_abbreviation = p_text.split(" ")[-2]
            state_lookup = states.lookup(state_abbreviation)
            if state_lookup:
                state = state_lookup.name            
                p2 = div.find("p", class_="url")
                a = p2.find("a")
                url = f"https:{a['href']}"
                foodBanks.append({
                    "name": name,
                    "url": url,
                    "state": state
                })

In [53]:
newFoodBanks = []
for food_bank in foodBanks:
    if food_bank["name"] not in currentFoodBanks:
        newFoodBanks.append(food_bank)

In [54]:
from loguru import logger
import coloredlogs
import sys

color_codes = {
    'black': '30',
    'red': '31',
    'green': '32',
    'yellow': '33',
    'blue': '34',
    'magenta': '35',
    'cyan': '36',
    'white': '37',
    'reset': '0'
}

def log(level, title, message, color):
    color_code = color_codes.get(color.lower(), '37')
    formatted_message = f"\033[37m{title}\n\033[{color_code}m{message}\033[0m"
    logger.log(level.upper(), formatted_message)

In [55]:
ollama_cache = {}
request_cahce = {}

def get_request(url):
    if url in request_cahce:
        return request_cahce[url]
    try:
        response = requests.get(url)
    except:
        response = None
    request_cahce[url] = response
    return response

In [56]:
import re
import json
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate

tag_count = 20

summary_prompt = """
Imagine you are a robot browsing the web, but you can only view html. Now you need to complete a task. You are on {url}, and you have found an email address. You have 2 jobs: finding the purpose of the email [explaining why the email address is on the page] and extracting the full name [first and last] of the person with that email address. You will recieve an email address and a the html of its {parent_count}th parent element from the website—the html will be turned to json for easy reading and understanding child parent relationships. Note that {parent_count} is chosen arbitrarily, so the html likely contains some irrelevant information for that email address. Using this parent element, you will complete your two jobs. Note that for the purpose, if the email belongs to a person, give any background information of the person you find.

Only return the purpose if and only if the html clearly explains it. Only return the name if and only if both the first and last names are found in the html. Never return a partial name or a guess.

The user will provide the following (in the following order):

1) The JSON schema that your output must follow
2) The email address
3) The HTML of the {parent_count}th parent element of the element containing the email address

Output your response in the JSON schema format. The only keys in the output can be 'purpose' and 'name'
"""

summary_schema = {
    "title": "Email Summary",
    "description": "Identifying information about an email address",
    "type": "object",
    "properties": {
        "purpose": {"title": "Purpose", "description": "What the email address is doing on the page", "type": "string"},
        "name": {"title": "Name", "description": "The first and last name [as a single string] associated with the email address", "type": "string"},
    },
    "required": [],
}


link_prompt = """
You will recieve a list of links from a webpage of the food_bank {food_bank}. Your job is to select the links that may contain the main contact email address for the food bank. Choose as many links as you think are necessary, but don't go over 10 links [i.e. if you think it could be in more than 10 links, choose your top 10 links]. Note that the main contact email address is usually found on the homepage; standard routes like Contact Us or About Us or Get Involved; or social media links.

The user will provide the following:

1) The JSON schema that your output must follow
2) [link1], [link2], [link3], ... (a list of links on the page separated by commas)

Output your response in the JSON schema format. The only key in the output can be 'links'
"""

link_schema = {
    "title": "Link Selection",
    "description": "Select the links that may contain the main email address",
    "type": "object",
    "properties": {
        "links": {"title": "Links", "description": "The links that may contain the main email address", "type": "array", "items": {"type": "string"}}
    },
    "required": ["links"]
}

main_email_prompt = """
Your main task is to choose the main contact  email address for the food bank {food_bank}. You will be given a list of every email address found on the food bank's website. Each email will have (1) a purpose [i.e. why the email address was found on the page] and (2) a name [i.e. the name of the person associated with the email address]. If you believe that an email address is the main contact email address based on the surrounding info, return that email address. If you don't think the list contains that main email address, don't return an email address. Note that the main contact email address usually has words associated with it like general or director, although it doesn't have to.

The user will provide the following (in the following order):

1) The JSON schema that your output must follow
2) [email1]: {
    "purpose": ['email1s purpose'],
    "name": ['email1s name'],
}
[email2]: {
    "purpose": ['email2s purpose'],
    "name": ['email2s name'],
}
...   

Output your response in the JSON schema format. The only possible key in the output can be 'email'
"""

main_email_schema = {
    "title": "Main Email Selection",
    "description": "Select the main email address",
    "type": "object",
    "properties": {
        "email": {"title": "Email", "description": "The main email address of the food bank", "type": "string"}
    },
    "required": []
}


def get_emails(url):
    response = get_request(url)
    if not response:
        return []

    soup = BeautifulSoup(response.text, "html.parser")
    text = soup.get_text(separator=' ')
    emails = list(
        set(re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)))
    return emails


def get_links(url, clicks, links=set()):
    response = get_request(url)
    if not response:
        return set()

    soup = BeautifulSoup(response.text, "html.parser")
    new_links = set(link.get('href') for link in soup.find_all('a'))
    new_links = new_links.difference(links)
    new_links = set([link for link in new_links if link and link.startswith(
        'http')])
    if clicks > 1:
        for link in new_links:
            new_links = new_links.union(get_links(link, clicks - 1, new_links))
    return new_links


def get_parent_html(url, text):
    response = get_request(url)
    if not response:
        return None, 0

    soup = BeautifulSoup(response.text, "html.parser")
    try:
        element = soup.find_all(
            lambda tag: tag.string and text in tag.string)[0]
    except:
        return None, 0

    def get_parent(element, current_index):
        num_tags = len(element.find_all())
        if num_tags >= tag_count:
            return element, current_index
        else:
            parent = element.parent
            if not parent:                
                return element, current_index
            else:
                parent_num_tags = len(parent.find_all())
                if parent_num_tags >= tag_count:
                    return parent if parent_num_tags - tag_count <= tag_count - num_tags else element, current_index
                else:
                    return get_parent(parent, current_index + 1)

    return get_parent(element, 0)


def shorten_string(input_string, line_amt):
    lines = input_string.split('\n')
    if len(lines) > 20:
        return '\n'.join(lines[:line_amt])
    return input_string


def html_to_json(element):
    if (element):
        result = {}
        result['tag'] = element.name
        result['children'] = [html_to_json(
            child) for child in element.children if child.name]
        result['text'] = element.get_text(strip=True)
        return result
    else:
        return None


def use_ollama(func_name, system_prompt, schema, user_prompts, variables={}, model="llama3"):
    key = f'{func_name} {system_prompt} {" ".join(user_prompts)}'
    if key in ollama_cache:
        log('DEBUG', f'{func_name}: Cached Output', ollama_cache[key], 'cyan')
        return ollama_cache[key]

    dumps = json.dumps(schema, indent=2)
    variables['dumps'] = dumps

    log('DEBUG', f'{func_name}: System Prompt',
        system_prompt, 'blue')
    log('DEBUG', f'{func_name}: User Prompt {1}',
        dumps, 'blue')
    for i, user_prompt in enumerate(user_prompts):
        log('DEBUG', f'{func_name}: User Prompt {i + 2}',
            user_prompt, 'blue')

    llm = ChatOllama(model=model, format="json", temperature=0)

    messages = [
        SystemMessage(content=system_prompt),
        HumanMessage(content=dumps),
        *list(map(lambda user_prompt: HumanMessage(content=user_prompt), user_prompts))
    ]

    prompt = ChatPromptTemplate.from_messages(messages)

    chain = prompt | llm | JsonOutputParser()
    response = chain.invoke(variables)

    log('DEBUG', f'{func_name}: Output', response, 'green')

    ollama_cache[key] = response

    return response


def summarize_email_parent(email, html, parent_count, food_bank, url):
    jsonified_html = json.dumps(html_to_json(html), indent=2)
    return use_ollama("Summarize Context", summary_prompt, summary_schema, [email, jsonified_html],
                      {
        'parent_count': parent_count,
        'food_bank': food_bank,
        'url': url
    }
    )


def link_selection(links, food_bank):
    if len(links) == 0:
        return {'links': []}

    return use_ollama("Link Selection", link_prompt, link_schema, [', '.join(links)],
                      {
        'food_bank': food_bank
    })


def get_main_email(emails, food_bank):
    response = use_ollama("Main Email Selection", main_email_prompt, main_email_schema, [json.dumps(list(map(lambda email: {'email': email['email'], 'purpose': email['purpose'], 'name': email['name']}, emails)), indent=2)],
                          {
        'food_bank': food_bank
    })

    return response['email'] if 'email' in response and response['email'] in list(map(lambda email: email['email'], emails)) else None


new_rows = []
skipped_urls = []

for i, food_bank in enumerate(newFoodBanks):
    log('Info', 'New Food Bank',
        f'{food_bank["name"]} in {food_bank["state"]} w homepage {food_bank["url"]}', 'white')
    all_links = list(get_links(food_bank['url'], 1))
    viable_links = link_selection(all_links, food_bank['name'])
    viable_links = list(set(
        [*viable_links['links'], food_bank['url']])) if 'links' in viable_links else []
    viable_links = [link for link in viable_links if get_request(
        link) and get_request(link).status_code == 200]

    non_unique_emails = [{'email': email, 'link': link}
                         for link in viable_links for email in get_emails(link)]
    emails = []
    seen_emails = set()
    for email in non_unique_emails:
        if email['email'] not in seen_emails:
            seen_emails.add(email['email'])
            email['html'], email['parent_count'] = get_parent_html(email['link'], email['email'])
            response = summarize_email_parent(
                email['email'], email['html'], email['parent_count'], food_bank['name'], email['link'])
            email['purpose'] = response['purpose'] if 'purpose' in response else 'No purpose found'
            email['name'] = response['name'] if 'name' in response and type(
                response['name']) == str and response['name'].count(' ') == 2 else food_bank['name']
            email['title'] = response['title'] if 'title' in response else 'No title found'
            emails.append(email)

    if len(emails) > 0:
        main_email = get_main_email(emails, food_bank['name'])
        new_rows.append({
            'food_bank': food_bank['name'],
            'state': food_bank['state'],
            'url': food_bank['url'],            
            'emails': emails,
            'main_email': main_email
        })
        log('Info', f'{i}/{len(newFoodBanks)} Found {len(emails)} for {food_bank["name"]} ✅',
            list(map(lambda email: email['email'], emails)), 'white')
    elif len(viable_links) > 0:
        log('Warning',
            f'{i}/{len(newFoodBanks)} No emails found for {food_bank["name"]} ❌', '', 'yellow')
    else:
        log('Warning',
            f'{i}/{len(newFoodBanks)} Access blocked for {food_bank["name"]} ❌', '', 'yellow')
        skipped_urls.append(food_bank['url'])

[32m2024-06-11 23:07:02.617[0m | [1mINFO    [0m | [36m__main__[0m:[36mlog[0m:[36m20[0m - [1m[37mNew Food Bank
[37mFood Bank of Alaska, Inc. in Alaska w homepage https://www.foodbankofalaska.org/[0m[0m
[32m2024-06-11 23:07:03.074[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mlog[0m:[36m20[0m - [34m[1m[37mLink Selection: System Prompt
[34m
You will recieve a list of links from a webpage of the food_bank {food_bank}. Your job is to select the links that may contain the main contact email address for the food bank. Choose as many links as you think are necessary, but don't go over 10 links [i.e. if you think it could be in more than 10 links, choose your top 10 links]. Note that the main contact email address is usually found on the homepage; standard routes like Contact Us or About Us or Get Involved; or social media links.

The user will provide the following:

1) The JSON schema that your output must follow
2) [link1], [link2], [link3], ... (a list of links

In [62]:
def remove_html(row):
    row = row.copy()
    for email in row['emails']:
        del email['html']
    return row

new_rows1 = list(map(remove_html, new_rows))

with open('rows.pkl', 'wb') as file:
    pickle.dump(new_rows1, file)

In [None]:
with open('rows.pkl', 'rb') as file:
    new_rows1 = pickle.load(file)

In [64]:
index = 0

In [168]:
def get_email(email):
    return f"{email['name']}\t{email['email']}\t{email['purpose']}\t{email['link']}\t\t"


state_food_banks = [row for row in new_rows if row["state"] == states.STATES[index].name]
if state_food_banks:
    print(f"{index}  {states.STATES[index]} has {len(state_food_banks)} food banks. Copying to clipboard...")
    copyable_text = '\n'.join([f"{food_bank['food_bank']}\t{''.join([get_email(email) for email in food_bank['emails'] if email['email'] == food_bank['main_email']])}\t{''.join([get_email(email) for email in food_bank['emails'] if email['email'] != food_bank['main_email']])}" for food_bank in state_food_banks])
    pyperclip.copy(copyable_text)
else:
    print(f"No food banks found in {states.STATES[index]}")

No food banks found in Wyoming


In [167]:
index += 1

In [170]:
copyable_text = '\n'.join(url for url in skipped_urls)
pyperclip.copy(copyable_text)