In [None]:
%pip install --quiet google-cloud-documentai==2.31.0

import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()
    print("Authenticated")

PROJECT_ID = "<PROJECT_NAME_FROM_SETUP_NOTEBOOK>"
PROJECT_NUMBER = "<PROJECT_NUMBER_FROM_SETUP_NOTEBOOK>"
DATASET_ID = "<DATASET_ID_FROM_SETUP_NOTEBOOK>"

CONNECTION_NAME = "<CONNECTION_NAME_FROM_SETUP_NOTEBOOK>"
BUCKET_NAME = "<BUCKET_NAME_FROM_SETUP_NOTEBOOK>"
from google.cloud import bigquery, storage


# Initialize clients
bq_client = bigquery.Client(project=PROJECT_ID)
storage_client = storage.Client(project=PROJECT_ID)

# Get the list of existing tables in the dataset
existing_tables = [
    table.table_id for table in bq_client.list_tables(DATASET_ID)
]

# Get the list of PDF files in the GCS bucket
bucket = storage_client.bucket(BUCKET_NAME)
blobs = bucket.list_blobs()

for blob in blobs:
    if blob.name.endswith(".pdf"):
        table_name = blob.name.rsplit(".", 1)[0]  # Extract table name from filename (up to .pdf)
        if table_name not in existing_tables:
            # Create external table for this PDF
            query = f"""
            CREATE OR REPLACE EXTERNAL TABLE `{DATASET_ID}.{table_name}`
            WITH CONNECTION `{CONNECTION_NAME}`
            OPTIONS (
              uris = ['gs://{BUCKET_NAME}/{blob.name}'],
              object_metadata = 'DIRECTORY'
            );
            """

            query_job = bq_client.query(query)  # API request
            query_job.result()  # Waits for the query to complete

            print(f"External table {DATASET_ID}.{table_name} created successfully.")
        else:
            print(f"Table {DATASET_ID}.{table_name} already exists. Skipping.")

from google.cloud import bigquery, storage
import re

# Get the list of existing tables in the dataset
existing_tables = [
    table.table_id for table in bq_client.list_tables(DATASET_ID)
]

# Filter numeric-only table names
numeric_tables = [
    table for table in existing_tables if re.match(r'^\d+$', table)
]

# Iterate through numeric-only tables and create _result and _result_parsed tables
for table_name in numeric_tables:
    # Create _result table
    result_table_name = f"{table_name}_result"
    result_query = f"""
    CREATE OR REPLACE TABLE {DATASET_ID}.{result_table_name} AS (
      SELECT * FROM ML.PROCESS_DOCUMENT(
        MODEL {DATASET_ID}.layout_parser,
        TABLE {DATASET_ID}.{table_name},
        PROCESS_OPTIONS => (JSON '{{"layout_config": {{"chunking_config": {{"chunk_size": 250}}}}}}')
      )
    );
    """

    result_query_job = bq_client.query(result_query)
    result_query_job.result()
    print(f"Table {result_table_name} created successfully.")

    # Create _result_parsed table
    result_parsed_table_name = f"{table_name}_result_parsed"
    result_parsed_query = f"""
    CREATE OR REPLACE TABLE {DATASET_ID}.{result_parsed_table_name} AS (
      SELECT
        uri,
        JSON_EXTRACT_SCALAR(json , '$.chunkId') AS id,
        JSON_EXTRACT_SCALAR(json , '$.content') AS content,
        JSON_EXTRACT_SCALAR(json , '$.pageFooters[0].text') AS page_footers_text,
        JSON_EXTRACT_SCALAR(json , '$.pageSpan.pageStart') AS page_span_start,
        JSON_EXTRACT_SCALAR(json , '$.pageSpan.pageEnd') AS page_span_end
      FROM {DATASET_ID}.{result_table_name}, UNNEST(JSON_EXTRACT_ARRAY(ml_process_document_result.chunkedDocument.chunks, '$')) json
    );
    """

    result_parsed_query_job = bq_client.query(result_parsed_query)
    result_parsed_query_job.result()
    print(f"Table {result_parsed_table_name} created successfully.")


# Initialize BigQuery client
bq_client = bigquery.Client(project=PROJECT_ID)

# Get the list of existing tables in the dataset
existing_tables = [
    table.table_id for table in bq_client.list_tables(DATASET_ID)
]

# Filter numeric-only table names excluding _result and _result_parsed
numeric_tables = [
    table for table in existing_tables if re.match(r'^\d+$', table)
]

# Iterate through numeric-only tables and create embeddings table
for table_name in numeric_tables:
    embeddings_table_name = f"{table_name}_embeddings"
    result_parsed_table_name = f"{table_name}_result_parsed"

    if result_parsed_table_name in existing_tables:
        embeddings_query = f"""
        CREATE OR REPLACE TABLE {DATASET_ID}.{embeddings_table_name} AS
        SELECT * FROM ML.GENERATE_EMBEDDING(
          MODEL {DATASET_ID}.embedding_model,
          TABLE {DATASET_ID}.{result_parsed_table_name}
        );
        """

        embeddings_query_job = bq_client.query(embeddings_query)
        embeddings_query_job.result()
        print(f"Table {embeddings_table_name} created successfully.")
    else:
        print(f"Required table {result_parsed_table_name} does not exist. Skipping embedding table creation for {table_name}.")

def get_deal_ids():
    """Fetch deal_id values from the sows CTE."""
    client = bigquery.Client()

    sows_query = """
    SELECT SAFE_CAST(table_name AS INT64) AS deal_id
    FROM {DATASET_ID}.INFORMATION_SCHEMA.TABLES
    WHERE SAFE_CAST(table_name AS INT64) IS NOT NULL
    AND SAFE_CAST(table_name AS INT64) not in (SELECT deal_id from {DATASET_ID}.engagement_details )
    """

    deal_ids = [row["deal_id"] for row in client.query(sows_query)]
    return deal_ids

def construct_query(deal_ids):
    """Construct the full SQL query dynamically for all deal_id values."""
    query_parts = []

    for deal_id in deal_ids:
        query_parts.append(f"""
        select * from (with objectives as (
        SELECT
          {deal_id} AS deal_id,
          ml_generate_text_llm_result AS objectives_json,
          NULL AS engagement_summary,
          NULL AS engagement_deliverables
        FROM
          ML.GENERATE_TEXT(
            MODEL `{DATASET_ID}.gemini_flash`,
            (
              SELECT
                CONCAT(
                  'What are the (up to 10) objectives, expected business benefits and measurement metrics for this project? Be concise, add no preamble and respond in JSON format using output fields "objective_number", "objective_details", "expected_benefit","objective_metric" using the following context:',
                  STRING_AGG(FORMAT("context: %s and reference: %s", base.content, base.uri), ',')) AS prompt
              FROM VECTOR_SEARCH(
                TABLE `{DATASET_ID}.{deal_id}_embeddings`,
                'ml_generate_embedding_result',
                (
                  SELECT
                    ml_generate_embedding_result,
                    content AS query
                  FROM
                    ML.GENERATE_EMBEDDING(
                      MODEL `{DATASET_ID}.embedding_model`,
                      (
                        SELECT
                          'What are the objectives of this project? Be concise and use the following context' AS content
                      )
                    )
                ),
                TOP_K => 10,
                OPTIONS => '{{"fraction_lists_to_search": 0.01}}'
              )
            ),
            STRUCT(2048 AS max_output_tokens, TRUE AS flatten_json_output)
          )),
        engagement_summary as (
        -- Engagement summary for deal_id {deal_id}
        SELECT
          {deal_id} AS deal_id,
          NULL AS objectives_json,
          ml_generate_text_llm_result AS engagement_summary,
          NULL AS engagement_deliverables
        FROM
          ML.GENERATE_TEXT(
            MODEL `{DATASET_ID}.gemini_flash`,
            (
              SELECT
                CONCAT(
                  'Summarise the background, business requirements, solution and assumptions for this project. Be concise and respond in JSON format using output fields "Background", "Requirements","Solution", "Assumptions", add no preamble and be factual:',
                  STRING_AGG(FORMAT("context: %s and reference: %s", base.content, base.uri), ',')) AS prompt,
              FROM VECTOR_SEARCH(
                TABLE `{DATASET_ID}.{deal_id}_embeddings`,
                'ml_generate_embedding_result',
                (
                  SELECT
                    ml_generate_embedding_result,
                    content AS query
                  FROM
                    ML.GENERATE_EMBEDDING(
                      MODEL `{DATASET_ID}.embedding_model`,
                      (
                        SELECT
                          'Summarise the background, requirements and solution for this project' AS content
                      )
                    )
                ),
                TOP_K => 10,
                OPTIONS => '{{"fraction_lists_to_search": 0.01}}'
              )
            ),
            STRUCT(2048 AS max_output_tokens, TRUE AS flatten_json_output)
          )),

        engagement_deliverables as (
        SELECT
          {deal_id} AS deal_id,
          NULL AS objectives_json,
          NULL AS engagement_summary,
          ml_generate_text_llm_result AS engagement_deliverables
        FROM
          ML.GENERATE_TEXT(
            MODEL `{DATASET_ID}.gemini_flash`,
            (
              SELECT
                CONCAT(
                  'What are the (up to 10) contractual deliverables listed for this project? Be concise, add no preamble and respond in JSON format using output fields "deliverable_number", "deliverable_details", "deliverable_format", "acceptance_criteria" using the following context:',
                  STRING_AGG(FORMAT("context: %s and reference: %s", base.content, base.uri), ',')) AS prompt
              FROM VECTOR_SEARCH(
                TABLE `{DATASET_ID}.{deal_id}_embeddings`,
                'ml_generate_embedding_result',
                (
                  SELECT
                    ml_generate_embedding_result,
                    content AS query
                  FROM
                    ML.GENERATE_EMBEDDING(
                      MODEL `{DATASET_ID}.embedding_model`,
                      (
                        SELECT
                          'What are the (up to 10) contractual deliverables listed for this project? Be concise and use the following context' AS content
                      )
                    )
                ),
                TOP_K => 10,
                OPTIONS => '{{"fraction_lists_to_search": 0.01}}'
              )
            ),
            STRUCT(2048 AS max_output_tokens, TRUE AS flatten_json_output)
          ))
        SELECT
        o.deal_id,
        JSON_VALUE(REPLACE(REPLACE(s.engagement_summary,'```json',''),'```',''), "$.Background") AS background,
        JSON_VALUE(REPLACE(REPLACE(s.engagement_summary,'```json',''),'```',''), "$.Requirements") AS requirements,
        JSON_VALUE(REPLACE(REPLACE(s.engagement_summary,'```json',''),'```',''), "$.Solution") AS solution,
        ARRAY(
          SELECT AS STRUCT
            JSON_EXTRACT_SCALAR(item, '$.objective_number') AS objective_number,
            JSON_EXTRACT_SCALAR(item, '$.objective_details') AS objective_details,
            JSON_EXTRACT_SCALAR(item, '$.expected_benefit') AS expected_benefit,
            JSON_EXTRACT_SCALAR(item, '$.objective_metric') AS objective_metric
          FROM UNNEST(JSON_EXTRACT_ARRAY(REPLACE(REPLACE(o.objectives_json,'```json',''),'```',''))) AS item
        ) AS objectives,
        ARRAY(
          SELECT AS STRUCT
            JSON_EXTRACT_SCALAR(item, '$.deliverable_number') AS deliverable_number,
            JSON_EXTRACT_SCALAR(item, '$.deliverable_details') AS deliverable_details,
            JSON_EXTRACT_SCALAR(item, '$.deliverable_format') AS deliverable_format,
            JSON_EXTRACT_SCALAR(item, '$.acceptance_criteria') AS acceptance_criteria
          FROM UNNEST(JSON_EXTRACT_ARRAY(REPLACE(REPLACE(d.engagement_deliverables,'```json',''),'```',''))) AS item
        ) AS deliverables,
        REPLACE(REPLACE(o.objectives_json,'```json',''),'```','') as objectives_json,
        REPLACE(REPLACE(d.engagement_deliverables,'```json',''),'```','') as deliverables_json
      FROM
      objectives o
      LEFT JOIN engagement_summary s ON o.deal_id = s.deal_id
      LEFT JOIN engagement_deliverables d ON o.deal_id = d.deal_id)
        """)
    return " UNION ALL ".join(query_parts)

def replicate_eu_to_europe_west2():

    from google.cloud import bigquery
    from google.cloud import storage

    # Initialize BigQuery and GCS clients
    bq_client = bigquery.Client()
    storage_client = storage.Client()

    # Variables
    source_project = "ra-development"
    source_dataset = "{DATASET_ID}"
    source_table = "engagement_details"
    source_region = "EU"

    target_project = "ra-development"
    target_dataset = "{DATASET_ID}_europe_west2"
    target_table = "engagement_details"
    target_region = "europe-west2"

    gcs_bucket_name = "<BUCKET_NAME_FROM_PREVIOUS_NOTEBOOK>"  # Ensure this bucket is in the `EU` region
    export_uri = f"gs://{gcs_bucket_name}/{source_table}.avro"  # Using Avro for structured data

    # Step 1: Export the source table to GCS in Avro format
    extract_job = bq_client.extract_table(
        f"{source_project}.{source_dataset}.{source_table}",
        export_uri,
        job_config=bigquery.job.ExtractJobConfig(destination_format="AVRO"),
        location=source_region  # Ensure the export happens in the source table's region
    )
    extract_job.result()  # Wait for the job to complete
    print(f"Exported {source_table} to {export_uri}")

    # Step 2: Load the exported data from GCS into the target table
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.AVRO,  # Avro preserves nested STRUCTs
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE  # Overwrite the table if it exists
    )

    load_job = bq_client.load_table_from_uri(
        export_uri,
        f"{target_project}.{target_dataset}.{target_table}",
        job_config=job_config,
        location=target_region  # Ensure the load happens in the target dataset's region
    )
    load_job.result()  # Wait for the job to complete
    print(f"Loaded data into {target_dataset}.{target_table} in region {target_region}")

    # Step 3: Cleanup GCS (optional)
    bucket = storage_client.bucket(gcs_bucket_name)
    blob = bucket.blob(f"{source_table}.avro")
    blob.delete()
    print(f"Deleted temporary file {export_uri} from GCS")

def execute_query(query):
    """Execute the constructed SQL query."""
    client = bigquery.Client()
    query_job = client.query("INSERT INTO {DATASET_ID}.engagement_details (deal_id, background, requirements, solution, objectives, deliverables, objectives_json, deliverables_json)  "+query)
    return query_job.result()

def main():
    deal_ids = get_deal_ids()
    if not deal_ids:
        replicate_eu_to_europe_west2()
        print("No deal_ids found in sows.")
        return

    full_query = construct_query(deal_ids)
    results = execute_query(full_query)

    replicate_eu_to_europe_west2()

    for row in results:
        print(row)

if __name__ == "__main__":
    main()

