In [144]:
from playwright.async_api import async_playwright
import time


async def get_related_document_urls(grant_url):
    files = []
    async with async_playwright() as playwright:
        browser = await playwright.chromium.launch(headless=True)
        context = await browser.new_context(accept_downloads=True)
        page = await context.new_page()

        await page.goto(grant_url)
        await page.wait_for_selector('body')

        related_doc_button = page.locator(
            'button', has_text='Related Documents')
        await related_doc_button.click()

        table_p_tag = page.locator(
            'p:has-text("Click on the following file link(s) to download the related document(s):")')
        if await table_p_tag.is_visible():
            table = table_p_tag.locator('xpath=following-sibling::table')
            if await table.is_visible():
                print("Table found")

                table_a_tags = await table.locator('a').all()

                for a_tag in table_a_tags:
                    a_tag_text = await a_tag.text_content()
                    print(f"Clicking on: {a_tag_text}")

                    await a_tag.click()
                    iframe = page.locator('id=attachmentDownload')
                    iframe_src = await iframe.get_attribute('src')
                    if iframe_src:
                        files.append(
                            {"file_name": a_tag_text, "url": iframe_src})

        else:
            print("No table found")

        await browser.close()

    filtered_files = [file for file in files if file.get(
        "file_name") != "" and file.get("url")]
    return filtered_files

# Run the function

In [105]:
import urllib.request
import zipfile
import os
from urllib.error import HTTPError


async def download_file(url, file_name, path='.'):
    print(f"Downloading {url}, file_name: {file_name}, path: {path}")

    # Create the folder for downloads if it doesn't exist
    folder_path = os.path.join(path)
    os.makedirs(folder_path, exist_ok=True)

    # Full file path with the original file name
    file_path = os.path.join(folder_path, file_name)

    # Attempt to download the file and handle HTTP errors
    try:
        urllib.request.urlretrieve(url, file_path)
        print(f"Downloaded: {file_path}")

        # Check if the file is a zip and extract it
        if file_name.endswith('.zip'):
            with zipfile.ZipFile(file_path, 'r') as zip_ref:
                zip_ref.extractall(folder_path)
            print(f"Extracted: {file_path}")

            # Remove the ZIP file after extraction
            os.remove(file_path)
            print(f"Removed: {file_path}")

    except HTTPError as e:
        print(f"Failed to download {file_name}. HTTPError: {e}")

In [106]:
async def extract_related_documents(files, sub_path='.'):
    print(files)
    for file in files:
        url = file.get("url")
        file_name = file.get("file_name")
        print(f"Downloading: {file_name}")
        await download_file(url, file_name, path=f'./{sub_path}')

In [143]:
import nest_asyncio
nest_asyncio.apply()


async def download_grant_documents_for_ai(grant_url):
    start = time.time()

    grant_id = grant_url.split("/")[-1]
    print(f"Grant ID: {grant_id}")
    files = await get_related_document_urls(grant_url)
    await extract_related_documents(files, sub_path=grant_id)

    end = time.time()
    print(f"Time taken: {end - start}")
    return grant_id

# Example usage

In [None]:
import os


def get_file_paths(directory):
    file_paths = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            file_paths.append(os.path.join(root, file))
    return file_paths


# Example usage
file_paths = get_file_paths("/path/to/directory")
print(file_paths)

In [146]:
from openai import OpenAI


async def create_assistant(api_key, grant_url):
    client = OpenAI(api_key=api_key)
    grant_id = await download_grant_documents_for_ai(grant_url)

    assistant = client.beta.assistants.create(
        name="Grant Document Assistant",
        instructions="You help users understand grant documents.",
        model="gpt-4o",
        tools=[{"type": "file_search"}],
    )

    vector_store = client.beta.vector_stores.create(name="Grant Documents")

    file_paths = get_file_paths(f"./{grant_id}")
    print(file_paths)
    # Ready the files for upload to OpenAI

    file_streams = [open(path, "rb") for path in file_paths]

    # # Use the upload and poll SDK helper to upload the files, add them to the vector store,
    # # and poll the status of the file batch for completion.
    file_batch = client.beta.vector_stores.file_batches.upload_and_poll(
        vector_store_id=vector_store.id, files=file_streams
    )

    # # You can print the status and the file counts of the batch to see the result of this operation.
    print(file_batch.status)
    print(file_batch.file_counts)

    assistant = client.beta.assistants.update(
        assistant_id=assistant.id,
        tool_resources={"file_search": {
            "vector_store_ids": [vector_store.id]}},
    )
    return client, assistant

In [147]:
def query_assistant(client, assistant, query):
    thread = client.beta.threads.create(
        messages=[
            {
                "role": "user",
                "content": query,
            }
        ]
    )

    run = client.beta.threads.runs.create_and_poll(
        thread_id=thread.id, assistant_id=assistant.id
    )

    messages = list(client.beta.threads.messages.list(
        thread_id=thread.id, run_id=run.id))

    message_content = messages[0].content[0].text
    annotations = message_content.annotations
    citations = []
    for index, annotation in enumerate(annotations):
        message_content.value = message_content.value.replace(
            annotation.text, f"[{index}]")
        if file_citation := getattr(annotation, "file_citation", None):
            cited_file = client.files.retrieve(file_citation.file_id)
            citations.append(f"[{index}] {cited_file.filename}")

    print(message_content.value)
    print("\n".join(citations))

In [None]:
api_key = None
client, assistant = await create_assistant(api_key, grant_url="https://grants.gov/search-results-detail/356303")

In [None]:
query_assistant(client, assistant,
                "What are the eligibility requirements for this grant?")