In [10]:
# %pip install -q google-cloud-aiplatform==1.36.0 langchain==0.0.327 unstructured chromadb==0.4.15 --upgrade --user "unstructured[pdf]" pydantic gradio pdf2image google-cloud-vision google-cloud-storage pymupdf easyocr numpy python-docx python-pptx

In [11]:
# %pip install -q langchain==0.0.327 chromadb==0.4.15 gradio google-cloud-storage pymupdf easyocr numpy python-docx python-pptx

In [12]:
import IPython
import time
# Restart kernel after installs so that your environment can access the new packages
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

In [None]:
import sys
if "google.colab" in sys.modules:
    from google.colab import auth as google_auth

    google_auth.authenticate_user()

In [1]:
import re
from langchain.embeddings import VertexAIEmbeddings
from langchain.llms import VertexAI
from langchain.vectorstores import Chroma
import chromadb
import vertexai
from google.cloud import storage
import fitz  # PyMuPDF
import easyocr
from PIL import Image
import io
import numpy as np
import docx
import pptx
from langchain.text_splitter import RecursiveCharacterTextSplitter
import gradio as gr
import datetime  
import os
import concurrent.futures

2024-09-25 19:24:08.226157: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-09-25 19:24:09.853942: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/nccl2/lib:/usr/local/cuda/extras/CUPTI/lib64
2024-09-25 19:24:09.854183: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/nccl2/lib:/usr/loca

In [2]:
# Initialize GCP Storage client
def initialize_gcp():
    # Initialize Vertex AI SDK
    vertexai.init(project=PROJECT_ID, location=LOCATION)

    # Initialize Google Cloud Storage client
    storage_client = storage.Client()
    return storage_client

# Global vars & Models

In [3]:
PROJECT_ID = "genai-vertex-poc"
LOCATION = "us-central1" 
BUCKET_NAME = "data88"
GCP_STORAGE_CLIENT = initialize_gcp()
TEMP_FOLDER = "_temp/"

DB_PATH = "chroma_db5"

In [4]:
# Define Text Embeddings model
# embedding = VertexAIEmbeddings(model_name='text-embedding-004')
embedding = VertexAIEmbeddings()

llm = VertexAI(
    model_name="text-bison-32k",
    max_output_tokens=256,
    temperature=0.1,
    top_p=0.6,
    top_k=40,
    verbose=True,
)

# Util

In [5]:
# Function to add text to dictionary key
def add_text_to_key(my_dict, key, text):
    my_dict.setdefault(key, '')  # Initialize key with empty string if not present
    my_dict[key] += text
    
def download_from_gcs(bucket_name, blob_name, destination_file_name):    
    bucket = GCP_STORAGE_CLIENT.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.download_to_filename(destination_file_name)

def get_or_create_bucket(bucket_name):
    try:
        # Get the bucket
        bucket = GCP_STORAGE_CLIENT.bucket(bucket_name)
    except:
        # Create the bucket if it doesn't exist
        bucket = GCP_STORAGE_CLIENT.create_bucket(bucket_name)
        print(f"Bucket {bucket_name[:-6]} created.")

    return bucket

# Text Extraction

In [6]:
def extract_images_from_file(file_path):
    images = []

    if file_path.endswith('.pdf'):
        doc = fitz.open(file_path)
        for page_num in range(len(doc)):
            page = doc.load_page(page_num)
            pix = page.get_pixmap()
            img_bytes = pix.tobytes()
            img = Image.open(io.BytesIO(img_bytes))
            images.append(np.array(img))  # Convert PIL.Image to numpy array
        doc.close()

    elif file_path.endswith('.docx'):
        doc = docx.Document(file_path)
        for rel in doc.part.rels.values():
            if "image" in rel.target_ref:
                img_bytes = rel.target_part.blob
                img = Image.open(io.BytesIO(img_bytes))
                images.append(np.array(img))  # Convert PIL.Image to numpy array

    elif file_path.endswith('.pptx'):
        ppt = pptx.Presentation(file_path)
        for slide in ppt.slides:
            for shape in slide.shapes:
                if shape.shape_type == 13:  # 13 is the type for Picture
                    image = shape.image
                    img_bytes = image.blob
                    img = Image.open(io.BytesIO(img_bytes))
                    images.append(np.array(img))  # Convert PIL.Image to numpy array
    else:
        print('Unsupported file format')
        return None
    # print('extract_images_from_file executed')
    return images

# Function to perform OCR using EasyOCR
def perform_ocr(image):
    reader = easyocr.Reader(['en'])  # Replace with desired language(s)
    result = reader.readtext(image)
    text = '\n'.join([bbox[1] for bbox in result])
    # print('perform_ocr executed')
    return text

def extract_text_from_pdf(pdf_path):
    doc = fitz.open(pdf_path)
    combined_text = ''
    dict_pagenumber_text = {}
    for page_num in range(len(doc)):
        page = doc.load_page(page_num)
        page_text = page.get_text()
        combined_text += page_text
        dict_pagenumber_text[page_num + 1] = page_text # Page numbers start from 1
    doc.close()
    # print('extract_text_from_pdf executed')
    return combined_text, dict_pagenumber_text

def extract_text_from_docx(file_path):
    print("file_path", file_path)
    document = docx.Document(file_path)
    dict_pagenumber_text = {}
    current_page = 0
    page_content = []
    combined_text = []

    for paragraph in document.paragraphs:
        for run in paragraph.runs:
            # Check for page break in the run
            if 'lastRenderedPageBreak' in run._element.xml:
                page_content.append(run.text)
                combined_text.append(run.text)
                dict_pagenumber_text[current_page] = ' '.join(page_content).strip()
                current_page += 1
                page_content = []
            else:
                page_content.append(run.text)
                combined_text.append(run.text)
        page_content.append('\n')  # Add a newline to separate paragraphs
        combined_text.append('\n')

    # Add the last page
    if page_content:
        dict_pagenumber_text[current_page] = ' '.join(page_content).strip()

    combined_text = ''.join(combined_text).strip()
    return combined_text, dict_pagenumber_text


def extract_text_from_pptx(pptx_path):
    ppt = pptx.Presentation(pptx_path)
    combined_text = ''
    dict_pagenumber_text = {}
    for slide_num, slide in enumerate(ppt.slides):
        slide_text = ''
        for shape in slide.shapes:
            if shape.has_text_frame:
                slide_text += shape.text_frame.text
        combined_text += slide_text
        dict_pagenumber_text[slide_num + 1] = slide_text # Slide numbers start from 1
    # print('extract_text_from_pptx executed')
    return combined_text, dict_pagenumber_text


def extract_text(bucket_name, blob_name):
    # Create the "_temp" folder if it doesn't exist
    os.makedirs(TEMP_FOLDER, exist_ok=True)
    destination_file_name = os.path.join(TEMP_FOLDER, f'downloaded_{blob_name}')
    download_from_gcs(bucket_name, blob_name, destination_file_name)

    combined_text = ""
    dict_pagenumber_text = {}
    metadata = {
        "file_name": blob_name,
        "file_type": os.path.splitext(blob_name)[1],
        "upload_date_time": str(datetime.datetime.now())
    }

    if destination_file_name.endswith('.pdf'):
        combined_text, dict_pagenumber_text = extract_text_from_pdf(destination_file_name)
    elif destination_file_name.endswith('.docx'):
        combined_text, dict_pagenumber_text = extract_text_from_docx(destination_file_name)
    elif destination_file_name.endswith('.pptx'):
        combined_text, dict_pagenumber_text = extract_text_from_pptx(destination_file_name)
    elif destination_file_name.endswith('.txt'):
        with open(destination_file_name) as fp:
            combined_text = fp.read()
            dict_pagenumber_text = {1: combined_text}
    else:
        print('Unsupported file format')
        #metadata["processing_status"] = "Failed: Unsupported file format"
        return "", {}, {}

    images = extract_images_from_file(destination_file_name)
    for i, image in enumerate(images):
        ocr_text = perform_ocr(image)
        combined_text += "\n" + ocr_text
        add_text_to_key(dict_pagenumber_text, i, ocr_text)

    os.remove(destination_file_name)
    # print('process_file executed and metadata is: ', metadata)
    return combined_text, metadata, dict_pagenumber_text


def process_blobs_concurrently(bucket_name, blobs, max_workers=4):    
    # blobs = list(GCP_STORAGE_CLIENT.list_blobs(bucket_name))
    combined_text_all_files = ""
    # {filename: dict_metadata}
    metadata_all_files = {}
    # {filename: dict_pagenumber_text}
    dict_pagenumber_text_all_files = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_blob = {executor.submit(extract_text, bucket_name, blob.name): blob for blob in blobs}
        for future in concurrent.futures.as_completed(future_to_blob):
            blob = future_to_blob[future]
            print("Processing :", blob)
            try:
                combined_text, metadata, dict_pagenumber_text = future.result()
                if combined_text:
                    combined_text_all_files += combined_text
                    metadata_all_files[blob.name] = metadata
                    dict_pagenumber_text_all_files[blob.name] = dict_pagenumber_text
            except Exception as e:
                print(f"Error processing {blob.name}: {e}")
    return combined_text_all_files, metadata_all_files, dict_pagenumber_text_all_files

# Vector DB Operations

In [7]:
CHROMA_DB_CLIENT = chromadb.PersistentClient(path = DB_PATH)

def del_collection(collection_name):
    CHROMA_DB_CLIENT.delete_collection(collection_name)
    return f"Collection {collection_name} deleted Successfully!"

def get_all_collections():
    # Function to get the list of all collections
    collections = CHROMA_DB_CLIENT.list_collections()
    collection_names = [col.name for col in collections]
    return collection_names

def get_files_in_collection(collection_name):
    # Function to get the list of files in a given collection
    try:
        collection = CHROMA_DB_CLIENT.get_collection(collection_name)
        documents = collection.get()
        
        # Prepare a list to store file details
        file_details = {}
        
        for i in range(len(documents['ids'])):
            file_details[documents['metadatas'][i]['file_name']] = {'Upload Date Time': documents['metadatas'][i]['upload_date_time']}
        
        return file_details
    
    except Exception as e:
        return str(e)

# split_documents

In [8]:
class Document:
    def __init__(self, page_content, metadata={}):
        self.page_content = page_content
        self.metadata = metadata


def split_documents(metadata_all_files, dict_pagenumber_text_all_files):
    documents = []
    for filename, dict_pagenumber_text in dict_pagenumber_text_all_files.items():
        for page_num, page_content in dict_pagenumber_text.items():
            doc = Document(page_content=page_content, metadata= metadata_all_files[filename]|{"page_number": page_num})
            documents.append(doc)
    #print('documents: ',documents)    
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
    split_docs = text_splitter.split_documents(documents)
    #print('split_docs: ',split_docs)
    print(f"# of documents = {len(split_docs)}")
    # print('split_documents executed')
    return split_docs

# Process documents

In [9]:
def process_documents(bucket_name, collection_name, blobs):
    combined_text_all_files, metadata_all_files, dict_pagenumber_text_all_files = process_blobs_concurrently(bucket_name, blobs, 2)
    docs = split_documents(metadata_all_files, dict_pagenumber_text_all_files)
    # save documents' embeddings in Chroma
    db = Chroma.from_documents(docs, embedding, collection_name= collection_name, persist_directory=DB_PATH)
    return db

# Function to validate collection name
def is_valid_collection_name(name):
    # Check if the name is at least 3 characters long
    if len(name) < 3:
        return False
    # Check if the name contains only alphanumeric characters, underscores, and hyphens
    if not re.match(r'^[a-zA-Z0-9_-]+$', name):
        return False
    return True


# Upload files to GCP
def upload_and_process_documents(files, collection_name):
    if not is_valid_collection_name(collection_name):
        return '''Invalid collection name. Please follow the naming guidelines.\nNaming Guidelines: 
    minimum length of 3 characters, and allowing only alphanumeric characters, underscores, and hyphens'''
    bucket = get_or_create_bucket(BUCKET_NAME)

    blobs = []
    for file in files:
        file_name = os.path.basename(file.name)
        blob = bucket.blob(file_name)
        blob.upload_from_filename(file.name)
        blobs.append(blob)

    process_documents(BUCKET_NAME, collection_name, blobs)
    return "Uploaded files successfully in Collection: " + collection_name

# UI

In [None]:
# Define the Gradio app using Blocks
with gr.Blocks() as app:
    gr.Markdown("# OCR_based_gpt")
    gr.Markdown("Upload and manage your documents here!")

    with gr.Row():
        with gr.Column():
            # Upload Files Section
            gr.Markdown("## Upload & Process Files")
            file_upload = gr.File(label="Upload Files", file_count="multiple")
            collection_name_input = gr.Textbox(label="Collection Name", placeholder="Enter Collection Name")
            upload_button = gr.Button("Upload and Process")
            upload_output = gr.Textbox(label="Upload Response")
            
            upload_button.click(
                upload_and_process_documents,
                inputs=[file_upload, collection_name_input],
                outputs=upload_output
            )
            
            # Delete Collection Section
            gr.Markdown("## Delete Collection")
            collection_name_input = gr.Textbox(label="Collection Name", placeholder="Enter Collection Name")
            delete_collection_btn = gr.Button("Delete Collection")
            delete_collection_output = gr.Textbox(label="Delete Response")
            
            delete_collection_btn.click(
                del_collection,
                inputs=collection_name_input,
                outputs=delete_collection_output
            )

        with gr.Column():
            # List Collections Section
            gr.Markdown("## Get All Collections")
            all_collections_output = gr.JSON(label="All Collections")
            list_collections_btn = gr.Button("List All Collections")
            
            list_collections_btn.click(
                get_all_collections,
                inputs=[],
                outputs=all_collections_output
            )
        
            # List Files in Collection Section
            gr.Markdown("## Get Files in Collection")
            collection_name_input_files = gr.Textbox(label="Collection Name", placeholder="Enter Collection Name")
            files_output = gr.JSON(label="Files in Collection")
            list_files_btn = gr.Button("List Files in Collection")
            
            list_files_btn.click(
                get_files_in_collection,
                inputs=collection_name_input_files,
                outputs=files_output
            )


# Launch the Gradio app
app.launch(share=True, debug=True)

Running on local URL:  http://127.0.0.1:7861
Running on public URL: https://afcb9d04f065885807.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
  net.load_state_dict(copyStateDict(torch.load(trained_model, map_location=device)))
  state_dict = torch.load(model_path, map_location=device)


Processing : <Blob: data88, Format_MResult Resume.pptx, 1727292332184099>


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


Processing : <Blob: data88, OCR__RAG_based_chatbot.pdf, 1727292331901251>
# of documents = 18


--------------