In [None]:
from botocore.exceptions import ClientError
from google import genai
from google.genai.types import CreateBatchJobConfig
from google.cloud import storage
from tqdm import tqdm
from dotenv import load_dotenv
from logging.handlers import RotatingFileHandler
import logging
import boto3
import fsspec
import jsonlines
import os
import pandas as pd
import time
from collections import defaultdict

In [None]:
load_dotenv(override=True)

In [None]:
PROJECT_ID = os.getenv("PROJECT_ID")

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

print(PROJECT_ID, LOCATION)

client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)

# Batching for gemini

In [None]:
storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(os.getenv("BUCKET_NAME"))

In [None]:
# define jsonl filaname and where it willl be stored locally
JSON_FILENAME = "tau_ocr_batch1.jsonl"
jsonl_fn = f"outputs/gcp_batches/{JSON_FILENAME}"

# model to use for "OCR"
MODEL_ID = "gemini-2.5-flash"

# prefix in the gcp bucket where the pages are stored
prefix = "data_HQ_wOcr_img_chunks/"

In [None]:
# inspired by dsparse
system_prompt = None
prompt = """
You are a PDF -> MD file parser. Your task is to analyze the provided PDF page (provided as an image) and return a structured JSON response containing all of the elements on the page. Each element must be represented using Markdown formatting.

There are three categories of elements you need to identify: "Text", "Table" and "Figure". 
- Text and table elements are those that can be accurately represented using Markdown. For such elements, you must provide the exact text content. 
- Figure elements are those that need to be represented as images to fully capture their content. For Figure elements, you must provide a detailed description of the content in *Hebrew*.

**Crucially, page numbers, often found in headers or footers, are considered document metadata and should be explicitly excluded from *any* extracted content for *any* element type.**

Every element on the page should be classified as one of these types. There should be no overlap between elements. You should use the smallest number of elements possible while still accurately representing and categorizing the content on the page. For example, if the page contains a couple paragraphs of text, followed by a large figure, followed by a few more paragraphs of text, you should use three elements: Text, Figure, and Text. With that said, you should never combine two different types of elements into a single element.

**Key Instruction for Footnotes:**
**Footnotes must adhere to the standard Markdown footnote syntax.** This means:
1. For references within the main body of text, use the format `[^N]` (e.g., `[^1]`, `[^2]`), where `N` is an integer sequentially assigned to that footnote on the page.
2. For the footnote definition itself, found typically at the bottom of the page, use the format `[^N]: <full footnote text>`.
3. The `N` in the reference `[^N]` and its corresponding definition `[^N]:` *must* match. **Footnote numbering should reflect the original numbering from the document, which may be continuous across pages rather than restarting each page. Therefore, transcribe the footnote number precisely as it appears in the PDF content on the current page.** Do not attempt to re-number or infer a sequence if the original numbering is already present.

Here are detailed descriptions of the element types you can use:
- Text: This is the main text content of the page, including paragraphs, lists, titles, and any other text content that is not part of one of the other more specialized element types. Not all pages have narrative text, but most do. Be sure to use Markdown formatting for the text content. This includes using tags like # for headers, * for lists, etc. Make sure your header tags are properly nested and that your lists are properly formatted.
    When a footnote reference appears in the text, transcribe it using the Markdown `[^N]` format as described above.
    When encountering a Table of Contents (TOC), transcribe each entry as a list item. For each entry, include only the main text of the entry and its corresponding page number. Explicitly omit any connecting leader characters (e.g., dots, dashes, spaces) that typically appear between the entry text and the page number (e.g., "Chapter 1 Introduction 1" not "Chapter 1 Introduction . . . . . . . . . 1").
- Table: This covers any tabular data arrangement on the page, including simple and complex tables. Any titles, captions, or notes associated with the table should be considered part of the table element.
- Figure: This covers charts, graphs, diagrams, etc. Associated titles, legends, axis titles, etc. should be considered to be part of the figure.
- Footnote: This covers any explanatory notes typically found at the bottom of the page, marked by a reference in the main text. Their purpose is to provide supplementary information or citations for specific points in the main text. Include the footnote marker (e.g., *, ¹, a) along with its associated text content. Markdown formatting should be used for the footnote's text. Footnotes should be treated as distinct elements from the main 'Text' content.
    For the content of a `Footnote` element, you *must* use the Markdown definition syntax `[^N]: <full footnote text>`. *Ensure `N` matches the corresponding reference in the main document*.
    Crucially, if a footnote contains structured data (e.g., citations with multiple fields, statistical breakdowns, short lists, key-value pairs), you must represent this data using appropriate Markdown structures within the `<full footnote text>` part. Prioritize Markdown lists (ordered or unordered), simple Markdown tables, or inline code blocks/fenced code blocks if the content warrants it.

For Figure elements, you must provide a detailed description of the element in the "content" field. Do not just transcribe the actual text contained in the element. For textual elements (Text, Table), you must provide the exact text content of the element in the Markdown format.

Output format
- Your output should be an ordered (from top to bottom) list of elements on the page, where each element is a dictionary with the following keys:
    - type: str - the type of the element
    - content: str - the content of the element. For Figure elements, this should be a detailed description of the visual content, rather than a transcription of the actual text contained in the element. You can use Markdown formatting for text content.

Complex and multi-part figures or images should be represented as a single element. For example, if a figure consists of a main chart and a smaller inset chart, these should be described together in a single Figure element. If there are two separate graphs side by side, these should be represented as a single Figure element with a bounding box that encompasses both graphs. DO NOT create separate elements for each part of a complex figure or image.
"""

response_schema = {
    "type": "ARRAY",
    "items": {
        "type": "OBJECT",
        "properties": {
            "type": {
                "type": "string",
                "enum": ["Text", "Table", "Figure", "Footnote"]
            },
            "content": {
                "type": "string",
            },
        },
        "required": ["type", "content"],
    } 
}

In [None]:
generationConfig = {"temperature": 0, "maxOutputTokens": 8192}
if response_schema:
    generationConfig["response_schema"] = response_schema
    generationConfig["response_mime_type"] = "application/json"
if MODEL_ID=="gemini-2.5-flash":
    generationConfig["thinkingConfig"] = {"thinkingBudget": 0}
print(generationConfig)

print(MODEL_ID)

In [None]:
# generate the jsonl locally
blobs = bucket.list_blobs(prefix=prefix)

counter = 0
with jsonlines.open(jsonl_fn, mode="w") as writer:
    for blob in tqdm(blobs):
        # if not blob.name.lower().endswith('.pdf'): continue
        if not blob.name.lower().endswith('.png'): continue
        blob_id = blob.id
        blob_generation = str(blob.generation)
        blob_uri = "gs://" + blob_id.strip("/"+blob_generation)
        chunk_id = int(blob_uri.strip(".png").split("_chunk_")[1])
        gem_request = {"request":{"contents": [{"role": "user", "parts": [{"text": prompt}, 
                                                                            {"file_data": {"file_uri": blob_uri, 
                                                                                        "mime_type": "image/png"
                                                                                           }}]}],
                                    "generationConfig":generationConfig}}
        if system_prompt:
            gem_request["request"]["system_instruction"] = {"parts": [{"text": system_prompt}]}
        writer.write(gem_request)
        counter += 1

In [None]:
# upload the local jsonl to the bucket 
blob = bucket.blob(JSON_FILENAME)
blob.upload_from_filename(jsonl_fn)

In [None]:
# the variable definitions for passing to the batch API
INPUT_DATA = f"gs://{os.getenv("BUCKET_NAME")}/{JSON_FILENAME}"  
DEST_BUKCET_URI = "gs://tau_ocr_results"

In [None]:
# Start the batch
gcs_batch_job = client.batches.create(
    model=MODEL_ID,
    src=INPUT_DATA,
    config=CreateBatchJobConfig(dest=DEST_BUKCET_URI)
)
gcs_batch_job.name

In [None]:
# Optionally cancel the job - good for quick fixes
# client.batches.cancel(name=gcs_batch_job.name)

In [None]:
# Optionally get a print once the batch succeeds
# Alternatively check through gcp console
gcs_batch_job = client.batches.get(name=gcs_batch_job.name)
# Refresh the job until complete
while not gcs_batch_job.state in ["JOB_STATE_SUCCEEDED", "JOB_STATE_FAILED", "JOB_STATE_UNEXECUTED"]:
    time.sleep(5)
    gcs_batch_job = client.batches.get(name=gcs_batch_job.name)

if gcs_batch_job.state == "JOB_STATE_SUCCEEDED":
# Check if the job succeeds
    print("Job succeeded!")
else:
    print(f"Job failed: {gcs_batch_job.error}")

# Processing the results

Note: if the batch fails at large, likely something is wrong with the configuration. In that case the results will have errors loading and no response_text will be available.

Occasional fails, ~1%, are expected due to the model wrongly trying to parse a figure or table of contents.

In [None]:
logger = logging.getLogger("gemini_ocr")
logger.setLevel(logging.INFO)

# Create rotating file handler (max 10MB, keep 5 backups)
handler = RotatingFileHandler(
    './logs/text_uploading_batch.log',
    maxBytes=10*1024*1024,  # 10MB
    backupCount=5
)

# Create formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# Add handler to logger
logger.addHandler(handler)

In [None]:
def ensure_candidates(response):
    if not isinstance(response, dict):
        return {'candidates': []}
    if 'candidates' not in response:
        response['candidates'] = []
    return response

def extract_file_uri(request):
    try:
        contents = request.get('contents', [])
        for content in contents:
            parts = content.get('parts', [])
            for part in parts:
                file_data = part.get('file_data')
                if file_data and 'file_uri' in file_data:
                    return file_data['file_uri']
        return None
    except (AttributeError, TypeError):
        return None
    
def extract_response_text(response):
    try:
        return response[0]['text']
    except (AttributeError, TypeError):
        return None
def extract_source_file_from_uri(file_uri):
    """Extract source file name from file URI"""
    return file_uri.split("/")[-1].split("_chunk_")[0]

In [None]:
fs = fsspec.filesystem("gcs")

file_paths = fs.glob(f"{gcs_batch_job.dest.gcs_uri}/*/predictions.jsonl")
print(file_paths)

In [None]:
# if the batch succeeded (don't forget to read the latest status)
# store only the important parts of the result
# within dictionary
if gcs_batch_job.state == "JOB_STATE_SUCCEEDED":
    logger.info("Accumulating all requests by source file...")
    
    # Dictionary to accumulate all chunks for each file
    file_accumulator = defaultdict(list)
    processed_count = 0
    
    # Stream through the JSONL file and accumulate by source file
    chunk_reader = pd.read_json(f"gs://{file_paths[-1]}", lines=True, chunksize=1_000)
    
    for chunk_df in chunk_reader:
        logger.info(f"Processing batch with {len(chunk_df)} records...")
        
        # Process each row in this chunk
        for _, row in chunk_df.iterrows():
            try:
                file_uri = extract_file_uri(row['request'])
                if file_uri == "gs://tau_ocr/data_HQwOcr_img_chunks":
                    continue
                    
                source_file = extract_source_file_from_uri(file_uri)
                
                # Extract chunk ID for proper ordering later
                chunk_id = int(file_uri.strip(".png").split('_chunk_')[1]) if '_chunk_' in file_uri else 0
                
                # Process the response immediately to extract just the text
                try:
                    response = ensure_candidates(row['response'])
                    normalized = pd.json_normalize([response], "candidates")
                    response_text = extract_response_text(normalized.iloc[0]["content.parts"])
                    
                    # Store only the essentials: filename, chunk_id, and processed text
                    file_accumulator[source_file].append({
                        'chunk_id': chunk_id,
                        'text': response_text
                    })
                    
                except Exception as e:
                    logger.info(f"Error processing response for {source_file}, chunk {chunk_id}: {e}")
                    continue
                
                processed_count += 1
                if processed_count % 10000 == 0:
                    logger.info(f"Processed {processed_count} records, tracking {len(file_accumulator)} files")
                    
            except Exception as e:
                logger.info(f"Error processing row: {e}")
                continue
    
    logger.info(f"Finished accumulating. Found {len(file_accumulator)} unique files")

In [None]:
# saving full docs locally within jsonl

import json

results_fn = "./outputs/data_HQ_WOcr_text_batch.jsonl"
with jsonlines.open(results_fn, mode="w") as writer:
    for key, text in file_accumulator.items():
        
        text.sort(key=lambda x: x["chunk_id"])

        final_text = ""
        for ele_type,ele in enumerate(text):
            try:
                texts = json.loads(ele["text"])
                for el in texts:
                    if el["type"] == "Table":
                        final_text += "<MARKDOWN_TABLE>" + el["content"] + "</MARKDOWN_TABLE>" + "\n"
                    else: 
                        final_text += el["content"]+"\n"
                final_text += "\n\n"
            except:
                logger.info(f"Couldn't load row {ele["chunk_id"]} for {key}")

        entry = {"filename": key, "text": final_text}
        writer.write(entry)

In [None]:
s3_key = 'tau_clean/HQ_WOcr_Markdown_Tables_batch3.jsonl'

In [None]:
# Upload the jsonl with results to s3
 
s3_client = boto3.client("s3")

try:
    s3_client.upload_file(
        Filename=results_fn,  # local file path
        Bucket=os.getenv("AWS_BUCKET_NAME"),
        Key=s3_key  # S3 object key (path in bucket)
    )
    logger.info("Upload to s3 was successful")
except ClientError as e:
    logger.info(f"Upload to s3 failed: {e}")

# Cleaning up the batch 

In [None]:
client.batches.delete(name=gcs_batch_job.name)