In [1]:
import os
import gc
import time
import logging
import warnings
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from operator import itemgetter

import requests
import pickle
import numpy as np
import pandas as pd
import bs4
import faiss  # For creating embeddings
import openai  # Main model
import PyPDF2  # For extracting text from PDFs
from PyPDF2 import PdfReader
from pypdf import PdfReader  # Ensure pypdf is installed
from dotenv import load_dotenv
from tqdm import tqdm

# SQLAlchemy imports
from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, declarative_base
from sqlalchemy.exc import SQLAlchemyError

# FastAPI imports
from fastapi import FastAPI
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware

# LangChain imports
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain.llms import OpenAI
from langchain.vectorstores import ElasticVectorSearch, Pinecone, Weaviate, FAISS
from langchain.chains.question_answering import load_qa_chain
from langchain.utils.math import cosine_similarity
from langchain.document_loaders import PyPDFLoader
from langchain.schema import Document
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain

# LangChain core and community imports
from langchain_core.messages import AIMessageChunk
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.tracers.log_stream import LogEntry, LogStreamCallbackHandler
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Ignore warnings
warnings.filterwarnings("ignore")

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [2]:
os.environ["OPENAI_API_KEY"] = "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"

In [3]:
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)

In [4]:
# !rm -rf "report_pdfs"

### Loading Data

In [17]:
storage_client = storage.Client()

# Define your bucket name
bucket_name = 'barchart-aichatbot'
bucket = storage_client.bucket(bucket_name)

# Load your symbol data from the CSV (ensure this file contains a 'symbol' column)
data = pd.read_csv("gs://barchart-aichatbot/Fortune_200.csv")

# Define a directory to save the downloaded PDFs
local_dir = 'report_pdfs'
if not os.path.exists(local_dir):
    os.makedirs(local_dir)

# Function to download a PDF file from GCS
def download_pdf_from_gcs(bucket, symbol, pdf_file):
    """Download a PDF from GCS and save it locally."""
    # Define the local file path to save the file
    local_file_path = os.path.join(local_dir, f"{os.path.basename(pdf_file)}")
    
    try:
        # Access the blob in the GCS bucket
        blob = bucket.blob(pdf_file)
        
        # Download the file to the local directory
        blob.download_to_filename(local_file_path)
        # print(f"Downloaded {pdf_file} to {local_file_path}")
    except Exception as e:
        print(f"Error downloading {pdf_file} for {symbol}: {str(e)}")

# Iterate through the DataFrame and list all PDFs in the GCS folder for each symbol
for index, row in tqdm(data.iterrows(), desc="Downloading PDFs", total=len(data)):
    symbol = row['Ticker Symbol']  # Get the symbol (folder name)
    
    # List all files within the folder 'SEC Filings/{symbol}/'
    blobs = storage_client.list_blobs(bucket_name, prefix=f'SEC Filings/{symbol}/')
    
    for blob in blobs:
        # Check if the file has a .pdf extension
        if blob.name.endswith('.pdf'):
            # Download the PDF from GCS
            download_pdf_from_gcs(bucket, symbol, blob.name)

print("All PDFs have been downloaded.")

Downloading PDFs: 100%|██████████| 200/200 [03:00<00:00,  1.11it/s]

All PDFs have been downloaded.





In [5]:
folder_path = 'report_pdfs'

file_count = len([f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))])
print(f'Total number of files: {file_count}')

Total number of files: 1581


### Extracting PDF Contents

In [5]:
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

local_dir = 'report_pdfs'

pdf_files = [f for f in os.listdir(local_dir) if f.endswith('.pdf')]

def extract_pdf_text(pdf_file):
    try:
        file_path = os.path.join(local_dir, pdf_file)
        loader = PdfReader(file_path)
        text_pages = []
        
        # Extract text from each page
        for page_num in range(len(loader.pages)):
            text_pages.append(loader.pages[page_num].extract_text())
        
        return text_pages
    
    except Exception:
        return None

def process_pdfs_in_batches(pdf_files, num_workers):
    docs = []
    total_files = len(pdf_files)
    processed_files = 0
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(extract_pdf_text, pdf_file) for pdf_file in pdf_files]
        
        for future in as_completed(futures):
            result = future.result()
            if result:
                docs.extend(result)
            processed_files += 1
            logging.info(f"Processed {processed_files}/{total_files} files")

    return docs

num_workers = 15
docs = process_pdfs_in_batches(pdf_files, num_workers=num_workers)

print(f"Loaded {len(docs)} pages from the PDFs.")

### Chunking and Splitting

In [8]:
def split_documents_into_chunks(docs, chunk_size=2000, chunk_overlap=200):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,    # 2000 characters per chunk
        chunk_overlap=chunk_overlap,  # 200-character overlap
        length_function=len
    )
    
    documents = [Document(page_content=doc, metadata={}) for doc in docs]
    
    with tqdm(total=len(documents), desc="Splitting Documents", unit="doc") as pbar:
        # Split the documents into chunks
        doc_splits = []
        for document in documents:
            doc_splits.extend(text_splitter.split_documents([document]))  # Split each document
            pbar.update(1)  # Update progress bar after each document is processed
    
    # Adding chunk metadata for traceability
    for idx, split in enumerate(doc_splits):
        split.metadata["chunk"] = idx
    
    logging.info(f"Created {len(doc_splits):,} chunks from {len(docs):,} pages")
    return doc_splits

if __name__ == '__main__':
    # num_workers = 12
    doc_splits = split_documents_into_chunks(docs)
    
    # Final output
    print(f"Loaded {len(docs)} pages and created {len(doc_splits)} chunks.")

Splitting Documents: 100%|██████████| 4780/4780 [00:00<00:00, 5959.61doc/s]
2024-11-13 06:39:28,922 - Created 11,390 chunks from 4,780 pages


Loaded 4780 pages and created 11390 chunks.


### Saving Doc Splits

In [9]:
# Function to save doc_splits to a file
def save_doc_splits(doc_splits, filename='doc_splits.pkl'):
    with open(filename, 'wb') as f:
        pickle.dump(doc_splits, f)
    print(f"doc_splits saved to {filename}")

if __name__ == '__main__':    
    # Save the doc_splits to a file
    save_doc_splits(doc_splits)
    
    print(f"Loaded {len(docs)} pages and created {len(doc_splits)} chunks.")

doc_splits saved to test.pkl
Loaded 4780 pages and created 11390 chunks.


### Loading Doc_Splits

In [5]:
def load_doc_splits(filename='doc_splits.pkl'):
    with open(filename, 'rb') as f:
        doc_splits = pickle.load(f)
    print(f"doc_splits loaded from {filename}")
    return doc_splits

if __name__ == '__main__':
    doc_splits = load_doc_splits()
    
    print(f"Loaded {len(doc_splits)} chunks from the file.")

doc_splits loaded from doc_splits.pkl
Loaded 518501 chunks from the file.


### Creating Vector Embeddings

In [6]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

In [7]:
def create_faiss_index(doc_splits, batch_size=25, initial_delay=10, max_retries=10, checkpoint_path="Checkpoint/faiss_checkpoint", checkpoint_interval=10):
    embeddings = OpenAIEmbeddings()
    vector_db = None
    indexed_docs = 0

    # Check if a checkpoint exists
    if os.path.exists(checkpoint_path):
        try:
            vector_db = FAISS.load_local(checkpoint_path, embeddings, allow_dangerous_deserialization=True)
            logging.info(f"Loaded checkpoint from {checkpoint_path}.")
            indexed_docs = vector_db.index.ntotal
        except Exception as e:
            logging.warning(f"Failed to load checkpoint: {e}. Starting fresh FAISS indexing.")
    else:
        logging.info("No checkpoint found. Starting fresh FAISS indexing.")

    start_batch = indexed_docs // batch_size

    # Initialize tqdm with the remaining batches to process
    with tqdm(total=len(doc_splits) // batch_size, initial=start_batch, desc="Creating Embeddings", unit="batch") as pbar:
        for i in range(start_batch * batch_size, len(doc_splits), batch_size):
            batch = doc_splits[i:i + batch_size]
            retries = 0

            while retries < max_retries:
                try:
                    if vector_db is None:
                        vector_db = FAISS.from_documents(batch, embeddings)
                    else:
                        vector_db.add_documents(batch)

                    # Save checkpoint at the defined interval
                    if i // batch_size % checkpoint_interval == 0:
                        vector_db.save_local(checkpoint_path)
                        logging.info(f"Checkpoint saved at batch {i // batch_size}")

                    # Update progress and log the number of documents indexed so far
                    pbar.update(1)
                    logging.info(f"Documents indexed so far: {vector_db.index.ntotal}")

                    # Periodically collect garbage to free memory
                    if i // batch_size % (checkpoint_interval * 2) == 0:
                        gc.collect()

                    break  # Exit retry loop if successful
                except Exception as e:
                    if "429" in str(e):
                        wait_time = initial_delay * (2 ** retries)
                        logging.warning(f"Rate limit hit. Retrying in {wait_time} seconds...")
                        time.sleep(wait_time)
                        retries += 1
                    else:
                        logging.error(f"An error occurred: {str(e)}")
                        break  # Exit on non-retryable errors

            if retries == max_retries:
                raise Exception("Max retries exceeded. Unable to complete API requests for embeddings.")

    # Save a final checkpoint at the end
    vector_db.save_local(checkpoint_path)
    logging.info("Final checkpoint saved.")
    return vector_db

In [8]:
if __name__ == '__main__':
    
    batch_size = 25
    checkpoint_path = "Checkpoint/faiss_checkpoint"

    try:
        vector_db = create_faiss_index(doc_splits, batch_size=batch_size, checkpoint_path=checkpoint_path)
        
        # Check the number of indexed documents
        number_of_documents = vector_db.index.ntotal
        logging.info(f"Number of documents in the FAISS index: {number_of_documents}")
        
    except Exception as e:
        logging.error(f"Failed to complete FAISS indexing: {str(e)}")

2024-11-15 00:05:11,782 - Loaded checkpoint from Checkpoint/faiss_checkpoint.
Creating Embeddings:  99%|█████████▉| 20601/20740 [00:00<?, ?batch/s]2024-11-15 00:05:12,698 - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
Creating Embeddings:  99%|█████████▉| 20602/20740 [00:01<03:46,  1.64s/batch]2024-11-15 00:05:13,426 - Documents indexed so far: 515050
2024-11-15 00:05:13,954 - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
Creating Embeddings:  99%|█████████▉| 20603/20740 [00:02<02:32,  1.11s/batch]2024-11-15 00:05:14,168 - Documents indexed so far: 515075
2024-11-15 00:05:14,666 - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
Creating Embeddings:  99%|█████████▉| 20604/20740 [00:03<02:04,  1.10batch/s]2024-11-15 00:05:14,845 - Documents indexed so far: 515100
2024-11-15 00:05:15,281 - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
Creating Embeddings:  99%|█████████▉| 20605/2

### Saving Vector DB

In [9]:
retriever = vector_db.as_retriever(
    search_type="similarity", search_kwargs={"k": 3} #k: Number of Documents to return, defaults to 4.
)

In [10]:
vector_path = 'barchart_vectordb'

vectordb_folder = vector_path
index_name="faiss_index"

In [11]:
vector_db.save_local(vectordb_folder, index_name=index_name)

In [12]:
print("Embeddings Created")

Embeddings Created
