# test big query view access for data scientist
for info: Filippo Burnelli 
-  test to make sure d.s. can access bigquery view
- https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-api (see limitations)

In [1]:
BUCKET = 'axa-ch-machine-learning-poc-dev'
PROJECT = 'axa-ch-machine-learning-dev'
REGION = 'europe-west1'
ZONE = 'europe-west6-a'

In [2]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION


## Access a view for exploration

In [5]:
# Create SQL query 
""" CREATE OR REPLACE VIEW `axa-ch-machine-learning-dev.vw_frenc_claims.vw_freMTPL2freq` AS select   vehAge,drivAge from `axa-ch-machine-learning-dev.french_claims.freMTPL2freq`;"""

query = """SELECT *  FROM `axa-ch-machine-learning-dev.vw_frenc_claims.vw_freMTPL2freq`  """

# Call BigQuery and examine in dataframe
import google.datalab.bigquery as bq
df = bq.Query(query + " LIMIT 100").execute().result().to_dataframe()
df.head()

Unnamed: 0,vehAge,drivAge
0,0,36
1,0,36
2,11,35
3,18,44
4,12,31


## Beam processing

In [25]:
import apache_beam as beam
import datetime, os

def to_csv(rowdict):
  s=''
  #for key, value in rowdict.iteritems():
  #      s=s+value+','
  s=','.join([str(rowdict[k]) for k,v in rowdict.iteritems()])
  yield s
    
def preprocess(in_test_mode,query):
  import shutil, os, subprocess
  job_name = 'preprocess-french-claims-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
  
  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/project/test-bq-vw/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass
    
    
  options = {'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
              'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
              'job_name': job_name,
              'project': PROJECT,
              'region': REGION,
              'zone': ZONE,
              'teardown_policy': 'TEARDOWN_ALWAYS',
              'no_save_main_session': True,
              'max_num_workers': 6  }

  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
      RUNNER = 'DirectRunner'
  else:
      RUNNER = 'DataflowRunner'

  p = beam.Pipeline(RUNNER, options = opts)
  

  if in_test_mode:
    query = query + ' LIMIT 1000' 

  for step in ['train', 'eval']:
    if step == 'train':
      selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(drivAge),10) < 2'.format(query)
    else:
      selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(drivAge),4) >2 '.format(query)

    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )


  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")


## Access a view through Apache Beam local

In [16]:
query = """ SELECT *  FROM `axa-ch-machine-learning-dev.vw_frenc_claims.vw_freMTPL2freq`  """
preprocess(in_test_mode = True,query)

Launching local job ... hang on




Done!


In [18]:
!ls preproc

eval.csv-00000-of-00001  train.csv-00000-of-00001


In [19]:
!head -10 preproc/eval.csv-00000-of-00001 

23,0
23,0
23,0
27,0
31,0
31,0
35,0
35,0
39,0
47,0


## Access a view through Apache Beam Remote (test scaling)

In [20]:
query = """ SELECT *  FROM `axa-ch-machine-learning-dev.vw_frenc_claims.vw_freMTPL2freq`  """
preprocess(in_test_mode = False,query)

Launching Dataflow job preprocess-french-claims-features-190508-071853 ... hang on


In [21]:
!gsutil ls gs://axa-ch-machine-learning-poc-dev/project/test-bq-vw/

gs://axa-ch-machine-learning-poc-dev/project/test-bq-vw/eval.csv-00000-of-00002
gs://axa-ch-machine-learning-poc-dev/project/test-bq-vw/eval.csv-00001-of-00002
gs://axa-ch-machine-learning-poc-dev/project/test-bq-vw/train.csv-00000-of-00001
gs://axa-ch-machine-learning-poc-dev/project/test-bq-vw/tmp/


#### OK but the job ran on only 1 worker ( jobid 2019-05-08_00_18_59-16469127876986107248)
so i try with a view 10 x bigger 

In [27]:
# Create SQL query 
#Link to the View  https://console.cloud.google.com/bigquery?sq=1009147027531:98d6376b193041c69d436c726b40af8f
query = """ SELECT *  FROM `axa-ch-machine-learning-dev.vw_frenc_claims.vw_freMTPL2freq_x_10`  """
preprocess(in_test_mode = False,query=query)


Launching Dataflow job preprocess-french-claims-features-190508-073935 ... hang on


In [28]:
!gsutil ls gs://axa-ch-machine-learning-poc-dev/project/test-bq-vw/

gs://axa-ch-machine-learning-poc-dev/project/test-bq-vw/tmp/


#### ok code  worked with 5 Worker

open points
- check without max_num_workers = 6
- check why workks
- check with some view from Data engineering