In [4]:
# API docs
# - https://platform.openai.com/docs/api-reference
# file_search:
# - https://platform.openai.com/docs/assistants/tools/file-search?context=without-streaming

In [5]:
!which python
!which pip

~/.conda/envs/py38/bin/python
~/.conda/envs/py38/bin/pip


In [6]:
# Commented out to be able to `Run All Cells` smoothly.

#!pip install PyPDF2
#!pip install openai

In [7]:
import sys
import openai

print(sys.version)
print(openai.__version__)

3.8.20 | packaged by conda-forge | (default, Sep 30 2024, 17:52:49) 
[GCC 13.3.0]
1.56.0


In [8]:
import os
with open("oai_token.txt", 'r') as file:
    os.environ["OPENAI_API_KEY"] = file.readline().strip()
openai.api_key = os.environ["OPENAI_API_KEY"]

In [9]:
pdf_document_filepath = "files/4-2_manual.pdf"

from PyPDF2 import PdfReader

# Load the PDF file
reader = PdfReader(pdf_document_filepath)

# Get the number of pages
NUM_PAGES = len(reader.pages)
print(f"Number of pages: {NUM_PAGES}")

Number of pages: 740


In [10]:
from PyPDF2 import PdfReader, PdfWriter

# pages is a 0-indexed array
def extract_pages(input_pdf, output_pdf, pages):
    reader = PdfReader(input_pdf)
    writer = PdfWriter()

    for page_num in pages:
        writer.add_page(reader.pages[page_num])

    with open(output_pdf, 'wb') as output_file:
        writer.write(output_file)

In [11]:
from openai import OpenAI, AsyncOpenAI
 
client = OpenAI()
 
assistant = client.beta.assistants.create(
  name="DRAGEN Assistant",
  instructions="You are an expert reader of manual pages, which include text, tables, and images. Use the documents provided to you to answer questions about what the manual says. You must stick to the manual and not complement your responses with any information not included in the manual pages provided.",
  model="gpt-4o",
  tools=[{"type": "file_search"}],
)
assistant_id = assistant.id

In [12]:
import asyncio

# Start a thread with a message containing a file.
async def create_thread_with_message(filepath, message, prev_filepath=None):
    client = AsyncOpenAI()
    messages=[{"role": "user","content": message,"attachments": []}]

    with open(filepath, "rb") as file:
        # Upload the file
        message_file = await client.files.create(
            file=file, purpose="assistants"
        )
        # Attach the new file to the message.
        messages[0]["attachments"].append(
            {
                "file_id": message_file.id, 
                "tools": [{"type": "file_search"}]
            }
        )
    
    # Handle prev page
    if prev_filepath:
        with open(prev_filepath, "rb") as prev:
            # Upload the file
            prev_message_file = await client.files.create(
                file=prev, purpose="assistants"
            )
            # Attach the new file to the message.
            messages[0]["attachments"].append(
                {
                    "file_id": prev_message_file.id, 
                    "tools": [{"type": "file_search"}]
                }
            )
        
    # Create a thread and attach the files to the message
    thread = await client.beta.threads.create(messages=messages)
    
    thread_id = thread.id 
    #file_id = message_file.id
    #vector_store_id = thread.tool_resources.file_search.vector_store_ids[0]
    #prev_vector_store_id = prev_vector_store.id
    return thread_id #, file_id, vector_store_id, prev_vector_store_id

# Use the create-and-poll SDK helper to create a run and poll its status
# until it's in a terminal state.
async def create_and_submit_run(thread_id):
    client = AsyncOpenAI()
    run = await client.beta.threads.runs.create_and_poll(
        thread_id=thread_id, assistant_id=assistant_id
    )
    
    messages = []
    async for message in client.beta.threads.messages.list(thread_id=thread_id, run_id=run.id):
        messages.append(message)
    return messages

def extract_response(messages):    
    message_content = messages[0].content[0].text
    annotations = message_content.annotations
    citations = []
    for index, annotation in enumerate(annotations):
        message_content.value = message_content.value.replace(annotation.text, f"[{index}]")
        if file_citation := getattr(annotation, "file_citation", None):
            cited_file = client.files.retrieve(file_citation.file_id)
            citations.append(f"[{index}] {cited_file.filename}")
    
    response = message_content.value
    references = "\n".join(citations)

    return response, references

In [13]:
message = """You are a document processing assistant tasked with generating prompts based on the content of a manual page. 
You are being provided with two pages: the current manual page and the preceding page for context. Focus primarily on the current page. Refer to the preceding page only when necessary to ensure continuity for sections or information that began earlier. The page number is located on the lower right corner of every page.

Follow these instructions carefully:
1. Analyze the current page from the manual for relevant information, including text, tables, diagrams, and examples. Relevant information refers to useful information for someone who is trying to learn how to use this platform. 
2. Generate between 10 and 20 detailed Alpaca-format prompts based on the relevance and length of the current (not the preceding) page.
3. Ensure that the response is formatted in the Alpaca structure, which is a JSON array of objects. Each object represents a single prompt and contains the following fields:
instruction: The task or query for the model (required).
input: Any context or details relevant to the instruction (optional).
output: The model's expected response (required).
system: The system instruction, which should always be "Do not add information not explicitly stated or speculate.".
history: A list of relevant prior [instruction, response] pairs, representing conversational history (optional).
Each field should be enclosed in quotation marks, separated by commas, and formatted as valid JSON. Multiple prompts should be enclosed in square brackets [] to form an array.
4. Include exactly one prompt that explicitly states the current page number as the source of the information being discussed.
5. If the page includes examples or diagrams, generate prompts that specifically cover those examples and diagrams.
6. Some pages may contain information that continues from a previous page. If this is the case, refer to the previous page to ensure the prompts comprehensively address the complete context of the information.
7. Include all important details from the page in the prompts. The prompts do not have to be concise but must be thorough, comprehensive, and factual.
8. Do not wrap your response in a type label such as ```json```

Use these guidelines to process the provided page and return the output in valid Alpaca format. Only return the prompts, nothing else.
"""

In [14]:
import asyncio
import os
import json
from collections import deque

# main coroutine
async def event_loop(pages_to_process):
    prev_filepath = None
    # for cleaning up later
    tasks = {}
    thread_ids = deque([])
    pages = deque([])
    for page in pages_to_process:
        print(f"\tProcessing page {page}")
                
        if page > 6: # page 6 is the first content page
            prev_filepath = f"files/page_{page-1}.pdf"
            if not os.path.exists(prev_filepath):
                extract_pages(pdf_document_filepath, prev_filepath, [page-1])

        page_filepath = f"files/page_{page}.pdf"
        if not os.path.exists(page_filepath):
            extract_pages(pdf_document_filepath, page_filepath, [page])
        
        thread_id = await create_thread_with_message(page_filepath, message, prev_filepath=prev_filepath)

        task = create_and_submit_run(thread_id)
        tasks[str(page)] = task

        # for cleaning up later
        thread_ids.append(thread_id)
        pages.append(page)
        
    # Wait for all tasks to complete
    raw_responses = await asyncio.gather(*tasks.values())
    return raw_responses, tasks, thread_ids, pages

def save_results(raw_responses_dict, overwrite=False):
    stored_prompts = {}
    with open('files/metadata.json', 'r') as metadata:
        stored_prompts = json.load(metadata)
    
    new_prompts = {}
    for page, raw_response in raw_responses_dict.items():
        if page not in stored_prompts or overwrite:
            new_prompts[page] = extract_response(raw_response)[0]            
    
    stored_prompts.update(new_prompts)
    
    with open('files/metadata.json', 'w') as metadata:
        json.dump(stored_prompts, metadata, indent=4)  # 'indent=4' makes the JSON file pretty-printed

# to clean up threads and tasks
async def clean_up(tasks, thread_ids, pages):
    client = AsyncOpenAI()
    while thread_ids:
        t_id = thread_ids.popleft()
        try:
            await client.beta.threads.delete(t_id)
        except Exception as e:
            print(f"Cleaning up threads failed: {e}")

    # Not necessary unless global
    #while pages:
    #    p = pages.popleft()
    #    del(tasks[p])

async def submitter(pages_to_process, overwrite=False):
    raw_responses, tasks, thread_ids, pages_processed = await event_loop(pages_to_process)

    raw_responses_dict = {page: response for page, response in zip(tasks.keys(), raw_responses)}
    save_results(raw_responses_dict, overwrite)
    
    await clean_up(tasks, thread_ids, pages_processed)    

    return pages_processed

async def driver(pages_to_process=[]): # first 6 pages and last page are non-content
    if pages_to_process:
        await submitter(pages_to_process, True)
    else:
        stored_prompts = {}
        with open('files/metadata.json', 'r') as metadata:
            stored_prompts = json.load(metadata)
    
        for page in range(6, NUM_PAGES-1, 10): 
            if str(page) in stored_prompts:
                continue

            # batching calls to event_loop for fault tolerance
            [pages_to_process.append(p) for p in range(page, page+10)]
            if pages_to_process[-1] >= NUM_PAGES-1:
                break
                
            pages_processed = await submitter(pages_to_process)
            # for check at start of the loop, since event_loop process more than 1 page at a time
            for page in pages_processed:
                stored_prompts[str(page)] = None 


In [15]:
# reprocess failed responses
def reprocess_failed():
    stored_prompts = {}
    with open('files/metadata.json', 'r') as metadata:
        stored_prompts = json.load(metadata)
    
    pages_to_reprocess = []
    for page, prompt_arr in stored_prompts.items():
        if isinstance(prompt_arr, str):
            try:
                isinstance(json.loads(prompt_arr), list)
            except Exception:
                #print(f"Will reprocess page {page}")
                #print(prompt_arr)
                pages_to_reprocess.append(int(page))
        else:
            print(f"ERROR: value not stored as a string but a: {type(prompt_arr)}.")

    return pages_to_reprocess

In [16]:
# cell driving main logic

await driver()
failed = reprocess_failed()
while failed:
    await driver(failed)
    failed = reprocess_failed()
    print(f"reprocessing {failed}")

In [17]:
# finalize data for fine-tuning step in next notebook
stored_prompts = {}
with open('files/metadata.json', 'r') as file:
    stored_prompts = json.load(file)

all_prompts = []
for arr in stored_prompts.values():
    # Ensure each value is parsed into a Python object
    if isinstance(arr, str):  # If `arr` is still a JSON string
        #print(arr)
        arr = json.loads(arr)
    all_prompts.extend(arr)  # Use `extend` for nested lists

#print(all_prompts[:10])

with open('files/dataset.json', 'w') as file:
    json.dump(all_prompts, file, indent=4)

In [18]:
# cleaning up

from openai import OpenAI
client = OpenAI()

response = client.files.list(purpose="assistants")
for file in response.data:
    client.files.delete(file.id)
print("All files with purpose 'assistants' have been deleted.")

#retries = 0
#while retries < 15:
try:
    vector_stores = client.beta.vector_stores.list()
    #retries = 0
    for vs in vector_stores:
        try:
            client.beta.vector_stores.delete(vs.id)
        except Exception as e:
            print(f"Error deleting vector store: {vs}")
    print("All returned vector stores have been deleted")
except:
    print("there was an issue, retrying...")
    #retries += 1

All files with purpose 'assistants' have been deleted.
All returned vector stores have been deleted


In [19]:
# Code graveyard
#    ### Handle prev page ###
#    page_num = int(filepath.strip("files/page_.pdf"))
#    # Create a vector store to contain the preceding manual page.
#   if page_num > 0:
#        prev_vector_store = await client.beta.vector_stores.create(name="Preceding manual page")
#
#        file_paths = [f"files/page_{page_num-1}.pdf"]
#        file_streams = [open(path, "rb") for path in file_paths]
#        file_batch = await client.beta.vector_stores.file_batches.upload_and_poll(
#          vector_store_id=prev_vector_store.id, files=file_streams
#        )
#        if file_batch.status != "completed":
#            print(f"Upload failed for preceding file: {file_path}.")
#
#        assistant = await client.beta.assistants.update(
#          assistant_id=assistant_id,
#          tool_resources={"file_search": {"vector_store_ids": [prev_vector_store.id]}},
#        )
#    ### End handle prev page ###


#    - IMPORTANT: If the current page is a non-content page (such as a title page, table of contents, blank page, etc.) do not generate any prompts. Instead, return an empty response.
