**V3: Claim Preprocessing**

Input:
-  A single BQ table with a column for Jefferson publication numbers.

What happens:
- This colab will handle all the data fetching, joining, parsing, that we previously did in 6 steps.

It will generate three output tables.

Output Table: a claims-level table with the following columns:


     -  claim_publication_number (str)
     -  claim_id (str)
     -  claim_num_str (str)
     -  claim_num_int (int)
     -  claim_type (str)
     -  claim_indep (TRUE/FALSE)
     -  claim_indep_order (int)
     -  claim_text (text)
     -  claim_list_parentclaim_ids (str)
     -  claim_list_parentclaim_nums (str)
     -  claim_list_parentclaim_texts (str)
     -  claim_list_children_ids (str)
     -  claim_immediate_parent_id (str)
     -  claim_immediate_child_id (str)


Output:
- Save answers into the following tables to BigQuery using original input table name with the extensions of:
  - gpatents.publications_v2.us_gpt_claims


More Info:
- See this [doc](https://docs.google.com/document/d/1t7KpJkHVhbK3D6C3syMo0QYVicB92YjAW39ALGL4Rrk/edit?pli=1&tab=t.chh254ik1msx)

Older Versions:






In [None]:
#@title Init Inputs

GPATENTS_PROJECT = "gpatents" # @param ["gpatents"]
PATENTS_DST_PROJECT = "google.com:patents-dst" # @param ["google.com:patents-dst"]
LOCATION = "us-central1" # @param ["us-central1"]

GPATENTS_DATASET_ID = "publications_v2" # @param ["publications_v2"]
PATENTS_DST_DATASET_ID = "elysian" # @param ["elysian"]

INPUT_TABLE_ID = "us_claims_backlog" # @param ["us_claims_backlog"]

####################

OUTPUT_CLAIMS_TABLE_ID = INPUT_TABLE_ID.replace('_backlog', '')

print(f"INPUT_TABLE_ID: {INPUT_TABLE_ID}")

print(f"OUTPUT_CLAIMS_TABLE_ID: {OUTPUT_CLAIMS_TABLE_ID}")


# Init

In [None]:
#@title Installs
!pip install -U spacy pandas_gbq

In [None]:
#@title Imports

import gc
import re
import json
import base64
import numpy as np
import pandas as pd
import multiprocessing
import subprocess
import concurrent.futures
from ast import literal_eval
from time import sleep, time
from datetime import date, datetime
from pandas_gbq.schema.pandas_to_bigquery import dataframe_to_bigquery_fields
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytz
PST = pytz.timezone('America/Los_Angeles')

from functools import partial

from typing import List, Dict, Tuple, Any, Set, Optional
from collections import defaultdict

from bs4 import BeautifulSoup

import random
from tqdm import tqdm_notebook

from google.colab import drive
from google.cloud import bigquery
from google.cloud import storage
from google.colab import auth, runtime

bq_client = bigquery.Client(project=GPATENTS_PROJECT)
num_workers = multiprocessing.cpu_count()

In [None]:
#@title Helper Functions - Generic

################################################################################
# Generic helper functions
def get_timestamp(timestamp_label:bool=False):
  prefix = "Timestamp:" if timestamp_label else ""
  return f"""{prefix} {datetime.now(PST).strftime("%Y/%m/%d %I:%M %p")}"""


# Get Colab runtime uptime

def get_uptime(hours_needed:int=0):
  """Get the uptime of the Colab runtime."""
  # Execute the uptime command and get the result
  uptime_result = subprocess.run(['uptime', '-p'], capture_output=True, text=True)

  # Parse the output to get the uptime duration
  uptime_str = uptime_result.stdout.strip()
  if hours_needed > 0:
    if "hours" in uptime_str:
      hours_left = 24 - int(get_uptime().replace("System Uptime: up ","").split("hours")[0].strip())
    else:
      hours_left = 23
    if hours_left < hours_needed:
      return f"!!!!! WARNING !!!!! System Uptime: {uptime_str} | !!!! {hours_left} hours left but {hours_needed} hours needed !!!!"
  return f"System Uptime: {uptime_str}"


print(f"{get_timestamp(True)} | {get_uptime()} | vCPUs: {multiprocessing.cpu_count()}")


################################################################################


# Generate Unique ID
pacific_tz = pytz.timezone('US/Pacific')


def generate_custom_unique_id(include_random=False, include_counter=False, counter_digits:int=3, include_uuid=False):
  current_datetime = datetime.now(pacific_tz)

  # Get components separately
  year = current_datetime.strftime('%Y')

  # Convert month abbreviation to uppercase
  month_abbr = current_datetime.strftime('%b').upper()

  day = current_datetime.strftime('%d')
  hour = current_datetime.strftime('%H')
  if hour > "11":
    if hour > "12":
      hour = str(int(hour) - 12)
    am_pm = "PM"
  else:
    am_pm = "AM"
  minute = current_datetime.strftime('%M')
  second = current_datetime.strftime('%S')

  # timestamp = f"{day}_{month_abbr}_{year}_{hour}_{minute}_{second}"
  timestamp = f"{day}{month_abbr}{year}_{hour}{minute}{am_pm}"
  parts = [timestamp]

  if include_random:
    random_number = random.randint(1000, 9999)
    parts.append(f"{random_number}")

  if include_counter:
    global counter, last_timestamp
    if 'counter' not in globals():
      counter = 0
      last_timestamp = timestamp
    if timestamp != last_timestamp:
      counter = 0
      last_timestamp = timestamp
    else:
      counter += 1
    parts.append(f"{counter:0{counter_digits}d}")

  if include_uuid:
    short_uuid = str(uuid.uuid4())[:8]
    parts.append(short_uuid)

  unique_id = '_'.join(parts)
  return unique_id

# Example usage
unique_id = generate_custom_unique_id(include_counter=True)
print("Generated Unique ID:", unique_id)

################################################################################

In [None]:
# @title Big Query Helper Functions
import uuid

def does_table_exist(bq_client, table_id):
  try:
    bq_client.get_table(table_id)
    return True
  except:
    return False

def does_dataset_exist(bq_client, dataset_id):
  try:
    bq_client.get_dataset(dataset_id)
    return True
  except:
    return False

def df_to_bq(
    bq_client: bigquery.Client,
    df: pd.DataFrame,
    merge_keys: List[str],
    output_table: str,
    output_dataset: str,
    project: str,
    output_table_description: str,
    output_table_schema: List[bigquery.SchemaField],
    job_config_labels: Dict[str, str],
    batch_size: int = 50000,
    debug_flag: bool = False
):
  dataset_exists = does_dataset_exist(bq_client, output_dataset)
  table_exists = does_table_exist(bq_client, output_table)

  if dataset_exists:
    pass
  else:
    bq_client.create_dataset(output_dataset)

  if table_exists and not merge_keys:
    if debug_flag:
      print(f"Table '{output_table}' exists. Data will be appended.")
    write_disposition = "WRITE_APPEND"
  elif table_exists and merge_keys:
    if debug_flag:
      print(f"Table '{output_table}' exists. Data will be merged and inserted.")
    write_disposition = "WRITE_MERGE"
  else:
    if debug_flag:
      print(f"Table '{output_table}' does not exist.")

    write_disposition = "WRITE_TRUNCATE"

  if write_disposition == "WRITE_MERGE":
    try:
      table_id = f"{project}.{output_table}"
      staging_table_id = f"{table_id}_staging_{uuid.uuid4().hex}"
      print(f"Inserting new data into Temp Table: {staging_table_id}")

      job_config = bigquery.LoadJobConfig(
          schema=output_table_schema,
          write_disposition="WRITE_TRUNCATE", # Always truncate the temp table
          labels = job_config_labels
      )
      load_job = bq_client.load_table_from_dataframe(
          df, staging_table_id, job_config=job_config
      )
      load_job.result() # Wait for the staging load to complete

      # 3. Construct and run the MERGE query
      all_cols = [field.name for field in output_table_schema]
      update_cols = [col for col in all_cols if col not in merge_keys]

      on_clause = " AND ".join([f"T.`{key}` = S.`{key}`" for key in merge_keys])
      update_clause = ", ".join([f"T.`{col}` = S.`{col}`" for col in update_cols])
      insert_clause_cols = ", ".join([f"`{col}`" for col in all_cols])
      insert_clause_vals = ", ".join([f"S.`{col}`" for col in all_cols])

      merge_sql = f"""
        MERGE `{table_id}` AS T
        USING (SELECT * FROM `{staging_table_id}` GROUP BY ALL) AS S
        ON {on_clause}
        WHEN MATCHED THEN
          UPDATE SET {update_clause}
        WHEN NOT MATCHED THEN
          INSERT ({insert_clause_cols})
          VALUES ({insert_clause_vals})
      """
      # print(merge_sql)
      merge_job = bq_client.query(merge_sql)
      merge_job.result() # Wait for the MERGE to complete
      print(f"MERGE operation successful. {merge_job.num_dml_affected_rows} rows affected.")
    except Exception as e:
      if "UPDATE/MERGE must match at most one source row for each target row" in str(e):
        print("MERGE failed due to multiple matches. Deleting old records and retrying.")

        # 1. Get unique values of the merge_keys from the DataFrame
        # Handle cases where merge_keys might not contain all columns
        if all(key in df.columns for key in merge_keys):
          unique_keys = df[merge_keys].drop_duplicates()
        else:
          print("Warning: One or more merge keys not found in DataFrame. Skipping DELETE and re-attempting MERGE.")
          raise e

        # 2. Construct and run the DELETE statement
        delete_statements = []
        key_conditions = ""
        for x in unique_keys.columns:
          clause = ', '.join(f"'{l}'" for l in list(set(unique_keys[x])))
          key_conditions += " AND ".join(f"{x} IN ({clause})")
        delete_statements.append(f"DELETE FROM `{table_id}` WHERE {key_conditions};")

        delete_sql = "\n".join(delete_statements)
        print(f"Executing DELETE statements:\n{delete_sql}")
        delete_job = bq_client.query(delete_sql)
        delete_job.result()

        # 3. Re-run the MERGE query after deletion
        print("Re-attempting MERGE after successful deletion.")
        merge_job = bq_client.query(merge_sql)
        merge_job.result()
        print(f"Second MERGE operation successful. {merge_job.num_dml_affected_rows} rows affected.")
      else:
        raise e # Re-raise any other exceptions
    finally:
      bq_client.delete_table(staging_table_id, not_found_ok=True)
  else:
    num_batches = (len(df) // batch_size) + (1 if len(df) % batch_size != 0 else 0)

    for batch_num in range(num_batches):
      start_idx = batch_num * batch_size
      end_idx = min((batch_num + 1) * batch_size, len(df))
      batch_df = df.iloc[start_idx: end_idx]

      job_config = bigquery.LoadJobConfig(
          schema=output_table_schema,
          write_disposition=write_disposition,
          labels = job_config_labels
      )
      job = bq_client.load_table_from_dataframe(
          batch_df, output_table, job_config=job_config
      )
      job.result()

      if write_disposition == "WRITE_TRUNCATE":
        write_disposition = "WRITE_APPEND"

      sleep(1)

  table = bq_client.get_table(output_table)
  table.description = output_table_description
  bq_client.update_table(table, ["description"])

def save_output_to_bq(
    df: pd.DataFrame,
    input_tables_id: List[str],
    output_table: str,
    output_dataset: str,
    project: str,
    job_config_labels: Dict[str, str],
    merge_keys: List[str] = None,
    output_table_schema: List[bigquery.SchemaField] = None,
    colab_description: str = "Quick Description of it",
    table_notes: str = "",
    display_timer: bool = False,
    bq_client: bigquery.Client = None # pass BQ_CLIENT
):
  if bq_client is None:
    raise ValueError("bq_client must be provided to save_output_to_bq")
  print(f"{get_timestamp()} | Saving dataframe to BQ")

  input_tables_str = " \n".join(input_tables_id)

  output_table_description = f"""
  Colab:
    {colab_description}

  Notes:
    {table_notes}

  Input Parameters:
    INPUT_TABLES: {input_tables_str}
  """

  if not output_table_schema:
    output_table_schema = list(dataframe_to_bigquery_fields(df))

  try:
    df_to_bq(
        bq_client=bq_client,
        df=df,
        output_table=output_table,
        merge_keys=merge_keys,
        project=project,
        output_dataset=output_dataset,
        output_table_description=output_table_description,
        output_table_schema=output_table_schema,
        job_config_labels=job_config_labels,
        debug_flag=True
    )
    print(f"{get_timestamp()} | Saved {len(df)} rows to {output_table}")
  except Exception as e:
    print(f"Error saving data to BigQuery: {e}")

# Colab-specific Helper Functions

In [None]:
#@title Parsing Claims Helpers
from copy import deepcopy
from bs4 import NavigableString

def parse_html_claims(row, debug=False):
  soup = BeautifulSoup(row["claims_html"], "html.parser")

  claims = []

  for claim_tag in soup.find_all("claim"):
    claim_id = claim_tag.get("id")
    claim_num = claim_tag.get("num")
    list_claim_tags = claim_tag.find_all("claim-text")

    try:
      # We build a list of parts by iterating through tag contents
      raw_parts = []
      # NEW: A set to store the exact text of math formulas
      math_formulas = set()

      for i, tag in enumerate(list_claim_tags):
        if len(list_claim_tags) > 1 and i == 0:
          if list_claim_tags[1].get_text() in list_claim_tags[0].get_text():
            first_limitation = list_claim_tags[0].get_text().split(list_claim_tags[1].get_text())[0]
            raw_parts.extend(list(filter(lambda x: x != '', re.split('[:;]', first_limitation.replace("\n", "")))))
          else:
            math_tag = bool(tag.find("maths"))
            for part in re.split('[:;]', tag.get_text()):
              if part and part.strip():
                if math_tag:
                  # If the content is a math tag, keep it whole
                  # Extract the clean text from the formula
                  formula_text = part.strip()
                  raw_parts.append(formula_text)
                  # NEW: Add the formula text to our set for later checking
                  math_formulas.add(formula_text)

                  # If it's a regular text string, split it
                else:
                  # Split only the text part by colon or semicolon
                  text = part.strip().replace("\n", " ").replace("  ", " ")
                  if text not in ('and', 'where', '.', ' '):
                    raw_parts.append(text)
        else:
          math_tag = bool(tag.find("maths"))
          for part in re.split('[:;]', tag.get_text()):
            if part and part.strip():
              if math_tag:
                # If the content is a math tag, keep it whole
                # Extract the clean text from the formula
                formula_text = part.strip()
                raw_parts.append(formula_text)
                # NEW: Add the formula text to our set for later checking
                math_formulas.add(formula_text)

                # If it's a regular text string, split it
              else:
                # Split only the text part by colon or semicolon
                text = part.strip().replace("\n", " ").replace("  ", " ")
                if text not in ('and', 'where', '.', ' '):
                  raw_parts.append(text)

      # Clean up the list by stripping whitespace and removing empty items
      all_parts = [part.strip().replace("\n", " ").replace("  ", " ") for part in raw_parts if part and part.strip()]

      # Merge singular connector words with the next element
      merged_parts = []
      i = 0
      while i < len(all_parts):
        current_part = all_parts[i]

        # MODIFIED LINE: Check if the part is a single word AND NOT in our math_formulas set
        if len(current_part.split()) == 1 and current_part not in math_formulas and i + 1 < len(all_parts):
          # Join with the next part and append
          next_part = all_parts[i+1]
          merged_parts.append(f"{current_part} {next_part}")
          # Skip the next element since we've already used it
          i += 2
        else:
          # Otherwise, just add the current part as is
          merged_parts.append(current_part)
          i += 1

      full_text = " ".join(merged_parts)
      limitations = "#!#".join(merged_parts)

      parent_claim_ids = []
      parent_claim_nums = []

      parents = claim_tag.find_all("claim-ref")

      if parents:
        for p in parents:
          parent_claim_ids.append(p["idref"])
          parent_claim_nums.append(p["idref"].split("CLM-")[1])

      claims.append({
          "publication_number": row["publication_number"],
          "publication_date": row["publication_date"],
          "claim_id": claim_id,
          "claim_num_str": claim_num,
          "claim_num_int": int(claim_num),
          "claim_text": full_text,
          "claim_type": get_claim_type(full_text),
          "limitations": limitations,
          "claim_list_parentclaim_ids": parent_claim_ids,
          "claim_list_parentclaim_nums": parent_claim_nums
      })
    except:
      continue

  return pd.DataFrame(claims)

def find_first_of_words(sentence, words):
  pattern = rf'\b(?:{"|".join(map(re.escape, words))})\b'
  matches = list(re.finditer(pattern, sentence, re.IGNORECASE))

  if not matches:
      return None

  first_match = min(matches, key=lambda m: m.start())
  return first_match.group()

def get_claim_type(claim_text: str):
  claim_intro = claim_text[:70].replace("-", " ").lower()
  words = ["method", "system", "apparatus", "medium", "device",
          "cancelled", "canceled", "non transitory", "computer readable"]
  first_word = find_first_of_words(claim_intro, words)

  if "medium" == first_word:
    claim_type = "CRM"
  elif "system" == first_word:
    claim_type = "System"
  elif "method" == first_word:
    claim_type = "Method"
  elif "device" == first_word:
    claim_type = "Apparatus"
  elif "apparatus" == first_word:
    claim_type = "Apparatus"
  elif "canceled" == first_word:
    claim_type = "Canceled"
  elif "cancelled" == first_word:
    claim_type = "Canceled"
  elif "non transitory" == first_word:
    claim_type = "CRM"
  elif "computer readable" == first_word:
    claim_type = "CRM"
  elif "media" == first_word:
    claim_type = "CRM"
  elif "medium" == first_word:
    claim_type = "CRM"
  else:
    claim_type = "Apparatus"
  return claim_type


In [None]:
#@title Patent Hierarchy & Generic Assessment
import numpy as np
from collections import defaultdict

def _clean_cell(cell):
    """
    A robust helper function to replace NA-like values with an empty list.
    It specifically avoids the 'truth value is ambiguous' error by checking
    for array-like types before calling pd.isna().
    """
    # If the cell is already a list, tuple, or numpy array, keep it as is.
    if isinstance(cell, (list, tuple, np.ndarray)):
        return cell
    # For scalar values, check if it's NA/NaN/None and replace with an empty list.
    if pd.isna(cell):
        return []
    # Otherwise, return the scalar value.
    return cell

def create_hierarchy_vectorized(df_junction, df_pub_claims):
    """
    Vectorized replacement for the first slow loop using process_tree_hierarchy.
    """
    if df_junction.empty:
        return pd.DataFrame(columns=['publication_number', 'claim_id', 'claim_list_parentclaim_texts', 'claim_list_children_ids'])

    # Create a lookup table: (publication_number, claim_id) -> claim_text
    claim_text_lookup = df_pub_claims.groupby(
        ["publication_number", "claim_id"]
    )['claim_text'].first().reset_index()

    # Rename columns to prepare for the merge
    claim_text_lookup = claim_text_lookup.rename(columns={
        'claim_id': 'claim_list_parentclaim_ids',
        'claim_text': 'parent_text'
    })

    df_junction = pd.merge(
        df_junction,
        claim_text_lookup,
        on=['publication_number', 'claim_list_parentclaim_ids'],
        how='left' # 'left' join keeps all rows from the original dataframe
    )

    parent_texts_agg = df_junction.dropna(subset=['parent_text']) \
        .groupby(['publication_number', 'claim_id'])['parent_text'] \
        .agg(list) \
        .rename('claim_list_parentclaim_texts')

    children_ids_agg = df_junction.groupby(['publication_number', 'claim_list_parentclaim_ids'])['claim_id'] \
        .agg(list) \
        .rename('claim_list_children_ids')
    children_ids_agg.index.names = ['publication_number', 'claim_id']

    df_hierarchy = pd.merge(
        parent_texts_agg,
        children_ids_agg,
        on=['publication_number', 'claim_id'],
        how='outer'
    ).reset_index()

    return df_hierarchy

def find_nodes_vectorized(df_junction):
    """
    Vectorized replacement for the second slow loop using find_intermediate_nodes_multi_parent_df.
    """
    df_edges = df_junction.dropna(subset=['claim_list_parentclaim_ids'])

    if df_edges.empty:
        return pd.DataFrame()

    parent_ids_agg = df_edges.groupby(['publication_number', 'claim_id'])['claim_list_parentclaim_ids'] \
        .agg(list) \
        .rename('claim_immediate_parent_id')

    child_ids_agg = df_edges.groupby(['publication_number', 'claim_list_parentclaim_ids'])['claim_id'] \
        .agg(list) \
        .rename('claim_immediate_child_id')
    child_ids_agg.index.names = ['publication_number', 'claim_id']

    df_nodes = pd.merge(
        parent_ids_agg,
        child_ids_agg,
        on=['publication_number', 'claim_id'],
        how='outer'
    ).reset_index()

    return df_nodes

def generate_publication_hierarchy(df_pub_claims):
  """
  Main function to process claim hierarchies, now using optimized helper functions.
  """
  df_parent_child_junction = df_pub_claims[["publication_number", "claim_id", "claim_text", "claim_list_parentclaim_ids"]].explode("claim_list_parentclaim_ids")

  df_pub_hierarchy = create_hierarchy_vectorized(df_parent_child_junction.copy(), df_pub_claims)

  df_pub_claims_final = pd.merge(df_pub_claims, df_pub_hierarchy, how = "left", on = ["publication_number", "claim_id"])

  df_intermediate_nodes = find_nodes_vectorized(df_parent_child_junction)

  if df_intermediate_nodes.empty:
    df_pub_claims_final["claim_immediate_parent_id"] = [[] for _ in range(len(df_pub_claims_final))]
    df_pub_claims_final["claim_immediate_child_id"] = [[] for _ in range(len(df_pub_claims_final))]
  else:
    df_pub_claims_final = pd.merge(df_pub_claims_final, df_intermediate_nodes, how = "left", on = ["publication_number", "claim_id"])

  # === FIX: Final cleanup using the robust helper function ===
  list_cols = [
      'claim_list_parentclaim_texts', 'claim_list_children_ids',
      'claim_immediate_parent_id', 'claim_immediate_child_id'
  ]
  for col in list_cols:
      if col in df_pub_claims_final.columns:
          # Use the helper function to robustly fill NaNs with empty lists
          df_pub_claims_final[col] = df_pub_claims_final[col].apply(_clean_cell)

  return df_pub_claims_final

In [None]:
#@title Claim HTML Processor
def process_pub_claim_html(df_input, debug=False):
  list_columns = [
      "claim_list_parentclaim_ids", "claim_list_parentclaim_nums",
      "claim_list_parentclaim_texts", "claim_list_children_ids",
      "claim_immediate_parent_id", "claim_immediate_child_id"
  ]


  df_pub_claims = pd.concat(df_input.apply(lambda x: parse_html_claims(x, debug), axis = 1).values, ignore_index=True)

  df_pub_claims_final = generate_publication_hierarchy(df_pub_claims)

  for x in list_columns:
    df_pub_claims_final[x] = df_pub_claims_final[x].fillna("999").apply(lambda x: "||".join(sorted(set(x))) if x != "999" else '')

  # Step 1 & 2: Filter for 'indep'==True and sort by 'pub_number' and 'num_str'
  df_pub_claims_final["claim_indep"] = pd.isnull(df_pub_claims_final["claim_list_parentclaim_ids"].replace("", None))
  filtered_sorted_df = df_pub_claims_final[df_pub_claims_final['claim_indep']].sort_values(by=['publication_number', 'claim_num_str'])

  # Step 3: Group by 'pub_number' and apply cumcount()
  filtered_sorted_df['claim_indep_order'] = filtered_sorted_df.groupby('publication_number').cumcount() + 1

  # Step 4: Merge this series back to the original DataFrame
  df_pub_claims_final = df_pub_claims_final.merge(filtered_sorted_df[['publication_number', 'claim_num_str', 'claim_indep_order']], on=['publication_number', 'claim_num_str'], how='left')

  # Fill NaN values in 'claim_indep_order' with 0 and convert to integer
  df_pub_claims_final['claim_indep_order'] = df_pub_claims_final['claim_indep_order'].fillna(0).astype(int)

  df_pub_claims_final = df_pub_claims_final.drop(columns = ["limitations"])

  return df_pub_claims_final

# Main

In [None]:
#@title Assert Data Checks
def assert_data_checks(claims_data):
  # No fields as NULL
  null_cols = claims_data.isnull().sum()
  null_cols = null_cols[null_cols > 0].index
  assert int(claims_data.isnull().sum().sum()) == 0, f'Claims DataFrame has Null values, columns: {list(null_cols)}'

  # Indep Order must be from 1 to Number of Independent Claims
  assert (
      pd.merge(
        claims_data[claims_data["claim_indep"]==True]
        .groupby(["publication_number"]).size()
        .to_frame()
        .rename(columns={0:"total_indep_claims"}),
        claims_data[claims_data["claim_indep"]==True]
        .groupby(["publication_number"]).agg({"claim_indep_order": lambda x: len(x)}),
        how = "left",
        right_index = True,
        left_index = True
    ).assign(
        claim_indep_check = lambda x: x.total_indep_claims == x.claim_indep_order
    ).query("claim_indep_check == False").shape[0]
  ) == 0, 'Independent Order of Claims do not match the number of independent Claims'

In [None]:
%%time
#@title Step 1: Loading Data from Public Data
print(f"{get_timestamp(True)} | {get_uptime()} | Loading Data from Public Data")

################################################################################

label_colab = "claim__preprocessing" # Name of the colab; e.g. google_embeddings_vector_index
label_step = "loading_data_from_public_data"  # Descriptive name of what is happening, e.g. backlog_query
label_pipeline = "claim_etl_v0" # Name of the BigQuery pipeline; e.g. patent_embeddings_v0

input_sql = f"""
  WITH JeffersonTmp AS (
    SELECT
      JPubs.publication_number,
      PARSE_DATE('%Y%m%d', CAST(JPubs.publication_date AS STRING)) AS publication_date,
      HtmlClaims.text as html_claim_text,
      GPubs.title AS title,
      GPubs.abstract AS abstract,
      GPubs.url AS gpatents_url
    FROM `{GPATENTS_PROJECT}.{GPATENTS_DATASET_ID}.{INPUT_TABLE_ID}` AS Pubs
    INNER JOIN `jefferson-1790.patents.publications` AS JPubs
    ON Pubs.publication_number = JPubs.publication_number
    LEFT JOIN UNNEST(JPubs.claims_localized_html) AS HtmlClaims
    LEFT JOIN `jefferson-1790.google_patents_research.publications` AS GPubs
      ON GPubs.publication_number = JPubs.publication_number
    GROUP BY
      ALL
  )

  SELECT
    JPubs.publication_number,
    JPubs.publication_date,
    JPubs.html_claim_text AS claims_html,
    JPubs.title,
    JPubs.abstract,
    JPubs.gpatents_url
  FROM JeffersonTmp AS JPubs
  WHERE JPubs.publication_number IN (
    SELECT
      DISTINCT
        publication_number
    FROM `{GPATENTS_PROJECT}.{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}`
  )
  LIMIT 200000
"""

job_config = bigquery.QueryJobConfig(
    labels = {
        "colab": label_colab,
        "step": label_step,
        "pipeline": label_pipeline
    }
)

query_result = bq_client.query(input_sql, job_config=job_config)

df_input = query_result.result().to_dataframe()

new_data_bool = df_input.shape[0]

if not new_data_bool:
  print("No new data found to process it. Task complete. Disconnecting runtime to save resources.")
  runtime.unassign()

df_input_filt = df_input[pd.notnull(df_input["claims_html"])].reset_index(drop=True)

del df_input
gc.collect()

In [None]:
%%time
#@title Step 2: Processing Claims in Parallel
print(f"{get_timestamp(True)} | {get_uptime()} | Processing Limitations in Parallel")

################################################################################

label_colab = "claim_preprocessing" # Name of the colab; e.g. google_embeddings_vector_index
label_step = "processing_claims_in_parallel"  # Descriptive name of what is happening, e.g. backlog_query
label_pipeline = "claim_etl_v0" # Name of the BigQuery pipeline; e.g. patent_embeddings_v0

job_config_labels = {
  "colab": label_colab,
  "step": label_step,
  "pipeline": label_pipeline
}

start_time = str(datetime.fromtimestamp(int(time()))).replace(' ', '_').replace(":", "").replace("-", "")

for y in tqdm_notebook(range(0, len(df_input_filt), 10000)):
  df_input_re_filt = df_input_filt.iloc[y: y + 10000]

  batch_size = "5000" # @param [10, 50, 100, 200, 500, 800, 1000, 1500, 2000, 5000, 10000] {allow_type: true}
  batches = [df_input_re_filt.iloc[x: x + int(batch_size)] for x in range(0, len(df_input_re_filt), int(batch_size))]

  del df_input_re_filt
  gc.collect()

  with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit the function for each batch to the executor
    # This creates a list of 'Future' objects, which represent the running tasks
    futures = [executor.submit(process_pub_claim_html, batch) for batch in batches]

    # Use as_completed to process results as they finish
    # Wrap it with tqdm for a live progress bar
    for future in tqdm_notebook(as_completed(futures), total=len(batches), desc="Processing Batches"):
      try:
        # You can get the result of the function call if it returns anything
        claims_data = future.result()

        assert_data_checks(claims_data)

        claims_data["processed_date"] = datetime.fromtimestamp(int(time())).date()
        claims_data["processed_timestamp"] = datetime.fromtimestamp(int(time()))

        save_output_to_bq(
          claims_data,
          input_tables_id = [
              f"{GPATENTS_PROJECT}.{GPATENTS_DATASET_ID}.{INPUT_TABLE_ID}"
          ],
          merge_keys = ["publication_number", "claim_id"],
          output_table = f"{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}_new_{start_time}",
          output_dataset = GPATENTS_DATASET_ID,
          project = GPATENTS_PROJECT,
          colab_description = "Publications Claims Metrics",
          job_config_labels = job_config_labels,
          bq_client = bq_client
        )

        del claims_data
        gc.collect()
      except Exception as e:
        print(f"A batch generated an exception: {e}")

## Recreating Main table with Clustering and Partitions

In [None]:
%%time
#@title Step 3: Merging Temporary Table with Previous Main Table
print(f"{get_timestamp(True)} | {get_uptime()} | Merging Temporary Table with Previous Main Table")

################################################################################

label_colab = "claim_preprocessing" # Name of the colab; e.g. google_embeddings_vector_index
label_step = "creating_merged_table"  # Descriptive name of what is happening, e.g. backlog_query
label_pipeline = "claim_etl_v0" # Name of the BigQuery pipeline; e.g. patent_embeddings_v0

job_config = bigquery.QueryJobConfig(
  labels = {
    "colab": label_colab,
    "step": label_step,
    "pipeline": label_pipeline
  }
)

append_output_sql = f"""
CREATE OR REPLACE TABLE `{GPATENTS_PROJECT}.{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}_new_merged` AS (
  SELECT
    -- Add all the columns you want to keep from your tables, except for 'is_new'
    * EXCEPT(is_new)
  FROM (
    -- Combine new and old records into a single source
    SELECT
      publication_number,
      publication_date,
      claim_id,
      claim_num_str,
      claim_num_int,
      claim_text,
      claim_type,
      claim_list_parentclaim_ids,
      claim_list_parentclaim_nums,
      claim_list_parentclaim_texts,
      claim_list_children_ids,
      claim_immediate_parent_id,
      claim_immediate_child_id,
      claim_indep,
      claim_indep_order,
      processed_date,
      CAST(processed_timestamp AS TIMESTAMP) AS processed_timestamp,
      TRUE AS is_new -- Flag new records
    FROM
      `{GPATENTS_PROJECT}.{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}_new_{start_time}` t
    UNION ALL
    SELECT
      publication_number,
      publication_date,
      claim_id,
      claim_num_str,
      claim_num_int,
      claim_text,
      claim_type,
      claim_list_parentclaim_ids,
      claim_list_parentclaim_nums,
      claim_list_parentclaim_texts,
      claim_list_children_ids,
      claim_immediate_parent_id,
      claim_immediate_child_id,
      claim_indep,
      claim_indep_order,
      processed_date,
      CAST(processed_timestamp AS TIMESTAMP) AS processed_timestamp,
      FALSE AS is_new -- Flag old records
    FROM
      `{GPATENTS_PROJECT}.{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}` t
  )
  -- Filter the results to get the newest row for each publication_number
  QUALIFY ROW_NUMBER() OVER (PARTITION BY publication_number, claim_id ORDER BY is_new DESC, processed_date DESC) = 1
)
"""

append_output = bq_client.query(append_output_sql, job_config=job_config)
try:
  append_output_result = append_output.result()
  if append_output.state == 'DONE':
    print("Successfully Created Temporary new Main Table")
  else:
    print("Please re-check the previous query")
except Exception as ex:
  print(f"Please re-check the previous query, error: {ex}")
  raise ex

In [None]:
%%time
#@title Step 4: Dropping Temporary Table
print(f"{get_timestamp(True)} | {get_uptime()} | Dropping Temporary Table")

################################################################################

label_colab = "claim_preprocessing" # Name of the colab; e.g. google_embeddings_vector_index
label_step = "dropping_temporary_table"  # Descriptive name of what is happening, e.g. backlog_query
label_pipeline = "claim_etl_v0" # Name of the BigQuery pipeline; e.g. patent_embeddings_v0

job_config = bigquery.QueryJobConfig(
  labels = {
    "colab": label_colab,
    "step": label_step,
    "pipeline": label_pipeline
  }
)

drop_older_table = f"DROP TABLE `{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}_new_{start_time}`"
drop_old_output = bq_client.query(drop_older_table, job_config=job_config)
try:
  drop_old_output_result = drop_old_output.result()
  if drop_old_output.state == 'DONE':
    print("Successfully Dropped Temp tables")
  else:
    print("Please re-check the previous query")
except Exception as ex:
  print(f"Please re-check the previous query, error: {ex}")

In [None]:
%%time
#@title Step 5: Dropping Old Main Table
print(f"{get_timestamp(True)} | {get_uptime()} | Dropping Old Main Table")

################################################################################

label_colab = "claim_preprocessing" # Name of the colab; e.g. google_embeddings_vector_index
label_step = "dropping_old_main_table"  # Descriptive name of what is happening, e.g. backlog_query
label_pipeline = "claim_etl_v0" # Name of the BigQuery pipeline; e.g. patent_embeddings_v0

job_config = bigquery.QueryJobConfig(
  labels = {
    "colab": label_colab,
    "step": label_step,
    "pipeline": label_pipeline
  }
)

drop_older_table = f"DROP TABLE `{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}`"
drop_old_output = bq_client.query(drop_older_table, job_config=job_config)
try:
  drop_old_output_result = drop_old_output.result()
  if drop_old_output.state == 'DONE':
    print("Successfully Dropped Old original table")
  else:
    print("Please re-check the previous query")
except Exception as ex:
  print(f"Please re-check the previous query, error: {ex}")

In [None]:
%%time
#@title Step 6: Creating new Main Table
print(f"{get_timestamp(True)} | {get_uptime()} | Creating New Main Table")

################################################################################

label_colab = "claim_preprocessing" # Name of the colab; e.g. google_embeddings_vector_index
label_step = "creating_new_main_table"  # Descriptive name of what is happening, e.g. backlog_query
label_pipeline = "claim_etl_v0" # Name of the BigQuery pipeline; e.g. patent_embeddings_v0

job_config = bigquery.QueryJobConfig(
  labels = {
    "colab": label_colab,
    "step": label_step,
    "pipeline": label_pipeline
  }
)

append_output_sql = f"""
CREATE OR REPLACE TABLE `{GPATENTS_PROJECT}.{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}`
PARTITION BY processed_date
CLUSTER BY claim_indep, claim_num_int
AS (
  SELECT
    *
  FROM `{GPATENTS_PROJECT}.{GPATENTS_DATASET_ID}.{OUTPUT_CLAIMS_TABLE_ID}_new_merged`
)
"""

append_output = bq_client.query(append_output_sql, job_config=job_config)
try:
  append_output_result = append_output.result()
  if append_output.state == 'DONE':
    print("Successfully Created Output new Main Table")
  else:
    print("Please re-check the previous query")
except Exception as ex:
  print(f"Please re-check the previous query, error: {ex}")
  raise ex