In [None]:

import os
import requests
from dotenv import load_dotenv
from datetime import datetime
import time
from datetime import datetime, timedelta
load_dotenv() 

swparse_api_key = os.environ["SWPARSE_KEY"]
BASE_URL =  os.environ["BASE_URL"]


def get_file_content(filename: str)-> bytes:
    
    with open(f"pdf/{filename}", mode="rb") as f:
        return f.read()


def write_file(filepath: str, content:str):
    with open(filepath, "w") as f:
        f.write(content)


def upload_file(files: dict[str, tuple[str, bytes, str]], force_ocr: bool = False, plain_text: bool = False)-> str:    
    
    
    data = {
        "force_ocr": force_ocr,
        "plain_text": plain_text
    }
    
    headers = {
        'Authorization': f'Bearer {swparse_api_key}'
    }
    try:
        
        response = requests.post(f"{BASE_URL}/api/parsing/upload", files=files, headers=headers, data = data)
        if response.status_code == 201:
            print("File uploaded successfully!")
            res  = response.json()
            print(res)

            return res["id"]
     
        else:
            print(f"Failed to upload the file. Status code: {response.status_code}")
            print(response.text)
    except Exception as e:
        print(f"Error occurred: {str(e)}")
        
        


def get_result(job_id: str,  result_type:str) -> str:        
    headers = {
        'Authorization': f'Bearer {swparse_api_key}'
    }
    while True:
  
        response = requests.get(f"{BASE_URL}/api/parsing/job/{job_id}/result/{result_type}", headers=headers)
         
        if response.status_code == 200:
            break

        time.sleep(.5)
 
    result = response.json()
    output = result[result_type]
    
    return output
    


def process_file(files: dict[str, tuple[str, bytes, str]], result_type: str, attempts:int = 5, force_ocr:bool=False, plain_text:bool=False)->list[object]:
        times =  []
        filename, _, _ = list( files.values())[0]
        for attempt in range(attempts):
                start_time = datetime.now()
                
                job_id = upload_file(files, force_ocr=force_ocr, plain_text=plain_text)
                result = get_result(job_id, result_type=result_type)
                end_time = datetime.now()                     
                
                time_taken = end_time - start_time   
                times.append(time_taken)
        filename = filename.replace(".pdf", "")
        if force_ocr:
            filename = f"{filename}(force_ocr)" 
        if plain_text:
                filename = f"{filename}(plain_text).txt"
        else:
            filename = f"{filename}.md"
        write_file(f"output/{filename}", result)
                
        return times 
 
 
def get_average_str(times:list[object]):
    total_time = sum(times, timedelta())
    avg_time = total_time / len(times)
    return get_time_str(avg_time)


def get_time_str(time:object)->str:
    minutes, seconds = divmod(time.seconds, 60)
    milliseconds = time.microseconds // 1000
    
    return f"Time Taken: {minutes} min {seconds} sec {milliseconds} ms"
        

File:CMS_AI_Playbook_3_Final.pdf uploaded
File uploaded successfully!
{'id': 'saq:job:swparse:fba208b4-ce7b-11ef-9f89-0242ac120006', 'status': 'PENDING', 's3_url': 'swparse/2340cd88df33ff3c70188c3cbd2a9476.pdf'}


In [None]:
from typing import Any

files = [
    'My-Agreements-in-4i-Tip-Sheet_508-1.pdf'
    '2024SalesPresentationC6501-PPOs.pdf',
    'CMS_AI_Playbook_3_Final.pdf',
]

output_data:list[dict[str, Any]] = []

output_dir = "output"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

for filename in files:
    results:dict[str, Any] = {"file_name": filename}
    
    content = get_file_content(filename)
    file = {
            'file': (filename, content, 'application/pdf')
    }

    # Markdown Extraction
    markdown_times = process_file(file, "markdown", force_ocr=False, plain_text=False)
    results["markdown"] = markdown_times
    
    # Force OCR
    force_ocr_times = process_file(file, "markdown", force_ocr=True, plain_text=False)
    results["force_ocr"] = force_ocr_times
    
    # Plain Text
    plain_text_times = process_file(file, "text", force_ocr=False, plain_text=True)
    results["plain_text"] = plain_text_times
    
 
    output_data.append(results)


In [None]:

with open("memory_usage.md", "w") as f:
    for data in output_data:
        
        markdown_times = data.get("markdown", [])
        force_ocr_times = data.get("force_ocr", [])
        plain_text_times = data.get("plain_text", [])
 
        max_attempts = max(len(markdown_times), len(force_ocr_times), len(plain_text_times))

        
        f.write(f"`File name`:  {data['file_name']}\n\n")
        f.write("| Metric         | Markdown Extraction   |     Force OCR       |    Plain Text       |\n")
        f.write("|----------------|-----------------------|---------------------|---------------------|\n")
        
        for i in range(max_attempts):
            md_time = data["markdown"][i]
            ocr_time = data["force_ocr"][i]
            pt_time = data["plain_text"][i]
            
            md_time_str = get_time_str(md_time)
            ocr_time_str = get_time_str(ocr_time)
            pt_time_str = get_time_str(pt_time)
            
            f.write(f"| {i+1}st Attempt | {md_time_str} | {ocr_time_str} | {pt_time_str} |\n")
        
        avg_md = get_average_str(data["markdown"])
        avg_ocr = get_average_str(data["force_ocr"])
        avg_pt = get_average_str(data["plain_text"])
        
        f.write("|                |                       |                     |                     |\n")
        f.write(f"|    Avg Time    | {avg_md} | {avg_ocr} | {avg_pt} |\n\n\n")
        f.write("---\n\n")
