<a href="https://colab.research.google.com/github/shinchan75034/TFE_BigQuery/blob/master/GCP_TFE2_BigQuery_Colab_Authentication.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Reference
Had to make a change in creating `one_shot_iterator`. It has to wrap around `tf.compact.v1.data`. <br />
This is modified from [here](https://cloud.google.com/blog/products/ai-machine-learning/tensorflow-enterprise-makes-accessing-data-on-google-cloud-faster-and-easier "TFE and BigQuery")

### Install TensorFlow IO
This is necesary in Colab. In Google Cloud service such as AI notebook or Deep Learning VM, this is installed along as a part of TensorFlow Enterprise distribution.

In [0]:
 !pip install tensorflow-io

Collecting tensorflow-io
[?25l  Downloading https://files.pythonhosted.org/packages/28/d5/fed76bdd291d3ada5b159e47638fb639dd01bfb3361a58b00fb61f60682f/tensorflow_io-0.13.0-cp36-cp36m-manylinux2010_x86_64.whl (20.9MB)
[K     |████████████████████████████████| 20.9MB 1.4MB/s 
Installing collected packages: tensorflow-io
Successfully installed tensorflow-io-0.13.0


In [0]:
!ls -lrt

total 4
drwxr-xr-x 1 root root 4096 May 13 16:29 sample_data


In [0]:

from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


Set environmental variable

In [0]:
PROJECT_ID = "project1-190517" #@param {type:"string"}
! gcloud config set project $PROJECT_ID
%env GCLOUD_PROJECT=$PROJECT_ID

Updated property [core/project].
env: GCLOUD_PROJECT=project1-190517


In [0]:
!ls -lrt

total 8
drwxr-xr-x 1 root root 4096 May 13 16:29 sample_data
-rw-r--r-- 1 root root 2664 May 19 21:31 adc.json


### Authentication 


Colab notebook requires extra authentication steps below. You don't need to do this if you are running in Google Cloud service environment such as a AI notebook, because in there, you are authenticated in your Google Cloud tenant. 

According to instruction [here](https://cloud.google.com/iam/docs/creating-managing-service-account-keys#iam-service-account-keys-create-console  "Google Cloud IAM & Admins"), go Google Cloud IAM & Admins tab to choose a service account email, create a key and downlowd the key in JSON format at your local computer, then in the cell below, upload it to your Colab notebook.

In [0]:

from google.colab import files
uploaded = files.upload()

Saving project1-190517-858599adc951.json to project1-190517-858599adc951.json


In [0]:
!rm service_account.json
!ln -s project1-190517-858599adc951.json service_account.json

rm: cannot remove 'service_account.json': No such file or directory


In [0]:
!ls -lrt ./service_account.json

lrwxrwxrwx 1 root root 33 May 19 21:31 ./service_account.json -> project1-190517-858599adc951.json


In [0]:
!cat ./service_account.json

In [0]:
from google.cloud import bigquery
from google.oauth2 import service_account

# TODO(developer): Set key_path to the path to the service account key
#                  file.
key_path = "./service_account.json"

credentials = service_account.Credentials.from_service_account_file(
    key_path,
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

In [0]:
client

<google.cloud.bigquery.client.Client at 0x7fead431e6d8>

Now we have completed all the client authentication steps necessary for this Colab to use BigQuery API.

### Run BigQuery API without using TensorFlow IO
The first way we can try is calling BigQuery API directly from Python. This will give us direct access to the data, execute the query, and receive the results.

In [0]:
project_id ="project1-190517"

In [0]:
from google.cloud import bigquery

client = bigquery.Client(project=project_id)

sample_count = 2000
row_count = client.query('''
  SELECT 
    COUNT(*) as total
  FROM `bigquery-public-data.samples.gsod`''').to_dataframe().total[0]

df = client.query('''
  SELECT
    *
  FROM
    `bigquery-public-data.samples.gsod`
  WHERE RAND() < %d/%d
''' % (sample_count, row_count)).to_dataframe()

print('Full dataset has %d rows' % row_count)

Full dataset has 114420316 rows


In [0]:
df.describe()

Unnamed: 0,station_number,wban_number,year,month,day,mean_temp,num_mean_temp_samples,mean_dew_point,num_mean_dew_point_samples,mean_sealevel_pressure,num_mean_sealevel_pressure_samples,mean_station_pressure,num_mean_station_pressure_samples,mean_visibility,num_mean_visibility_samples,mean_wind_speed,num_mean_wind_speed_samples,max_sustained_wind_speed,max_gust_wind_speed,max_temperature,total_precipitation,snow_depth
count,1946.0,1946.0,1946.0,1946.0,1946.0,1946.0,1946.0,1861.0,1861.0,1488.0,1488.0,717.0,717.0,1730.0,1730.0,1920.0,1920.0,1890.0,317.0,1945.0,1786.0,101.0
mean,500668.700925,89078.760021,1987.696814,6.594553,15.810894,51.513309,12.966598,40.438098,12.897367,1014.314717,11.594086,960.870013,12.152022,12.358786,12.63526,7.128958,12.895833,12.643862,25.454574,43.028946,0.066529,9.09901
std,299115.051121,27732.276606,15.581828,3.451992,8.834799,24.246043,7.951486,22.701581,7.968196,9.712622,7.622606,79.920111,7.886807,8.848615,7.875141,5.14076,7.913984,7.076316,8.712668,24.212716,0.298507,9.729198
min,10010.0,137.0,1938.0,1.0,1.0,-90.400002,4.0,-97.400002,4.0,950.0,4.0,603.400024,4.0,0.0,4.0,0.0,4.0,1.9,7.8,-105.199997,0.0,0.4
25%,238152.5,99999.0,1978.0,4.0,8.0,38.299999,7.0,28.6,7.0,1008.799988,6.0,949.5,6.0,6.6,6.0,3.7,7.0,7.8,19.4,31.5,0.0,2.0
50%,529230.0,99999.0,1989.0,7.0,16.0,53.900002,8.0,43.099998,8.0,1014.099976,8.0,991.900024,8.0,9.9,8.0,6.1,8.0,11.7,23.299999,45.5,0.0,6.3
75%,725107.75,99999.0,2001.0,10.0,23.0,69.800003,23.0,55.799999,23.0,1019.799988,22.0,1009.0,23.0,15.075,23.0,9.2,23.0,15.9,29.9,60.099998,0.01,14.2
max,999999.0,99999.0,2010.0,12.0,31.0,98.400002,24.0,81.400002,24.0,1051.800049,24.0,1037.800049,24.0,91.800003,24.0,62.200001,24.0,69.900002,62.200001,87.800003,7.24,63.0


In [0]:
import pandas as pd

sample_count = 2000
df = pd.io.gbq.read_gbq('''
  SELECT name, SUM(number) as count
  FROM `bigquery-public-data.usa_names.usa_1910_2013`
  WHERE state = 'TX'
  GROUP BY name
  ORDER BY count DESC
  LIMIT 100
''', project_id=project_id, dialect='standard')

df.head()

Unnamed: 0,name,count
0,James,272793
1,John,235139
2,Michael,225320
3,Robert,220399
4,David,219028


### Run BigQuery API with TensorFlow IO

For TensorFlow consumption of BigQuery Data, it is better if we would use TensorFlow IO to invoke the BigQuery API. This is because TensorFlow IO will provide us with a dataset object that represents the query results, rather than the entire results as in the previous way. Dataset object is the means to streaming data to model during training. This is necessary when data size gets very big.

In [0]:

import tensorflow as tf
from tensorflow_io.bigquery import BigQueryClient

In [0]:
PROJECT_ID = "project1-190517" # This is from what you created in your Google Cloud Account. 
DATASET_GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = "samples"
TABLE_ID = "wikipedia"

Lets wrap around the client and session call in a function. Notice that a tradeoff is that we must know the schema of table beforehand. It won't work as an ad hoc query like previous way of using BigQuery API call without using TensorFlow IO. Also, we have to create a session to read the data. These are some obvious complications.

In [0]:
# Use this function to create a BigQuery client with read session.
def run_bqsession(num_iterations):
  batch_size = 2048
  client = BigQueryClient()
  read_session = client.read_session(
      "projects/" + PROJECT_ID,
      DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
      ["title",
       "id",
       "num_characters",
       "language",
       "timestamp",
       "wp_namespace",
       "contributor_username"],
      [tf.string,
       tf.int64,
       tf.int64,
       tf.string,
       tf.int64,
       tf.int64,
       tf.string],
      requested_streams=10
  )

  dataset = read_session.parallel_read_rows(sloppy=True).batch(batch_size)
  return dataset

In [0]:
batcheddataset = run_bqsession(1000)

In [0]:
type(batcheddataset)

tensorflow.python.data.ops.dataset_ops.BatchDataset

### Examine dataset returned by TensorFlow IO Ops

Dataset structure is inherently a Python iterator. We can output the actual content of the data using `next()`

In [0]:
next(iter(batcheddataset))

OrderedDict([('contributor_username',
              <tf.Tensor: shape=(2048,), dtype=string, numpy=array([b'', b'MSGJ', b'', ..., b'', b'', b''], dtype=object)>),
             ('id',
              <tf.Tensor: shape=(2048,), dtype=int64, numpy=array([ 1462053, 15851098,  6459333, ..., 21315055,  1533342, 10607447])>),
             ('language',
              <tf.Tensor: shape=(2048,), dtype=string, numpy=array([b'', b'', b'', ..., b'', b'', b''], dtype=object)>),
             ('num_characters',
              <tf.Tensor: shape=(2048,), dtype=int64, numpy=array([20009, 81301, 30391, ..., 16407, 11407,  2163])>),
             ('timestamp', <tf.Tensor: shape=(2048,), dtype=int64, numpy=
              array([1173977859, 1252849786, 1167244494, ..., 1175541864, 1262753759,
                     1184276488])>),
             ('title', <tf.Tensor: shape=(2048,), dtype=string, numpy=
              array([b'Strait of Messina Bridge', b'Template talk:WPBannerMeta',
                     b'2007 in musi

Lets not wrap it around a function. Lets just use the code as-is.

In [0]:
batch_size = 2048
client = BigQueryClient()
read_session = client.read_session(
    "projects/" + PROJECT_ID,
    DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
    ["title",
     "id",
     "num_characters",
     "language",
     "timestamp",
     "wp_namespace",
     "contributor_username"],
    [tf.string,
     tf.int64,
     tf.int64,
     tf.string,
     tf.int64,
     tf.int64,
     tf.string],
      requested_streams=10
)

In [0]:
dataset = read_session.parallel_read_rows(sloppy=True).batch(batch_size)

In [0]:
type(dataset)

tensorflow.python.data.ops.dataset_ops.BatchDataset

In [0]:
itr = tf.compat.v1.data.make_one_shot_iterator(
    dataset
)

Lets try to get a feel for read speed.

In [0]:
import time
n = 0
mini_batch = 100
num_iterations = 10000
for i in range(num_iterations // mini_batch):
    local_start = time.time()
    start_n = n
    for j in range(mini_batch):
        n += batch_size
        batch = itr.get_next()    
    local_end = time.time()
    print('Processed %d entries in %f seconds. [%f] examples/s' % (n - start_n, local_end - local_start, (mini_batch * batch_size) / (local_end - local_start)))
    

Processed 204800 entries in 6.675380 seconds. [30679.900312] examples/s
Processed 204800 entries in 0.803941 seconds. [254745.134064] examples/s
Processed 204800 entries in 0.786988 seconds. [260232.678021] examples/s
Processed 204800 entries in 0.794928 seconds. [257633.371784] examples/s
Processed 204800 entries in 0.785177 seconds. [260832.751855] examples/s
Processed 204800 entries in 0.800716 seconds. [255770.957061] examples/s
Processed 204800 entries in 0.794140 seconds. [257889.004568] examples/s
Processed 204800 entries in 0.806647 seconds. [253890.617887] examples/s
Processed 204800 entries in 0.789359 seconds. [259450.992445] examples/s
Processed 204800 entries in 0.804452 seconds. [254583.187009] examples/s
Processed 204800 entries in 0.814778 seconds. [251356.710134] examples/s
Processed 204800 entries in 0.854899 seconds. [239560.549236] examples/s
Processed 204800 entries in 0.813641 seconds. [251708.188344] examples/s
Processed 204800 entries in 0.856135 seconds. [23921

Lets read another dataset

In [0]:
DATASET_GCP_PROJECT_ID = 'project1-190517'
TABLE_ID = 'confirmed_cases'
DATASET_ID = 'covid19_usafacts'

In [0]:
read_session2 = client.read_session(
    "projects/" + PROJECT_ID,
    DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
    ["county_fips_code",
     "county_name",
     "state",
     "state_fips_code",
     "_1_22_20"
     
     ],
    [tf.string,
     tf.string,
     tf.string,
     tf.string,
     tf.int64
     
     
     ],
      requested_streams=10
)

In [0]:
dataset2 = read_session2.parallel_read_rows(sloppy=True).batch(batch_size)

In [0]:
itr2 = tf.compat.v1.data.make_one_shot_iterator(dataset2)

In [0]:
next(itr2)

OrderedDict([('_1_22_20',
              <tf.Tensor: shape=(2048,), dtype=int64, numpy=array([0, 0, 0, ..., 0, 0, 0])>),
             ('county_fips_code',
              <tf.Tensor: shape=(2048,), dtype=string, numpy=
              array([b'01073', b'06001', b'06013', ..., b'28003', b'48277', b'54081'],
                    dtype=object)>),
             ('county_name', <tf.Tensor: shape=(2048,), dtype=string, numpy=
              array([b'Jefferson County', b'Alameda County', b'Contra Costa County',
                     ..., b'Alcorn County', b'Lamar County', b'Raleigh County'],
                    dtype=object)>),
             ('state',
              <tf.Tensor: shape=(2048,), dtype=string, numpy=array([b'AL', b'CA', b'CA', ..., b'MS', b'TX', b'WV'], dtype=object)>),
             ('state_fips_code',
              <tf.Tensor: shape=(2048,), dtype=string, numpy=array([b'01', b'06', b'06', ..., b'28', b'48', b'54'], dtype=object)>)])