In [1]:
from dotenv import dotenv_values

# specify the name of the .env file name 
env_name = "llm_pgvector.env" # change to your own .env file name
config = dotenv_values(env_name)

# Extract data and context

In [7]:
# code to extract text from pdf 
"""
This code sample shows Prebuilt Document operations with the Azure Form Recognizer client library. 
The async versions of the samples require Python 3.6 or later.

To learn more, please visit the documentation - Quickstart: Form Recognizer Python client library SDKs
https://docs.microsoft.com/en-us/azure/applied-ai-services/form-recognizer/quickstarts/try-v3-python-sdk
"""

from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer import DocumentAnalysisClient

"""
Remember to remove the key from your code when you're done, and never post it publicly. For production, use
secure methods to store and access your credentials. For more information, see 
https://docs.microsoft.com/en-us/azure/cognitive-services/cognitive-services-security?tabs=command-line%2Ccsharp#environment-variables-and-application-configuration
"""

endpoint = config["AZURE_FORM_RECOGNIZER_ENDPOINT"]
key = config["AZURE_FORM_RECOGNIZER_KEY"]

# sample document
document_analysis_client = DocumentAnalysisClient(
        endpoint=endpoint, credential=AzureKeyCredential(key)
    )


def analyze_pdf(doc_path):  
    with open(doc_path, "rb") as f:
        poller = document_analysis_client.begin_analyze_document(
            "prebuilt-document", document=f
        )
    result = poller.result()
    for kv_pair in result.key_value_pairs:

        if filter_id1_name in kv_pair.key.content:
            print(30*"*")
            print("filter_id1_val:")
            print(":\n")
            print(kv_pair.value.content)
            print(30*"*")
                
    return result


In [15]:
import re
# Extract value if explicity mentioned in a text. You may develop the template for specific use case
extract_name1 = "Ticker"  
extract_name2= "Quarter"
def extract_values(text, target =["FY"]):
    '''
    given a text string, extract the client code and line of business
    '''
    extract_name1_pattern = r"{extract_name1}\s+([\w,]+)"
    extract_name2_pattern = r"{extract_name2}\s+([\w\s]+)"

    extract_name1_match = re.search(extract_name1_pattern, text)
    extract_name1_match = re.search(extract_name2_pattern, text)

    if extract_name1_match and extract_name1_match:
        extract_name1 = filter_id1_match.group(1).split(',')  # Split by commas to get a list
        extract_name2 = filter_id2_match.group(1).split()[0]  # Extract only the first word
        return extract_name1, extract_name2
    else:
        return None


In [26]:
import re
# extract information form the filename
def extract_info_from_filename(filename):
    pattern = r'([A-Z]+)TranscriptFY(\d{2})Q(\d)'
    match = re.search(pattern, filename)
    
    if match:
        symbol = match.group(1)
        fiscal_year = match.group(2)
        fiscal_quarter = match.group(3)
        return symbol, fiscal_year, fiscal_quarter
    else:
        return None

# example
filename = "MSFTTranscriptFY23Q4"
symbol, fiscal_year, fiscal_quarter = extract_info_from_filename(filename)

if symbol and fiscal_year and fiscal_quarter:
    print(f"Symbol: {symbol}")
    print(f"Fiscal Year: {fiscal_year}")
    print(f"Fiscal Quarter: {fiscal_quarter}")
else:
    print("Unable to extract information from the filename.")


Symbol: MSFT
Fiscal Year: 23
Fiscal Quarter: 4


In [18]:
def create_line_page_tuples(result):
    '''
    Input: result of form recognizer analyze_pdf function
    Output: Create list of tuples of the form (line, page_num, line_num) 
    This will keep reference of the line number and page number of each line in the document.
    '''
    line_page_tuples = []

    total_pages = len(result.pages)
    for page_num in range(total_pages):
        lines = result.pages[page_num].lines
        total_lines = len(lines)

        for line_num in range(total_lines):
            line = lines[line_num].content
            line_page_tuples.append((line, page_num + 1, line_num + 1))

    return line_page_tuples


In [19]:
def chunk_with_page_number(line_page_tuples, chunk_length = 10, chunk_overlap = 2):
    '''
    Given the list of tuples of the form (line, page_num, line_num) and chunk length and overlap,
    it will create chunks of text with page number and line number of the first line in the chunk.
    chunk length: number of lines in each chunk
    chunk_overlap: number of overlapping lines between chunks
    '''
    pointer = 0 
    chunks = []
    total_lines = len(line_page_tuples)
    #for line, page_number, line_number in line_page_tuples:
    while pointer<total_lines:
        line_count =0
        current_chunk = ""
        if not chunks: 
            # for first chunk we can not use overlap
            pointer = 0
        else:
            pointer = pointer - chunk_overlap
        
        # take starting page number and line number 
        page_number, line_number = line_page_tuples[pointer][1:]  
        while line_count<chunk_length and pointer<total_lines:
            current_chunk = current_chunk + line_page_tuples[pointer][0]
            current_chunk = current_chunk + " "
            line_count += 1
            pointer += 1
        chunks.append((current_chunk, page_number, line_number))
    return chunks


#### Read pdf and extract symbol and quarter from file name 



In [27]:
import os
doc_dir = "..\DATA"
pdf_files = [filename for filename in os.listdir(doc_dir) if filename.endswith('.pdf')]
for file in pdf_files:
    symbol, fiscal_year, fiscal_quarter = extract_info_from_filename(file)
    print(symbol, fiscal_year, fiscal_quarter)

MSFT 23 1
MSFT 23 2
MSFT 23 3
MSFT 23 4


In [28]:
import os
import pandas as pd 
doc_dir = "..\DATA"
pdf_files = [filename for filename in os.listdir(doc_dir) if filename.endswith('.pdf')]
DocId = 0 
for file in pdf_files:
    values = extract_info_from_filename(file)
    file_path = os.path.join(doc_dir, f"{os.path.splitext(file)[0]}.pdf")
    # analyze the pdf using form recognizer
    result = analyze_pdf(file_path)
    # get the chunks in a tuple of the form (chunk, page_number, line_number)
    line_page_tuples = create_line_page_tuples(result)
    chunks = chunk_with_page_number(line_page_tuples = line_page_tuples, chunk_length = 10, chunk_overlap = 2)
    DocId += 1   
    
    # write the chunks to another csv file 
    df_chunks = pd.DataFrame(chunks, columns = ['chunk', 'page_number', 'line_number'])  
    df_chunks['DocId'] = DocId
    if values:
        symbol, fiscal_year, fiscal_quarter = values
        df_chunks["Ticker"] = symbol
        df_chunks["Year"] = fiscal_year
        df_chunks["Quarter"] = fiscal_quarter
    else:
        df_chunks["Ticker"] = "NULL"
        df_chunks["Year"] = "NULL"
        df_chunks["Quarter"] = "NULL"
    if not os.path.exists("../AnalyzedPDF/Chunks/"):
        os.makedirs("../AnalyzedPDF/Chunks")

print('writing the results of: \n' + file)  
    if not os.path.exists(f"../AnalyzedPDF/Chunks/Chunks_{file}"+".csv"):
        df_chunks.to_csv("../AnalyzedPDF/Chunks/" + 'DocId_'+ str(DocId) + ".csv", index=False)
    else:
        print(f'File: chunks_{file}.csv already exists, skipping...')
    

    
    

KeyboardInterrupt: 

In [None]:
# read csv files in anlyzedPDF2 folder and create a dataframe
import pandas as pd
import os 
def read_csv_files(path= "../AnalyzedPDF"):
    df = pd.DataFrame()
    for file in os.listdir(path):
        file_path = os.path.join(path, file)
        df = pd.concat(pd.read_csv(file_path), ignore_index=True, axis = 0)
    return df

In [None]:
import os 
import pandas as pd 
def concatenate_csv_files(path = "../AnalyzedPDF"):
    csv_files = [file for file in os.listdir(path) if file.endswith(".csv")]
    dfs = []
    for file in csv_files:
        file_path = os.path.join(path, file)
        df = pd.read_csv(file_path)
        dfs.append(df)
    print("Concatenated files:", dfs)
    combined_df = pd.concat(dfs, ignore_index=True)
    return combined_df

Let's combine all the csv files for filter_id and chunks and store in separate combined csv files.

In [None]:
folder_path = f"../AnalyzedPDF/{filter_id1_name}"
result_filter_id1_df = concatenate_csv_files(folder_path)
print(result_filter_id1_df)
if not os.path.exists("../AnalyzedPDF/CombinedResults"):
    os.makedirs("../AnalyzedPDF/CombinedResults")
    

In [None]:
# let's rename columns, add index, and save the results
result_filter_id1_df["Id"] = result_filter_id1_df.index +1 

# now let's add a unique id
columns = ['Id'] + [col for col in result_filter_id1_df.columns if col != 'Id']
result_filter_id1_df = result_filter_id1_df.reindex(columns=columns)
# rename for consistency
result_filter_id1_df.columns = ['Id', 'DocId', filter_id1_name, filter_id2_name]
result_filter_id1_df.head(1000)

In [None]:
result_filter_id1_df.to_csv(f"../AnalyzedPDF/CombinedResults/{filter_id1_name}.csv", index=False)

In [None]:
folder_path = "../AnalyzedPDF/Chunks"
result_chunk_df = concatenate_csv_files(folder_path)
print(result_chunk_df)
# add primary key
result_chunk_df["Id"] = result_chunk_df.index +1    
#make Id as the first column as it will be used as primary key
columns = ['Id'] + [col for col in result_chunk_df.columns if col != 'Id']
result_chunk_df = result_chunk_df.reindex(columns=columns)
new_columns = {
    'Id': "Id",
    'chunk': 'Chunk',
    'Embedding': 'Embedding',
    'page_number': 'PageNumber',
    'DocID': 'DocID',
    'line_number': 'LineNumber',
}
result_chunk_df = result_chunk_df.rename(columns=new_columns)
# Print the DataFrame with 'Id' as the first column after index
result_chunk_df.head(1000)
result_chunk_df.to_csv("../AnalyzedPDF/CombinedResults/Chunks.csv", index=False)

In [None]:
result_chunk_df

In [None]:
result_filter_id1_df