In [10]:
import asyncio
import io
import os
import time
from urllib.parse import urljoin, urlparse, urlunparse

import aiohttp
import nest_asyncio
import pandas as pd
import PyPDF2
from bs4 import BeautifulSoup

nest_asyncio.apply() # for running async code in notebook

In [11]:
# Define cert->keywords mapping
CERT_KEYWORDS = {
    "ASC Cert": [
        "ASC", "A.S.C.", "Aquaculture Stewardship Council",
    ],
    "BAP Cert": [
        "BAP", "Best Aquaculture Practices", "Global Seafood Alliance"
    ],
    "FOS Cert": [
        "Friend of the Sea", "FOS", "WSO", "World Sustainability Organization"
    ],
    "FIP Cert": [
        "FIP", "Fisheries Improvement Project"
    ],
    "MarinTrust Cert": [
        "Marin Trust"
    ]
}

In [12]:
def extract_text_from_pdf(pdf_content: bytes) -> str:
    """
    Extract and concatenate text from all pages of a PDF document.
    
    Args:
        pdf_content (bytes): Raw PDF file content as bytes
        
    Returns:
        str: Extracted text from all PDF pages, converted to lowercase
        
    Notes:
        - Uses PyPDF2 to parse PDF and extract text page by page
        - Silently handles PDF parsing errors by printing error message
        - Empty/unparseable pages are skipped
        - All extracted text is concatenated into a single string
    """
    text = ""
    try:
        # Create in-memory file object from PDF bytes
        with io.BytesIO(pdf_content) as pdf_file:
            # Initialize PDF reader
            reader = PyPDF2.PdfReader(pdf_file)
            
            # Extract text from each page
            for page in reader.pages:
                page_text = page.extract_text()
                # Only append if page contained text
                if page_text:
                    text += page_text
                    
    except Exception as e:
        # Log any PDF parsing errors
        print(f"[PDF ERROR] {e}")
        
    return text.lower()

In [13]:
def is_same_domain(base_url: str, new_url: str) -> bool:
    """Returns True if new_url is on the same domain (or subdomain) as base_url."""
    base_domain = urlparse(base_url).netloc
    check_domain = urlparse(new_url).netloc
    # A loose check: ensure the base domain is a substring of check_domain
    # e.g. base: "example.com", check: "sub.example.com" => True
    return base_domain in check_domain

In [14]:
def normalize_url(url: str) -> str:
    """Remove fragment, remove trailing slash from path, return normalized URL."""
    parsed = urlparse(url)

    # Drop the fragment entirely
    parsed = parsed._replace(fragment="")

    # Remove trailing slash in path
    path = parsed.path
    if path.endswith("/") and len(path) > 1:
        path = path[:-1]
    parsed = parsed._replace(path=path)

    # Rebuild into a string
    cleaned_url = urlunparse(parsed)
    return cleaned_url


In [20]:
async def crawl_for_keywords_async(
    session: aiohttp.ClientSession,
    seed_url: str,
    cert_keywords: dict,
    max_depth: int = 2,
    limit_pages: int = 50,
    politeness_delay: float = 0.5
) -> dict:
    """
    Asynchronously crawl a website starting from seed_url, searching for certification keywords.
    Uses breadth-first search (BFS) to explore links up to max_depth away from the seed URL.
    
    Args:
        session: aiohttp client session for making HTTP requests
        seed_url: Starting URL to begin crawl from
        cert_keywords: Dictionary mapping certification columns to lists of keywords
        max_depth: Maximum link depth to explore from seed_url (default: 2)
        limit_pages: Maximum total pages to fetch across all depths (default: 50)
        politeness_delay: Seconds to wait between requests to same domain (default: 0.5)
    
    Returns:
        Dictionary mapping certification columns to sets of URLs where keywords were found
    """
    # Track URLs found for each certification type
    found_urls_by_cert = {col: set() for col in cert_keywords.keys()}
    
    # Track visited URLs to avoid cycles
    visited = set()
    
    # Queue for BFS traversal - each item is (url, depth) tuple
    queue = asyncio.Queue()
    await queue.put((seed_url, 0))

    pages_crawled = 0
    
    while not queue.empty():
        current_url, depth = await queue.get()
        queue.task_done()

        # Skip if we've exceeded max depth
        if depth > max_depth:
            continue
        
        # Skip if already visited this URL
        if current_url in visited:
            continue
        visited.add(current_url)

        # Only crawl URLs on same domain as seed
        if not is_same_domain(seed_url, current_url):
            continue

        # Stop if we've hit the page limit
        if pages_crawled >= limit_pages:
            break

        # Respect crawl delay
        await asyncio.sleep(politeness_delay)

        try:
            async with session.get(current_url, timeout=10) as resp:
                if resp.status != 200:
                    continue
                pages_crawled += 1
                
                content_type = resp.headers.get('Content-Type', '').lower()
                
                if 'pdf' in content_type:
                    # For PDFs: download bytes, extract text, check for keywords
                    pdf_bytes = await resp.read()
                    pdf_text = extract_text_from_pdf(pdf_bytes)
                    for cert_col, kws in cert_keywords.items():
                        for kw in kws:
                            if kw.lower() in pdf_text:
                                normalized = normalize_url(current_url) 
                                found_urls_by_cert[cert_col].add(normalized)
                
                elif 'html' in content_type:
                    # For HTML: parse content, extract text, check for keywords
                    html = await resp.text(errors='ignore')
                    soup = BeautifulSoup(html, 'html.parser')
                    page_text = soup.get_text(separator=' ').lower()

                    # Check each certification's keywords against page text
                    for cert_col, kws in cert_keywords.items():
                        for kw in kws:
                            if kw.lower() in page_text:
                                normalized = normalize_url(current_url)
                                found_urls_by_cert[cert_col].add(normalized)

                    # If not at max depth, add all links to queue
                    if depth < max_depth:
                        for link_tag in soup.find_all("a", href=True):
                            child_url = urljoin(current_url, link_tag['href'])
                            if child_url not in visited:
                                await queue.put((child_url, depth + 1))
                else:
                    # Skip non-HTML/PDF content types
                    pass

        except Exception as e:
            print(f"[ERROR] {current_url} => {e}")
            continue

    return found_urls_by_cert

In [27]:
async def process_df_with_sites(df_with_sites: pd.DataFrame) -> pd.DataFrame:
    """
    Process each website in the DataFrame to find certification information.
    
    Creates an aiohttp session and crawls each company website to search for 
    certification-related keywords. Updates the certification columns in the DataFrame
    with any found URLs containing certification information.

    Args:
        df_with_sites: DataFrame containing company websites and certification columns
        
    Returns:
        Updated DataFrame with certification URLs added to relevant columns
    """
    
    async with aiohttp.ClientSession() as session:
        for idx, row in df_with_sites.iterrows():
            seed_url = row["Company website"]
            
            # Ensure URL has protocol prefix
            if not seed_url.startswith("http"):
                seed_url = "http://" + seed_url
            
            print(f"\nCrawling: {seed_url} (index: {idx})")
            
            # Perform breadth-first crawl of the site looking for certification keywords
            # Returns dict mapping certification columns to sets of URLs containing matches
            found_by_cert = await crawl_for_keywords_async(
                session=session,
                seed_url=seed_url,
                cert_keywords=CERT_KEYWORDS,
                max_depth=2,        # How many links deep to crawl
                limit_pages=50,     # Maximum number of pages to crawl
                politeness_delay=0.5 # Delay between requests in seconds
            )
            
            # Update DataFrame with any newly found certification URLs
            for cert_col, found_urls in found_by_cert.items():
                if found_urls:
                    # Get existing URLs for this certification, handling NaN values
                    existing_val = row.get(cert_col, "")
                    if not isinstance(existing_val, str):
                        existing_val = ""
                    
                    # Convert semicolon-separated string to set of URLs
                    old_urls = set(u.strip() for u in existing_val.split(";") if u.strip())
                    
                    # Combine existing and newly found URLs, removing duplicates
                    merged = old_urls.union(found_urls)
                    
                    # Update DataFrame with sorted, semicolon-separated URL string
                    new_val = ";".join(sorted(merged))
                    df_with_sites.at[idx, cert_col] = new_val
    
    return df_with_sites

In [28]:
async def main():
    # Load company data from Google Sheets into DataFrame
    df_all = pd.read_csv("https://docs.google.com/spreadsheets/d/1jJA1pYMDPiBEXFGnT3LXtjEHC7Ya4W4GhG-D8uhCplE/export?format=csv")
    
    # Initialize and standardize certification columns as empty strings
    # This prevents dtype warnings and ensures consistent handling
    for cert_col in CERT_KEYWORDS.keys():
        if cert_col not in df_all.columns:
            df_all[cert_col] = ""
        df_all[cert_col] = df_all[cert_col].fillna("").astype(str)
    
    # Extract subset of companies that have website URLs
    # Create clean copy with stripped website URLs
    df_with_sites = df_all.dropna(subset=["Company website"]).copy()
    df_with_sites["Company website"] = df_with_sites["Company website"].str.strip()
    
    # Crawl each company website asynchronously to find certification pages
    # Track and display total execution time
    start_time = time.time()
    updated_df_with_sites = asyncio.run(process_df_with_sites(df_with_sites))
    end_time = time.time()
    print(f"\nCrawling complete. Elapsed time: {end_time - start_time:.2f} seconds.")
    
    # Update main DataFrame with any newly discovered certification URLs
    # Preserves original row indices while updating only certification columns
    for idx, row in updated_df_with_sites.iterrows():
        df_all.loc[idx, CERT_KEYWORDS.keys()] = row[list(CERT_KEYWORDS.keys())]
    
    # Save results to CSV, appending timestamp if file already exists
    # This prevents overwriting previous results
    output_file = "plants_websites_crawled.csv"
    if os.path.exists(output_file):
        timestamp = int(time.time())
        output_file = f"plants_updated_{timestamp}.csv"
    df_all.to_csv(output_file, index=False)
    print("Results saved to CSV file")

    df_all


In [29]:
if __name__ == "__main__":
    asyncio.run(main())



Crawling: https://bhpimpex.ca/ (index: 1)

Crawling: https://abideenfishmeal.com/en/index.html (index: 2)

Crawling: https://fishmeal.co/ (index: 4)

Crawling: https://www.nublend.com.au/index.php/products (index: 5)

Crawling: https://www.comeausea.com/facilities (index: 7)

Crawling: https://ditp.sec.gouv.sn/sites/default/files/docs/Liste%20des%20%C3%A9tablissements%20agr%C3%A9%C3%A9s%20%C3%A0%20l%27exportation%20au%2031%20d%C3%A9cembre%202022%20.pdf (index: 8)

Crawling: https://www.csfproteins.com.au (index: 9)

Crawling: http://www.gdhxgf.com/en/gsjj_en.html (index: 11)

Crawling: https://novosana.nl/ (index: 12)

Crawling: https://pmtcr.com/english/oil.htm (index: 13)

Crawling: https://www.akerbiomarine.com/ (index: 14)

Crawling: http://valofish.fr (index: 15)

Crawling: https://matchory.com/supplier/golden-lead-import-and-export-trade (index: 16)

Crawling: https://nessimtrading.com/nessim-fisheries/ (index: 17)

Crawling: https://iceberg2.ge/index.php?action_skin_change=yes&