# Environment

## Libraries

In [1]:
import os, pathlib, sys, re, string, spacy, bs4, apache_beam as beam

## Working Directory

In [2]:
workingdir=os.getcwd()
d=[d for d in os.listdir(workingdir)]
n=0
while not set(['notebook']).issubset(set(d)):
    workingdir=str(pathlib.Path(workingdir).parents[0])

    d=[d for d in os.listdir(str(workingdir))]
    n+=1
    if n>5:
        break
sys.path.insert(0, workingdir)
os.chdir(workingdir)

## Configuring spaCy for NLP Operations

In [3]:
! python -m spacy download en_core_web_sm

[38;5;2mâœ” Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_sm')


## Apache Beam and GCP Settings

In [4]:
pipeline_options = beam.options.pipeline_options.PipelineOptions()
gcp_options = beam.options.pipeline_options.GoogleCloudOptions
standard_options = beam.options.pipeline_options.StandardOptions

In [14]:
google_cloud_options = pipeline_options.view_as(gcp_options)
google_cloud_options.project = 'nlp-text-classification'
google_cloud_options.job_name = 'stackoverflow-preprocessing-2'
google_cloud_options.region = 'us-central1'
google_cloud_options.staging_location = 'gs://nlp-text-classification/beam/stage'
google_cloud_options.temp_location = 'gs://nlp-text-classification/beam/temp'
pipeline_options.view_as(standard_options).runner = 'DataflowRunner'

# Creating a DoFn Object

In [20]:
class CleanText(beam.DoFn):
    def __init__(self):
        pass
        
    def __decode_html(self, input_str: str) -> str:
        self.soup = bs4.BeautifulSoup(input_str, 'html.parser')
        self.output = self.soup.text
        return self.output

    def __nlp(self, input_str: str) -> list:
        self.doc = self.spacy(input_str)
        self.stopwords = list(string.punctuation + string.digits) + ['-pron-']
        self.output = [token.lemma_.lower() for token in self.doc if not token.is_stop 
                  and token.lemma_.lower() not in self.stopwords]
        return ' '.join(self.output)

    def process(self, element):
        
        return element

# Pipeline

## Local Pipeline

In [None]:
local_file = 'data/beam_test.csv'
if os.path.exists('data/beam_output.txt'):
    os.remove('data/beam_output.txt')

In [None]:
with beam.Pipeline(argv=sys.argv) as p:
    file = p                  | "ReadLocalFile" >> beam.io.ReadFromText(local_file)
    table = file              | "CreateDictionary"  >> beam.ParDo(Split())
    clean_text = table        | "ProcessFields" >> beam.ParDo(CleanText())
    clean_text                | "WriteLocalFile" >> beam.io.WriteToText('data/beam_output.txt')

## GCP Pipeline

In [24]:
query = '''SELECT
  id,
  title,
  body,
  tags
FROM
  `nlp-text-classification.stackoverflow.posts_p1_subset`'''

In [22]:
table_schema = {'fields': [
    {'name': 'id', 'type': 'INTEGER', 'mode': 'NULLABLE'},
    {'name': 'title', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'body', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'tags', 'type': 'STRING', 'mode': 'REPEATED'},
]}
new_table = beam.io.gcp.internal.clients.bigquery.TableReference(
    projectId='nlp-text-classification',
    datasetId='stackoverflow',
    tableId='posts_p2_subset')

In [23]:
with beam.Pipeline(options=pipeline_options) as p:
    table = p                 | "QueryTable" >> beam.io.Read(beam.io.BigQuerySource(
                                                    query=query,
                                                    use_standard_sql=True))
    clean_text = table        | "Preprocessing" >> beam.ParDo(CleanText())
    clean_text                | "WriteTable" >> beam.io.WriteToBigQuery(
                                                    new_table,
                                                    schema=table_schema,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)



DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 261, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 317, in loads
    return load(file, ignore)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 305, in load
    obj = pik.load()
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 577, in _load_type
    return _reverse_typemap[name]
KeyError: 'ClassType'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 176, in execute
    op.start()
  File "apache_beam/runners/worker/operations.py", line 587, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 588, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 589, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py", line 224, in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py", line 535, in apache_beam.runners.worker.operations.DoOperation.setup
  File "apache_beam/runners/worker/operations.py", line 540, in apache_beam.runners.worker.operations.DoOperation.setup
  File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 265, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 317, in loads
    return load(file, ignore)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 305, in load
    obj = pik.load()
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 577, in _load_type
    return _reverse_typemap[name]
KeyError: 'ClassType'
