# BigQuery and Dataflow (Apache Beam on GCP)
This is an excerpt from a dataflow pipeline project built for an NLP client. We needed to tokenize the (massive) [Google Patents](https://cloud.google.com/blog/topics/public-datasets/google-patents-public-datasets-connecting-public-paid-and-private-patent-data) dataset, for later training of several custom transformers models.

# Installs

In [None]:
import sys
!{sys.executable} -m pip install -U spacy
!{sys.executable} -m pip install -U spacy-lookups-data
!{sys.executable} -m spacy download en
!{sys.executable} -m pip install nltk
!{sys.executable} -m pip install transformers

# Imports

In [None]:
import re
import json
import sys
import os

import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.io.gcp.internal.clients import bigquery

import google.auth

### Setup (Omitted)
Contains proprietary cloud infrastructure information. Redacted out of an abundance of caution. Effectively, at this step I:
1) Instantiate a [pipeline_options.PipelineOptions](https://beam.apache.org/releases/pydoc/2.5.0/apache_beam.options.pipeline_options.html) object.
2) Configure its GCP project, zone, and cloud storage bucket for the client's project.
3) Set the Google Cloud Storage output bucket for the pipeline. 

# BigQuery Setup

In [None]:
from apache_beam.io.gcp.internal.clients import bigquery
import pyarrow as pa
import pickle
import transformers
from transformers import AutoTokenizer
import os


table_spec = bigquery.TableReference(
    projectId=os.getenv('CLIENT_PROJECT_ID'),
    datasetId=os.getenv('CLIENT_BQ_DATASET'),
    tableId=os.getenv('CLIENT_BQ_TABLE'))


fields = [
    ('block', pa.binary())
]
schema = pyarrow.schema(fields)

# Define One or Multiple Pipeline Functions

In [None]:
def clean_text(e):
    '''
    A basic text cleaning function on Big Query patent objects that 
    casts everything to lowercase and removes any redundant whitespace.
    '''
    jdata = json.loads(e)
    txt = jdata['abs_text'] + jdata['desc_text'] + jdata['claims_text']
    low = txt.lower()
    alpha = re.sub(r'[^A-Za-z\s]+', '', low)
    rm_space = re.sub(r'[\s]{2,}', ' ', alpha)
    return rm_space
    

# Build the Pipeline

In [None]:
p = beam.Pipeline(InteractiveRunner(), options=options)

#Instantiate a Transformers tokenizer object using the GPT2 presets
tok = AutoTokenizer.from_pretrained('gpt2')

from_table = (
    p
    | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))
    | 'Tokenize' >> beam.Map(lambda x: tok.encode(x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']))
    | 'Serialize for Parquet' >> beam.Map(lambda x: {'block' : pickle.dumps(tok.encode(x))})
    | 'Write to GCS' >> beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'), schema)
)


# Optionally, Inspect the Pipeline

In [None]:
ib.show_graph(p)

# RUN IT

In [None]:
pipeline_result = DataflowRunner().run_pipeline(p, options=options)

In [None]:
from IPython.core.display import display, HTML
url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % 
      (pipeline_result._job.location, pipeline_result._job.id, pipeline_result._job.projectId))
display(HTML('Click <a href="%s" target="_new">here</a> for the details of your Dataflow job!' % url))

## Sanity Check
Once things are done, download one of the output parquet files and load it. Then, decode one of the blocks to make sure the tokenization successfully completed.

In [None]:
import pyarrow.parquet as pq
a = pq.read_table('pq_out-00000-of-00001')

arr = a['block']

tok.decode(pickle.loads(a['block'][0].as_py()))