In [13]:
import os
from pathlib import Path
import json
import os
import requests
from bs4 import BeautifulSoup, NavigableString, Tag
from urllib.parse import urljoin

In [14]:
def find_project_root(current_directory, marker):
    current_directory = Path(current_directory).absolute()
    for parent in current_directory.parents:
        if (parent / marker).exists():
            return parent
    raise FileNotFoundError(f"Project root with {marker} not found")

current_directory = Path.cwd()
project_root = find_project_root(current_directory, '.git')

# Load the environment variables from the .env file
env_path = project_root / '.env'
from dotenv import load_dotenv
load_dotenv(dotenv_path=env_path)
PATH_TO_URLS = current_directory / os.getenv("PATH_TO_URLS")

base_url = "https://www.uscis.gov"
output_dir = "uscis_data"

In [15]:
# Function to read URLs from the file
def read_urls_from_file():
    with open(PATH_TO_URLS, 'r') as file:
        return [line.strip() for line in file.readlines()]

In [16]:
# Function to save content to a file
def save_content(content, path):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, 'w', encoding='utf-8') as file:
        file.write(content)

In [17]:
new_urls = set()

def save_new_urls():
    with open("urls.txt", "a") as f:
        f.write("\n")
        for url in new_urls:
            f.write(url + "\n")

In [18]:
# Function to format and save table contents to a JSON file
def save_table_as_json(table, path):
    headers = [header.get_text(strip=True) for header in table.find_all('th')]
    rows = []

    for row in table.find_all('tr'):
        cells = row.find_all(['td', 'th'])
        if not cells:
            continue

        row_data = {}
        for index, cell in enumerate(cells):
            # For cells with unordered lists, convert them into a list of texts
            if cell.find('ul'):
                cell_data = [li.get_text(strip=True) for li in cell.find_all('li')]
            else:
                cell_data = cell.get_text(strip=True)
            
            header = headers[index] if index < len(headers) else f"Column {index}"
            row_data[header] = cell_data

        rows.append(row_data)

    # Save the rows as a JSON file
    with open(path, 'w', encoding='utf-8') as file:
        json.dump(rows, file, indent=4)

In [19]:
# Function to recursively extract text from elements as they appear on the web page
def extract_text(element, buffer, depth=0, base_url=""):
    if isinstance(element, NavigableString):
        stripped_text = str(element).strip()
        if stripped_text:  # Avoid adding empty strings
            buffer.append(" " * (depth * 2) + stripped_text)  # Indent text based on depth for better readability
    elif isinstance(element, Tag):
        if element.name in ['h1', 'h2', 'h3', 'h4', 'h5']:
            buffer.append('\n')  # Prepend newline for headers

        if element.name == 'li':
            buffer.append("\n" + " " * (depth * 2) + "- ")  # Indent list items based on depth

        if element.name == 'a' and 'href' in element.attrs:
            link_text = element.get_text(strip=True)
            buffer.append(link_text + f" [URL: {urljoin(base_url, element['href'])}]")

        if element.name == 'table':
            # Process each row of the table
            buffer.append('\n' + " " * (depth * 2) + "Table Start:")
            for row in element.find_all('tr'):
                row_buffer = []
                for cell in row.find_all(['td', 'th']):  # handle both table data and header cells
                    cell_text = cell.get_text(strip=True, separator=' ').replace('\n', ' ')
                    row_buffer.append(cell_text)
                buffer.append("\n" + " " * (depth * 4) + ' | '.join(row_buffer) + ' |')
            buffer.append(" " * (depth * 2) + "\nTable End." + '\n')

        elif element.name not in ['table', 'tr', 'td', 'th']:  # Prevent diving into table parts handled above
            for child in element.children:
                extract_text(child, buffer, depth + (1 if element.name in ['ul', 'ol'] else 0))

        if element.name in ['p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'article', 'section']:
            buffer.append('\n')

In [20]:
# Function to scrape a page
def scrape_page(url_suffix, base_url, output_dir):
    url = urljoin(base_url, url_suffix)
    response = requests.get(url)
    if response.status_code != 200:
        print(f"Failed to fetch {url}")
        return

    soup = BeautifulSoup(response.content, 'html.parser')
    main_container = soup.find('div', class_='container container--main')

    if main_container:
        text_segments = []
        extract_text(main_container, text_segments)  # Recursively extract text
        text_content = ''.join(text_segments)

        # Save the extracted text
        path_segments = url_suffix.strip('/').split('/')
        text_path = os.path.join(output_dir, *path_segments[:-1], f"{path_segments[-1]}.txt")
        save_content(text_content, text_path)

    else:
        print(f"No main container found in {url}")

In [21]:
urls = read_urls_from_file()

for url_suffix in urls[:10]:
    scrape_page(url_suffix, base_url, output_dir)

Failed to fetch https://www.uscis.gov/file-online/how-to-file-your-application-for-naturalization-online-video


TODO:
- use full url for all instead of base and suffix (new urls might have different domain)
- queue or some way to visit new urls at runtime