In [2]:
### local DB setup and load
import os
import io
import yaml
import json
import pandas as pd
from dotenv import load_dotenv

import fitz 
import pymupdf
import pytesseract
from PIL import Image

from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import AzureChatOpenAI

pytesseract.pytesseract.tesseract_cmd = r"C:\\Users\\yifyun01\AppData\\Local\Programs\\Tesseract-OCR\\tesseract.exe"

print(load_dotenv("../chatbot_poc_playground/.env"))

True


### Utility Functions

In [3]:
def load_and_chunk_one_pdf(file_path, ocr_flag=True):

    all_chunks = []            
    
    if ocr_flag:
        chunks = load_pdf_ocr_file(file_path)
        all_chunks.extend(chunks)
    
    else:
        pages = load_pdf_file(file_path)
        chunks = split_pdf_pages_into_chunks(pdf_pages=pages, 
                                    chunk_size=6000, 
                                    chunk_overlap=300)
        all_chunks.extend(chunks) 
    
    return all_chunks



def load_and_chunk_pdfs(pdf_dir, ocr_flag=True):

    all_chunks = []

    for filename in os.listdir(pdf_dir):
        if filename.lower().endswith("pdf"):
            file_path = os.path.join(pdf_dir, filename)            
            print(f"Loading file: {filename}")
            
            if ocr_flag:
                chunks = load_pdf_ocr_file(file_path)
                all_chunks.extend(chunks)
            
            else:
                pages = load_pdf_file(file_path)
                chunks = split_pdf_pages_into_chunks(pdf_pages=pages, 
                                            chunk_size=6000, 
                                            chunk_overlap=300)
                all_chunks.extend(chunks) 
            
    return all_chunks



def load_pdf_ocr_file(file_path): 
      
    doc = pymupdf.open(file_path)

    chunk_list = []

    for page_num in range(len(doc)):       
        page = doc[page_num]
        # Render page to a pixmap (image)
        pix = page.get_pixmap(dpi=300)  
        img_data = pix.tobytes("png")
        # Use PIL to open image
        image = Image.open(io.BytesIO(img_data))
        # Run OCR
        text = pytesseract.image_to_string(image)
        
        chunk_list.extend([text])
          
    return chunk_list


def load_pdf_file(file_path):
    loader = PyPDFLoader(file_path)
    # Load pages
    pages = loader.load()
    return pages


def load_text_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        # Read entire file as a single string
        content = f.read()  
    return content.strip()


def split_text_into_chunks(text, chunk_size=5000, chunk_overlap=300):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    chunks = splitter.split_text(text)
    return chunks


def split_pdf_pages_into_chunks(pdf_pages, chunk_size=5000, chunk_overlap=300):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    chunks = splitter.split_documents(pdf_pages)    
    return chunks


def extract_json_from_text(text):
    """
    Extract JSON array from text that might contain additional content.
    
    Args:
        text: Text that may contain JSON
        
    Returns:
        The parsed JSON if found, None otherwise
    """
    # First, check if the text is wrapped in code blocks with triple backticks
    code_block_pattern = r'```(?:json)?\s*([\s\S]*?)```'
    code_match = re.search(code_block_pattern, text)
    if code_match:
        text = code_match.group(1).strip()
        print("Found JSON in code block, extracting content...")
    
    try:
        # Try direct parsing in case the response is already clean JSON
        return json.loads(text)
    except json.JSONDecodeError:
        # Look for opening and closing brackets of a JSON array
        start_idx = text.find('[')
        if start_idx == -1:
            print("No JSON array start found in text")
            return None
            
        # Simple bracket counting to find matching closing bracket
        bracket_count = 0
        complete_json = False
        for i in range(start_idx, len(text)):
            if text[i] == '[':
                bracket_count += 1
            elif text[i] == ']':
                bracket_count -= 1
                if bracket_count == 0:
                    # Found the matching closing bracket
                    json_str = text[start_idx:i+1]
                    complete_json = True
                    break
        
        # Handle complete JSON array
        if complete_json:
            try:
                return json.loads(json_str)
            except json.JSONDecodeError:
                print("Found JSON-like structure but couldn't parse it.")
                print("Trying to fix common formatting issues...")
                
                # Try to fix missing quotes around keys
                fixed_json = re.sub(r'(\s*)(\w+)(\s*):(\s*)', r'\1"\2"\3:\4', json_str)
                # Fix trailing commas
                fixed_json = re.sub(r',(\s*[\]}])', r'\1', fixed_json)
                
                try:
                    return json.loads(fixed_json)
                except:
                    print("Could not fix JSON format issues")
        else:
            # Handle incomplete JSON - try to complete it
            print("Found incomplete JSON array, attempting to complete it...")
            
            # Get all complete objects from the array
            objects = []
            obj_start = -1
            obj_end = -1
            brace_count = 0
            
            # First find all complete objects
            for i in range(start_idx + 1, len(text)):
                if text[i] == '{':
                    if brace_count == 0:
                        obj_start = i
                    brace_count += 1
                elif text[i] == '}':
                    brace_count -= 1
                    if brace_count == 0:
                        obj_end = i
                        objects.append(text[obj_start:obj_end+1])
            
            if objects:
                # Reconstruct a valid JSON array with complete objects
                reconstructed_json = "[\n" + ",\n".join(objects) + "\n]"
                try:
                    return json.loads(reconstructed_json)
                except json.JSONDecodeError:
                    print("Couldn't parse reconstructed JSON array.")
                    print("Trying to fix common formatting issues...")
                    
                    # Try to fix missing quotes around keys
                    fixed_json = re.sub(r'(\s*)(\w+)(\s*):(\s*)', r'\1"\2"\3:\4', reconstructed_json)
                    # Fix trailing commas
                    fixed_json = re.sub(r',(\s*[\]}])', r'\1', fixed_json)
                    
                    try:
                        return json.loads(fixed_json)
                    except:
                        print("Could not fix JSON format issues in reconstructed array")
            
        print("No complete JSON array could be extracted")
        return None 


### LLM Chain

In [4]:
### Setup Azure OpenAI model instance
llm = AzureChatOpenAI(
    azure_deployment="gpt-4",
    openai_api_version =os.getenv("AZURE_OPENAI_API_VERSION"),
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),    
    temperature=0,
    max_tokens=500
)

print(llm.invoke("tell me a joke").content)

Why don't skeletons fight each other? They don't have the guts.


In [6]:
### Load data

# Load one pdf to test prompt
pdf_path = "./contracts/LinkedIn.pdf"
all_chunks = load_and_chunk_one_pdf(pdf_path)
input_text_for_entity_extract = "".join(all_chunks[:5])
input_text_all = all_chunks
# print(input_text_all)

In [11]:
from prompts_test import *
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableMap
from langchain.schema import SystemMessage, HumanMessage


prompt_entity_extraction = ChatPromptTemplate.from_messages([
    ("system", MAIN_SYSTEM_PROMPT),
    ("user", MAIN_USER_PROMPT_V2)
])

prompt_high_risk_clauses = ChatPromptTemplate.from_messages([
    ("system", MAIN_SYSTEM_PROMPT),
    ("user", MAIN_USER_PROMPT_HIGH_RISK_CLAUSE_V2)
])


chain_entity_extraction = prompt_entity_extraction | llm | JsonOutputParser()

chain_high_risk_clauses = prompt_high_risk_clauses | llm | JsonOutputParser()


In [24]:
response = chain_entity_extraction.invoke({"input_text": input_text_for_entity_extract })
response

{'primaryEntity': {'id': 'pe-001',
  'name': 'Robert Half Inc.',
  'description': 'Parent company',
  'counterparties': [{'id': 'cp-001',
    'name': 'LinkedIn Corporation',
    'description': 'Professional networking platform',
    'contractDates': {'effectiveDate': 'N/A', 'endDate': 'N/A'},
    'services': [{'id': 'svc-001',
      'name': 'Premium Services',
      'description': 'Services specified in Order Forms for recruiting and contacting prospective clients'}]}]}}

In [25]:
with open("./output/linkedin_entity.json", "w") as f:
    json.dump(response, f, indent=4)

In [46]:
prompt_high_risk_clauses = ChatPromptTemplate.from_messages([
    ("system", MAIN_SYSTEM_PROMPT),
    ("user", MAIN_USER_PROMPT_HIGH_RISK_CLAUSE_V2)
])

chain_high_risk_clauses = prompt_high_risk_clauses | llm | JsonOutputParser()

response = chain_high_risk_clauses.invoke({"input_text": input_text_for_entity_extract })
response

{'high-risk clauses': ['LinkedIn reserves the right to modify the User Agreement at any time for any reason. Notice of any change will be sent to the Customer in writing and made available via the Linkedin website. If the Parties agree that such modifications materially degrade the ability of the Customer to use the Premium Services, Customer may terminate the applicable Order Form within thirty (30) days after such change and receive a refund of any unused Fee for Premium Services.',
  "Customer agrees to pay all undisputed fees for the Premium Services included in the applicable Order Form ('Service Fees'), even if Users do not activate or use the Premium Services.",
  'Customer will not allow the Premium Services to be used as a service bureau for third parties (i.e. Customer will not provide or sell access to the Premium Services to third parties).',
  'Linkedin reserves the right to suspend the Premium Services for an individual Customer User account with or without notice to Cust

In [47]:
with open("./output/linkedin_high_risk_clauses.json", "w") as f:
    json.dump(response, f, indent=4)

In [12]:
def extract_text_from_pdf(pdf_stream):
    """Extract text from an in-memory PDF file."""
    text = ""
    with pymupdf.open(stream=pdf_stream, filetype="pdf") as pdf:
        for page in pdf:
            text += page.get_text()
    return text


def generate_entity(selected_file):    
    
    all_chunks = load_and_chunk_one_pdf(f"./contracts/{selected_file}")
    input_text_for_entity_extract = "".join(all_chunks[:5])
    
    resp = llm.invoke(prompt_entity_extraction.format_prompt(input_text=input_text_for_entity_extract))
    summary = resp.content

    return summary



def generate_clauses(selected_file):    
    
    all_chunks = load_and_chunk_one_pdf(f"./contracts/{selected_file}")
    input_text_for_entity_extract = "".join(all_chunks[:10])
    
    resp = llm.invoke(prompt_high_risk_clauses.format_prompt(input_text=input_text_for_entity_extract))
    summary = resp.content

    return summary

In [13]:
#===============#
# Create the UI #
#===============#
import gradio as gr

pdf_dir = "./contracts"
file_list = [file_name for file_name in os.listdir(pdf_dir) if file_name.lower().endswith("pdf")]

with gr.Blocks() as demo:
    # gr.Markdown('### PDF Summarization AI')
    gr.HTML("<center><h1>Third-Party Contract AI</h1></center>")
    with gr.Row():
         
        dropdown = gr.Dropdown(choices = file_list, label="Select a PDF File")
        entity_button = gr.Button("Extract key entities For Me", variant='primary')
        clauses_button = gr.Button("Extract high-risk clauses For Me", variant='primary')
    
    output_entity_box = gr.Textbox(label="Key Contract Entities", lines =10)
    output_clauses_box = gr.Textbox(label="High-risk Clauses", lines =10)

    entity_button.click(fn=generate_entity, inputs=dropdown, outputs=output_entity_box)
    clauses_button.click(fn=generate_clauses, inputs=dropdown, outputs=output_clauses_box)


demo.launch()

* Running on local URL:  http://127.0.0.1:7862
* To create a public link, set `share=True` in `launch()`.




### Test Prompt

In [None]:
# Extract entities
system_prompt = MAIN_SYSTEM_PROMPT
user_prompt = MAIN_USER_PROMPT_V1
user_prompt += f"```\n{input_text_for_entity_extract}```\n" 

response = llm.invoke(system_prompt + user_prompt)
print(response.content)

In [None]:
# Extract high-risk clauses

pdf_path = "./contracts/UIPath.pdf"
# all_chunks = load_and_chunk_pdfs(pdf_path, ocr_flag=True)
# all_chunks = load_and_chunk_one_pdf(pdf_path)
print(f"Total contract chunks: {len(all_chunks)}")

# Process each chunk
system_prompt = MAIN_SYSTEM_PROMPT
user_prompt = MAIN_USER_PROMPT_HIGH_RISK_CLAUSE_V1

all_results = []

for i, chunk in enumerate(all_chunks):
    print(f"Processing chunk {i+1}/{len(all_chunks)}")
    
    # Process the chunk with LLM
    chunk_response = llm.invoke(system_prompt + user_prompt + f"```\n{chunk}```\n" )
    chunk_results = chunk_response.content
    
    if chunk_results:              
        # Add to overall results
        all_results.extend([chunk_results])
    else:
        print(f"Warning: Failed to extract triples from chunk {i+1}")

print(f"\nExtracted a total of {len(all_results)} triples from all chunks")    

In [None]:
import ast
# Your raw nested list (as strings)
# Flattening logic
flattened_list = []
for s in all_results:
    try:
        parsed = ast.literal_eval(s)  # safely parse string to list
        flattened_list.extend(parsed)
    except Exception as e:
        print(f"Skipping one item due to parsing error: {e}")

# Output flattened list
for item in flattened_list:
    print(item)

In [None]:
json_txt_clauses = {"high-risk clauses": flattened_list}

with open("high_risk_clauses.json", "w") as f:
    json.dump(json_txt_clauses, f, indent=4)

# df = pd.DataFrame(data=all_results, columns=['json_outpt'])
# df.to_excel("extraction.xlsx")
# print(df['json_outpt'].iloc[3])

### Parse 10K

In [None]:
import requests
import json

# Company info
ticker = "MSFT"  # Apple Inc.
cik_lookup_url = f"https://www.sec.gov/files/company_tickers.json"

# Headers (SEC requires a User-Agent)
headers = {
    "User-Agent": "yifei.yun@protiviti.com"
}

# Step 1: Get CIK from ticker
response = requests.get(cik_lookup_url, headers=headers)
ticker_data = response.json()

In [None]:
import os
from sec_edgar_downloader import Downloader
from bs4 import BeautifulSoup

# # Create a downloader instance
# dl = Downloader("Protiviti", "yifei.yun@protiviti.com", "./data")
# # Download the latest 10-K (1 means the most recent)
# ticker = "MSFT"
# dl.get("10-K", ticker, limit=1)


# Path to downloaded file (find the first .htm file)
html_file = "./data/MSFT/10-K.html"
# filename = [f for f in os.listdir(folder_path) if f.endswith(".htm") or f.endswith(".html")][0]

# Extract text content
def html_to_clean_text(html_path):
    with open(html_path, "r", encoding="utf-8") as file:
        soup = BeautifulSoup(file, "lxml")

    # Remove non-visible elements
    for tag in soup(["script", "style", "head", "meta", "noscript", "footer", "nav", "tr", "td"]):
        tag.decompose()

    # Get text content
    text = soup.get_text(separator="\n")

    # Optional: collapse excessive whitespace
    lines = [line.strip() for line in text.splitlines()]
    clean_lines = [line for line in lines if line]  # remove empty lines
    return "\n".join(clean_lines)

clean_text = html_to_clean_text(html_file)

with open("./data/MSFT/clean_10k_text.txt", "w", encoding="utf-8") as f:
    f.write(clean_text)