# Project Documentation

**Author:** Md Reyad Hossain  
**Contact:**  
- Phone: 01768489220  
- Email: reyadhasan7254@gmail.com  

### Installation of Necessary Dependencies

In [1]:
!apt install -y poppler-utils tesseract-ocr
!pip install -q pytesseract pdf2image pillow pandas pdfplumber langchain sentence-transformers torch gradio pypdf faiss-cpu
!pip install -U langchain-community

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
tesseract-ocr is already the newest version (4.1.1-2.1build1).
The following NEW packages will be installed:
  poppler-utils
0 upgraded, 1 newly installed, 0 to remove and 35 not upgraded.
Need to get 186 kB of archives.
After this operation, 697 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 poppler-utils amd64 22.02.0-2ubuntu0.8 [186 kB]
Fetched 186 kB in 1s (234 kB/s)
Selecting previously unselected package poppler-utils.
(Reading database ... 126281 files and directories currently installed.)
Preparing to unpack .../poppler-utils_22.02.0-2ubuntu0.8_amd64.deb ...
Unpacking poppler-utils (22.02.0-2ubuntu0.8) ...
Setting up poppler-utils (22.02.0-2ubuntu0.8) ...
Processing triggers for man-db (2.10.2-1) ...
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[2K  

### Setting a Secret in Google Colab and Get the Secret

1. On the left sidebar, tap the ** key icon** (that’s the Secrets manager).
2. Hit **“+ Add new secret”**.
3. Give your secret a name — like `MY_SECRET`.
4. Type in the secret’s value.
5. Press **“Save”** to store it securely.

In [4]:
from google.colab import userdata
try:
    my_secret = userdata.get('HF_TOKEN')
    print("Secret retrieved successfully")
except userdata.SecretNotFoundError:
    print("Secret not found")
hf_token = my_secret
# print("your secret is : ",hf_token)

Secret retrieved successfully


### Import Necessary Dependencies

In [5]:
import os
import re
import cv2
import numpy as np
from PIL import Image
import pytesseract
from pdf2image import convert_from_path
import pdfplumber
import torch
from transformers import T5Tokenizer, T5ForConditionalGeneration, pipeline
from sentence_transformers import SentenceTransformer, util
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.docstore.document import Document
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.llms import HuggingFacePipeline
import gradio as gr
import pandas as pd

### Necessary function of pdf & image text extraction

In [6]:
def extract_text_from_pdf(file_path):
    try:
        with pdfplumber.open(file_path) as pdf:
            full_text = ""
            for page in pdf.pages:
                page_text = page.extract_text()
                if page_text:
                    full_text += page_text + "\f"
            return full_text.strip()
    except Exception as e:
        print(f"Error reading PDF: {str(e)}")
        return ""

def preprocess_image(image_path):
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"Image file not found: {image_path}")
    img = cv2.imread(image_path)
    if img is None:
        raise ValueError(f"Unable to read image: {image_path}")
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
    kernel = np.ones((1, 1), np.uint8)                                            # for morphological operation
    processed = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
    processed = cv2.convertScaleAbs(processed, alpha=1.5, beta=0)
    return processed

def extract_text_from_image(image_path):
    processed_img = preprocess_image(image_path)
    pil_img = Image.fromarray(processed_img)
    custom_config = r'--oem 3 --psm 6'
    return pytesseract.image_to_string(pil_img, config=custom_config)

### Task 1 - DOCUMENT TYPE CLASSIFIER

In [19]:
model = SentenceTransformer("all-MiniLM-L6-v2", use_auth_token=hf_token)
"""
I am using reference texts because the provided three data files are very small for training.
If I train a model with such limited data, it will definitely overfit. That’s why I am using these
reference texts after analyzing those three data samples. If I get a larger dataset with similar types
of data, it can be used for training, and after training, it can be used for classification.
The classification model should use BERT base uncased or DistilBERT model.
"""
reference_texts = {
    "Invoice": "This document is an invoice showing vendor name, invoice number, total amount and date.",
    "Bank Statement": "This is a bank statement listing transactions, account number, dates and balances.",
    "Money Receipt": "This is a money receipt showing a receipt number, total amount paid in cash, VAT breakdown,Tax,cash receipt, items sold, and payer contact details."
}

def classify_document_type(file_path):
    if file_path.lower().endswith('.pdf'):
        text = extract_text_from_pdf(file_path)
    elif file_path.lower().endswith(('.jpg', '.jpeg', '.png')):
        text = extract_text_from_image(file_path)
    else:
        return None
    doc_embedding = model.encode(text, convert_to_tensor=True) # Generate embedding for the extracted document text
    scores = {}
    for label, ref_text in reference_texts.items():
        ref_embedding = model.encode(ref_text, convert_to_tensor=True)
        similarity = util.pytorch_cos_sim(doc_embedding, ref_embedding) # Calculate cosine similarity
        scores[label] = similarity.item()
    predicted_label = max(scores, key=scores.get)
    return predicted_label

### Task -1  Test Document Type Classification
#### Important Notes for File Processing. Make sure the file uploded in google colab

1. **File Upload Requirements**:
   - Make sure to upload your files to Google Colab first
   - Use the file upload button in the Colab sidebar
   - Or drag-and-drop files into the file explorer panel

In [9]:
file_paths = [
    "/content/Bank Statement.pdf",
    "/content/Purchase Invoice.pdf",
    "/content/Purchase Receipt.jpg"
]

for path in file_paths:
    print(f"\n Processing: {path}")
    doc_type = classify_document_type(path)
    print(f" Predicted Document Type: {doc_type}")


 Processing: /content/Bank Statement.pdf
 Predicted Document Type: Bank Statement

 Processing: /content/Purchase Invoice.pdf
 Predicted Document Type: Invoice

 Processing: /content/Purchase Receipt.jpg
 Predicted Document Type: Money Receipt


### Necessary function Data Extraction from Document

In [10]:
def extract_bank_statement_data(file_path):
    transactions = []
    text = extract_text_from_pdf(file_path)
    account_match = re.search(r'Account Number / Name / Currency Code:\s*([\d-]+)', text)
    account_number = account_match.group(1) if account_match else "N/A"
    transaction_lines = re.finditer(
        r'(?P<date>\d{2}-[A-Za-z]{3}-\d{4})\s+'
        r'(?P<type>.+?)\s+'
        r'(?P<description>.*?)\s+'
        r'(?:[\d,]+\.\d{2}\s+){0,2}'
        r'(?P<balance>-?[\d,]+\.\d{2})',
        text
    )
    for match in transaction_lines:
        transactions.append({
            'Account Number': account_number,
            'Transaction Date': match.group('date'),
            'Description': f"{match.group('type')} , {match.group('description')}".strip(),
            'Amount': match.group('balance').replace(',', '')
        })
    return pd.DataFrame(transactions)


def extract_invoice_data(file_path):
    text = extract_text_from_pdf(file_path)
    normalized_text = ' '.join(text.split())
    inv_num_match = re.search(r'INVOICE\s+(\d+)', normalized_text)
    date_match = re.search(r'DATE\s+(\d{2}/\d{2}/\d{4})', normalized_text)
    subtotal_match = re.search(r'SUBTOTAL\s+([\d,]+\.\d{2})', normalized_text)
    vat_match = re.search(r'VAT TOTAL\s+([\d,]+\.\d{2})', normalized_text)
    total_amount = "N/A"
    if subtotal_match and vat_match:
        subtotal = float(subtotal_match.group(1).replace(',', ''))
        vat = float(vat_match.group(1).replace(',', ''))
        total_amount = f"{subtotal + vat:.2f}"
    else:
        total_match = re.search(r'(?:TOTAL|BALANCE DUE)\s+(?:GBP\s*)?([\d,]+\.\d{2})', normalized_text)
        if total_match:
            total_amount = total_match.group(1).replace(',', '')

    vendor_match = re.search(r'INVOICE TO(?:.*?INVOICE \d+)?\s+(.*?)\s+DATE', normalized_text)
    invoice_number = inv_num_match.group(1) if inv_num_match else "N/A"
    date = date_match.group(1) if date_match else "N/A"
    vendor_name = "N/A"
    if vendor_match:
        vendor_name = vendor_match.group(1).strip()
        vendor_name = re.sub(r'INVOICE\s+\d+|\d{2}/\d{2}/\d{4}', '', vendor_name).strip()

    return pd.DataFrame([{
        'Invoice Number': invoice_number,
        'Date': date,
        'Total Amount': total_amount,
        'Vendor Name': vendor_name
    }])


def extract_receipt_data(image_path):
    text = extract_text_from_image(image_path)
    receipt_num = "N/A"
    rf_match = re.search(r'RF RECEIPT No[:;]\s*([\w\s]+)', text)
    if rf_match:
        receipt_num = re.sub(r'[^\d\s]', '', rf_match.group(1)).strip()
        receipt_num = ' '.join(receipt_num.split())
    else:
        receipt_match = re.search(r'RECEIPT NO[:]?\s*((?:\d\s*){8,12})', text, re.IGNORECASE)
        if receipt_match:
            receipt_num = re.sub(r'\s', '', receipt_match.group(1))
        else:
            near_rf_match = re.search(r'(?:RF|RECEIPT).*?(\d{3}\s*\d\s*\d{3}\s*\d{3})', text, re.IGNORECASE)
            if near_rf_match:
                receipt_num = re.sub(r'\s', '', near_rf_match.group(1))

    date_match = re.search(r'RF (\d{2}/\d{2}/\d{4})', text) or \
                 re.search(r'DATE[:]?\s*(\d{2}/\d{2}/\d{4})', text) or \
                 re.search(r'(\d{2}/\d{2}/\d{4})\s*RF', text)
    date = date_match.group(1) if date_match else "N/A"

    cash_match = re.search(r'CASH\s*[\D]*([\d.]+)', text)
    change_match = re.search(r'CHANGE\s*[\D]*([\d.]+)', text)
    payment_amount = "N/A"
    if cash_match and change_match:
        try:
            payment_amount = f"{float(cash_match.group(1)) - float(change_match.group(1)):.2f}"
        except ValueError:
            pass
    payer_match = re.search(r'^(.*?)\n\s*SELF\s*EMPLOYED', text, re.MULTILINE | re.IGNORECASE)
    payer_name = payer_match.group(1).strip() if payer_match else "N/A"
    payer_name = re.sub(r'[^a-zA-Z\s]', '', payer_name).strip()
    return pd.DataFrame([{
        'Receipt Number': receipt_num,
        'Date': date,
        'Payment Amount': payment_amount,
        'Payer Name': payer_name
    }])

### Task - 2  Test Data Extraction from Document

This is a data extraction task. Three individual files are provided for extraction.  
Since each file contains data in different formats and patterns, I first categorize the data based on its structure.  
After categorization, I apply the appropriate extraction logic for each type.

In [12]:
def classify_and_extract(file_path):
    doc_type = classify_document_type(file_path)
    print(f" Predicted Document Type: {doc_type}")
    if doc_type == "Invoice":
        print(f" Invoice DATA:")
        return extract_invoice_data(file_path)
    elif doc_type == "Bank Statement":
        print(f" Bank Statement DATA:")
        return extract_bank_statement_data(file_path)
    elif doc_type == "Money Receipt":
        print(f" Money Receipt DATA:")
        return extract_receipt_data(file_path)
    else:
        print(" Unable to classify or extract data.")
        return pd.DataFrame()

file_paths = [
    "/content/Bank Statement.pdf",
    "/content/Purchase Invoice.pdf",
    "/content/Purchase Receipt.jpg"
]

for path in file_paths:
    print(f"\n Processing: {path}")
    result_df = classify_and_extract(path)
    if result_df.empty:
        print("  No data extracted.")
    else:
        print(result_df.to_string(index=False))


 Processing: /content/Bank Statement.pdf
 Predicted Document Type: Bank Statement
 Bank Statement DATA:
 Account Number Transaction Date                              Description   Amount
306541-18240268      31-Oct-2023                        Closing , Ledger:    45.17
306541-18240268      01-Nov-2023                Faster , AGP ACCOUNTS LTD -2054.83
306541-18240268      07-Nov-2023          Card , AMERICAN EXPRESS CD 1883 -3900.70
306541-18240268      08-Nov-2023 Miscellaneous , FT261678054501 LLINK TFR 11099.30
306541-18240268      09-Nov-2023             Overdraft , O/DRAFT INTEREST 10928.67
306541-18240268      10-Nov-2023 Miscellaneous , FT261763567501 LLINK TFR 23228.67
306541-18240268      13-Nov-2023 Miscellaneous , THE DRESSING ROOM 306541 13228.67
306541-18240268      13-Nov-2023 Miscellaneous , THE DRESSING ROOM 306541 12828.67
306541-18240268      13-Nov-2023 Miscellaneous , THE DRESSING ROOM 306541 12728.67
306541-18240268      14-Nov-2023             Charges-Fees , OVERD

### Task - 3 Document Query Answering
#### Necessary function Query Answering RAG pipeline

In [13]:
VECTOR_DB_DIR = "faiss_index"
CHUNK_SIZE = 100
CHUNK_OVERLAP = 50

def extract_text(file_path):
    ext = os.path.splitext(file_path)[1].lower()
    if ext == ".pdf":
        loader = PyPDFLoader(file_path)
        pages = loader.load()
        return "\n".join([p.page_content for p in pages])
    elif ext in [".jpg", ".jpeg", ".png"]:
        return extract_text_from_image(file_path)
    elif ext == ".txt":
        loader = TextLoader(file_path)
        pages = loader.load()
        return "\n".join([p.page_content for p in pages])
    else:
        raise ValueError(f"Unsupported file format: {ext}")

def clean_text(text):
    lines = text.splitlines()
    filtered = [line for line in lines if len(line.strip()) > 5 and not line.strip().startswith(("---", "Page"))]
    return "\n".join(filtered)

def split_text(text):
    splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    return [Document(page_content=chunk) for chunk in splitter.split_text(text)]

def create_vectorstore(docs):
    embedding = HuggingFaceEmbeddings(                           # Initialize embedding model
        model_name="sentence-transformers/all-MiniLM-L6-v2",
        model_kwargs={"device": "cpu"},
        encode_kwargs={"normalize_embeddings": False}
    )
    db = FAISS.from_documents(docs, embedding) # Create FAISS vector store f
    db.save_local(VECTOR_DB_DIR)               # Save vector store locally
    return db

def setup_qa_chain(vectorstore):
    tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-small", use_auth_token=hf_token)            # Load tokenizer for FLAN-T5 model
    model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-small", use_auth_token=hf_token) # Load FLAN-T5 model for text generation
    pipe = pipeline("text2text-generation", model=model, tokenizer=tokenizer, max_length=256)
    llm = HuggingFacePipeline(pipeline=pipe)
    prompt = PromptTemplate(
        input_variables=["context", "question"],
        template=(
            "You are a helpful assistant for financial documents.\n"
            "Based only on the following context, answer the question briefly.\n"
            "If the answer cannot be found, just respond with 'Not found'.\n\n"
            "Context:\n{context}\n\n"
            "Question: {question}\n"
            "Answer:"
        )
    )
    retriever = vectorstore.as_retriever(search_kwargs={"k": 1}) # Set up retriever using vector store with top 1 result
    return RetrievalQA.from_chain_type(
        llm=llm,
        retriever=retriever,
        chain_type="stuff",
        chain_type_kwargs={"prompt": prompt}
    )

### Test Query Answering RAG pipeline

In [14]:
doc_path = "/content/Bank Statement.pdf"  # Replace with your test file
print(f" Reading file: {doc_path}")
text = extract_text(doc_path)
clean = clean_text(text)
chunks = split_text(clean)
print(f" Split text into {len(chunks)} chunks.")
vectordb = create_vectorstore(chunks)
print(" Vector store created.")
qa_chain = setup_qa_chain(vectordb)
print(" QA pipeline ready.")
questions = [
    "What is the total amount?",
    "Who is the payer?",
    "When was the transaction made?",
    "Give me a summary of the transactions?"
]
for q in questions:
    print(f"\n Question: {q}")
    answer = qa_chain.run(q)
    print(f" Answer: {answer.strip()}")

 Reading file: /content/Bank Statement.pdf
 Split text into 1 chunks.


  embedding = HuggingFaceEmbeddings(


 Vector store created.




tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/308M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

Device set to use cpu
  llm = HuggingFacePipeline(pipeline=pipe)
  answer = qa_chain.run(q)
Token indices sequence length is longer than the specified maximum sequence length for this model (982 > 512). Running this sequence through the model will result in indexing errors


 QA pipeline ready.

 Question: What is the total amount?
 Answer: 599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.46 35,300.00 End of Report Ledger Balance 11,745.71 Totals 23,599.

 Question: Who is the payer?
 Answer: Lloyds Bank plc

 Question: When was the transaction made?
 Answer: 01-Nov-2023 to 01-Dec-2023 Ba

### Document Processing System - Gradio Web Interface
After running the code cell, you'll see:
Running on public URL: https://xxxxxx.gradio.live
Open the public url and use it in web interface


In [16]:
qa_chain = None

def process_file(file):
    file_path = file.name
    doc_type = classify_document_type(file_path)
    result = f"**Predicted Document Type**: `{doc_type}`\n\n"
    df = None
    if doc_type == "Invoice":
        df = extract_invoice_data(file_path)
    elif doc_type == "Bank Statement":
        df = extract_bank_statement_data(file_path)
    elif doc_type == "Money Receipt":
        df = extract_receipt_data(file_path)

    if df is not None and not df.empty:
        result += f"**Extracted Data:**\n\n{df.to_markdown(index=False)}"
    else:
        result += "No relevant data extracted."
    return result, gr.update(visible=True)

def prepare_qa_pipeline(file):
    global qa_chain
    file_path = file.name
    text = extract_text(file_path)
    cleaned = clean_text(text)
    chunks = split_text(cleaned)

    vectorstore = create_vectorstore(chunks)
    qa_chain = setup_qa_chain(vectorstore)
    return "✅ QA pipeline ready.", gr.update(visible=True)

def answer_question(question):
    if qa_chain is None:
        return "QA pipeline not ready."
    return f"**Answer:** {qa_chain.run(question).strip()}"

with gr.Blocks(css=".scroll-box { max-height: 300px; overflow-y: auto; }", title="AI-Based Document Classification and Extraction System") as demo:
    gr.Markdown("#  AI-Based Document Classification and Extraction System")
    gr.Markdown("Upload a document to classify, extract data, and ask questions.")

    with gr.Row():
        with gr.Column():
            file_input = gr.File(label="Upload PDF or Image", type="filepath", file_types=[".pdf", ".jpg", ".jpeg", ".png"], file_count="single")
            process_btn = gr.Button("Classify & Extract")
            rag_btn = gr.Button("Build QA Pipeline")
        with gr.Column():
            extraction_output = gr.Markdown("Awaiting file...", visible=True, elem_classes=["scroll-box"])
            qa_status_output = gr.Markdown("", visible=False)

    with gr.Row(visible=False) as qa_section:
        with gr.Column():
            question_input = gr.Textbox(label=" Ask a Question", placeholder="e.g. What is the total amount?")
            ask_btn = gr.Button(" Get Answer")
        with gr.Column():
            answer_output = gr.Markdown("")

    process_btn.click(fn=process_file, inputs=file_input, outputs=[extraction_output, qa_status_output])
    rag_btn.click(fn=prepare_qa_pipeline, inputs=file_input, outputs=[qa_status_output, qa_section])
    ask_btn.click(fn=answer_question, inputs=question_input, outputs=answer_output)

demo.launch(debug=True)


It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://e35eac724dba0f4b95.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://e35eac724dba0f4b95.gradio.live


