# Preprocessing using Beam/Dataflow
## Setup Working Directory

In [1]:
import sys
import os
import pathlib

workingdir=os.getcwd()
#print(workingdir)
d=[d for d in os.listdir(workingdir)]
n=0
while not set(['notebook']).issubset(set(d)):
    workingdir=str(pathlib.Path(workingdir).parents[0])
    #print(workingdir)
    
    
    
    
    d=[d for d in os.listdir(str(workingdir))]
    n+=1
    if n>5:
        break
sys.path.insert(0, workingdir)
os.chdir(workingdir)

## Import libraries

In [7]:
import os
import logging
import subprocess
import datetime
import subprocess, requests
import apache_beam as beam
from google.cloud import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

## Defined GCP env variables

In [8]:
# get all variables here
os.environ['PROJECT_ID'] =  subprocess.run('gcloud config list project --format "value(core.project)"',
                                             shell=True, check=True,
                                             stdout=subprocess.PIPE).stdout.decode().replace('\n', '').replace('\r', '')

os.environ['REGION'] = subprocess.run('gcloud config get-value compute/region  2> /dev/null',
                                      shell=True, check=True,
                                      stdout=subprocess.PIPE).stdout.decode().replace('\n', '').replace('\r', '')
# bucket storage name
os.environ['BUCKET_NAME']='axa-ch-machine-learning-poc-dev'

In [10]:
try:
    tmp=os.environ['PROJECT_ID']
except:
    print('Env variable PROJECT not defined!') 

try:
    tmp=os.environ['BUCKET_NAME']
except:
    print('Env variable BUCKET_NAME not defined!') 

try:
    tmp=os.environ['REGION']
except:
    print('Env variable REGION not defined!') 

try:    
    tmp=os.environ['GOOGLE_APPLICATION_CREDENTIALS']
except:
    print('Env variable GOOGLE_APPLICATION_CREDENTIALS not defined!') 

try:
    tmp=os.environ['REQUESTS_CA_BUNDLE']
except:
    print('Env variable REQUESTS_CA_BUNDLE not defined!') 

try:
    tmp=os.environ['AXA_CH_CA_BUNDLE']
except:
    print('Env variable AXA_CA_CA_BUNDLE not defined!') 

Env variable GOOGLE_APPLICATION_CREDENTIALS not defined!
Env variable REQUESTS_CA_BUNDLE not defined!
Env variable AXA_CA_CA_BUNDLE not defined!


## Preprocessing using Beam/Dataflow

### Define a query to get data

In [11]:
# define query table
def create_query():
    query = """
    SELECT
    *
    FROM
    `bigquery-public-data.stackoverflow.tags`
    LIMIT 100
    """

    return query

### Get the schema of the input table

In [12]:
# getting schema
! bq show --format=prettyjson bigquery-public-data:stackoverflow.tags 

{
  "creationTime": "1477491432274", 
  "etag": "MA4KCkNQ6uZukuWZs/YFNQ==", 
  "id": "bigquery-public-data:stackoverflow.tags", 
  "kind": "bigquery#table", 
  "lastModifiedTime": "1569811976063", 
  "location": "US", 
  "numBytes": "2284573", 
  "numLongTermBytes": "0", 
  "numRows": "55665", 
  "schema": {
    "fields": [
      {
        "mode": "NULLABLE", 
        "name": "id", 
        "type": "INTEGER"
      }, 
      {
        "mode": "NULLABLE", 
        "name": "tag_name", 
        "type": "STRING"
      }, 
      {
        "mode": "NULLABLE", 
        "name": "count", 
        "type": "INTEGER"
      }, 
      {
        "mode": "NULLABLE", 
        "name": "excerpt_post_id", 
        "type": "INTEGER"
      }, 
      {
        "mode": "NULLABLE", 
        "name": "wiki_post_id", 
        "type": "INTEGER"
      }
    ]
  }, 
  "selfLink": "https://bigquery.googleapis.com/bigquery/v2/projects/bigquery-public-data/datasets/stackoverflow/tables/tags", 
  "tableReference": {
    

In [13]:
table_schema = {
    'fields': [
        {
            'mode': 'NULLABLE',
            'name': 'id',
            'type': 'INTEGER'
        },
        {
            'mode': 'NULLABLE',
            'name': 'tag_name',
            'type': 'STRING'
        },
        {
            'mode': 'NULLABLE',
            'name': 'count',
            'type': 'INTEGER'
        },
        {
            'mode': 'NULLABLE',
            'name': 'excerpt_post_id',
            'type': 'INTEGER'
        },
        {
            'mode': 'NULLABLE',
            'name': 'wiki_post_id',
            'type': 'INTEGER'
        }
    ]
}

### Define the preprocessing steps

In [21]:
def preprocess():
    """
    Arguments:
        -RUNNER: "DirectRunner" or "DataflowRunner". Specfy to run the pipeline locally or on Google Cloud respectively.
    Side-effects:
        -Creates and executes dataflow pipeline.
        See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline
    """
    job_name = 'test-stackoverflow' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    project = os.environ['PROJECT_ID']
    region = os.environ['REGION']
    output_dir = "gs://{0}/stackoverflow/".format(os.environ['BUCKET_NAME'])

    # options
    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project =  project
    google_cloud_options.job_name =  job_name
    google_cloud_options.region = region
    google_cloud_options.staging_location = os.path.join(output_dir, 'tmp', 'staging')
    google_cloud_options.temp_location = os.path.join(output_dir, 'tmp')
    # done by command line
    #options.view_as(StandardOptions).runner = 'DataflowRunner'

    # instantantiate Pipeline object using PipelineOptions
    print('Launching Dataflow job {} ... hang on'.format(job_name))

    p = beam.Pipeline(options=options)
    output = p | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
        # query
        query=create_query(),
        # use standard SQL for the above query
        use_standard_sql=True)
        )
    output | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
        # The table name is a required argument for the BigQuery
        table='test_stackoverflow_beam',
        dataset='test',
        project=project,
        # Here we use the JSON schema read in from a JSON file.
        # Specifying the schema allows the API to create the table correctly if it does not yet exist.
        schema=table_schema,
        # Creates the table in BigQuery if it does not yet exist.
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        # Deletes all data in the BigQuery table before writing.
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
        # not needed, from with clause

    if options.view_as(StandardOptions).runner == 'DataflowRunner':
        print('DataflowRunner')
        p.run()
    else:
        print('Default: DirectRunner')
        result = p.run()
        result.wait_until_finish()
    print('Done')

In [22]:
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.DEBUG)

    print('Starting main process ...')
    preprocess()

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
DEBUG:root:Unhandled type_constraint: Union[]
DEBUG:root:Unhandled type_constraint: Union[]
DEBUG:root:Unhandled type_constraint: Any
INFO:root:Running pipeline with DirectRunner.


Starting main process ...
Launching Dataflow job test-stackoverflow-191023-075445 ... hang on
Default: DirectRunner


INFO:root:Using location 'US' from table <TableReference
 datasetId: 'stackoverflow'
 projectId: 'bigquery-public-data'
 tableId: 'tags'> referenced by query 
    SELECT
    *
    FROM
    `bigquery-public-data.stackoverflow.tags`
    LIMIT 100
    
DEBUG:root:Creating or getting table <TableReference
 datasetId: 'test'
 projectId: 'axa-ch-machine-learning-dev'
 tableId: 'test_stackoverflow_beam'> with schema {'fields': [{'mode': 'NULLABLE', 'name': 'id', 'type': 'INTEGER'}, {'mode': 'NULLABLE', 'name': 'tag_name', 'type': 'STRING'}, {'mode': 'NULLABLE', 'name': 'count', 'type': 'INTEGER'}, {'mode': 'NULLABLE', 'name': 'excerpt_post_id', 'type': 'INTEGER'}, {'mode': 'NULLABLE', 'name': 'wiki_post_id', 'type': 'INTEGER'}]}.
DEBUG:root:Created the table with id test_stackoverflow_beam
INFO:root:Created table axa-ch-machine-learning-dev.test.test_stackoverflow_beam with schema <TableSchema
 fields: [<TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'id'
 type: 'INTEGER'>, <TableField

Done
