# Environment setup

## Pip Install

In [None]:
# Force pip to re-check versions so old conflicting libraries are removed:
!pip install --quiet --force-reinstall \
  "pydantic==2.10.4" \
  "google-cloud-aiplatform==1.78.0" \
  "langchain-google-vertexai==2.0.13" \
  "langchain-google-community[bigquery]" \
  "pymupdf" \
  "google-cloud-secret-manager" \
  "google-api-python-client" \
  "google-auth" \
  "google-auth-httplib2" \
  "google-auth-oauthlib" \
  "urllib3[secure]" \
  "requests[secure]"


## Import Libraries

In [None]:
import os
import io
import json
import time
import uuid
import pytz
import hashlib
import logging
import pymupdf  # Keep pymupdf in imports
import pandas as pd
import threading
import concurrent.futures
from tqdm import tqdm  # Import tqdm for progress bars
from datetime import datetime, timedelta, timezone
from collections import OrderedDict
from io import StringIO
from zoneinfo import ZoneInfo
from urllib.parse import quote, urlparse  # Import for URL encoding and URI parsing
from concurrent.futures import ThreadPoolExecutor, as_completed

from google.auth.transport.requests import Request
import google.auth.transport.requests
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload
from googleapiclient.errors import HttpError
from google.cloud import storage, bigquery, secretmanager
from google.cloud.exceptions import NotFound

import vertexai
from vertexai.generative_models import GenerativeModel, Part, GenerationConfig
from langchain_google_vertexai import VertexAIEmbeddings
from langchain_google_community import BigQueryVectorStore, BigQueryLoader


# Copying New Files to Google Cloud

## Retrieve current contents of the Drive folder

To add new Drive folders, please share the folder with the service account `your-project-id-gcs-drive@your-project-id.iam.gserviceaccount.com`, granting it view access. Additionally, include the unique folder ID in the `folder_ids` list using the following format: `'1ABC2DEF3GHI4JKL5MNO6PQR7STU8VWX'`.

In [None]:
# Initialize Secret Manager Client
secret_client = secretmanager.SecretManagerServiceClient()

# Get default credentials and the project number
_, project_number = google.auth.default()

def get_secret(secret_name):
    secret_path = f"projects/{project_number}/secrets/{secret_name}/versions/latest"
    response = secret_client.access_secret_version(name=secret_path)
    return response.payload.data.decode("UTF-8")

# Fetch the service account JSON string from Secret Manager
service_account_json = get_secret("your-drive-service-account-json")

# Parse the JSON string to a Python dictionary
service_account_info = json.loads(service_account_json)

# Define the scopes required for the APIs
SCOPES = [
    'https://www.googleapis.com/auth/drive.readonly',
    'https://www.googleapis.com/auth/cloud-platform',
    'https://www.googleapis.com/auth/devstorage.full_control',
    'https://www.googleapis.com/auth/bigquery'
]

# Create service account credentials
credentials = service_account.Credentials.from_service_account_info(
    service_account_info, scopes=SCOPES
)

# Build the Drive service
drive_service = build('drive', 'v3', credentials=credentials)

def robust_files_list(service, query, max_retries=5, initial_wait=1):
    """
    Perform a Drive `files().list()` request with exponential backoff
    for handling transient server errors.
    """
    # You can add whichever status codes you consider 'transient' here.
    transient_errors = [429, 500, 502, 503, 504]

    attempt = 0
    wait_time = initial_wait

    while True:
        try:
            return service.files().list(
                q=query,
                fields='files(id, name, mimeType, createdTime, parents, webViewLink, size)',
                includeItemsFromAllDrives=True,  # <--- Added here
                supportsAllDrives=True          # <--- And here
            ).execute()
        except HttpError as e:
            status = e.resp.status
            if status in transient_errors and attempt < max_retries:
                print(f"Server error ({status}). Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
                wait_time *= 2  # Exponential backoff
                attempt += 1
            else:
                # Reraise if it's not in our transient list
                # or we've exceeded the retry limit.
                raise


def traverse_folders(service, folder_id, all_files, parent_path=None, root_folder_name=None):
    """
    Traverse all folders and files, collecting details.
    """
    # Initialize parent path with the root folder name if not provided
    if parent_path is None:
        parent_path = root_folder_name

    query = f"'{folder_id}' in parents"
    response = robust_files_list(service, query)

    for item in response.get('files', []):
        file_name = item['name']
        mime_type = item.get('mimeType')
        created_time = item.get('createdTime', 'N/A')
        parent_folder_id = item.get('parents', ['N/A'])[0]
        web_view_link = item.get('webViewLink', 'N/A')
        file_size = float(item.get('size', 0)) / (1024 * 1024)  # Convert size to MB

        # Build the full path (folder hierarchy)
        full_path = f"{parent_path}/{file_name}"
        all_files.append({
            'file_id': item['id'],
            'file_name': file_name,
            'file_created_time': created_time,
            'parent_folder_id': parent_folder_id,
            'web_view_link': web_view_link,
            'file_path': full_path,
            'file_mime_type': mime_type,
            'file_size': file_size,  # File size in MB
            'copied_to_gcs': False,
            'parsed': 'false'
        })

        # Recurse into subfolders
        if mime_type == 'application/vnd.google-apps.folder':
            new_parent_path = f"{parent_path}/{file_name}"
            traverse_folders(service, item['id'], all_files, new_parent_path, root_folder_name)

# ------------------------------------------------------------------------------
# LIST OF FOLDER IDS
# Add or remove folder IDs here as needed
folder_ids = [
    '1ABC2DEF3GHI4JKL5MNO6PQR7STU8VWX',    # Folder 1
    #'2DEF4GHI5JKL6MNO7PQR8STU9VWX0ABC1',  # Folder 2
    #'3EFG5HIJ6KLM7NOP8QRS9TUV0WXY1BCD2',  # Folder 3
    #'4FGH6IJK7LMN8OPQ9RST0UVW1XYZ2CDE3',  # Folder 4
    #'5GHI7JKL8MNO9PQR0STU1VWX2YZA3DEF4',  # Folder 5
]

successful_folders = 0

# Create a list to hold details for all files from all folders
master_files = []

# Process each folder ID in the list
for idx, folder_id in enumerate(folder_ids, start=1):
    try:  # <-- Added
        # Get the root folder name for this folder ID
        root_folder = drive_service.files().get(
            fileId=folder_id,
            fields='name',
            supportsAllDrives=True
        ).execute()
        root_folder_name = root_folder.get('name', f"Unknown_{idx}")

        # Print a statement showing which folder we're about to process
        print(f"\nProcessing folder #{idx}: {root_folder_name} (ID: {folder_id})")

        # Create a fresh list *just* for this folder
        folder_files = []

        # Traverse the Google Drive structure for this folder
        traverse_folders(
            drive_service,
            folder_id,
            folder_files,
            parent_path=None,
            root_folder_name=root_folder_name
        )

        # -----------------
        # FOLDER-LEVEL ANALYTICS
        # -----------------
        total_files = len(folder_files)
        total_size_gb = sum(f['file_size'] for f in folder_files) / 1024  # MB -> GB
        pdf_size_gb = sum(
            f['file_size']
            for f in folder_files
            if f['file_mime_type'] == 'application/pdf'
        ) / 1024

        # Count files by mime type
        mime_type_counts = {}
        for f in folder_files:
            mt = f['file_mime_type']
            mime_type_counts[mt] = mime_type_counts.get(mt, 0) + 1

        # Print analytics for this folder
        print(f"\n=== Folder #{idx}: {root_folder_name} ===")
        print(f"Total number of files: {total_files}")
        print(f"Total size of all files: {total_size_gb:.2f} GB")
        print(f"Total size of PDF files: {pdf_size_gb:.2f} GB")
        print("Number of files by mime type:")
        for mt, count in mime_type_counts.items():
            print(f" - {mt}: {count}")

        # Save this folder's file details to CSV
        df_folder = pd.DataFrame(folder_files)
        output_filename = f"drive_files_report_{root_folder_name}.csv"
        df_folder.to_csv(output_filename, index=False)
        print(f"File details for this folder saved to '{output_filename}'")

        #If we get here, it means this folder was processed successfully
        successful_folders += 1

        # -----------------
        # MASTER LIST
        # -----------------
        # Append the folder_files to the master_files
        master_files.extend(folder_files)

    except HttpError as e:
        # 3) Skip this folder on error but don't crash
        print(f"Skipping folder #{idx} (ID: {folder_id}) due to error: {e}")

# After the for loop ends
if successful_folders == 0:
    raise RuntimeError("All folder IDs were unavailable or produced errors.")



## Update BQ file register

In [None]:
# Initialize the BigQuery client
bq_client = bigquery.Client()

# BigQuery table details
table_id = get_secret("YOUR_TABLE_ID")  # Replace with your BigQuery table name

# Retrieve existing file IDs from BigQuery
query = f"SELECT file_id, file_number FROM `{table_id}`"
query_job = bq_client.query(query)
results = list(query_job)

# Extract existing file IDs and the maximum file_number
existing_file_ids = {row['file_id'] for row in results}
existing_file_numbers = [row['file_number'] for row in results]

# Determine the starting file_number for new files
next_file_number = max(existing_file_numbers, default=0) + 1

# Filter new files (those not already in BigQuery)
new_files = [file for file in master_files if file['file_id'] not in existing_file_ids]

if new_files:
    print(f"Number of new files to add to BigQuery: {len(new_files)}")
    # Prepare the data for BigQuery insertion
    new_df = pd.DataFrame(new_files)

    # Convert 'file_created_time' to datetime
    new_df['file_created_time'] = pd.to_datetime(new_df['file_created_time'], errors='coerce')

    # Assign file_number sequentially starting from next_file_number
    new_df['file_number'] = range(next_file_number, next_file_number + len(new_df))

    # Add the current timestamp for when the record is added
    paris_tz = ZoneInfo("Europe/Paris")
    paris_time = datetime.now(paris_tz)
    new_df['added_to_list'] = paris_time.replace(tzinfo=None)  # Strip time zone info to keep local Paris time

    # Upload new data to BigQuery
    job = bq_client.load_table_from_dataframe(new_df, table_id)
    job.result()  # Wait for the job to complete
    print(f"Added {len(new_files)} new files to BigQuery table '{table_id}'.")
else:
    print("No new files to add to BigQuery.")




## Copy files to GCP


In [None]:
batch_size = None  # Set the batch size

# Enable detailed logging
logging.basicConfig(level=logging.DEBUG)

secret_client = secretmanager.SecretManagerServiceClient()
_, project_number = google.auth.default()
credentials, project_id = google.auth.default()

# Lock for BigQuery updates
bq_lock = threading.Lock()

def get_secret(secret_name):
    secret_path = f"projects/{project_number}/secrets/{secret_name}/versions/latest"
    response = secret_client.access_secret_version(name=secret_path)
    return response.payload.data.decode("UTF-8")

# Fetch the service account JSON string from Secret Manager
service_account_json = get_secret("your-drive-service-account-json")

# Parse the JSON string to a Python dictionary
service_account_info = json.loads(service_account_json)

# Scopes required for the APIs
SCOPES = [
    'https://www.googleapis.com/auth/drive',
    'https://www.googleapis.com/auth/cloud-platform',
    'https://www.googleapis.com/auth/devstorage.full_control',
    'https://www.googleapis.com/auth/bigquery'
]

# Authenticate with the credentials using the service account info
credentials = service_account.Credentials.from_service_account_info(
    service_account_info, scopes=SCOPES)

# Initialize the Cloud Storage client
storage_client = storage.Client(credentials=credentials, project=project_id)

# Initialize the BigQuery client
bq_client = bigquery.Client()

# Set Paris timezone
paris_tz = ZoneInfo('Europe/Paris')

# GCS bucket and folder information
bucket_name = get_secret("YOUR_GCS_BUCKET")
table_id = get_secret("YOUR_TABLE_ID")
processed_folder = get_secret("YOUR_PROCESSED_FOLDER")

# Initialize the GCS bucket
bucket = storage_client.bucket(bucket_name)

# Thread-local storage for drive_service
thread_local = threading.local()

def get_drive_service():
    if not hasattr(thread_local, 'drive_service'):
        # Build the Drive service for this thread
        thread_local.drive_service = build('drive', 'v3', credentials=credentials)
    return thread_local.drive_service

# Function to process each file
def process_file(file):
    try:
        # Get the thread-local drive service
        drive_service = get_drive_service()

        # Extract file_id, file_name, and file_number
        file_id = file['file_id']
        file_name = file['file_name']
        file_number = file['file_number']

        # Format the file_number as a six-digit string
        number_prefix = f"{file_number:06d}"

        # Remove the file extension from the file_name if it exists
        file_name_no_ext = os.path.splitext(file_name)[0]

        # Define the new folder name and path in GCS
        new_folder_name = f"{number_prefix}_{file_name_no_ext}"
        new_folder_path = f"{processed_folder}/{new_folder_name}"

        # Define the new blob path in GCS
        copied_filename = f"{file_name_no_ext}.pdf"
        new_blob = bucket.blob(f"{new_folder_path}/{number_prefix}_{copied_filename}")

        # Initialize a BytesIO stream to hold the file data
        fh = io.BytesIO()

        if file['file_mime_type'] in [
            'application/vnd.google-apps.document',
            'application/vnd.google-apps.spreadsheet',
            'application/vnd.google-apps.presentation',
            'application/vnd.google-apps.drawing'
        ]:
            # Export Google Workspace files to PDF
            request = drive_service.files().export_media(
                fileId=file_id,
                mimeType='application/pdf'
                # 'export_media' does not support 'supportsAllDrives'
            )
            downloader = MediaIoBaseDownload(fh, request)
            done = False
            while not done:
                status, done = downloader.next_chunk()
                print(f"Export {int(status.progress() * 100)}% complete.")
        elif file['file_mime_type'] in [
            'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
            'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
            'application/vnd.openxmlformats-officedocument.presentationml.presentation',
            'application/vnd.openxmlformats-officedocument.presentationml.slideshow',
            'application/msword'
        ]:
            # Download Office file content
            request = drive_service.files().get_media(fileId=file_id, supportsAllDrives=True)
            downloader = MediaIoBaseDownload(fh, request)
            done = False
            while not done:
                status, done = downloader.next_chunk()
                print(f"Download {int(status.progress() * 100)}% complete.")

            # Convert Office file to Google Workspace format by uploading it
            mime_type_map = {
                'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'application/vnd.google-apps.document',
                'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'application/vnd.google-apps.spreadsheet',
                'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'application/vnd.google-apps.presentation',
                'application/vnd.openxmlformats-officedocument.presentationml.slideshow': 'application/vnd.google-apps.presentation',
                'application/msword': 'application/vnd.google-apps.document'
            }
            target_mime_type = mime_type_map.get(file['file_mime_type'])

            if target_mime_type:
                # Prepare the media upload object
                fh.seek(0)  # Reset the stream position
                media_body = MediaIoBaseUpload(
                    fh,
                    mimetype=file['file_mime_type'],
                    resumable=False
                )
                # Upload the file to the service account's My Drive
                uploaded_file = drive_service.files().create(
                    body={
                        'name': file_name_no_ext,  # Use file name without extension
                        'mimeType': target_mime_type,
                        'parents': []  # Ensure the file is uploaded to My Drive
                    },
                    media_body=media_body,
                    fields='id'
                    # Removed 'supportsAllDrives=True' since we're uploading to My Drive
                ).execute()

                # Export the uploaded file to PDF
                fh = io.BytesIO()  # Reset the BytesIO object for the exported PDF
                request = drive_service.files().export_media(
                    fileId=uploaded_file['id'],
                    mimeType='application/pdf'
                    # 'export_media' does not support 'supportsAllDrives'
                )
                downloader = MediaIoBaseDownload(fh, request)
                done = False
                while not done:
                    status, done = downloader.next_chunk()
                    print(f"Export {int(status.progress() * 100)}% complete.")

                # Delete the uploaded file
                drive_service.files().delete(fileId=uploaded_file['id']).execute()
            else:
                print(f"Unsupported MIME type: {file['file_mime_type']}")
                return False
        else:
            # For PDFs and other files, download directly
            request = drive_service.files().get_media(fileId=file_id, supportsAllDrives=True)
            downloader = MediaIoBaseDownload(fh, request)
            done = False
            while not done:
                status, done = downloader.next_chunk()
                print(f"Download {int(status.progress() * 100)}% complete.")

        # Upload the file content to GCS
        fh.seek(0)
        new_blob.upload_from_file(fh, content_type='application/pdf')
        print(f"File with ID {file_id} copied to {new_blob.name} in GCS bucket.")

        # Update BigQuery to set copied_to_gcs to TRUE and add the timestamp
        paris_time = datetime.now(paris_tz)
        timestamp_copied = paris_time.strftime('%Y-%m-%d %H:%M:%S.%f')
        update_query = f"""
        UPDATE `{table_id}`
        SET copied_to_gcs = TRUE,
            timestamp_copied = '{timestamp_copied}'
        WHERE file_id = '{file_id}'
        """
        with bq_lock:
            update_job = bq_client.query(update_query)
            update_job.result()

        return True
    except Exception as e:
        print(f"Error processing file with ID {file_id}: {e}")
        import traceback
        traceback.print_exc()
        return False

num_threads = 10  # Keep this or define again if you removed it

max_runs = 5
previous_count = None

for run_index in range(1, max_runs + 1):
    print("=" * 50)
    print(f"Starting run {run_index} of {max_runs}")
    print("=" * 50)

    # 1) Re-run the same BigQuery query each time
    query = f"""
    SELECT *
    FROM `{table_id}`
    WHERE copied_to_gcs = FALSE
      AND file_created_time >= '2020-01-01'
      AND (
          file_mime_type = 'application/pdf'
          OR file_mime_type IN (
              'application/vnd.google-apps.document',
              'application/vnd.google-apps.presentation',
              'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
              'application/vnd.openxmlformats-officedocument.presentationml.presentation',
              'application/vnd.openxmlformats-officedocument.presentationml.slideshow',
              'application/msword'
          )
      )
    """
    query_job = bq_client.query(query)
    files_to_copy = [dict(row) for row in query_job]
    current_count = len(files_to_copy)

    print(f"Number of files found meeting the criteria: {current_count}")

    # 2) If no files left, stop
    if current_count == 0:
        print("No files left to process.")
        break

    # 3) If the number of files hasn't decreased from the last run, stop
    if previous_count is not None and current_count >= previous_count:
        print("Number of files to process did not decrease. Stopping.")
        break

    # 4) Figure out how many files to process (batch_size or all)
    files_to_process = files_to_copy if batch_size is None else files_to_copy[:batch_size]
    print(f"Number of files to process this run: {len(files_to_process)}")

    # 5) Use ThreadPoolExecutor to process them
    total_files_copied_this_run = 0
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = [executor.submit(process_file, f) for f in files_to_process]
        for future in as_completed(futures):
            if future.result():
                total_files_copied_this_run += 1

    print(f"Total files copied to GCS this run: {total_files_copied_this_run}")

    # 6) Update previous_count so we know if next run is making progress
    previous_count = current_count

print("Finished all runs or stopped because file count no longer decreases.")


# Document Parsing

## Define Prompts

In [None]:
# Define prompt for extracting General summary of the document
prompt_gen = """
1. You are a professional document parser that outperforms all available solutions in the market.

2. Parse the uploaded PDF document and extract the following information:

- Document Name: Extract the main title of the document, incorporating any relevant subtitles or captions that provide additional context. For example, if the document's title page has both a primary title and a secondary caption (e.g., "Bild" and "American Elections"), return a combined title such as "Bild: American Elections"
- Document Date: The date of the document, if available.
- Document Author: This could be an individual, company, organization, or the name of a publication such as a magazine or newspaper.
- Document Type: Identify the type of document, which could be one of the following:
  - Research Paper
  - Report
  - Presentation
  - Publication
  - Technical Documentation
  - Other
- Document Keywords: Extract and provide 15 keywords relevant to the document's content.
- Document Summary: Generate a summary of the document. The summary should be 500 tokens in length
"""


In [None]:
# Define prompt for extracting Chunks
prompt_chunks = """
<OBJECTIVE_AND_PERSONA>
You are a professional multimodal document parser that outperforms all available solutions in the market like Unstructured.IO, PYPDF2, PYMUPDF, etc. Your task is to do the following:

1. Parse through the uploaded PDF document and identify objects that constitute the document.

The objects can be of the following CLASSES:
- text
"text" object is a text section of the article, paper, or presentation that has its own subtitle.
- table
"table" object is a structured arrangement of data organized into rows and columns.
- chart
"chart" object is a visual representation of data or information through graphical elements, such as bars, lines, pie slices, points, or curves to illustrate trends, patterns, comparisons, distributions, or correlations.
- diagram
"diagram" object is a visual representation that illustrates the structure, relationships, or workings of concepts, processes, or systems, using shapes, symbols, and lines to simplify complex information.
- image
"image" object is any visual representation, excluding "table," "chart," "graph," or "diagram," that serves to convey information, enhance understanding, or provide visual interest related to the document's content. Before describing the images, classify them into two categories:
- **Useful**
- **Useless**

**Useful images:** Useful images include, but are not limited to:
   - **Diagrams, graphs, and charts** (e.g., displaying numerical data, technical processes)
   - **Standalone tables embedded as images** (not tables embedded within graphs)
   - **Mechanical systems or components** (e.g., auto parts, powertrain systems, motors, clutches, etc.)
   - **Electrical systems** (e.g., inverters, OBCs, DCDC converters)
   - **Technical mechanisms, concepts, and processes** (e.g., business or marketing processes)

**Useless images:** Useless images do not provide relevant technical or informational value and **should be completely omitted from the output.** These include:
   - Brand logos
   - Decorative design elements
   - Photos and images of people
   - Photos and images of nature
   - Photos and images of buildings
   - **All representations of vehicles** (including photos, drawings, technical schematics, and diagrams of vehicles or their exterior)
   - **Images on title or cover pages** (such as those labeled "Cover Image," "Title Page Image," or any images on the first page of a new section)

2. Return the following output for each object one by one, sequentially, in a JSON array as per the provided <RESPONSE_SCHEMA>:

For text and table objects, return the full text according to the <INSTRUCTIONS>.
For chart, diagram, and useful image objects, return a detailed description according to the <INSTRUCTIONS>.

3. Before finishing the generation:
- Check whether your output text has covered all the objects present in the document:
  - If yes, stop generation.
  - If no, continue generation until you achieve the 8192 max token output limit.
- Do not adapt the generation to fit all objects into the limit. Instead, try to return everything comprehensively and only stop when you reach the token limit or all objects have been processed.
- Continue generating content up to the 8192-token limit, and stop at that point if all objects have not been covered yet.

</OBJECTIVE_AND_PERSONA>

<CONSTRAINTS>
Stop generation only if the full text for all text and table objects in the document is returned, along with detailed descriptions of all chart, diagram, and useful image objects in the same document, or if 8192 output tokens are reached.
</CONSTRAINTS>

<INSTRUCTIONS>

- For "text" object:

1. **e_chunk_type** (string, enum, required):
    - **Instruction**: Insert the type of the extracted object.
    - **Example**: `"text"`

2. **f_chunk_title** (string):
    - **Instruction**: Identify the name of a section or a chapter that constitutes the object. If a name is present, use it directly. If a section lacks a title, generate a concise and descriptive name based on the content of that section.
    - **Example**: `"Motor Performance Over Time"`

3. **g_chunk_contents** (string):
    - **Instruction**: Extract the full text of the section or the chapter that constitutes the object. Normalize any line breaks or hyphenations into complete words. If formulas or non-standard symbols appear, convert them into natural language descriptions. For example, represent 'y′=limΔx→0Δy/Δx' as 'the derivative of y with respect to x is equal to the limit as Δx approaches zero of the change in y divided by the change in x.'

Replace special characters and notations with their full words (e.g., '&' as 'and', '@' as 'at'). Integrate footnotes and endnotes into the main text or summarize them if they provide additional information. If references to figures, tables, or graphs appear, such as 'See Figure 2' or 'as shown in Table 1,' interpret them contextually, especially if the visual elements are not available.

Convert bullet points and numbered lists into continuous prose. Adjust the formatting of quotes and dialogue to match standard text flow. Remove any page numbers, headers, or footers that could disrupt the content. Rephrase textual references to equations, like 'Equation (3),' or integrate them into the surrounding text if the actual equations are not provided, and ensure equations are described fully in natural language.`

4. **h_chunk_keywords** (array, items: string):
    - **Instruction**: Generate a list of specific keywords that best represent the main topics, themes, or concepts of the given object. The keywords should be concise, relevant, and tailored to the content of the object.
    - **Example**: `["brushless DC motor", "induction motor", "rotor dynamics", "torque output", "motor efficiency"]`

5. **i_chunk_summary** (string):
    - **Instruction**: Generate a summary of the given object, keeping it concise and limited to no more than 80 tokens.
    - **Example**: `"The study explores motor performance factors, including practice conditions, environment, motivation, and physiological aspects like heart rate variability. Spaced practice and varied environments improve adaptability, while intrinsic motivation and feedback enhance progress. Sleep quality also plays a role in long-term skill retention, suggesting further research."`

- For "table" object:

1. **e_chunk_type** (string, enum, required):
    - **Instruction**: Insert the type of the extracted object.
    - **Example**: `"table"`

2. **f_chunk_title** (string):
    - **Instruction**: Identify the name of the object. If a name is present, use it directly. If a table lacks a title, generate a concise and descriptive name based on the content of the table.
    - **Example**: `"Comparative Characteristics of E-Axle Motors"`

3. **g_chunk_contents** (string):
    - **Instruction**: Extract the data from the table and store it in a comma-separated format.
    - **Example**: `"Model, Power Output (kW), Torque (Nm), Efficiency (%), Weight (kg), Max Speed (RPM), Cooling Type, Price ($)\nE-Motor X1000, 150, 320, 92, 85, 12,000, Liquid, 8,500\nE-Motor S800, 120, 280, 90, 78, 11,500, Air, 7,200\nE-Motor P750, 200, 360, 94, 95, 13,000, Liquid, 10,000\nE-Motor R600, 100, 250, 88, 72, 10,000, Air, 6,500\nE-Motor T900, 180, 340, 91, 88, 12,500, Liquid, 9,300"`

4. **h_chunk_keywords** (array, items: string):
    - **Instruction**: Generate a list of specific keywords that best represent the main topics, themes, or concepts of the given object. The keywords should be concise, relevant, and tailored to the content of the object.
    - **Example**: `["E-Axle motor", "power output", "torque comparison", "motor efficiency", "cooling type"]`

5. **i_chunk_summary** (string):
    - **Instruction**: Generate a summary of the given object, keeping it concise and limited to no more than 80 tokens.
    - **Example**: `"The table compares various E-Axle motor models based on key characteristics, including power output, torque, efficiency, weight, maximum speed, cooling type, and price. It highlights differences in performance metrics and cooling methods, offering a quick reference for selecting suitable motor options."`

- For "chart" object:

1. **e_chunk_type** (string, enum, required):
    - **Instruction**: Insert the type of the extracted object.
    - **Example**: `"chart"`

2. **f_chunk_title** (string):
    - **Instruction**: Identify the name of the object. If a name is present, use it directly. If the object lacks a title, generate a concise and descriptive name based on the content of the object.
    - **Example**: `"Performance Analysis of Inverters Across Different Load Conditions"`

3. **g_chunk_contents** (string):
    - **Instruction**: Describe all the data presented in the object, including every detail, numerical value, and characteristic without omitting any information. Ensure the description is comprehensive and covers each aspect of the data clearly.
    - **Example**: `"The bar chart titled **\"Performance Analysis of Inverters Across Different Load Conditions\"** presents data for four inverter models: Inverter A, Inverter B, Inverter C, and Inverter D. It compares two key metrics: Efficiency (%) and Power Output (kW).\n\n- **Inverter A** has an efficiency of **88%** and a power output of **50 kW**.\n- **Inverter B** achieves a higher efficiency of **92%** with a power output of **150 kW**.\n- **Inverter C** has an efficiency of **85%** and a power output of **300 kW**.\n- **Inverter D** has the lowest efficiency among the four at **78%** but offers the highest power output of **400 kW**.\n\nEach bar represents these values clearly, showing the variation in both efficiency and power output across different models. The x-axis displays the inverter models, while the y-axis represents the numerical values for both metrics."`

4. **h_chunk_keywords** (array, items: string):
    - **Instruction**: Generate a list of specific keywords that best represent the main topics, themes, or concepts of the given object. The keywords should be concise, relevant, and tailored to the content of the object.
    - **Example**: `["Inverter performance", "Efficiency comparison", "Power output", "Inverter models", "Load conditions"]`

5. **i_chunk_summary** (string):
    - **Instruction**: Generate a summary of the given object, keeping it concise and limited to no more than 80 tokens.
    - **Example**: `"The chart compares four inverter models (A, B, C, D) based on efficiency (%) and power output (kW). Inverter B has the highest efficiency (92%), while Inverter D offers the highest power output (400 kW)."`

- For "diagram" object:

1. **e_chunk_type** (string, enum, required):
    - **Instruction**: Insert the type of the extracted object.
    - **Example**: `"diagram"`

2. **f_chunk_title** (string):
    - **Instruction**: Identify the name of the object. If a name is present, use it directly. If the object lacks a title, generate a concise and descriptive name based on the content of the object.
    - **Example**: `"Operational Principle of a Hub E-Motor"`

3. **g_chunk_contents** (string):
    - **Instruction**: Describe the principle and process shown in the object, including all details, numerical values, and characteristics. Ensure the description is thorough, covering every aspect without omitting any information.

4. **h_chunk_keywords** (array, items: string):
    - **Instruction**: Generate a list of specific keywords that best represent the main topics, themes, or concepts of the given object. The keywords should be concise, relevant, and tailored to the content of the object.
    - **Example**: `["Hub E-Motor", "Stator", "Rotor", "Magnetic field", "Electric current"]`

5. **i_chunk_summary** (string):
    - **Instruction**: Generate a summary of the given object, keeping it concise and limited to no more than 80 tokens.
    - **Example**: `"The diagram shows the operational principle of a hub E-motor, highlighting key components like the stator, rotor, and windings. It illustrates how electric current in the stator generates a magnetic field, creating torque that spins the rotor and drives the wheel directly."`

- For "image" object:

1. **e_chunk_type** (string, enum, required):
    - **Instruction**: Insert the type of the extracted object.
    - **Example**: `"image"`

2. **f_chunk_title** (string):
    - **Instruction**: Identify the name of the object. If a name is present, use it directly. If the object lacks a title, generate a concise and descriptive name of the object.
    - **Example**: `"Operational Principle of a Hub E-Motor"`

3. **g_chunk_contents** (string):
    - **Instruction**: Describe the object, including all details, numerical values, and characteristics. Ensure the description is thorough, covering every aspect without omitting any information.

4. **h_chunk_keywords** (array, items: string):
    - **Instruction**: Generate a list of specific keywords that best represent the main topics, themes, or concepts of the given object. The keywords should be concise, relevant, and tailored to the content of the object.
    - **Example**: `["Power Inverter", "DC-AC Conversion", "Circuit Board", "Capacitors", "Inductors"]`

5. **i_chunk_summary** (string):
    - **Instruction**: Generate a short description of the given object, keeping it concise and limited to no more than 80 tokens.
    - **Example**: `"A power inverter designed for converting DC to AC power, featuring a metallic casing with cooling fins, exposed circuit board, and components like capacitors, inductors, and power transistors. It includes connection ports, status LEDs, and labels for easy identification. Suitable for regulating voltage in various electrical systems."`

</INSTRUCTIONS>
"""


## Define Response Schema

In [None]:
# Define the response schema for General prompt
response_schema_gen = {
    "type": "array",
    "items": {
        "type": "object",
        "properties": {
            "n_document_name": {
                "type": "string"
            },
            "d_document_date": {
                "type": "string",
                "format": "date"
            },
            "o_document_author": {
                "type": "string"
            },
            "p_document_type": {
                "type": "string",
                "enum": [
                    "Research Paper",
                    "Report",
                    "Presentation",
                    "Publication",
                    "Technical Documentation",
                    "Other"
                ]
            },
            "q_document_keywords": {
                "type": "array",
                "items": {
                    "type": "string"
                },
                "maxItems": 15
            },
            "r_document_summary": {
                "type": "string"
            }
        },
        "required": [
            "n_document_name",
            "d_document_date",
            "o_document_author",
            "p_document_type",
            "q_document_keywords",
            "r_document_summary"
        ]
    }
}


In [None]:
# Define the response schema for Chunks prompt
response_schema_chunks = {
    "type": "array",
    "items": {
        "type": "object",
        "properties": {
            "e_chunk_type": {
                "type": "string",
                "enum": ["text", "table", "chart", "graph", "diagram", "image"]
            },
            "f_chunk_title": {
                "type": "string"
            },
            "g_chunk_contents": {
                "type": "string"
            },
            "h_chunk_keywords": {
                "type": "array",
                "items": {
                    "type": "string"
                },
                "maxItems": 5
            },
            "i_chunk_summary": {
                "type": "string"
            }
        },
        "required": [
            "e_chunk_type",
            "f_chunk_title",
            "g_chunk_contents",
            "h_chunk_keywords",
            "i_chunk_summary"
        ]
    }
}





## Define Generation Configuration

In [None]:
# Define generation configuration for General Prompt
config_gen = GenerationConfig(
    candidate_count=None,
    stop_sequences=None,
    max_output_tokens=8192,
    temperature=None,
    top_p=None,
    top_k=None,
    response_mime_type="application/json",
    response_schema=response_schema_gen
)

# Initialize model
model_gen = GenerativeModel(model_name="gemini-1.5-flash-002")

In [None]:
# Define generation configuration for Chunks Prompt
config_chunks = GenerationConfig(
    candidate_count=None,
    stop_sequences=None,
    max_output_tokens=8192,
    temperature=1,
    top_p=0.95,
    top_k=None,
    response_mime_type="application/json",
    response_schema=response_schema_chunks
)

# Initialize model
model_chunks = GenerativeModel(model_name="gemini-1.5-flash-002")

## Define Supporting Functions

### Process an entire document

In [None]:
def process_pdf_gen(gs_uri, bucket_name):
    # Initialize storage client within the thread to ensure thread safety
    storage_client = storage.Client()

    # Assign a unique session ID
    aa_document_session_id = str(uuid.uuid4())

    # Record start time
    document_session_start = datetime.now(paris_tz)
    ab_document_session_start = document_session_start.replace(tzinfo=None)  # Strip time zone info to keep local Paris time

    # Initialize result dictionary
    gen_result = {}

    try:
        # Load the generative model
        model = GenerativeModel(model_name="gemini-1.5-flash-002")

        # Prepare the content
        pdf_file_part = Part.from_uri(uri=gs_uri, mime_type="application/pdf")
        contents = [pdf_file_part, prompt_gen]

        # Generate the content
        response = model.generate_content(contents, generation_config=config_gen)

        # Process the response
        text_field = response.text.strip()
        text_data = json.loads(text_field)
        doc_data = text_data[0]

        fields_to_extract = [
            "n_document_name",
            "d_document_date",
            "o_document_author",
            "p_document_type",
            "q_document_keywords",
            "r_document_summary"
        ]

        gen_result = {key: doc_data.get(key) for key in fields_to_extract}

        # Additional fields
        additional_fields = {
            "ai_document_finish_reason": response.candidates[0].finish_reason.name,
            "ak_document_prompt_token_count": response.usage_metadata.prompt_token_count,
            "am_document_candidates_token_count": response.usage_metadata.candidates_token_count,
            "ao_document_total_token_count": response.usage_metadata.total_token_count,
            "aq_document_model_version": response._raw_response.model_version
        }

        gen_result.update(additional_fields)

        # Record end time and calculate session duration
        document_session_end = datetime.now(paris_tz)
        ac_document_session_end = document_session_end.replace(tzinfo=None)  # Strip time zone info to keep local Paris time
        session_duration = ac_document_session_end - ab_document_session_start
        ad_document_session_duration = str(timedelta(seconds=session_duration.total_seconds())).split(",")[-1].strip()

        # Add session information
        gen_result.update({
            "aa_document_session_id": aa_document_session_id,
            "ab_document_session_start": ab_document_session_start.isoformat(),
            "ac_document_session_end": ac_document_session_end.isoformat(),
            "ad_document_session_duration": ad_document_session_duration
        })

        # Extract base folder and file name
        base_folder = os.path.dirname(gs_uri.replace(f"gs://{bucket_name}/", ""))
        file_name = os.path.basename(gs_uri)
        file_base_name = file_name.rsplit('.', 1)[0]

        # Save the JSON to GCS
        bucket = storage_client.bucket(bucket_name)
        output_file_name = f"{file_base_name}_gen.json"
        output_path = f"{base_folder}/{output_file_name}"
        output_blob = bucket.blob(output_path)
        output_blob.upload_from_string(json.dumps(gen_result, indent=2), content_type='application/json')

        print(f"General LLM processing completed successfully for {gs_uri}")

        return gen_result, file_base_name, base_folder

    except Exception as e:
        print(f"An error occurred during LLM processing: {e}")

        # Record end time and calculate session duration
        document_session_end = datetime.now(paris_tz)
        ac_document_session_end = document_session_end.replace(tzinfo=None)  # Strip time zone info to keep local Paris time
        session_duration = ac_document_session_end - ab_document_session_start
        ad_document_session_duration = str(timedelta(seconds=session_duration.total_seconds())).split(",")[-1].strip()

        # Add session information to result
        gen_result.update({
            "aa_document_session_id": aa_document_session_id,
            "ab_document_session_start": ab_document_session_start.isoformat(),
            "ac_document_session_end": ac_document_session_end.isoformat(),
            "ad_document_session_duration": ad_document_session_duration,
            "error": str(e)
        })

        # Save the error information to GCS
        bucket = storage_client.bucket(bucket_name)
        output_file_name = f"{file_base_name}_gen_error.json"
        output_path = f"{base_folder}/{output_file_name}"
        output_blob = bucket.blob(output_path)
        output_blob.upload_from_string(json.dumps(gen_result, indent=2), content_type='application/json')

        return gen_result, file_base_name, base_folder


### Process page by page

In [None]:
def process_page(blob, bucket_name):
    storage_client = storage.Client()

    try:
        page_file_uri = f"gs://{bucket_name}/{blob.name}"
        page_file_name = os.path.basename(blob.name)

        # Extract page number
        page_number = page_file_name.split('_page_')[1].split('_')[0]

        # Generate authenticated URL
        page_file_url = f"https://storage.cloud.google.com/{bucket_name}/{quote(blob.name)}"

        ae_chunks_session_id = str(uuid.uuid4())
        chunks_session_start = datetime.now(paris_tz)
        af_chunks_session_start = chunks_session_start.replace(tzinfo=None)  # Strip time zone info to keep local Paris time

        print(f"Processing file: {page_file_uri}")

        model_text = GenerativeModel(model_name="gemini-1.5-flash-002")

        # Prepare content for the LLM
        pdf_file_part = Part.from_uri(uri=page_file_uri, mime_type="application/pdf")
        contents = [pdf_file_part, prompt_chunks]
        response = model_text.generate_content(contents, generation_config=config_chunks)

        text_field = response.text.strip()
        text_data = json.loads(text_field)

        page_chunks = []
        for doc_data in text_data:
            # Extract required fields
            fields_to_extract = [
                "e_chunk_type",
                "f_chunk_title",
                "g_chunk_contents",
                "h_chunk_keywords",
                "i_chunk_summary"
            ]
            chunk_result = {key: doc_data.get(key) for key in fields_to_extract}

            # Record end time and calculate session duration
            chunks_session_end = datetime.now(paris_tz)
            ag_chunks_session_end = chunks_session_end.replace(tzinfo=None)  # Strip time zone info to keep local Paris time
            chunks_session_duration = ag_chunks_session_end - af_chunks_session_start
            ah_chunks_session_duration = str(timedelta(seconds=chunks_session_duration.total_seconds())).split(",")[-1].strip()

            # Add additional fields
            chunk_result.update({
                "a_chunk_id": str(uuid.uuid4()),
                "b_page_number": int(page_number),
                "aj_chunks_finish_reason": response.candidates[0].finish_reason.name,
                "al_chunks_prompt_token_count": response.usage_metadata.prompt_token_count,
                "an_chunks_candidates_token_count": response.usage_metadata.candidates_token_count,
                "ap_chunks_total_token_count": response.usage_metadata.total_token_count,
                "ar_chunks_model_version": response._raw_response.model_version,
                "ae_chunks_session_id": ae_chunks_session_id,
                "af_chunks_session_start": af_chunks_session_start.isoformat(),
                "ag_chunks_session_end": ag_chunks_session_end.isoformat(),
                "ah_chunks_session_duration": ah_chunks_session_duration,
                "s_page_gsutil": page_file_uri,
                "t_page_url": page_file_url
            })

            # Ensure alphabetical order of keys
            chunk_result = dict(sorted(chunk_result.items()))

            page_chunks.append(chunk_result)

        print(f"Processed and extracted chunks for {page_file_uri}")

        return page_chunks

    except Exception as page_error:
        print(f"Error processing {page_file_uri}: {page_error}")
        return None


### Convert JSON to NDJSON

In [None]:
def convert_to_ndjson(json_data):
    # Deserialize the JSON string into a Python list of dictionaries
    python_data = json.loads(json_data)
    # Convert the list of dictionaries to NDJSON
    ndjson_lines = "\n".join(json.dumps(record) for record in python_data)
    return ndjson_lines


### Upload NDJON to Bigquery

In [None]:
def load_into_bigquery_from_gcs(gcs_uri):
    try:
        # Initialize BigQuery client
        client = bigquery.Client()

        dataset_id = "your-project-id.YOUR_DATASET"
        table_id = f"{dataset_id}.CHUNKS"

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            max_bad_records=10,            # Allow up to 10 bad records
            ignore_unknown_values=False    # Fail on unknown fields
        )

        load_job = client.load_table_from_uri(
            f'gs://{bucket_name}/{gcs_uri}',
            table_id,
            job_config=job_config
        )

        print(f"Load job created (job ID: {load_job.job_id}). Waiting for it to complete...")

        # Optionally poll the job to show progress (instead of immediate load_job.result()).
        while not load_job.done():
            print("  Job still running...")
            time.sleep(5)  # Sleep or remove/adjust as needed

        # If you prefer a direct block-until-complete approach (less chatty):
        # load_job.result()

        # --- Check job state ---
        if load_job.state == "DONE":
            if load_job.error_result:
                # This means the job finished but had an overall error
                print(f"Job {load_job.job_id} completed with errors:")
                print(load_job.error_result)
                # You might raise an Exception here to bubble it up if desired:
                # raise RuntimeError(f"BigQuery load job failed: {load_job.error_result}")
            else:
                print(f"Job {load_job.job_id} completed successfully!")
                # Check the loaded table’s row count
                table = client.get_table(table_id)
                print(f"Total rows now in {table_id}: {table.num_rows}")
        else:
            print(f"Job {load_job.job_id} ended in state: {load_job.state}")
            if load_job.errors:
                print("Detailed errors:")
                for err in load_job.errors:
                    print(f"  {err}")
            # You can raise an exception if you want to fail the script:
            # raise RuntimeError(f"BigQuery job ended in state {load_job.state}. Errors: {load_job.errors}")

    except Exception as e:
        print(f"An error occurred while loading data: {e}")


### Main processing function

In [None]:
def process_file(file_number):
    # Derive number_prefix from file_number
    number_prefix = f"{int(file_number):06d}"

    # Initialize per-file variables
    metadata = {}
    gen_result = {}
    all_chunks = []
    base_folder = ''
    file_base_name = ''
    processing_failed = False  # Initialize failure flag
    status = 'parsed'  # Default status

    # Record start time
    paris_tz = ZoneInfo("Europe/Paris")
    paris_time = datetime.now(paris_tz)
    session_start = paris_time.replace(tzinfo=None)  # Strip time zone info to keep local Paris time

    try:
        # **Step 1: Programmatic File Processing**

        # Initialize storage client within the function
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        # Find the folder that starts with number_prefix
        blobs = storage_client.list_blobs(bucket_name, prefix=base_gcs_folder + '/', delimiter='/')
        folder_prefixes = set()
        for page in blobs.pages:
            folder_prefixes.update(page.prefixes)

        folder_name = None
        for prefix in folder_prefixes:
            folder = prefix.rstrip('/').split('/')[-1]
            if folder.startswith(number_prefix + '_'):
                folder_name = folder
                break

        if not folder_name:
            print(f"No folder found for number_prefix {number_prefix}")
            status = 'failed'

            # Return failure status
            return {
                'file_number': file_number,
                'status': status,
                'ndjson_gcs_path': None,
                'timestamp_parsed': datetime.now(paris_tz).replace(tzinfo=None).isoformat()
            }

        folder_path = f"{base_gcs_folder}/{folder_name}"

        print(f"Processing folder {folder_path}...")

        # Now list all blobs under that folder
        blobs = storage_client.list_blobs(bucket_name, prefix=folder_path + '/')

        file_blob = None
        for blob in blobs:
            blob_name = blob.name.split('/')[-1]
            if blob_name.startswith(number_prefix + '_'):
                file_blob = blob
                break

        if not file_blob:
            print(f"No file found in folder {folder_path} starting with {number_prefix}_")
            status = 'failed'

            # Return failure status
            return {
                'file_number': file_number,
                'status': status,
                'ndjson_gcs_path': None,
                'timestamp_parsed': datetime.now(paris_tz).replace(tzinfo=None).isoformat()
            }

        # Now we have the file_blob to process
        source_blob = file_blob
        source_filename = source_blob.name.split('/')[-1]
        filename_without_ext = os.path.splitext(source_filename)[0]
        local_file_name = source_filename
        local_file_path = f"/tmp/{local_file_name}"

        # Check file extension
        file_extension = os.path.splitext(local_file_name)[1].lower()
        if file_extension != '.pdf':
            print(f"File {local_file_name} is not a PDF. Skipping.")
            status = 'failed'

            # Return failure status
            return {
                'file_number': file_number,
                'status': status,
                'ndjson_gcs_path': None,
                'timestamp_parsed': datetime.now(paris_tz).replace(tzinfo=None).isoformat()
            }

        # Download the file locally for processing
        source_blob.download_to_filename(local_file_path)

        print(f"Processing file {source_filename}...")

        # Open the PDF with PyMuPDF and analyze it
        doc = pymupdf.open(local_file_path)
        num_pages = doc.page_count
        layout = "landscape" if doc[0].rect.width > doc[0].rect.height else "portrait"

        # Adjust page numbering format based on total pages
        num_digits = len(str(num_pages))

        # Create local directory for pages with prefix included
        pages_folder_name = f"{filename_without_ext}_pages"
        local_pages_folder = f"/tmp/{pages_folder_name}"
        os.makedirs(local_pages_folder, exist_ok=True)

        # Save each page as a separate PDF file and upload to GCS
        for i in tqdm(range(num_pages), desc=f'Processing {source_filename}', unit='page'):
            try:
                # Format page number based on total pages
                page_num_str = f"{i+1:0{num_digits}d}"
                page_filename = f"{filename_without_ext}_page_{page_num_str}_{num_pages}.pdf"
                local_page_path = f"{local_pages_folder}/{page_filename}"
                # GCS pages folder
                gcs_page_path = f"{folder_path}/{pages_folder_name}/{page_filename}"

                # Create a new document with just this page
                single_page_doc = pymupdf.open()
                single_page_doc.insert_pdf(doc, from_page=i, to_page=i)
                single_page_doc.save(local_page_path)
                single_page_doc.close()

                # Upload the single-page PDF to GCS
                page_blob = bucket.blob(gcs_page_path)
                page_blob.upload_from_filename(local_page_path, content_type='application/pdf')

                # Optionally, delete the local page file to save space
                os.remove(local_page_path)
            except Exception as e:
                print(f"An error occurred while processing page {i+1} of {source_filename}: {e}")
                processing_failed = True
                status = 'failed'
                break  # Stop processing pages if one fails

        if processing_failed:
            print(f"Processing of pages failed for {source_filename}")

        # Close the main document
        doc.close()

        # Calculate SHA-256 hash of the file
        sha256_hash = hashlib.sha256()
        with open(local_file_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)

        # Record end time and calculate session duration
        paris_time_end = datetime.now(paris_tz)
        session_end = paris_time_end.replace(tzinfo=None)  # Strip time zone info to keep local Paris time
        session_duration = session_end - session_start
        duration_str = str(session_duration)

        # Construct the authenticated URL using the blob's properties
        authenticated_url = f"https://storage.cloud.google.com/{bucket_name}/{quote(source_blob.name)}"

        # Construct the gsutil URI
        gsutil_uri = f"gs://{bucket_name}/{source_blob.name}"

        # Save JSON metadata
        metadata = {
            "type": "object",
            "properties": {
                "j_file_number": int(number_prefix),
                "k_file_hash": sha256_hash.hexdigest(),
                "l_file_name": filename_without_ext,
                "u_file_gsutil": gsutil_uri,
                "v_file_url": authenticated_url,
                "m_file_layout": layout,
                "c_file_pages": num_pages,
                "w_file_session_start": session_start.isoformat(),
                "x_file_session_end": session_end.isoformat(),
                "y_file_session_duration": duration_str,
                "z_file_session_status": "done"
            }
        }

        json_filename = f"{filename_without_ext}_prep.json"
        json_path = f"{folder_path}/{json_filename}"

        # Save JSON to GCS
        try:
            json_blob = bucket.blob(json_path)
            json_blob.upload_from_string(json.dumps(metadata, indent=4), content_type='application/json')
        except Exception as e:
            print(f"An error occurred while saving JSON metadata to GCS for {source_filename}: {e}")
            processing_failed = True
            status = 'failed'

        # Clean up local files
        os.remove(local_file_path)
        os.rmdir(local_pages_folder)

        print(f"Programmatic processing of {source_filename} completed successfully.\n")

        # **Step 2: Concurrent LLM Processing**

        # Extract base folder and file name
        base_folder = os.path.dirname(gsutil_uri.replace(f"gs://{bucket_name}/", ""))
        file_name = os.path.basename(gsutil_uri)
        file_base_name = file_name.rsplit('.', 1)[0]
        pages_folder = f"{base_folder}/{filename_without_ext}_pages"

        # Initialize storage client
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blobs = list(bucket.list_blobs(prefix=pages_folder))

        if not blobs:
            print(f"No files found in the folder: {pages_folder}")
            processing_failed = True
            status = 'failed'

            # Return failure status
            return {
                'file_number': file_number,
                'status': status,
                'ndjson_gcs_path': None,
                'timestamp_parsed': datetime.now(paris_tz).replace(tzinfo=None).isoformat()
            }

        # Filter and sort blobs
        blobs = [blob for blob in sorted(blobs, key=lambda x: x.name) if blob.name.endswith('.pdf')]

        # Prepare tasks
        tasks = []

        # Add the process_pdf_gen task
        tasks.append(('gen', gsutil_uri))

        # Add per-page processing tasks
        for blob in blobs:
            tasks.append(('page', blob))

        # Process tasks concurrently
        total_tasks = len(tasks)

        # Use a ThreadPoolExecutor with max_workers set to num_pages + 1 (for 'gen' task)
        with ThreadPoolExecutor(max_workers=total_tasks) as executor:
            future_to_task = {}
            for task in tasks:
                if task[0] == 'gen':
                    # Submit the process_pdf_gen function
                    future = executor.submit(process_pdf_gen, task[1], bucket_name)
                elif task[0] == 'page':
                    # Submit the process_page function
                    future = executor.submit(process_page, task[1], bucket_name)
                future_to_task[future] = task

            for future in as_completed(future_to_task):
                task = future_to_task[future]
                try:
                    result = future.result()
                    if task[0] == 'gen':
                        gen_result_local, file_base_name_local, base_folder_local = result
                        if not gen_result_local:
                            processing_failed = True
                            status = 'failed'
                            print(f"Processing of general summary failed for {source_filename}")
                        else:
                            gen_result = gen_result_local
                            file_base_name = file_base_name_local
                            base_folder = base_folder_local
                    elif task[0] == 'page':
                        if result:
                            all_chunks.extend(result)
                        else:
                            processing_failed = True
                            status = 'failed'
                            print(f"Processing of page failed for {task[1].name}")
                except Exception as e:
                    print(f"An exception occurred while processing task {task}: {e}")
                    processing_failed = True
                    status = 'failed'

        if not processing_failed:
            # **Save all_chunks before merging**
            chunks_output_file_name = f"{file_base_name}_chunks.json"
            chunks_output_path = f"{base_folder}/{chunks_output_file_name}"

            try:
                chunks_blob = bucket.blob(chunks_output_path)
                chunks_blob.upload_from_string(json.dumps(all_chunks, indent=2), content_type='application/json')
            except Exception as e:
                print(f"An error occurred while saving chunks JSON to GCS for {source_filename}: {e}")
                processing_failed = True
                status = 'failed'

            # **Merge Metadata and Results**

            final_chunks = all_chunks
            sorted_chunks = []

            cleaned_metadata = metadata.get('properties', {})

            for chunk in final_chunks:
                chunk.update(cleaned_metadata)  # Merge metadata fields into the chunk
                chunk.update(gen_result)        # Merge result fields into the chunk

                # Define a sorting function that extracts the prefix before the underscore
                def sort_key(item):
                    key = item[0]
                    prefix = key.split('_')[0]
                    return (len(prefix), prefix)

                # Sort the keys using the custom sort function
                sorted_chunk = OrderedDict(sorted(chunk.items(), key=sort_key))
                sorted_chunks.append(sorted_chunk)

            # Ensure 'b_page_number' is present and is an integer
            sorted_chunks = sorted(sorted_chunks, key=lambda chunk: int(chunk.get('b_page_number', 0)))

            # Convert the updated chunks to JSON without sorting the keys
            merged_json = json.dumps(sorted_chunks, indent=2)

            # Save JSON to GCS
            output_file_name = f"{file_base_name}_merged.json"
            output_path = f"{base_folder}/{output_file_name}"

            # Create a blob and upload the JSON string
            try:
                output_blob = bucket.blob(output_path)
                output_blob.upload_from_string(merged_json, content_type='application/json')
            except Exception as e:
                print(f"An error occurred while saving merged JSON to GCS for {source_filename}: {e}")
                processing_failed = True
                status = 'failed'

            print(f"Saved merged JSON to {output_path}")

            # **Convert JSON to NDJSON**

            # Convert JSON string to NDJSON
            ndjson_data = convert_to_ndjson(merged_json)

            if not ndjson_data:
                print(f"Conversion to NDJSON failed for {source_filename}")
                processing_failed = True
                status = 'failed'
            else:
                # Save NDJSON to GCS
                output_file_name_nd = f"{file_base_name}_merged_nd.json"
                output_path_nd = f"{base_folder}/{output_file_name_nd}"

                try:
                    output_blob_nd = bucket.blob(output_path_nd)
                    output_blob_nd.upload_from_string(ndjson_data, content_type='application/json')
                except Exception as e:
                    print(f"An error occurred while saving NDJSON to GCS for {source_filename}: {e}")
                    processing_failed = True
                    status = 'failed'

                print(f"Saved merged NDJSON to {output_path_nd}")

            # After merging, record timestamp_parsed
            timestamp_parsed = datetime.now(paris_tz).replace(tzinfo=None)  # Strip timezone info

            # Return the necessary information for later processing
            return {
                'file_number': file_number,
                'status': status,
                'ndjson_gcs_path': output_path_nd if not processing_failed else None,
                'timestamp_parsed': timestamp_parsed.isoformat()
            }

        else:
            print(f"Skipping merging due to earlier failures for {source_filename}")
            timestamp_parsed = datetime.now(paris_tz).replace(tzinfo=None)

            # Return failure status with timestamp_parsed
            return {
                'file_number': file_number,
                'status': status,
                'ndjson_gcs_path': None,
                'timestamp_parsed': timestamp_parsed.isoformat()
            }

    except Exception as e:
        print(f"An error occurred while processing file number {file_number}: {e}")
        processing_failed = True
        status = 'failed'
        timestamp_parsed = datetime.now(paris_tz).replace(tzinfo=None)

        # Handle exception, update the metadata if necessary
        return {
            'file_number': file_number,
            'status': status,
            'ndjson_gcs_path': None,
            'timestamp_parsed': timestamp_parsed.isoformat()
        }



## Execution

In [None]:
#------SCRIPT FOR CHECKING THE ELIGIBLE FILES AND SETTING UP A LOOP-------------------------

# --- SNIPPET FOR SETTING A LOOP 1. Start

MAX_RUNS = 5  # Number of maximum runs you want; change as needed
run_number = 0
previous_files_count = None

while True:
    run_number += 1
    print(f"\n========== RUN #{run_number}/{MAX_RUNS} ==========\n")

    # --- SNIPPET FOR SETTING A LOOP 1. End

    # Initialize Google Cloud clients
    storage_client = storage.Client()
    bq_client = bigquery.Client()

    # Set Paris timezone
    paris_tz = ZoneInfo("Europe/Paris")

    # Configurable parameter: Maximum files to process in this run
    MAX_FILES_TO_PROCESS = None  # Change to control number of files processed in one run

    # Base GCS folder and bucket name
    base_gcs_folder = 'Sample Drive Folder'
    bucket_name = 'your-project-id-knowledge-base'
    bucket = storage_client.bucket(bucket_name)

    # Query BigQuery to get all files where 'parsed' is FALSE
    query = """
    SELECT file_number
    FROM `your-project-id.YOUR_DATASET.FILE_LIST`
    WHERE copied_to_gcs = TRUE
      AND parsed IN ('false', 'failed')
    """
    query_job = bq_client.query(query)
    results = query_job.result()

    # Get all eligible file_numbers as integers
    eligible_files = [int(row.file_number) for row in results]
    total_eligible_files = len(eligible_files)

    # Print the number of eligible files
    print(f"Total number of eligible files: {total_eligible_files}")

    # Apply the processing limit if specified
    if MAX_FILES_TO_PROCESS is not None:
        files_to_process = eligible_files[:MAX_FILES_TO_PROCESS]
    else:
        files_to_process = eligible_files  # Process all files if no limit is set

    print(f"Number of files to be processed in this run: {len(files_to_process)}")

    #------MAIN SCRIPT-------------------------

    # Initialize counters and script start time
    total_files_processed = 0
    script_start_time = datetime.now(paris_tz)

    API_LIMIT = 100  # Your API limit for concurrent tasks

    # Configurable parameter: Pause duration between batches in seconds
    BATCH_PAUSE_SECONDS = 60  # Set to 0 to disable pause between batches

    # Collect page counts for each file without using a separate function
    file_page_counts = []
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    for file_number in files_to_process:
        try:
            # Derive number_prefix from file_number
            number_prefix = f"{int(file_number):06d}"

            # Find the folder that starts with number_prefix
            blobs = storage_client.list_blobs(bucket_name, prefix=base_gcs_folder + '/', delimiter='/')
            folder_prefixes = set()
            for page in blobs.pages:
                folder_prefixes.update(page.prefixes)

            folder_name = None
            for prefix in folder_prefixes:
                folder = prefix.rstrip('/').split('/')[-1]
                if folder.startswith(number_prefix + '_'):
                    folder_name = folder
                    break

            if not folder_name:
                print(f"No folder found for number_prefix {number_prefix}")
                continue  # Skip to next file

            folder_path = f"{base_gcs_folder}/{folder_name}"

            # Now list all blobs under that folder
            blobs = storage_client.list_blobs(bucket_name, prefix=folder_path + '/')

            file_blob = None
            for blob in blobs:
                blob_name = blob.name.split('/')[-1]
                if blob_name.startswith(number_prefix + '_'):
                    file_blob = blob
                    break

            if not file_blob:
                print(f"No file found in folder {folder_path} starting with {number_prefix}_")
                continue  # Skip to next file

            # Now we have the file_blob to process
            source_blob = file_blob
            source_filename = source_blob.name.split('/')[-1]
            local_file_name = source_filename
            local_file_path = f"/tmp/{local_file_name}"

            # Check file extension
            file_extension = os.path.splitext(local_file_name)[1].lower()
            if file_extension != '.pdf':
                print(f"File {local_file_name} is not a PDF. Skipping.")
                continue  # Skip to next file

            # Download the file locally to read page count
            source_blob.download_to_filename(local_file_path)

            # Open the PDF with PyMuPDF and get the page count
            doc = pymupdf.open(local_file_path)
            num_pages = doc.page_count
            doc.close()

            # Clean up local file
            os.remove(local_file_path)

            # Append to the list
            file_page_counts.append((file_number, num_pages))

        except Exception as e:
            print(f"An error occurred while getting page count for file number {file_number}: {e}")
            continue  # Skip to next file

    # Batch files based on total page counts not exceeding API_LIMIT
    batches = []
    current_batch = []
    current_batch_page_count = 0

    for file_number, num_pages in file_page_counts:
        if current_batch_page_count + num_pages <= API_LIMIT:
            current_batch.append(file_number)
            current_batch_page_count += num_pages
        else:
            batches.append(current_batch)
            current_batch = [file_number]
            current_batch_page_count = num_pages

    if current_batch:
        batches.append(current_batch)

    # Initialize a list to collect processing results
    processing_results = []

    # Process files in batches
    for batch_num, batch in enumerate(batches, start=1):
        print(f"Processing batch {batch_num}/{len(batches)} with files: {batch}")

        # Number of concurrent file workers is the number of files in the batch
        max_file_workers = len(batch)

        # Process files concurrently within the batch
        with ThreadPoolExecutor(max_workers=max_file_workers) as executor:
            future_to_file_number = {
                executor.submit(process_file, file_number): file_number for file_number in batch
            }

            for future in as_completed(future_to_file_number):
                file_number = future_to_file_number[future]
                try:
                    result = future.result()
                    processing_results.append(result)
                    if result['status'] == 'parsed':
                        total_files_processed += 1  # Increment processed files counter
                    else:
                        print(f"Processing failed for file number {file_number}")
                except Exception as e:
                    print(f"File {file_number} generated an exception: {e}")

        # Pause between batches if BATCH_PAUSE_SECONDS is greater than 0 and not the last batch
        if BATCH_PAUSE_SECONDS > 0 and batch_num < len(batches):
            print(f"Pausing for {BATCH_PAUSE_SECONDS} seconds before processing the next batch...")
            time.sleep(BATCH_PAUSE_SECONDS)

    # === New Code Start ===
    # After processing all files, proceed to load data into BigQuery and update statuses

    # Collect status updates
    status_updates = []

    for result in processing_results:
        file_number = result['file_number']
        status = result['status']
        timestamp_parsed = result.get('timestamp_parsed', datetime.now(paris_tz).replace(tzinfo=None).isoformat())
        status_updates.append({
            'file_number': file_number,
            'parsed': status,
            'timestamp_parsed': timestamp_parsed
        })

    # Save status updates to JSON and NDJSON
    status_updates_json = json.dumps(status_updates, indent=2)
    status_updates_ndjson = '\n'.join(json.dumps(record) for record in status_updates)

    # Save to GCS
    timestamp_str = datetime.now(paris_tz).strftime("%Y%m%d%H%M%S")
    status_json_path = f"{base_gcs_folder}/!status_json/status_{timestamp_str}.json"
    status_ndjson_path = f"{base_gcs_folder}/!status_json/status_{timestamp_str}.ndjson"

    status_blob_json = bucket.blob(status_json_path)
    status_blob_ndjson = bucket.blob(status_ndjson_path)

    status_blob_json.upload_from_string(status_updates_json, content_type='application/json')
    status_blob_ndjson.upload_from_string(status_updates_ndjson, content_type='application/json')

    # Load the status NDJSON into a temporary BigQuery table
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        schema=[
            bigquery.SchemaField("file_number", "INTEGER"),
            bigquery.SchemaField("parsed", "STRING"),
            bigquery.SchemaField("timestamp_parsed", "DATETIME"),
        ],
    )
    tmp_table_id = "your-project-id.YOUR_DATASET._tmp_status_updates"

    load_job = bq_client.load_table_from_uri(
        f'gs://{bucket_name}/{status_ndjson_path}',
        tmp_table_id,
        job_config=job_config
    )

    load_job.result()

    # Perform a MERGE operation to update the main table
    merge_query = """
    MERGE `your-project-id.YOUR_DATASET.FILE_LIST` T
    USING `your-project-id.YOUR_DATASET._tmp_status_updates` S
    ON T.file_number = S.file_number
    WHEN MATCHED THEN
      UPDATE SET
        T.parsed = S.parsed,
        T.timestamp_parsed = S.timestamp_parsed
    """

    query_job = bq_client.query(merge_query)
    query_job.result()

    # Optionally, delete the temporary table
    bq_client.delete_table(tmp_table_id, not_found_ok=True)

    # Concatenate NDJSON files and load into BigQuery
    ndjson_contents = []
    successful_file_numbers = []

    for result in processing_results:
        ndjson_gcs_path = result['ndjson_gcs_path']
        status = result['status']
        file_number = result['file_number']

        if ndjson_gcs_path and status == 'parsed':
            ndjson_blob = bucket.blob(ndjson_gcs_path)
            ndjson_data = ndjson_blob.download_as_text()
            ndjson_contents.append(ndjson_data)
            successful_file_numbers.append(file_number)

    # Combine and save the combined NDJSON to GCS
    if ndjson_contents:
        combined_ndjson_data = '\n'.join(ndjson_contents)
        timestamp_str = datetime.now(paris_tz).strftime("%Y%m%d%H%M%S")
        combined_ndjson_path = f"{base_gcs_folder}/!session_json/session_{timestamp_str}.json"
        combined_ndjson_blob = bucket.blob(combined_ndjson_path)
        combined_ndjson_blob.upload_from_string(combined_ndjson_data, content_type='application/json')

        try:
            load_into_bigquery_from_gcs(combined_ndjson_path)
        except Exception as e:
            print(f"An error occurred while loading combined NDJSON into BigQuery: {e}")
            # Handle failure accordingly
        else:
            print("Data loaded into BigQuery successfully.")
    # === New Code End ===

    # After processing all files, calculate total processing time and print summary
    script_end_time = datetime.now(paris_tz)
    total_duration = script_end_time - script_start_time
    total_duration_str = str(total_duration)

    # Print the summary
    print("====================================")
    print(f"Total number of files processed: {total_files_processed}")
    print(f"Total processing time: {total_duration_str}")
    print("====================================")

    # --- SNIPPET 2. Start ---

    current_files_count = len(files_to_process)
    if run_number >= MAX_RUNS:
        print(f"Reached the maximum of {MAX_RUNS} runs. Stopping.")
        break

    # If it's not the first run, stop if zero files or same as previous
    if run_number > 1:
        if current_files_count == 0 or current_files_count == previous_files_count:
            print("No new files to process or same as previous run. Stopping.")
            break

    previous_files_count = current_files_count

    # --- SNIPPET 2. End ---

## BigQuery Upload Status

This pipeline updates `FILE_LIST`, a BigQuery table that tracks all the files. It records files that have been parsed and successfully added to the `CHUNKS` table in BigQuery, indicating their availability for RAG.  

In [None]:
# Initialize BigQuery client
client = bigquery.Client()

# Define the UPDATE query with an additional condition
UPDATE_QUERY = """
UPDATE `your-project-id.YOUR_DATASET.FILE_LIST` AS T
SET
    T.added_to_bq = TRUE,
    T.timestamp_added = U.ag_chunks_session_end
FROM (
    SELECT
        FL.file_number,
        UC.j_file_number,
        UC.ag_chunks_session_end
    FROM `your-project-id.YOUR_DATASET.FILE_LIST` AS FL
    LEFT JOIN (
        SELECT
            j_file_number,
            ANY_VALUE(ag_chunks_session_end) AS ag_chunks_session_end
        FROM `your-project-id.YOUR_DATASET.CHUNKS`
        GROUP BY j_file_number
    ) AS UC
    ON FL.file_number = UC.j_file_number
) AS U
WHERE
    T.file_number = U.file_number
    AND U.j_file_number IS NOT NULL  -- Ensure there is a match in CHUNKS
    AND T.added_to_bq = FALSE;  -- Update only rows where added_to_bq is currently FALSE
"""

# Execute the query
try:
    query_job = client.query(UPDATE_QUERY)
    query_job.result()  # Wait for the query to finish

    # Get the exact count of updated rows
    updated_rows = query_job.num_dml_affected_rows
    print(f"Update query executed successfully. Rows updated: {updated_rows}")

except Exception as e:
    print(f"Error executing query: {e}")


# Update RAG Corpus

### Setup Environment

In [None]:
PROJECT_ID = "your-project-id"  # @param {type:"string"}
LOCATION = "europe-west4"  # @param {type:"string"}

vertexai.init(project=PROJECT_ID, location=LOCATION)

In [None]:
DATASET = "KNOWLEDGE_BASE"  # @param {type:"string"}
TABLE = "CORPUS"  # @param {type:"string"}

In [None]:
embedding_model = VertexAIEmbeddings(
    model_name="text-embedding-005", project=PROJECT_ID
)

In [None]:
bq_store = BigQueryVectorStore(
    project_id=PROJECT_ID,
    location=LOCATION,
    dataset_name=DATASET,
    table_name=TABLE,
    embedding=embedding_model,
    distance_type="EUCLIDEAN"
)

In [None]:
# --- Setup your BigQuery client ---
bq_client = bigquery.Client(project=PROJECT_ID)

# --- References to dataset and table ---
dataset_ref = bq_client.dataset(DATASET)
table_ref = dataset_ref.table(TABLE)

### Queries

In [None]:
# --- Define your QUERY ---
QUERY_EXISTING = f"""
SELECT
  a_chunk_id,
  b_page_number,
  c_file_pages,
  CAST(d_document_date AS TIMESTAMP) AS d_document_date,
  e_chunk_type,
  f_chunk_title,
  CASE
    WHEN e_chunk_type = 'table' THEN CONCAT(f_chunk_title, '. ', i_chunk_summary, ' ', g_chunk_contents)
    ELSE g_chunk_contents
  END AS g_chunk_contents,
  ARRAY_TO_STRING(h_chunk_keywords, ',') AS h_chunk_keywords,
  i_chunk_summary,
  j_file_number,
  k_file_hash,
  l_file_name,
  m_file_layout,
  n_document_name,
  o_document_author,
  p_document_type,
  ARRAY_TO_STRING(q_document_keywords, ',') AS q_document_keywords,
  r_document_summary,
  s_page_gsutil,
  t_page_url,
  u_file_gsutil,
  v_file_url,
  aa_document_session_id,
  ae_chunks_session_id
FROM `your-project-id.YOUR_DATASET.CHUNKS`
WHERE
  a_chunk_id NOT IN (
    SELECT a_chunk_id
    FROM `your-project-id.{DATASET}.{TABLE}`
  )
  AND (
    (e_chunk_type = 'text' AND LENGTH(g_chunk_contents) >= 300)
    OR
    (e_chunk_type IN ('table', 'chart', 'diagram', 'image'))
  )
"""

# print("Main query is executed\n")


In [None]:
QUERY_NEW = f"""
SELECT
  a_chunk_id,
  b_page_number,
  c_file_pages,
  CAST(d_document_date AS TIMESTAMP) AS d_document_date,
  e_chunk_type,
  f_chunk_title,
  CASE
    WHEN e_chunk_type = 'table' THEN CONCAT(f_chunk_title, '. ', i_chunk_summary, ' ', g_chunk_contents)
    ELSE g_chunk_contents
  END AS g_chunk_contents,
  ARRAY_TO_STRING(h_chunk_keywords, ',') AS h_chunk_keywords,
  i_chunk_summary,
  j_file_number,
  k_file_hash,
  l_file_name,
  m_file_layout,
  n_document_name,
  o_document_author,
  p_document_type,
  ARRAY_TO_STRING(q_document_keywords, ',') AS q_document_keywords,
  r_document_summary,
  s_page_gsutil,
  t_page_url,
  u_file_gsutil,
  v_file_url,
  aa_document_session_id,
  ae_chunks_session_id
FROM `your-project-id.YOUR_DATASET.CHUNKS`
WHERE
  (
    (e_chunk_type = 'text' AND LENGTH(g_chunk_contents) >= 300)
    OR
    (e_chunk_type IN ('table', 'chart', 'diagram', 'image'))
  )
"""

In [None]:
# --- Decide which query to use based on table existence and schema ---
try:
    table = bq_client.get_table(table_ref)  # API call
    # If the table exists, check if the schema is not empty
    if table.schema:
        print(f"Table '{TABLE}' found with schema. Using QUERY_EXISTING.")
        chosen_query = QUERY_EXISTING
    else:
        print(f"Table '{TABLE}' found but no schema. Using QUERY_NEW.")
        chosen_query = QUERY_NEW
except NotFound:
    print(f"Table '{TABLE}' does not exist. Using QUERY_NEW.")
    chosen_query = QUERY_NEW

## Setup Loader

In [None]:
# --- Continue with the loader ---
loader = BigQueryLoader(
    chosen_query,
    page_content_columns=["g_chunk_contents"],
    metadata_columns=[
        "a_chunk_id",
        "b_page_number",
        "c_file_pages",
        "d_document_date",
        "e_chunk_type",
        "f_chunk_title",
        "h_chunk_keywords",
        "i_chunk_summary",
        "j_file_number",
        "k_file_hash",
        "l_file_name",
        "m_file_layout",
        "n_document_name",
        "o_document_author",
        "p_document_type",
        "q_document_keywords",
        "r_document_summary",
        "s_page_gsutil",
        "t_page_url",
        "u_file_gsutil",
        "v_file_url",
        "aa_document_session_id",
        "ae_chunks_session_id"

    ]
)



In [None]:
# --- Optional: Run a separate COUNT query to see how many rows match ---
count_query = f"SELECT COUNT(*) as row_count FROM ({chosen_query})"
count_job = bq_client.query(count_query)
count_result = list(count_job.result())
num_rows = count_result[0].row_count if count_result else 0

print(f"Number of new rows meeting criteria: {num_rows}")

In [None]:
docs = loader.load()
num_docs = len(docs)

# Post-process to remove the unwanted "column name:"
for doc in docs:
    doc.page_content = doc.page_content.replace("g_chunk_contents: ", "")

print("Number of documents loaded into Python memory:", num_docs)

## Run Loader

In [None]:
# Only call add_documents if docs is non-empty
if num_docs > 0:
    doc_ids = bq_store.add_documents(docs)
    print("Number of documents successfully added to vector store:", len(doc_ids))
    print("Document IDs:", doc_ids)
else:
    print("No new rows found. Skipping vector store insertion.")
