In [None]:
# Created on April 2024 by Go Reply (based on Google's resources)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipeline for Tabular Data Extraction
*MLOps Pipeline control structures using [the Kubeflow Pipelines (KFP) SDK](https://www.kubeflow.org/docs/components/pipelines/v2/)

### Objective

This pipeline evaluates accuracy of Gen AI for tabular data extraction.
It uses the following Google Cloud ML services:
- Gemini API
- Document AI
- Vertex AI Pipelines

### Costs

Billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Estimated costs to be calculated using: the [Pricing
Calculator](https://cloud.google.com/products/calculator/)

*For more details on [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing)

## Installation

Install the packages required for executing this notebook.

In [10]:
! pip3 install --upgrade --quiet google-cloud-aiplatform  \
                                 google-cloud-storage \
                                 kfp \
                                 PyMuPDF \
                                 google-cloud-pipeline-components \
                                 tensorflow-metadata \
                                 pypdf2 \
                                 json5 \
                                 google-cloud-documentai==2.22.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m29.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.8/15.8 MB[0m [31m62.3 MB/s[0m eta [36m0:00:00[0m
[?25h

### Colab only: Uncomment the following cell to restart the kernel.

In [None]:
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

## Before you begin

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager).

2. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

3. [Enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com)

4. If you are running this notebook locally, you need to install the [Cloud SDK](https://cloud.google.com/sdk).

#### Set your project ID

**If you don't know your project ID**, try the following:
* Run `gcloud config list`.
* Run `gcloud projects list`.
* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)

In [22]:
PROJECT_ID = "gen-ai-sandbox"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


#### Region

You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [23]:
REGION = "europe-west2"  # @param {type: "string"}

### Authenticate your Google Cloud account

Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.

**1. Vertex AI Workbench**
* Do nothing as you are already authenticated.

**2. Local JupyterLab instance, uncomment and run:**

In [None]:
# ! gcloud auth login

**3. Colab, uncomment and run:**

In [None]:
#from google.colab import auth
#auth.authenticate_user()

**4. Service account or other**
* See how to grant Cloud Storage permissions to your service account at https://cloud.google.com/storage/docs/gsutil/commands/iam#ch-examples.

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_URI = "gs://ratecards-eval-gen-ai-sandbox-unique"  # @param {type:"string"}

**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

In [None]:
SERVICE_ACCOUNT = "230294883006-compute@developer.gserviceaccount.com"  # @param {type:"string"}

In [None]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://ratecards-eval-gen-ai-sandbox-unique/
No changes made to gs://ratecards-eval-gen-ai-sandbox-unique/


### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [17]:
import json
import json5
import fitz
import base64
import vertexai
from vertexai.generative_models import GenerativeModel, Part, FinishReason, Image
import vertexai.preview.generative_models as generative_models
from google.cloud import storage
from google.cloud import bigquery
from PyPDF2 import PdfReader, PdfWriter
import os
import re
import time
import io
import pandas as pd
from google.api_core.client_options import ClientOptions
from google.cloud import documentai as docai
from typing import Optional, Sequence, MutableSequence, Tuple
#from google.cloud import documentai_v1beta3 as docai

import google.cloud.aiplatform as aip
from kfp import compiler, dsl
from kfp.dsl import component, Metrics, Output, Artifact, ClassificationMetrics, Input
from google.cloud import bigquery


# Initial Setup
#################
DATASET_ID = "ratecard_extraction"
destination_table = "extracted_ratecards"
#__________________
mime_type ="application/pdf"
processor_display_name = "rate_card_extractor"
processor_id ="69046f38bcccfa47"

#### Vertex AI Initialisation

Setup up the Gen AI model from Vertex AI:

In [24]:
vertexai.init(project=PROJECT_ID, location="us-central1")

model = GenerativeModel("gemini-1.5-pro-001") # Only in us-central1 - ("gemini-experimental")
extraction_model = GenerativeModel("gemini-1.5-pro-001")

### Create you own Data Extraction Function!

In [29]:
## Write your own prompt to Extract Structured data from the document

my_extraction_prompt = ""

def extract_pdf_content(pdf_path,prompt, temperature=1):
 # pdf_file = Part.from_uri(pdf_path,mime_type="application/pdf")
 with open(pdf_path, "rb") as f:
    pdf_file = Part.from_data(data=f.read(), mime_type="application/pdf")

 generation_config = {
   "max_output_tokens": 8192,
   "temperature": temperature,
   "top_p": 0.95,
 }
 content = [pdf_file, prompt]
 responses= model.generate_content(content, generation_config=generation_config)
 return responses.text

extract_pdf_content("Sample-SoW.pdf", my_extraction_prompt)

'The document is a Work Order for IT Professional services between Pluto Telecom and Alphatech (UK) Limited. \n\n**Key details of the Work Order:**\n\n* **Project Name:** Project 20 - Requirement to Update Discount IDs\n* **Project Description:** Pluto Telecom is launching a new portfolio of plans with unlimited data and tiered network speeds. Alphatech is tasked with creating 3 new business plans and 6 new consumer plans on the Legacy system. These new plans will replace existing equivalent plans.\n* **Project Timeline:** The project will run from April 24th, 2020 to April 30th, 2020.\n* **Pricing Model:** Fixed Price \n* **Total Price:** €2,186.10\n\n**Other notable information:**\n\n* This Work Order is not legally binding until the corresponding Purchase Order is accepted.\n* The Pluto Telecom Procurement Agreement governs this Work Order and can only be changed through a formal Variation Agreement.\n\nThis document outlines the scope of work, timeline, and payment terms for the pr

# Extracting Rate Cards

### Prompt Engineering

Set up the necessary prompts for the extraction of tabular information

In [30]:
structure_prompt= """Objective: Accurately identify all rate card tables within the provided PDF document that deal with financial information, and extract formatting details for full reliable table boundary detection. The output must be a strictly valid JSON format.**Avoid using double quotes within values; instead, use single quotes or escape them.**

Instructions:

Table Recognition:
*   Only look specifially for rate card tables, where the table deals with financial or pricing information
*   Do not return information for tables that are not explicitly rate cards
*   Do not return any information about tables that do not contain financial information
*   Data Presentation: Identify patterns like rows and columns (e.g., Role, Rate GBP/USD) and repeating elements (e.g., job titles and corresponding rates).
*   Multi-page Tables: Check for consistency in headers and formatting across page breaks to identify tables spanning multiple pages. **If the table spans till the end of the page, examine the beginning of the following page to determine if it's a continuation of the same table.** If tables are spanning multiple pages they are still considered one single table with multiple page numbers in table_exists_on_pages.
*   Continuation Clues:
    *   **Page Break:** Check if the table is continued on the next page.
    *   **Table Header:** Check if the table on the next page has a header row if not it may be a continuation of the same table.
    *   **Formatting Consistency:** Look for consistent formatting (e.g., borders, font styles, column alignment) between the end of the previous page and the start of the next page.
    *   **Content Flow:** Analyze if the content flows naturally from the previous page's table to the next page, indicating a continuation rather than a new table.
*   Contextual Understanding:
    *   Document Title and Introduction: Consider the document's title and any introductory information for clues about the types of rate structures expected.

Table Format Description:
*   **Table Start**: Identify the starting point of each table by recognizing patterns like:
    *   **Keywords**: Look for keywords preceding tables such as "Rate Card," "Pricing Table," "Fee Schedule," etc.
    *   **Visual Cues**: Detect visual cues like lines, borders, or changes in font style/size that often mark the beginning of a table.
*   **Table End**: Determine the end point of each table by identifying:
    *   **Subsequent Content**: Look for changes in content or formatting that indicate the transition from table to non-table elements (e.g., paragraphs, headings).
    *   **Last Row Value**: Extract the value of the first column in the final row of the table. This can help distinguish the table's end, especially in cases where formatting changes are subtle.
*   **Structure**: Describe the overall structure of the table, including:
    *   **Column Names**: Identify the names of all the columns in a certain table.
    *   **Column Count**: Identify the total count of columns - do not take the rows into account.
    *   **Rows Count**: Identify the number of rows.
    *   **Headers**: Identify the presence of headers and their location (e.g., first row, first column).

table_exists_on_pages : This is the document page number(s) where the tables exist so for example if the table starts on page 4 and ends on page 5 then table_exists_on_pages = [4,5]. This cannot be empty. This is extracted from the document's metadata not the page document printed. or shown in the document. Ensure that the table does not continue to the next page before assigning this value.
first_page_table_appears: This is the document page number that the table starts on.
last_page_table_appears: This is the document page number that the table ends on.



If no tables are detected, return an empty JSON array: [].
Output strictly in JSON format.

Output Structure (Ensure strict valid JSON format. Do not use quotes within values):
[
  {
    "table_title": "APPENDIX 3-B1 (RATE CARD AND HOURLY RATES)",
    "table_exists_on_pages": "[1,2]",
    "first_page_table_appears": 1,
    "last_page_table_appears": 2,
    "Description": "This table appears to showcase the sales figures for different products across various regions and spans across 2 pages.",
    "format": {
      "start": "Starts after the heading: APPENDIX 3-B1 (RATE CARD AND HOURLY RATES)",
      "end": "Ends before the page break",
      "structure": {
        "columns_names": "Column Names: product_name, region, rate",
        "columns_count": 3,
        "rows_count": 11,
        "headers": "header row with bold text"
      },
      "last_row_first_column_value": "Junior Strategist"
    }
  },
  {
    "table_title": "Terms and Revenue",
    "table_exists_on_pages": "[2]",
    "first_page_table_appears": 2,
    "last_page_table_appears": 2,
    "Description": "This table appears in the bottom of the page to showcase the revenue of different products across various regions.",
    "format": {
      "start": "Starts after the heading: Terms and Revenue",
      "end": "Ends before the page break",
      "structure": {
        "columns_names": "Column Names: product_name, price",
        "columns_count": 2,
        "rows_count": 13,
        "headers": "header row with bold text"
      },
      "last_row_first_column_value": "Net Revenue"
    }
  },
  # ... (similar dictionaries for other tables) ...
]
"""

def get_extraction_prompt(table_data, start, end):
  return """
You are a business analyst that should extract data from a rate card table within a pdf document and output it as a specified structured schema.

The pdf contains the following rate card table:
""" + str(table_data) + """
You must extract the data from the rate card table only, ignoring any other tables and information. Do not include any additional explanations or text, and only output the JSON as shown in the hypothetical example output. Do not include any values, text, or data that are not explicitly found in the document.
Begin table parsing and only consider data from after the "start" location above: """ + str(start) + """
End table parsing and do not consider any data after the "end" location above: """ + str(end) + """
Ignore any other tables or data that are not within these two locations, unless you encounter additional rows or columns that belong to the same table. Do not include thousands-separator commas in numerical values.
Extract empty cells as null values and include this in the output.

**Schema:**
{
 "Rate Card": {
 "type": "object",
 "properties": {
  "Rate Card Category": {
  "type": "string",
  "description": "Overall rate card title / purpose / category",
  "occurrence": "Optional Once"
  },
  "Rate Card Total Price": {
  "type": "number",
  "description": "Overall cost of the entire rate card, often presented at the end of the table as a total or summation",
  "occurrence": "Optional Once"
  },
  "Line Item": {
  "type": "array",
  "items": {
   "type": "object",
   "properties": {
   "Service": {
    "type": "string",
    "description": "The Service/Product that is being detailed and described in the rate line",
    "occurrence": "Optional Once"
   },
   "Function": {
    "type": "string",
    "description": "Service function / detail - describes the specific instance, usage, or description of the service / product",
    "occurrence": "Optional Once"
   },
   "Prices": {
    "type": "array",
    "price": {
      "type": "object",
      "properties": {
        "Tier Category": {
          "type": "string",
          "description": "Top level tier category, in the case where a rate line has multiple prices for different tiers of the same item, for example due to different tiered pricing, different project types, or different categories. May be null if only a single tier.",
          "occurrence": "Optional Once"
        },
        "Tier Subcategory": {
          "type": "string",
          "description": "Sub tier category to allow for 2nd level of tiered price structure (e.g. Standard - SD vs Standard - HD).",
          "occurrence": "Optional Once"
        },
        "Cost per unit": {
          "type": "number",
          "description": "Cost or price of rate card line item, in number format with no commas",
          "occurrence": "Optional Once"
        },
           "Location": {
        "type": "string",
        "description": "Location / country / jurisdiction that the line item relates to.",
        "occurrence": "Optional Once"
        },
        "Currency": {
        "type": "string",
        "description": "Currency of the line item price.",
        "occurrence": "Optional Once"
   },
  }
  }
  },
  "Quantity or Unit": {
    "type": "string",
    "description": "Quantity of service/product that the rate card price is describing - This may be written in String format where only pure unit is specified, e.g. 'per license instance', or as a numerical value e.g. '4 FTEs'",
    "occurrence": "Optional Once"
  },
  "Line Total Cost": {
    "type": "number",
    "description": "Total price for the whole line (will often be quantity * cost per unit, and is mostly relevant for larger rate cards)",
    "occurrence": "Optional Once"
  }
  }
  }
  }
 }
 }
}



Example Output (Hypothetical):
{
"Rate Card": {
"Rate Card Category": "Personnel Rates for Unilever",
"Line Item": [
{
"Service": "Car Rental",
"Function": "Company car usage and rental",
"Prices": [
{
"Cost per unit": 400,
"Tier Category": "Bronze",
"Currency": "SAR"
},
{
"Cost per unit": 500,
"Tier Category": "Silver",
"Currency": "SAR"
}
],
"Quantity or Unit": "per day"
},
{
"Service": "Software Engineer",
"Function": "Software development",
"Prices": [
{
"Cost per unit": 300,
"Tier Category": "Basic",
"Currency": "GBP"
},
{
"Cost per unit": 900,
"Tier Category": "Advanced",
"Currency": "GBP"
}
],
"Quantity or Unit": "per FTE"
},
{
"Service": "Laptops",
"Function": "Hardware for use on project",
"Prices": [
{
"Cost per unit": 4500,
"Tier Category": "Project 1",
"Tier Subcategory": "Simple",
"Currency": "USD"
},
{
"Cost per unit": 6700,
"Tier Category": "Project 1",
"Tier Subcategory": "Complex",
"Currency": "USD"
}

],
"Quantity or Unit": "1"
},
]
}
}

*Note the field "Quantity or Unit" is outside of the price array

PDF Input:
  """

def get_array_extraction_prompt(table_data):
  start = table_data["format"]["start"]
  end = table_data["format"]["end"]
  return """
You are a business analyst that should extract data from a rate card table within a pdf document and output it as a specified structured schema.

The pdf contains the following rate card table:
""" + str(table_data) + """
You must extract the data from the rate card table only, ignoring any other tables and information. Do not include any additional explanations or text, and only output the JSON as shown in the hypothetical example output. Do not include any values, text, or data that are not explicitly found in the document.
Begin table parsing and only consider data from after the "start" location above: """ + str(start) + """
End table parsing and do not consider any data after the "end" location above: """ + str(end) + """
Ignore any other tables or data that are not within these two locations, unless you encounter additional rows or columns that belong to the same table. Do not include thousands-separator commas in numerical values.
This table has around """+str(rows)+ """rows.
I would like to chunk this table into """+str(rc_chunks)+""" chunks, return as an array of the start of the content of the first column for each of those chunks .
Provide an answer that is grounded in the pdf with no hallucinations.Answer output is then formatted and presented as an array.
"""

### Helper Functions:

Define helper functions for:
- Metadata extraction
- Document splitting
- DocAI-based Processing
- Gemini API Processing
- JSON Parsing
- Ground Truth Checks
- BQ Upload


In [36]:
#Initial Gemini Call to get the structure from the PDF
def extract_pdf_content(pdf_path,prompt, temperature=1):
 #pdf_file = Part.from_uri(pdf_path,mime_type="application/pdf")
 with open(pdf_path, "rb") as f:
    pdf_file = Part.from_data(data=f.read(), mime_type="application/pdf")

 generation_config = {
   "max_output_tokens": 8192,
   "temperature": temperature,
   "top_p": 0.95,
 }
 content = [pdf_file, prompt]
 responses= model.generate_content(content, generation_config=generation_config)
 return responses.text

def parse_json_from_gemini_output(output, filename, rate_card_index):
  if output.startswith("```json"):
    start_marker = "```json"
    end_marker = "```"
    start_index = output.find(start_marker) + len(start_marker)
    end_index = output.find(end_marker, start_index)
    json_string = output[start_index:end_index].strip()
  else:
    json_string = output.strip()
  json_string = json_string.replace('\\"', '"')  # Replace escaped quotes
  json_string = json_string.replace("Quantity/Unit","Quantity or Unit")
  extracted_json = json5.loads(json_string)
  extracted_json["Rate Card"]["Contract ID"] = filename
  extracted_json["Rate Card"]["Rate Card Index"] = rate_card_index
  #upload_to_bq(extracted_json,"rate_card_v1")
  return extracted_json

def parse_json_from_markdown(markdown_text):
  """Extracts and parses JSON data from markdown code blocks, returning a list of dictionaries with extracted table information."""
  if markdown_text.startswith("```json"):
    start_marker = "```json"
    end_marker = "```"
    start_index = markdown_text.find(start_marker) + len(start_marker)
    end_index = markdown_text.find(end_marker, start_index)
    json_string = markdown_text[start_index:end_index].strip()
  else:
    json_string = markdown_text.strip()
  json_string = json_string.replace('\\"', '"')  # Replace escaped quotes
  table_data = json5.loads(json_string)

  extracted_tables = []
  rate_card_index = 1
  for table in table_data:
    extracted_table = {
        "rate_card_index": rate_card_index,
        "table_title": table["table_title"],
        "table_exists_on_pages": table["table_exists_on_pages"],
        "first_page_table_appears": table["first_page_table_appears"],
        "last_page_table_appears": table["last_page_table_appears"],
        "description": table["Description"],
        "format": {
            "start": table["format"]["start"],
            "end": table["format"]["end"],
            "structure": table["format"]["structure"],
            "last_row_first_column_value": table["format"]["last_row_first_column_value"],
        },
    }
    extracted_tables.append(extracted_table)
    rate_card_index += 1

  return extracted_tables

def check_for_ground_truth(filename, index):
    client = bigquery.Client()
    # Get max contract_id and iterate by 1
    query = f"""
        SELECT count(*) as row_count
        FROM `{PROJECT_ID}.{DATASET_ID}.ground_truth`
        WHERE `Rate Card`.`Contract ID` = '{filename}'
        AND `Rate Card`.`Rate Card Index` = '{index}'
    """
    query_job = client.query(query)
    results = query_job.result()
    for row in results:
      if row["row_count"] == 0:
        return False
      else:
        return True

def split_pdf_to_gcs(gcs_bucket_name, gcs_blob_name, parsed_data, gcs_output_bucket):
    storage_client = storage.Client()
    bucket = storage_client.bucket(gcs_bucket_name)
    blob = bucket.blob(gcs_blob_name)

    # Download PDF from GCS to a temporary local file
    temp_pdf_file = f"/tmp/{gcs_blob_name}"
    blob.download_to_filename(temp_pdf_file)

    with open(temp_pdf_file, "rb") as f:
        pdf_reader = PdfReader(f)

        for table_data in parsed_data:
            output_blob_name = f"{table_data['rate_card_index']}.pdf"
            pdf_writer = PdfWriter()

            # Handle page ranges correctly
            if len(table_data['table_exists_on_pages']) == 1:
                pdf_writer.add_page(pdf_reader.pages[0])
            else:
                for page_range in table_data['table_exists_on_pages'].strip('[]').split(','):
                    if '-' in page_range:
                        start, end = map(int, page_range.split('-'))
                        for page_num in range(start, end+1):
                            pdf_writer.add_page(pdf_reader.pages[page_num - 1])
                    else:
                        page_num = int(page_range)
                        pdf_writer.add_page(pdf_reader.pages[page_num - 1])

            # Write PDF to a BytesIO buffer
            buffer = io.BytesIO()
            pdf_writer.write(buffer)

            # Upload buffer to GCS output bucket
            output_bucket = storage_client.bucket(gcs_output_bucket)
            output_blob = output_bucket.blob(output_blob_name)
            buffer.seek(0)
            output_blob.upload_from_file(buffer, content_type='application/pdf')
            #print("Uploaded to GCS")
    # Delete the temporary local PDF file
    os.remove(temp_pdf_file)

def split_pdf_locally(pdf_path, parsed_data):

  with open(pdf_path, "rb") as f:
        pdf_reader = PdfReader(f)

        for table_data in parsed_data:
            output_blob_name = f"{table_data['rate_card_index']}.pdf"
            pdf_writer = PdfWriter()

            # Handle page ranges correctly
            if len(table_data['table_exists_on_pages']) == 1:
                pdf_writer.add_page(pdf_reader.pages[0])
            else:
                for page_range in table_data['table_exists_on_pages'].strip('[]').split(','):
                    if '-' in page_range:
                        start, end = map(int, page_range.split('-'))
                        for page_num in range(start, end+1):
                            pdf_writer.add_page(pdf_reader.pages[page_num - 1])
                    else:
                        page_num = int(page_range)
                        pdf_writer.add_page(pdf_reader.pages[page_num - 1])

            # Write PDF to a file
            with open(output_blob_name, 'wb') as output_file:
                pdf_writer.write(output_file)

def extract_row_col(text):
    """Extracts the number of columns and rows from the structure text."""
    #pattern = r"(\d+) columns?, (\d+) rows?"
    #match = re.search(pattern, text)
    #if match:
    #    columns, rows = match.groups()
    #    return int(columns), int(rows)
    #else:
    #    return None, None
    columns = text["columns_count"]
    rows = text["rows_count"]
    return int(columns), int(rows)

def count_page_references(text):
    """Counts the number of page references within square brackets."""
    pattern = r"\[(\d+(?:,\d+)*)\]"  # Matches numbers and comma-separated numbers within brackets
    match = re.findall(pattern, text)
    if match:
        return sum(len(ref.split(",")) for ref in match)  # Count individual numbers in each reference
    else:
        return 0

def empty_gcs_bucket(bucket_name):
  """Empties a Google Cloud Storage bucket.

  Args:
    bucket_name: Name of the bucket to empty.
  """
  storage_client = storage.Client()
  bucket = storage_client.bucket(bucket_name)

  # List blobs in the bucket
  blobs = list(bucket.list_blobs())

  # Delete each blob
  for blob in blobs:
    blob.delete()
  #print(f"Bucket {bucket_name} has been processsed successfully and now is emptied.")


def extract_pdf_content_stream(pdf_path,prompt,temperature=0.1):
 # pdf_file = Part.from_uri(pdf_path,mime_type="application/pdf")
 with open(pdf_path, "rb") as f:
    pdf_file = Part.from_data(data=f.read(), mime_type="application/pdf")


 generation_config = {
   "max_output_tokens": 8192,
   "temperature": temperature,
   "top_p": 0.95,
 }
 content = [pdf_file, prompt]
 responses= extraction_model.generate_content(content, generation_config=generation_config, stream = True)
 output = ""
 for response in responses:
   output = output + response.text
   print(response.text, end="") #TODO: Do something with this data (e.g. store in BQ, output as a JSON to GCS)
 return output

def extract_array_content(table_data, pdf_path, rc_chunks):

  prompt = get_array_extraction_prompt(table_data)
  #pdf_file = Part.from_uri(pdf_path,mime_type="application/pdf")
  with open(pdf_path, "rb") as f:
    pdf_file = Part.from_data(data=f.read(), mime_type="application/pdf")


  generation_config = {
    "max_output_tokens": 8192,
    "temperature": temperature,
    "top_p": 0.95,
  }
  content = [pdf_file, prompt]
  responses= extraction_model.generate_content(content, generation_config=generation_config)
  return responses.text


### Data Extraction

#### File Selection & Metadata Extraction

In [34]:
pdf_path = "Sample-SoW.pdf"

max_retries = 5
retry_count = 0
parsed_data = None

while retry_count < max_retries and parsed_data is None:
   try:
     if (retry_count >= 3):
       temperature = 1.25
     else:
       temperature = 0.8

     answer = extract_pdf_content(pdf_path, structure_prompt, temperature)
     print("Gemini's Extracted Structure:", answer)
     parsed_data = parse_json_from_markdown(answer)
     print("Found " + str(len(parsed_data)) + " rate card tables")
   except Exception as e:
     print(f"Error during parsing: {e}")
     retry_count += 1
     time.sleep(5)

if parsed_data is None:
   print("Failed to parse data even after all retries.")

Gemini's Extracted Structure: ```json
[
  {
    "table_title": null,
    "table_exists_on_pages": [
      7
    ],
    "first_page_table_appears": 7,
    "last_page_table_appears": 7,
    "Description": "This table represents a rate card outlining the cost breakdown for a project, detailing roles, applicable day rates, estimated man-days, discounts, and subtotals.",
    "format": {
      "start": "Starts before the heading: X Fixed Price",
      "end": "Ends after the row with values: Price  € 2,082.00",
      "structure": {
        "columns_names": "Class, Role Description, Name of the person\n(not mandatory), Location, Applicable Day Rate in agreed currency, Estimated Mandays, Discount (%), Sub Total",
        "columns_count": 8,
        "rows_count": 2,
        "headers": "header row with bold text"
      },
      "last_row_first_column_value": "Price"
    }
  }
]
```
Found 1 rate card tables


#### Split document for each rate card

In [37]:
# empty_gcs_bucket("processing-rate-cards")
# gcs_output_bucket = "processing-rate-cards"
# split_pdf_to_gcs(gcs_bucket_name, gcs_blob_name, parsed_data, gcs_output_bucket)
split_pdf_locally(pdf_path, parsed_data)

print("Split PDF files have been generated for each rate card")

Split PDF files have been generated for each rate card


In [39]:
max_retries = 5
retry_count = 0
output = None
filename = pdf_path

outputs = []
for table_info in parsed_data:
    output = None
    structure = table_info['format']['structure']
    title = table_info['table_title']
    index = table_info['rate_card_index']
    columns, rows = extract_row_col(structure)
    page_refs = table_info['table_exists_on_pages']
    # num_pages = count_page_references(page_refs)
    # print(f"Columns: {columns}, Rows: {rows} , Number of pages: {num_pages} , title = {title}.pdf" )
    try:
      file_path = str(index)+".pdf"
    except:
      file_path = "None.pdf"
    print("Extracting rate card table " + str(index))
    file_path = "gs://processing_rate_cards/"+file_path
    # Changes done here
    if rows>=25:
      print("Chunking Needed")
      rc_chunks = (rows -1) // 21 # chunking for every twenty rows
      array= extract_array_content(table_info, pdf_path, rc_chunks)
      start_marker = "```json"
      end_marker = "```"
      start_index = array.find(start_marker) + len(start_marker)
      end_index = array.find(end_marker, start_index)
      array_json_string = array[start_index:end_index].strip()
      array_json_string = json5.loads(array_json_string)
      start = table_info["format"]["start"]
      end = table_info["format"]["end"]
      array_json_string.append(end)
      array_json_string[0] =start
      print(array_json_string)
      for i in range(len(array_json_string) -1):
        start = array_json_string[i]
        end=array_json_string[i+1]
        prompt = get_extraction_prompt(table_info,start,end)
        while retry_count < max_retries and output is None:
          try:
            if (retry_count >= 3):
              temperature = 0.5
            else:
              temperature = 0.1
            output = extract_pdf_content_stream(pdf_path, prompt, temperature)
            parsed_json_output = parse_json_from_gemini_output(output,gcs_blob_name,index)
            # outputs.append(parsed_json_output)
            # #print()
            # print(json.dumps(parsed_json_output, indent=4))
            # print("Extraction complete")
          except Exception as e:
            print(f"Error during parsing: {e}")
            retry_count += 1
            time.sleep(5)
          if parsed_data is None:
            print("Failed to parse data even after all retries.")

    else:
      print("normal process")
      start = table_info["format"]["start"]
      end = table_info["format"]["end"]
      prompt = get_extraction_prompt(table_info,start,end)
      while retry_count < max_retries and output is None:
        try:
          if (retry_count >= 3):
            temperature = 0.5
          else:
            temperature = 0.1
          output = extract_pdf_content_stream(pdf_path, prompt, temperature)
          parsed_json_output = parse_json_from_gemini_output(output,filename,index)
          outputs.append(parsed_json_output)
          #print()
          print(json.dumps(parsed_json_output, indent=4))
          print("Extraction complete")
        except Exception as e:
          print(f"Error during parsing: {e}")
          retry_count += 1
          time.sleep(5)
        if parsed_data is None:
          print("Failed to parse data even after all retries.")

      print("Processed Table:")
      print(parsed_json_output)


Extracting rate card table 1
normal process
```json
{
 "Rate Card": {
  "Rate Card Category": "X Fixed Price",
  "Rate Card Total Price": 2082.00,
  "Line Item": [
   {
    "Service": null,
    "Function": null,
    "Prices": [
     {
      "Tier Category": null,
      "Tier Subcategory": null,
      "Cost per unit": 694,
      "Location": "Ireland,\nItaly,\nIndia",
      "Currency": "€"
     }
    ],
    "Quantity or Unit": "3",
    "Line Total Cost": 2082.00
   }
  ]
 }
}
```{
    "Rate Card": {
        "Rate Card Category": "X Fixed Price",
        "Rate Card Total Price": 2082.0,
        "Line Item": [
            {
                "Service": null,
                "Function": null,
                "Prices": [
                    {
                        "Tier Category": null,
                        "Tier Subcategory": null,
                        "Cost per unit": 694,
                        "Location": "Ireland,\nItaly,\nIndia",
                        "Currency": "\u20ac"
    

### Additional Options to improve detection

Increase Resolution of the image:

In [None]:
# Defines the Generative Models Configuration
generation_config = {
    "max_output_tokens": 8192,
    "temperature": 0,
    "top_p": 0.95,
}

# Loading Gemini Pro Vision Model
multimodal_model = GenerativeModel(
    "gemini-1.5-pro-001", generation_config=generation_config
)

def split_pdf_extract_data(pdfList, folder_uri):
    # To get better resolution
    zoom_x = 4.0  # horizontal zoom
    zoom_y = 4.0  # vertical zoom
    mat = fitz.Matrix(zoom_x, zoom_y)  # zoom factor 2 in each dimension

    for indiv_Pdf in pdfList:
        doc = fitz.open(indiv_Pdf)  # open document
        for page in doc:  # iterate through the pages
            pix = page.get_pixmap(matrix=mat)  # render page to an image
            outpath = f"{folder_uri}{indiv_Pdf}_{page.number}.png"
            pix.save(outpath)  # store image as a PNG

    # Define the path where images are located
    image_names = os.listdir(folder_uri)
    Max_images = len(image_names)

    # Create empty lists to store image information
    page_source = []
    page_content = []
    page_id = []

    p_id = 0  # Initialize image ID counter
    rest_count = 0  # Initialize counter for error handling

    while p_id < Max_images:
        try:
            # Construct the full path to the current image
            image_path = folder_uri + image_names[p_id]

            # Load the image
            image = Image.load_from_file(image_path)

            # Generate prompts for text and table extraction
            prompt_text = "Extract all text content in the image"
            prompt_table = (
                "Detect table in this image. Extract content maintaining the structure"
            )
            prompt_image = "Detect images in this image. Extract content in the form of alternative text or subtitles to each sub-image"

            # Extract text using your multimodal model
            contents = [image, prompt_text]
            response = multimodal_model.generate_content(contents)
            text_content = response.text

            # Extract table using your multimodal model
            contents = [image, prompt_table]
            response = multimodal_model.generate_content(contents)
            table_content = response.text

            # Extract information from images (i.e. Subtitle / Alternative text). | Currently Disabled
            # contents = [image, prompt_image]
            # response = multimodal_model.generate_content(contents)
            # image_content = response.text

            # Log progress and store results
            print(f"processed image no: {p_id}")
            page_source.append(image_path)
            page_content.append(
                text_content + "\n" + table_content
            )  # + "\n" + image_content)
            page_id.append(p_id)
            p_id += 1

        except Exception as err:
            # Handle errors during processing
            print(err)
            print("Taking Some Rest")
            time.sleep(
                12
            )  # Pause execution for 12 second due to default Quota for Vertex
            rest_count += 1
            if rest_count == 5:  # Limit consecutive error handling
                rest_count = 0
                print(f"Cannot process image no: {image_path}")
                p_id += 1  # Move to the next image

    # Create a DataFrame to store extracted information
    df = pd.DataFrame(
        {"page_id": page_id, "page_source": page_source, "page_content": page_content}
    )
    del page_id, page_source, page_content  # Conserve memory
    df.head()  # Preview the DataFrame

    return df

my_list_pdfs = ["rate_card_02.png"]
my_folder_url = "./images/"

upgraded_pdf = split_pdf_extract_data(my_list_pdfs, my_folder_url)

[Errno 21] Is a directory: 'images/.ipynb_checkpoints'
Taking Some Rest


# Vertex AI Pipelines

Setup up the following constants for Vertex AI Pipelines:

In [None]:
PIPELINE_ROOT = "{}/pipeline_root/control".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Define pipeline components

The following example defines three simple pipeline components:

- A component that process the pdf documents.
(Note: This component requires an `import json` in the component function definition)
- A component that gets the relevant ground truth for each extracted table
- A component that leverages the extracted output as well as the ground truth to compute accuracy, precission, recall and F1 score.

In [None]:
@dsl.component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-storage"],
    output_component_file="contract_import.yaml"
)
def contract_import() -> list:
    # Fetching contract from gcs
    from google.cloud import storage
    storage_client = storage.Client()
    contracts = []
    blobs = storage_client.list_blobs("ratecards-eval-gen-ai-sandbox-unique")
    for blob in blobs:
        if blob.name.endswith(".pdf") and not "/" in blob.name:
            print(blob.name)
            contracts.append(blob.name)
    return contracts

@component (
    base_image="python:3.9",  # Ensure Python 3.9 for consistency
    packages_to_install=[
        "google-cloud-aiplatform",
        "vertexai",
        "google-cloud-storage",
        "json5"
        ],
    output_component_file="table_details_extraction.yaml",
)
def table_details_extraction(document: str) -> list:
    # Imports needed for this function
    import json5
    import time
    import vertexai
    from vertexai.generative_models import GenerativeModel, Part, FinishReason
    import vertexai.preview.generative_models as generative_models
    from google.cloud import storage

    PROJECT_ID ="gen-ai-sandbox"
    LOCATION = "us-central1"
    # Prompts
    #########
    structure_prompt= """Objective: Accurately identify all tables within the provided PDF document, and extract formatting details for full reliable table boundary detection. The output must be a strictly valid JSON format.**Avoid using double quotes within values; instead, use single quotes or escape them.**

    Instructions:

    Table Recognition:
    *   Data Presentation: Identify patterns like rows and columns (e.g., Role, Rate GBP/USD) and repeating elements (e.g., job titles and corresponding rates).
    *   Multi-page Tables: Check for consistency in headers and formatting across page breaks to identify tables spanning multiple pages. **If the table spans till the end of the page, examine the beginning of the following page to determine if it's a continuation of the same table.** If tables are spanning multiple pages they are still considered one single table with multiple page numbers in table_exists_on_pages.
    *   Continuation Clues:
        *   **Page Break:** Check if the table is continued on the next page.
        *   **Table Header:** Check if the table on the next page has a header row if not it may be a continuation of the same table.
        *   **Formatting Consistency:** Look for consistent formatting (e.g., borders, font styles, column alignment) between the end of the previous page and the start of the next page.
        *   **Content Flow:** Analyze if the content flows naturally from the previous page's table to the next page, indicating a continuation rather than a new table.
    *   Contextual Understanding:
        *   Document Title and Introduction: Consider the document's title and any introductory information for clues about the types of rate structures expected.

    Table Format Description:
    *   **Table Start**: Identify the starting point of each table by recognizing patterns like:
        *   **Keywords**: Look for keywords preceding tables such as "Rate Card," "Pricing Table," "Fee Schedule," etc.
        *   **Visual Cues**: Detect visual cues like lines, borders, or changes in font style/size that often mark the beginning of a table.
    *   **Table End**: Determine the end point of each table by identifying:
        *   **Subsequent Content**: Look for changes in content or formatting that indicate the transition from table to non-table elements (e.g., paragraphs, headings).
        *   **Last Row Value**: Extract the value of the first column in the final row of the table. This can help distinguish the table's end, especially in cases where formatting changes are subtle.
    *   **Structure**: Describe the overall structure of the table, including:
        *   Number of columns and rows.
        *   Presence of headers and their location (e.g., first row, first column).
        *   Any specific formatting patterns (e.g., alternating row colors, bold text for headers).

    table_exists_on_pages : This is the document page number(s) where the tables exist so for example if the table starts on page 4 and ends on page 5 then table_exists_on_pages = [4,5]. This cannot be empty. This is extracted from the document's metadata not the page document printed. or shown in the document. Ensure that the table does not continue to the next page before assigning this value.

    If no tables are detected, return an empty JSON array: [].

    Output Structure (Ensure strict valid JSON format. Do not use quotes within values):
    [
      {
        "table_title": "APPENDIX 3-B1 (RATE CARD AND HOURLY RATES)",
        "table_exists_on_pages": "[1,2]",
        "Description": "This table appears to showcase the sales figures for different products across various regions and spans across 2 pages.",
        "format": {
          "start": "Starts after the heading: APPENDIX 3-B1 (RATE CARD AND HOURLY RATES)",
          "end": "Ends before the page break",
          "structure": "3 columns, 11 rows, header row with bold text",
          "last_row_first_column_value": "Junior Strategist"
        }
      },
      {
        "table_title": "Terms and Revenue",
        "table_exists_on_pages": "[2]",
        "Description": "This table appears in the bottom of the page to showcase the revenue of different products across various regions.",
        "format": {
          "start": "Starts after the heading: Terms and Revenue",
          "end": "Ends before the page break",
          "structure": "2 columns, 13 rows, header row with bold text",
          "last_row_first_column_value": "Net Revenue"
        }
      },
      # ... (similar dictionaries for other tables) ...
    ]
    """

    # Vertex AI Initialisation
    #########################
    vertexai.init(project=PROJECT_ID, location=LOCATION)
    model = GenerativeModel("gemini-experimental")
    extraction_model = GenerativeModel("gemini-1.5-pro-preview-0409")

    #Initial Gemini Call to get the structure from the PDF
    def extract_pdf_content(pdf_path,prompt, temperature=1):
      pdf_file = Part.from_uri(pdf_path,mime_type="application/pdf")
      generation_config = {
        "max_output_tokens": 8192,
        "temperature": temperature,
        "top_p": 0.95,
      }
      content = [pdf_file, prompt]
      responses= model.generate_content(content, generation_config=generation_config)
      return responses.text

    def parse_json_from_markdown(markdown_text):
        """Extracts and parses JSON data from markdown code blocks, returning a list of dictionaries with extracted table information."""
        start_marker = "```json"
        end_marker = "```"
        start_index = markdown_text.find(start_marker) + len(start_marker)
        end_index = markdown_text.find(end_marker, start_index)
        json_string = markdown_text[start_index:end_index].strip()
        json_string = json_string.replace('\\"', '"')  # Replace escaped quotes
        table_data = json5.loads(json_string)

        extracted_tables = []
        rate_card_index = 1
        for table in table_data:
            extracted_table = {
                "rate_card_index": rate_card_index,
                "table_title": table["table_title"],
                "table_exists_on_pages": table["table_exists_on_pages"],
                "description": table["Description"],
                "format": {
                    "start": table["format"]["start"],
                    "end": table["format"]["end"],
                    "structure": table["format"]["structure"],
                    "last_row_first_column_value": table["format"]["last_row_first_column_value"],
                },
            }
            extracted_tables.append(extracted_table)
            rate_card_index += 1
        return extracted_tables

    #Set file path / Can be later automatically triggers by GCS changes in a Function
    gcs_bucket_name = "ratecards-eval-gen-ai-sandbox-unique"

    # Input to the step - contract name:
    #gcs_blob_name = "gemini_test_01.pdf"
    gcs_blob_name = document

    pdf_path = f"gs://{gcs_bucket_name}/{gcs_blob_name}"
    max_retries = 5
    retry_count = 0
    parsed_data = None
    while retry_count < max_retries and parsed_data is None:
        try:
            if (retry_count >= 3):
                temperature = 1.25
            else:
                temperature = 1

            answer = extract_pdf_content(pdf_path, structure_prompt, temperature)
            print("Gemini's Extracted Structure:", answer)
            parsed_data = parse_json_from_markdown(answer)
            print("Found " + str(len(parsed_data)) + " rate card tables")
        except Exception as e:
            print(f"Error during parsing: {e}")
            retry_count += 1
            time.sleep(5)

        if parsed_data is None:
            raise RuntimeError("Failed to extract data after retries")

    #return json.dumps(parsed_data, sort_keys=True)
    print(parsed_data)
    return parsed_data

@component
def doc_table_ground_truth_import(document: str) -> str:
    # Fetching contract from gcs
    contract = "tables info (JSON format)"

    return "success"

@component
def table_details_extraction_evaluation(document: list, ground_truth: str) -> str:
    # To be replaced with table details evaluation function
    tables = "tables info (JSON format)"
    ground_truth = "Ground truth data from BigQuery (JSON format)"
    return "success"

@dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        "PyPDF2",
        "google-cloud-storage",
        "google-cloud-aiplatform",
    ],
    output_component_file="table_import.yaml"
)
def table_import(doc_tables: list, document: str) -> list:
    # This function creates temporary PDF files with the specific pages relating to a rate card
    from PyPDF2 import PdfReader, PdfWriter
    from google.cloud import storage
    import io
    import os

    def empty_gcs_bucket(bucket_name):
        """Empties a Google Cloud Storage bucket.

        Args:
          bucket_name: Name of the bucket to empty.
        """
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        # List blobs in the bucket
        blobs = list(bucket.list_blobs())

        # Delete each blob
        for blob in blobs:
            blob.delete()

        print(f"Bucket {bucket_name} has been processsed successfully and now is emptied.")

    def split_pdf_to_gcs(gcs_bucket_name, gcs_blob_name, parsed_data, gcs_output_bucket):
        storage_client = storage.Client()
        bucket = storage_client.bucket(gcs_bucket_name)
        blob = bucket.blob(gcs_blob_name)

        # Download PDF from GCS to a temporary local file
        temp_pdf_file = f"/tmp/{gcs_blob_name}"
        blob.download_to_filename(temp_pdf_file)

        with open(temp_pdf_file, "rb") as f:
            pdf_reader = PdfReader(f)

            for table_data in parsed_data:
                output_blob_name = f"{table_data['rate_card_index']}.pdf"
                pdf_writer = PdfWriter()

                # Handle page ranges correctly
                for page_range in table_data['table_exists_on_pages'].strip('[]').split(','):
                    if '-' in page_range:
                        start, end = map(int, page_range.split('-'))
                        for page_num in range(start, end+1):
                            pdf_writer.add_page(pdf_reader.pages[page_num - 1])
                    else:
                        page_num = int(page_range)
                        pdf_writer.add_page(pdf_reader.pages[page_num - 1])

                # Write PDF to a BytesIO buffer
                buffer = io.BytesIO()
                pdf_writer.write(buffer)

                # Upload buffer to GCS output bucket
                output_bucket = storage_client.bucket(gcs_output_bucket)
                output_blob = output_bucket.blob(output_blob_name)
                buffer.seek(0)
                output_blob.upload_from_file(buffer, content_type='application/pdf')
                print("Uploaded to GCS")
        # Delete the temporary local PDF file
        os.remove(temp_pdf_file)

    empty_gcs_bucket("processing-rate-cards")
    gcs_output_bucket = "processing-rate-cards"
    gcs_bucket_name = "ratecards-eval-gen-ai-sandbox-unique"
    gcs_blob_name = document
    parsed_data = doc_tables
    split_pdf_to_gcs(gcs_bucket_name, gcs_blob_name, parsed_data, gcs_output_bucket)
    print("Split PDF files have been generated for each rate card")

    # Listing all tables within doc
    for table in doc_tables:
        print(table)

    return doc_tables

@dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-storage",
        "google-cloud-documentai",
        "json5",
        "google-cloud-aiplatform",
        "vertexai",
        "google-api-core",
    ],
    output_component_file="process_table.yaml"
)
def process_table(table: dict, document: str) -> dict:
    # Tabular data extraction process
    import json
    import io
    import json5
    import os
    import re
    import time
    from google.cloud import storage
    from google.api_core.client_options import ClientOptions
    from google.cloud import documentai as docai
    from typing import Optional, Sequence, MutableSequence, Tuple
    from vertexai.generative_models import GenerativeModel, Part, FinishReason

    max_retries = 5
    retry_count = 0
    output = None
    parsed_json_output = None
    mime_type = "application/pdf"
    table_info = table
    gcs_bucket_name = "ratecards-eval-gen-ai-sandbox-unique"
    gcs_blob_name = document
    pdf_path = f"gs://{gcs_bucket_name}/{gcs_blob_name}"

    def extract_row_col(text):
        """Extracts the number of columns and rows from the structure text."""
        pattern = r"(\d+) columns?, (\d+) rows?"
        match = re.search(pattern, text)
        if match:
            columns, rows = match.groups()
            return int(columns), int(rows)
        else:
            return None, None
    def count_page_references(text):
        """Counts the number of page references within square brackets."""
        pattern = r"\[(\d+(?:,\d+)*)\]"  # Matches numbers and comma-separated numbers within brackets
        match = re.findall(pattern, text)
        if match:
            return sum(len(ref.split(",")) for ref in match)  # Count individual numbers in each reference
        else:
            return 0

    def get_extraction_prompt(table_data):
        start = table_data["format"]["start"]
        end = table_data["format"]["end"]
        prompt = """
          You are a business analyst that should extract data from a rate card table within a pdf document and output it as a specified structured schema.

          The pdf contains the following rate card table:
          """ + str(table_data) + """
          You must extract the data from the rate card table only, ignoring any other tables and information. Do not include any additional explanations or text, and only output the JSON as shown in the hypothetical example output. Do not include any values, text, or data that are not explicitly found in the document.
          Begin table parsing and only consider data from after the "start" location above: """ + str(start) + """
          End table parsing and do not consider any data after the "end" location above: """ + str(end) + """
          Ignore any other tables or data that are not within these two locations, unless you encounter additional rows or columns that belong to the same table.

          **Schema:**
          {
          "Rate Card": {
          "type": "object",
          "properties": {
            "Rate Card Category": {
            "type": "string",
            "description": "Overall rate card title / purpose / category",
            "occurrence": "Optional Once"
            },
            "Rate Card Total Price": {
            "type": "number",
            "description": "Overall cost of the entire rate card, often presented at the end of the table as a total or summation",
            "occurrence": "Optional Once"
            },
            "Line Item": {
            "type": "array",
            "items": {
            "type": "object",
            "properties": {
            "Service": {
              "type": "string",
              "description": "The Service/Product that is being detailed and described in the rate line",
              "occurrence": "Optional Once"
            },
            "Function": {
              "type": "string",
              "description": "Service function / detail - describes the specific instance, usage, or description of the service / product",
              "occurrence": "Optional Once"
            },
            "Prices": {
              "type": "array",
              "price": {
                "type": "object",
                "properties": {
                  "Tier Category": {
                    "type": "string",
                    "description": "Top level tier category, in the case where a rate line has multiple prices for different tiers of the same item, for example due to different tiered pricing, different project types, or different categories. May be null if only a single tier.",
                    "occurrence": "Optional Once"
                  },
                  "Tier Subcategory": {
                    "type": "string",
                    "description": "Sub tier category to allow for 2nd level of tiered price structure (e.g. Standard - SD vs Standard - HD).",
                    "occurrence": "Optional Once"
                  },
                  "Cost per unit": {
                    "type": "number",
                    "description": "Cost or price of rate card line item",
                    "occurrence": "Optional Once"
                  }
                }
              }
              },
            "Quantity/Unit": {
              "type": "string",
              "description": "Quantity of service/product that the rate card price is describing - This may be written in String format where only pure unit is specified, e.g. 'per license instance', or as a numerical value e.g. '4 FTEs'",
              "occurrence": "Optional Once"
            },
            "Currency": {
              "type": "string",
              "description": "Currency of the line item price.",
              "occurrence": "Optional Once"
            },
            "Location": {
              "type": "string",
              "description": "Location / country / jurisdiction that the line item relates to.",
              "occurrence": "Optional Once"
            },
            "Line Total Cost": {
              "type": "number",
              "description": "Total price for the whole line (will often be quantity * cost per unit, and is mostly relevant for larger rate cards)",
              "occurrence": "Optional Once"
            }
            }
            }
            }
          }
          }
          }



          Example Output (Hypothetical):
          {
          "Rate Card": {
          "Category": "Personnel Rates for Unilever",
          "Line Item": [
          {
          "Service": "Car Rental",
          "Function": "Company car usage and rental",
          "Prices": [
          {
          "Cost per unit": 400,
          "Tier Category": "Bronze"
          },
          {
          "Cost per unit": 500,
          "Tier Category": "Silver"
          }
          ],
          "Currency": "SAR",
          "Quantity/Unit": "per day"
          },
          {
          "Service": "Software Engineer",
          "Function": "Software development",
          "Prices": [
          {
          "Cost per unit": 300,
          "Tier Category": "Basic"
          },
          {
          "Cost per unit": 900,
          "Tier Category": "Advanced"
          }
          ],
          "Currency": "GBP",
          "Quantity/Unit": "per FTE"
          },
          {
          "Service": "Laptops",
          "Function": "Hardware for use on project",
          "Prices": [
          {
          "Cost per unit": 4500,
          "Tier Category": "Project 1"
          "Tier Subcategory": "Simple"
          },
          {
          "Cost per unit": 6700,
          "Tier Category": "Project 1"
          "Tier Subcategory": "Complex"
          }

          ],
          "Currency": "USD",
          "Quantity/Unit": "1"
          },

          ]
          }
          }

          PDF Input:
        """
        return prompt

    def extract_pdf_content_stream(pdf_path,prompt, temperature=0.1):
        pdf_file = Part.from_uri(pdf_path,mime_type="application/pdf")
        generation_config = {
          "max_output_tokens": 8192,
          "temperature": temperature,
          "top_p": 0.95,
        }
        content = [pdf_file, prompt]
        extraction_model = GenerativeModel("gemini-1.5-pro-preview-0409")
        responses= extraction_model.generate_content(content, generation_config=generation_config, stream = True)
        output = ""
        for response in responses:
          output = output + response.text
          print(response.text, end="") #TODO: Do something with this data (e.g. store in BQ, output as a JSON to GCS)
        return output

    def parse_json_from_gemini_output(output, filename, rate_card_index):
        start_marker = "```json"
        end_marker = "```"
        start_index = output.find(start_marker) + len(start_marker)
        end_index = output.find(end_marker, start_index)
        json_string = output[start_index:end_index].strip()
        json_string = json_string.replace('\\"', '"')  # Replace escaped quotes
        json_string = json_string.replace("Quantity/Unit","Quantity or Unit")
        extracted_json = json5.loads(json_string)
        extracted_json["Rate Card"]["Contract ID"] = filename
        extracted_json["Rate Card"]["Rate Card Index"] = rate_card_index
        with open('data.json', 'w') as f:
            json.dump(extracted_json, f)
        #upload_to_bq(extracted_json,"rate_card")
        return extracted_json

    structure = table['format']['structure']
    title = table['table_title']
    index = table['rate_card_index']
    columns, rows = extract_row_col(structure)
    page_refs = table['table_exists_on_pages']
    num_pages = count_page_references(page_refs)
    print(f"Columns: {columns}, Rows: {rows} , Number of pages: {num_pages} , title = {title}.pdf" )
    try:
        file_path = str(index)+".pdf"
    except:
        file_path = "None.pdf"
    print(file_path)
    #if (columns < 3 and rows < 10 and num_pages == 1): -- send to DocAI
    if 1==1:
        print("Send to Gemini")
        print("Extracting rate card table " + str(index))
        prompt = get_extraction_prompt(table_info)
        file_path = "gs://processing-rate-cards/"+file_path
        while retry_count < max_retries and output is None:
            try:
                if (retry_count >= 3):
                    temperature = 0.5
                else:
                    temperature = 0.1
                output = extract_pdf_content_stream(pdf_path, prompt, temperature)
                parsed_json_output = parse_json_from_gemini_output(output,gcs_blob_name,index)
                print("Extraction complete")
            except Exception as e:
              print(f"Error during parsing: {e}")
              retry_count += 1
              time.sleep(5)
        if parsed_json_output is None:
            raise RuntimeError("Failed to extract data after retries")

    # return json.dumps(parsed_json_output, sort_keys=True)
    print("Processed Table:")
    print(parsed_json_output)
    return parsed_json_output

@dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-bigquery",
        "pandas",
        "db-dtypes",
        "numpy",
        ],
    output_component_file="get_table_ground_truth.yaml"
)
def get_table_ground_truth(table: dict, document: str) -> dict:
    """
    Reads ground truth data from BigQuery based on the provided table ID.
    """
    from google.cloud import bigquery
    import pandas
    import numpy as np

    gcs_blob_name = document

    client = bigquery.Client(project="gen-ai-sandbox") #, dataset="go_reply_extracted_contract_data")
    # Construct the query based on the contract ID
    query = f"""SELECT *
        FROM `ratecard_extraction.extracted_ratecards`
        WHERE `Rate Card`.`Contract ID` = '{gcs_blob_name}' ;
        """

    # Set the contract ID as a query parameter
    query_job = client.query(query)
    # Get the results as a dictionary
    results = query_job.result()
    query_job = client.query(query)
    records = [dict(row) for row in query_job]

    '''
    results = query_job.result()
    df = results.to_dataframe()
    first_row = df.iloc[0]

    # Function to convert ndarrays within a dictionary structure
    def convert_ndarrays_to_dicts(data):
        if isinstance(data, np.ndarray):
            # Handle ndarrays (specifically the "Prices" key)
            if data.dtype == object:  # Check for object dtype (nested ndarrays)
                return [dict(item) for item in data]  # Convert each row to a dictionary
            else:
                return data.tolist()  # Convert other ndarrays to lists
        elif isinstance(data, dict):
            return {key: convert_ndarrays_to_dicts(value) for key, value in data.items()}
        else:
            return data

    # Apply the recursive conversion function
    ground_truth = convert_ndarrays_to_dicts(first_row.to_dict())
    print(ground_truth)
    '''

    return records[0]

@dsl.component(
    base_image="python:3.9",
    packages_to_install=["numpy"],
    output_component_file="evaluation.yaml"
)
def compute_accuracy(extracted_data: dict, ground_truth: dict, metrics: Output[Metrics]) -> dict: #None:
    def extract_services(table):
        return set(
            (item['Service'], price['Cost per unit']) # , item['Function']
            for item in table['Rate Card']['Line Item']
            for price in item['Prices']
        )

    services_1 = extract_services(extracted_data)
    services_2 = extract_services(ground_truth)

    # Calculating true positives, false positives, and false negatives
    true_positives = services_1 & services_2
    false_positives = services_1 - services_2
    false_negatives = services_2 - services_1

    # Calculating precision, recall, and F1 score
    precision = len(true_positives) / (len(true_positives) + len(false_positives)) if (len(true_positives) + len(false_positives)) > 0 else 0
    recall = len(true_positives) / (len(true_positives) + len(false_negatives)) if (len(true_positives) + len(false_negatives)) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    Metrics.log_metric(metrics, "Precision", precision)
    Metrics.log_metric(metrics, "Recall", recall)
    Metrics.log_metric(metrics, "F1 Score", f1_score)

    return {
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score
    }



  @dsl.component(
  def contract_import() -> list:
  @component (
  def table_details_extraction(document: str) -> list:
  @dsl.component(
  def table_import(doc_tables: list, document: str) -> list:
  @dsl.component(
  def process_table(table: dict, document: str) -> dict:
  @dsl.component(
  def get_table_ground_truth(table: dict, document: str) -> dict:
  @dsl.component(
  def compute_accuracy(extracted_data: dict, ground_truth: dict, metrics: Output[Metrics]) -> dict: #None:


## Define a pipeline that uses control structures

The following example defines a pipeline that uses these components and demonstrates the use of  `dsl.Condition` and `dsl.ParallelFor`.

The `json_string` input's default value is a nested JSON list converted to a string. As the pipeline definition shows, the loop and conditional expressions are able to process this string as a list, and access list items and sub-items.
The same holds for the list output by the `args_generator_op`.

In [None]:
@dsl.pipeline(
    name="control",
    pipeline_root=PIPELINE_ROOT,
)
def evaluate_data_extraction(): #gcs_path: str, document_id: str, project_id: str):
    """
    Pipeline to evaluate data extraction accuracy using LLM and Gemini API.
    """

    contract = contract_import()

    with dsl.ParallelFor(items=contract.output) as contract_title:
        table_details_extraction_result = table_details_extraction(document= contract_title)
        doc_table_ground_truth = doc_table_ground_truth_import(document= contract_title)
        table_details_extraction_evaluation(document=table_details_extraction_result.output, ground_truth=doc_table_ground_truth.output)

        tables = table_import(doc_tables= table_details_extraction_result.output, document=contract_title)
        with dsl.ParallelFor(items=tables.output) as table:
          extracted_data = process_table(table = table, document=contract_title)
          table_ground_truth = get_table_ground_truth(table = table, document=contract_title)
          compute_accuracy(extracted_data=extracted_data.output , ground_truth = table_ground_truth.output)


## Compile the pipeline

Next, compile the pipeline.

In [None]:
compiler.Compiler().compile(
    pipeline_func=evaluate_data_extraction, package_path="control_pipeline.yaml"
)

## Run the pipeline

Next, run the pipeline.

In [None]:
DISPLAY_NAME = "control"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="control_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

! rm control_pipeline.json

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/230294883006/locations/us-central1/pipelineJobs/control-20240618103851
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/230294883006/locations/us-central1/pipelineJobs/control-20240618103851')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/control-20240618103851?project=230294883006
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/230294883006/locations/us-central1/pipelineJobs/control-20240618103851 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob run completed. Resource name: projects/230294883006/locations/us-central1/pipelineJobs/control

rm: cannot remove 'control_pipeline.json': No such file or directory


Click on the generated link to see your run in the Cloud Console.

<!-- It should look something like this as it is running:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a> -->

*In the Google Cloud Console, many of the pipeline DAG nodes expand or collapse when you click on them.

# Cleaning up

To clean up all Google Cloud resources used in this project, you can delete the individual resources you created.

In [None]:
#import os

#delete_bucket = False

#job.delete()

#if delete_bucket or os.getenv("IS_TESTING"):
#    ! gsutil rm -r $BUCKET_URI

 .

.

.

.











# Ground Truth Generation Script...

In [None]:

def table_details_extraction(document: str) -> list:
    # Imports needed for this function
    import json
    import time
    import vertexai
    from vertexai.generative_models import GenerativeModel, Part, FinishReason
    import vertexai.preview.generative_models as generative_models
    from google.cloud import storage

    PROJECT_ID ="gen-ai-sandbox"
    LOCATION = "us-central1"
    # Prompts
    #########
    structure_prompt= """Objective: Accurately identify all tables within the provided PDF document, and extract formatting details for full reliable table boundary detection. The output must be a strictly valid JSON format.**Avoid using double quotes within values; instead, use single quotes or escape them.**

    Instructions:

    Table Recognition:
    *   Data Presentation: Identify patterns like rows and columns (e.g., Role, Rate GBP/USD) and repeating elements (e.g., job titles and corresponding rates).
    *   Multi-page Tables: Check for consistency in headers and formatting across page breaks to identify tables spanning multiple pages. **If the table spans till the end of the page, examine the beginning of the following page to determine if it's a continuation of the same table.** If tables are spanning multiple pages they are still considered one single table with multiple page numbers in table_exists_on_pages.
    *   Continuation Clues:
        *   **Page Break:** Check if the table is continued on the next page.
        *   **Table Header:** Check if the table on the next page has a header row if not it may be a continuation of the same table.
        *   **Formatting Consistency:** Look for consistent formatting (e.g., borders, font styles, column alignment) between the end of the previous page and the start of the next page.
        *   **Content Flow:** Analyze if the content flows naturally from the previous page's table to the next page, indicating a continuation rather than a new table.
    *   Contextual Understanding:
        *   Document Title and Introduction: Consider the document's title and any introductory information for clues about the types of rate structures expected.

    Table Format Description:
    *   **Table Start**: Identify the starting point of each table by recognizing patterns like:
        *   **Keywords**: Look for keywords preceding tables such as "Rate Card," "Pricing Table," "Fee Schedule," etc.
        *   **Visual Cues**: Detect visual cues like lines, borders, or changes in font style/size that often mark the beginning of a table.
    *   **Table End**: Determine the end point of each table by identifying:
        *   **Subsequent Content**: Look for changes in content or formatting that indicate the transition from table to non-table elements (e.g., paragraphs, headings).
        *   **Last Row Value**: Extract the value of the first column in the final row of the table. This can help distinguish the table's end, especially in cases where formatting changes are subtle.
    *   **Structure**: Describe the overall structure of the table, including:
        *   Number of columns and rows.
        *   Presence of headers and their location (e.g., first row, first column).
        *   Any specific formatting patterns (e.g., alternating row colors, bold text for headers).

    table_exists_on_pages : This is the document page number(s) where the tables exist so for example if the table starts on page 4 and ends on page 5 then table_exists_on_pages = [4,5]. This cannot be empty. This is extracted from the document's metadata not the page document printed. or shown in the document. Ensure that the table does not continue to the next page before assigning this value.

    If no tables are detected, return an empty JSON array: [].

    Output Structure (Ensure strict valid JSON format. Do not use quotes within values):
    [
      {
        "table_title": "APPENDIX 3-B1 (RATE CARD AND HOURLY RATES)",
        "table_exists_on_pages": "[1,2]",
        "Description": "This table appears to showcase the sales figures for different products across various regions and spans across 2 pages.",
        "format": {
          "start": "Starts after the heading: APPENDIX 3-B1 (RATE CARD AND HOURLY RATES)",
          "end": "Ends before the page break",
          "structure": "3 columns, 11 rows, header row with bold text",
          "last_row_first_column_value": "Junior Strategist"
        }
      },
      {
        "table_title": "Terms and Revenue",
        "table_exists_on_pages": "[2]",
        "Description": "This table appears in the bottom of the page to showcase the revenue of different products across various regions.",
        "format": {
          "start": "Starts after the heading: Terms and Revenue",
          "end": "Ends before the page break",
          "structure": "2 columns, 13 rows, header row with bold text",
          "last_row_first_column_value": "Net Revenue"
        }
      },
      # ... (similar dictionaries for other tables) ...
    ]
    """

    # Vertex AI Initialisation
    #########################
    vertexai.init(project=PROJECT_ID, location=LOCATION)
    model = GenerativeModel("gemini-experimental")
    extraction_model = GenerativeModel("gemini-1.5-pro-preview-0409")

    #Initial Gemini Call to get the structure from the PDF
    def extract_pdf_content(pdf_path,prompt, temperature=1):
      pdf_file = Part.from_uri(pdf_path,mime_type="application/pdf")
      generation_config = {
        "max_output_tokens": 8192,
        "temperature": temperature,
        "top_p": 0.95,
      }
      content = [pdf_file, prompt]
      responses= model.generate_content(content, generation_config=generation_config)
      return responses.text

    def parse_json_from_markdown(markdown_text):
        """Extracts and parses JSON data from markdown code blocks, returning a list of dictionaries with extracted table information."""
        start_marker = "```json"
        end_marker = "```"
        start_index = markdown_text.find(start_marker) + len(start_marker)
        end_index = markdown_text.find(end_marker, start_index)
        json_string = markdown_text[start_index:end_index].strip()
        json_string = json_string.replace('\\"', '"')  # Replace escaped quotes
        table_data = json.loads(json_string)

        extracted_tables = []
        rate_card_index = 1
        for table in table_data:
            extracted_table = {
                "rate_card_index": rate_card_index,
                "table_title": table["table_title"],
                "table_exists_on_pages": table["table_exists_on_pages"],
                "description": table["Description"],
                "format": {
                    "start": table["format"]["start"],
                    "end": table["format"]["end"],
                    "structure": table["format"]["structure"],
                    "last_row_first_column_value": table["format"]["last_row_first_column_value"],
                },
            }
            extracted_tables.append(extracted_table)
            rate_card_index += 1
        return extracted_tables

    #Set file path / Can be later automatically triggers by GCS changes in a Function
    gcs_bucket_name = "ratecards-eval-gen-ai-sandbox-unique"

    # Input to the step - contract name:
    #gcs_blob_name = "gemini_test_01.pdf"
    gcs_blob_name = document

    pdf_path = f"gs://{gcs_bucket_name}/{gcs_blob_name}"
    max_retries = 5
    retry_count = 0
    parsed_data = None
    while retry_count < max_retries and parsed_data is None:
        try:
            if (retry_count >= 3):
                temperature = 1.25
            else:
                temperature = 1

            answer = extract_pdf_content(pdf_path, structure_prompt, temperature)
            print("Gemini's Extracted Structure:", answer)
            parsed_data = parse_json_from_markdown(answer)
            print("Found " + str(len(parsed_data)) + " rate card tables")
        except Exception as e:
            print(f"Error during parsing: {e}")
            retry_count += 1
            time.sleep(5)

        if parsed_data is None:
            raise RuntimeError("Failed to extract data after retries")

    #return json.dumps(parsed_data, sort_keys=True)
    print(parsed_data)
    return parsed_data

table_details_extraction("rate01.pdf")

Gemini's Extracted Structure: ```json
[
  {
    "table_title": "Worldwide Production Hourly Rates (USD)",
    "table_exists_on_pages": "[1]",
    "Description": "This table presents hourly rates for various digital production services, categorized into Creative Development, Project Management, Technology & Digital Studio, Data Management, and Programming.",
    "format": {
      "start": "Starts after the heading: Worldwide Production Hourly Rates (USD)",
      "end": "Ends before the page break",
      "structure": "2 columns, 24 rows, header row with bold text",
      "last_row_first_column_value": "XML/WAP Programming"
    }
  }
]
```
Found 1 rate card tables
[{'rate_card_index': 1, 'table_title': 'Worldwide Production Hourly Rates (USD)', 'table_exists_on_pages': '[1]', 'description': 'This table presents hourly rates for various digital production services, categorized into Creative Development, Project Management, Technology & Digital Studio, Data Management, and Programming.', '

[{'rate_card_index': 1,
  'table_title': 'Worldwide Production Hourly Rates (USD)',
  'table_exists_on_pages': '[1]',
  'description': 'This table presents hourly rates for various digital production services, categorized into Creative Development, Project Management, Technology & Digital Studio, Data Management, and Programming.',
  'format': {'start': 'Starts after the heading: Worldwide Production Hourly Rates (USD)',
   'end': 'Ends before the page break',
   'structure': '2 columns, 24 rows, header row with bold text',
   'last_row_first_column_value': 'XML/WAP Programming'}}]

In [None]:
table = {'rate_card_index': 1,
  'table_title': 'Worldwide Production Hourly Rates (USD)',
  'table_exists_on_pages': '[1]',
  'description': 'This table provides hourly rates for various digital production services. It includes categories for Creative Development, Project Management, Technology & Digital Studio, Data Management, and Programming.',
  'format': {'start': 'Starts after the heading: Worldwide Production Hourly Rates (USD)',
   'end': 'Ends before the next section or page break',
   'structure': '2 columns, 19 rows, header row with bold text',
   'last_row_first_column_value': 'XML/WAP Programming'}}

def process_table(table: dict, document: str) -> dict:
    # Tabular data extraction process
    import json
    import io
    import json
    import os
    import re
    import time
    from google.cloud import storage
    from google.api_core.client_options import ClientOptions
    #from google.cloud import documentai as docai
    from typing import Optional, Sequence, MutableSequence, Tuple
    from vertexai.generative_models import GenerativeModel, Part, FinishReason

    max_retries = 5
    retry_count = 0
    output = None
    parsed_json_output = None
    mime_type = "application/pdf"
    table_info = table
    gcs_bucket_name = "ratecards-eval-gen-ai-sandbox-unique"
    gcs_blob_name = document
    pdf_path = f"gs://{gcs_bucket_name}/{gcs_blob_name}"

    def extract_row_col(text):
        """Extracts the number of columns and rows from the structure text."""
        pattern = r"(\d+) columns?, (\d+) rows?"
        match = re.search(pattern, text)
        if match:
            columns, rows = match.groups()
            return int(columns), int(rows)
        else:
            return None, None
    def count_page_references(text):
        """Counts the number of page references within square brackets."""
        pattern = r"\[(\d+(?:,\d+)*)\]"  # Matches numbers and comma-separated numbers within brackets
        match = re.findall(pattern, text)
        if match:
            return sum(len(ref.split(",")) for ref in match)  # Count individual numbers in each reference
        else:
            return 0

    def get_extraction_prompt(table_data):
        start = table_data["format"]["start"]
        end = table_data["format"]["end"]
        prompt = """
          You are a business analyst that should extract data from a rate card table within a pdf document and output it as a specified structured schema.

          The pdf contains the following rate card table:
          """ + str(table_data) + """
          You must extract the data from the rate card table only, ignoring any other tables and information. Do not include any additional explanations or text, and only output the JSON as shown in the hypothetical example output. Do not include any values, text, or data that are not explicitly found in the document.
          Begin table parsing and only consider data from after the "start" location above: """ + str(start) + """
          End table parsing and do not consider any data after the "end" location above: """ + str(end) + """
          Ignore any other tables or data that are not within these two locations, unless you encounter additional rows or columns that belong to the same table.

          **Schema:**
          {
          "Rate Card": {
          "type": "object",
          "properties": {
            "Rate Card Category": {
            "type": "string",
            "description": "Overall rate card title / purpose / category",
            "occurrence": "Optional Once"
            },
            "Rate Card Total Price": {
            "type": "number",
            "description": "Overall cost of the entire rate card, often presented at the end of the table as a total or summation",
            "occurrence": "Optional Once"
            },
            "Line Item": {
            "type": "array",
            "items": {
            "type": "object",
            "properties": {
            "Service": {
              "type": "string",
              "description": "The Service/Product that is being detailed and described in the rate line",
              "occurrence": "Optional Once"
            },
            "Function": {
              "type": "string",
              "description": "Service function / detail - describes the specific instance, usage, or description of the service / product",
              "occurrence": "Optional Once"
            },
            "Prices": {
              "type": "array",
              "price": {
                "type": "object",
                "properties": {
                  "Tier Category": {
                    "type": "string",
                    "description": "Top level tier category, in the case where a rate line has multiple prices for different tiers of the same item, for example due to different tiered pricing, different project types, or different categories. May be null if only a single tier.",
                    "occurrence": "Optional Once"
                  },
                  "Tier Subcategory": {
                    "type": "string",
                    "description": "Sub tier category to allow for 2nd level of tiered price structure (e.g. Standard - SD vs Standard - HD).",
                    "occurrence": "Optional Once"
                  },
                  "Cost per unit": {
                    "type": "number",
                    "description": "Cost or price of rate card line item",
                    "occurrence": "Optional Once"
                  }
                }
              }
              },
            "Quantity/Unit": {
              "type": "string",
              "description": "Quantity of service/product that the rate card price is describing - This may be written in String format where only pure unit is specified, e.g. 'per license instance', or as a numerical value e.g. '4 FTEs'",
              "occurrence": "Optional Once"
            },
            "Currency": {
              "type": "string",
              "description": "Currency of the line item price.",
              "occurrence": "Optional Once"
            },
            "Location": {
              "type": "string",
              "description": "Location / country / jurisdiction that the line item relates to.",
              "occurrence": "Optional Once"
            },
            "Line Total Cost": {
              "type": "number",
              "description": "Total price for the whole line (will often be quantity * cost per unit, and is mostly relevant for larger rate cards)",
              "occurrence": "Optional Once"
            }
            }
            }
            }
          }
          }
          }



          Example Output (Hypothetical):
          {
          "Rate Card": {
          "Category": "Personnel Rates for Unilever",
          "Line Item": [
          {
          "Service": "Car Rental",
          "Function": "Company car usage and rental",
          "Prices": [
          {
          "Cost per unit": 400,
          "Tier Category": "Bronze"
          },
          {
          "Cost per unit": 500,
          "Tier Category": "Silver"
          }
          ],
          "Currency": "SAR",
          "Quantity/Unit": "per day"
          },
          {
          "Service": "Software Engineer",
          "Function": "Software development",
          "Prices": [
          {
          "Cost per unit": 300,
          "Tier Category": "Basic"
          },
          {
          "Cost per unit": 900,
          "Tier Category": "Advanced"
          }
          ],
          "Currency": "GBP",
          "Quantity/Unit": "per FTE"
          },
          {
          "Service": "Laptops",
          "Function": "Hardware for use on project",
          "Prices": [
          {
          "Cost per unit": 4500,
          "Tier Category": "Project 1"
          "Tier Subcategory": "Simple"
          },
          {
          "Cost per unit": 6700,
          "Tier Category": "Project 1"
          "Tier Subcategory": "Complex"
          }

          ],
          "Currency": "USD",
          "Quantity/Unit": "1"
          },

          ]
          }
          }

          PDF Input:
        """
        return prompt

    def extract_pdf_content_stream(pdf_path,prompt, temperature=0.1):
        pdf_file = Part.from_uri(pdf_path,mime_type="application/pdf")
        generation_config = {
          "max_output_tokens": 8192,
          "temperature": temperature,
          "top_p": 0.95,
        }
        content = [pdf_file, prompt]
        extraction_model = GenerativeModel("gemini-1.5-pro-preview-0409")
        responses= extraction_model.generate_content(content, generation_config=generation_config, stream = True)
        output = ""
        for response in responses:
          output = output + response.text
          print(response.text, end="") #TODO: Do something with this data (e.g. store in BQ, output as a JSON to GCS)
        return output

    def parse_json_from_gemini_output(output, filename, rate_card_index):
        start_marker = "```json"
        end_marker = "```"
        start_index = output.find(start_marker) + len(start_marker)
        end_index = output.find(end_marker, start_index)
        json_string = output[start_index:end_index].strip()
        json_string = json_string.replace('\\"', '"')  # Replace escaped quotes
        json_string = json_string.replace("Quantity/Unit","Quantity or Unit")
        extracted_json = json.loads(json_string)
        extracted_json["Rate Card"]["Contract ID"] = filename
        extracted_json["Rate Card"]["Rate Card Index"] = rate_card_index
        with open('data.json', 'w') as f:
            json.dump(extracted_json, f)
        #upload_to_bq(extracted_json,"rate_card")
        return extracted_json

    structure = table['format']['structure']
    title = table['table_title']
    index = table['rate_card_index']
    columns, rows = extract_row_col(structure)
    page_refs = table['table_exists_on_pages']
    num_pages = count_page_references(page_refs)
    print(f"Columns: {columns}, Rows: {rows} , Number of pages: {num_pages} , title = {title}.pdf" )
    try:
        file_path = str(index)+".pdf"
    except:
        file_path = "None.pdf"
    print(file_path)
    #if (columns < 3 and rows < 10 and num_pages == 1): -- send to DocAI
    if 1==1:
        print("Send to Gemini")
        print("Extracting rate card table " + str(index))
        prompt = get_extraction_prompt(table_info)
        file_path = "gs://processing-rate-cards/"+file_path
        while retry_count < max_retries and output is None:
            try:
                if (retry_count >= 3):
                    temperature = 0.5
                else:
                    temperature = 0.1
                output = extract_pdf_content_stream(pdf_path, prompt, temperature)
                parsed_json_output = parse_json_from_gemini_output(output,gcs_blob_name,index)
                print("Extraction complete")
            except Exception as e:
              print(f"Error during parsing: {e}")
              retry_count += 1
              time.sleep(5)
        if parsed_json_output is None:
            raise RuntimeError("Failed to extract data after retries")

    # return json.dumps(parsed_json_output, sort_keys=True)
    print("Processed Table:")
    print(parsed_json_output)
    return parsed_json_output

extracted_data = process_table(table, document='rate01.pdf')

Columns: 2, Rows: 19 , Number of pages: 1 , title = Worldwide Production Hourly Rates (USD).pdf
1.pdf
Send to Gemini
Extracting rate card table 1
```json
{
  "Rate Card": {
    "Rate Card Category": "Worldwide Production Hourly Rates",
    "Line Item": [
      {
        "Service": "Creative Development",
        "Function": "Creative Director",
        "Prices": [
          {
            "Cost per unit": 63
          }
        ],
        "Currency": "USD"
      },
      {
        "Service": "Creative Development",
        "Function": "Head of Art/ Senior Art Director",
        "Prices": [
          {
            "Cost per unit": 50
          }
        ],
        "Currency": "USD"
      },
      {
        "Service": "Creative Development",
        "Function": "Head of copy / Senior Copywriter",
        "Prices": [
          {
            "Cost per unit": 39
          }
        ],
        "Currency": "USD"
      },
      {
        "Service": "Creative Development",
        "Function": "A

In [None]:
# Script to save ground truth to BQ

def upload_to_bq(data_dict, destination_table):
    client = bigquery.Client()
    errors = client.insert_rows_json(
        f"{PROJECT_ID}.{DATASET_ID}.{destination_table}", [data_dict]
    )
    if errors == []:
        print("New row successfully added to " + f"{PROJECT_ID}.{DATASET_ID}.{destination_table}")
    else:
        print("Encountered errors while inserting row: ", errors)

upload_to_bq(extracted_data,destination_table)


New row successfully added to gen-ai-sandbox.ratecard_extraction.extracted_ratecards


In [None]:
import numpy as np

def get_table_ground_truth(table: dict, document: str) -> dict:
    """
    Reads ground truth data from BigQuery based on the provided table ID.
    """
    from google.cloud import bigquery

    gcs_blob_name = document

    client = bigquery.Client() #project="ul-gs-s-sandbx-02-prj", dataset="go_reply_extracted_contract_data")
    # Construct the query based on the contract ID
    query = f"""SELECT *
        FROM `gen-ai-sandbox.ratecard_extraction.extracted_ratecards`
        WHERE `Rate Card`.`Contract ID` = '{gcs_blob_name}' ;
        """

    # Set the contract ID as a query parameter
    query_job = client.query(query)
    # Get the results as a dictionary
    results = query_job.result()
    query_job = client.query(query)
    records = [dict(row) for row in query_job]
    print("records")
    print(records)

    #print(first_row["Rate Card"]['Line Item'])

    def convert_ndarrays_to_dicts(data):
        if isinstance(data, np.ndarray):
            # Handle ndarrays (specifically the "Prices" key)
            if data.dtype == object:  # Check for object dtype (nested ndarrays)
                for item in data:
                    print("Here")
                return [dict(item) for item in data]  # Convert each row to a dictionary
            else:
                print("Here")
                return data.tolist()  # Convert other ndarrays to lists
        elif isinstance(data, dict):
            print("Here")
            return {key: convert_ndarrays_to_dicts(value) for key, value in data.items()}
        else:
            print("Here")
            return data

    #ground_truth = convert_ndarrays_to_dicts(first_row)
    #print(ground_truth) #["Rate Card"]['Line Item'])

    #return ground_truth

get_table_ground_truth({},'rate01.pdf')

records
[{'Rate Card': {'Line Item': [{'Prices': [{'Tier Subcategory': None, 'Cost per unit': 63.0, 'Tier Category': None}], 'Service': 'Creative Development', 'Currency': 'USD', 'Location': None, 'Quantity or Unit': None, 'Function': 'Creative Director', 'Line Total Cost': None}, {'Prices': [{'Tier Subcategory': None, 'Cost per unit': 50.0, 'Tier Category': None}], 'Service': 'Creative Development', 'Currency': 'USD', 'Location': None, 'Quantity or Unit': None, 'Function': 'Head of Art/ Senior Art Director', 'Line Total Cost': None}, {'Prices': [{'Tier Subcategory': None, 'Cost per unit': 39.0, 'Tier Category': None}], 'Service': 'Creative Development', 'Currency': 'USD', 'Location': None, 'Quantity or Unit': None, 'Function': 'Head of copy / Senior Copywriter', 'Line Total Cost': None}, {'Prices': [{'Tier Subcategory': None, 'Cost per unit': 28.0, 'Tier Category': None}], 'Service': 'Creative Development', 'Currency': 'USD', 'Location': None, 'Quantity or Unit': None, 'Function': 'Ar