In [14]:
%%script false --no-raise-error
import json
from google.colab import userdata
from google.oauth2 import service_account
from google.cloud.bigquery import magics

credentials_json = userdata.get('BIGQUERY_CREDENTIALS')
credentials = service_account.Credentials.from_service_account_info(json.loads(credentials_json))
magics.context.credentials = credentials

Couldn't find program: 'false'


In [15]:
from google.cloud import bigquery
from google.cloud.bigquery import magics
%load_ext bigquery_magics

data_set = "testing_set"
project_name = "emerald-entity-468916-f9"

job_config = bigquery.QueryJobConfig(default_dataset = f"{project_name}.{data_set}")
client = bigquery.Client(project = project_name, default_query_job_config = job_config, credentials = globals().get('credentials', None))
magics.context.default_query_job_config = job_config
magics.context.project = project_name

The bigquery_magics extension is already loaded. To reload it, use:
  %reload_ext bigquery_magics


In [16]:
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from pydantic import BaseModel
from typing import List

nltk.download('punkt')
nltk.download('punkt_tab')

class Chunk(BaseModel):
    before: str  
    main: str  
    after: str

def chunk_text(sentences: list, sentences_lenghts: list, max_tokens) -> tuple:

    chunks = []
    lenghts = []

    current_sentences = []
    current_lenghts = []
    current_length = 0
    
    for length, sentence in zip(sentences_lenghts, sentences):
        if current_length + length > max_tokens and current_length > 0:
            chunks.append(current_sentences)
            lenghts.append(current_lenghts)
            current_sentences = []
            current_lenghts = []
            current_length = 0

        current_sentences.append(sentence)
        current_lenghts.append(length)
        current_length += length

    if current_length > 0:
        chunks.append(current_sentences)
        lenghts.append(current_lenghts)

    return (chunks, lenghts)

def get_prefix(sentences: list, lenghts: list, max_tokens):
    current_sentences = []
    current_length = 0
    
    for length, sentence in zip(lenghts, sentences):
        if current_length + length > max_tokens and current_length > 0:
            break

        current_sentences.append(sentence)
        current_length += length

    return current_sentences

def get_prefixes(chunks: list, lengths: list, max_tokens) -> list:
    return [" ".join(get_prefix(chunk_sentences, chunk_lengths, max_tokens)) for chunk_sentences, chunk_lengths in zip(chunks, lengths)]

def get_suffixes(chunks: list, lengths: list, max_tokens) -> list:
    return [" ".join(reversed(get_prefix(chunk_sentences[::-1], chunk_lengths[::-1], max_tokens))) for chunk_sentences, chunk_lengths in zip(chunks, lengths)]

def add_overlaps(chunks: list, prefixes: list, suffixes: list):
    result = []    
    for idx, chunk in enumerate(chunks):
        result.append(Chunk(before = "" if idx == 0 else suffixes[idx - 1], main = chunk, after = "" if idx == len(chunks) - 1 else prefixes[idx + 1]))
    return result

# divides long text to chunks with overlaps, but as opposite to common implementations, overlaps are not added to chunks, but provided separately
def chunk_text_with_overlaps(long_text, max_chunk_tokens, max_overlap_tokens) -> List[Chunk]:
    sentences = sent_tokenize(long_text)
    sentences_lenghts = [len(word_tokenize(sentence)) for sentence in sentences]

    chunks_and_lenghts = chunk_text(sentences, sentences_lenghts, max_chunk_tokens)
    chunked_sentences = chunks_and_lenghts[0]
    chunked_sentences_lenghts = chunks_and_lenghts[1]

    prefixes = get_prefixes(chunked_sentences, chunked_sentences_lenghts, max_overlap_tokens)
    suffixes = get_suffixes(chunked_sentences, chunked_sentences_lenghts, max_overlap_tokens)
    chunks = [" ".join(chunk_sentences) for chunk_sentences in chunked_sentences]

    return add_overlaps(chunks, prefixes, suffixes)

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\jurow\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\jurow\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [None]:
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed

clean_sql = "CREATE OR REPLACE TABLE tmp_correction_chunks(book_id STRING, chunk_number INTEGER, prefix STRING, original_txt STRING, suffix STRING, corrected_txt STRING)"
insert_sql = f"INSERT INTO tmp_correction_chunks(book_id, chunk_number, prefix, original_txt, suffix) VALUES(@id, @idx, @prefix, @txt, @suffix)"

job_configs = []

client.query_and_wait(query = clean_sql)
select_query_job = client.query(query = f"select * from books WHERE corrected_txt IS NULL")
any_row = False

for row in select_query_job.result():
    any_row = True
    print(f"\nChunking book: {row["title"]}")
    text_to_split = row["original_txt"]
    book_id = row["book_id"]
    chunks = chunk_text_with_overlaps(text_to_split, max_chunk_tokens = 3000, max_overlap_tokens = 1000)
    print(f"  Number of chunks: {len(chunks)}")
    print(f"  Processed: ", end = "")
    for idx, chunk in enumerate(chunks):
        print(f"{idx} ", end = "")
        job_config = bigquery.QueryJobConfig(query_parameters=[
            bigquery.ScalarQueryParameter("id", "STRING", book_id),
            bigquery.ScalarQueryParameter("idx", "INTEGER", idx),
            bigquery.ScalarQueryParameter("prefix", "STRING", chunk.before),
            bigquery.ScalarQueryParameter("txt", "STRING", chunk.main),
            bigquery.ScalarQueryParameter("suffix", "STRING", chunk.after)])
        job_configs.append(job_config)

if any_row:
    print(f"\n\nExecuting {len(job_configs)} INSERT jobs...")
    print("Jobs Completed: ", end = "")

    def execute_insert_job(job_config):
        client.query_and_wait(insert_sql, job_config = job_config)

    completed_jobs = 0
    with ThreadPoolExecutor(max_workers = 10) as executor:
        future_to_config = {executor.submit(execute_insert_job, config): config for config in job_configs}
        for future in as_completed(future_to_config):
            future.result()
            completed_jobs += 1
            print(f"{completed_jobs} ", end = "")

    print("\nAll INSERT jobs completed.")
else:
    print("No books to correct.")

No books to correct.


In [18]:
%%bigquery
CALL phase2_correction_correct();

Query is running:   0%|          |