In [16]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [11]:
import sys
import os.path
import logging

import os
from six.moves import urllib
import tempfile

import math
import numpy as np
import pandas as pd
import tensorflow as tf

from enum import Enum

import datetime
from tensorflow import keras
from tensorflow.keras.callbacks import *

from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError

from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession
from tensorflow.python.client import device_lib

from tensorflow.python.data.experimental.ops import interleave_ops
from tensorflow.python.data.ops import dataset_ops

import google.cloud.logging

import argparse

LOCATION = 'us'
PROJECT_ID = "alekseyv-scalableai-dev"


CSV_SCHEMA = [
      bigquery.SchemaField("label", "INTEGER", mode='REQUIRED'),
      bigquery.SchemaField("int1", "INTEGER"),
      bigquery.SchemaField("int2", "INTEGER"),
      bigquery.SchemaField("int3", "INTEGER"),
      bigquery.SchemaField("int4", "INTEGER"),
      bigquery.SchemaField("int5", "INTEGER"),
      bigquery.SchemaField("int6", "INTEGER"),
      bigquery.SchemaField("int7", "INTEGER"),
      bigquery.SchemaField("int8", "INTEGER"),
      bigquery.SchemaField("int9", "INTEGER"),
      bigquery.SchemaField("int10", "INTEGER"),
      bigquery.SchemaField("int11", "INTEGER"),
      bigquery.SchemaField("int12", "INTEGER"),
      bigquery.SchemaField("int13", "INTEGER"),
      bigquery.SchemaField("cat1", "STRING"),
      bigquery.SchemaField("cat2", "STRING"),
      bigquery.SchemaField("cat3", "STRING"),
      bigquery.SchemaField("cat4", "STRING"),
      bigquery.SchemaField("cat5", "STRING"),
      bigquery.SchemaField("cat6", "STRING"),
      bigquery.SchemaField("cat7", "STRING"),
      bigquery.SchemaField("cat8", "STRING"),
      bigquery.SchemaField("cat9", "STRING"),
      bigquery.SchemaField("cat10", "STRING"),
      bigquery.SchemaField("cat11", "STRING"),
      bigquery.SchemaField("cat12", "STRING"),
      bigquery.SchemaField("cat13", "STRING"),
      bigquery.SchemaField("cat14", "STRING"),
      bigquery.SchemaField("cat15", "STRING"),
      bigquery.SchemaField("cat16", "STRING"),
      bigquery.SchemaField("cat17", "STRING"),
      bigquery.SchemaField("cat18", "STRING"),
      bigquery.SchemaField("cat19", "STRING"),
      bigquery.SchemaField("cat20", "STRING"),
      bigquery.SchemaField("cat21", "STRING"),
      bigquery.SchemaField("cat22", "STRING"),
      bigquery.SchemaField("cat23", "STRING"),
      bigquery.SchemaField("cat24", "STRING"),
      bigquery.SchemaField("cat25", "STRING"),
      bigquery.SchemaField("cat26", "STRING")
  ]


In [35]:
def get_mean_and_std_dicts():
  #client = bigquery.Client(location="US", project=PROJECT_ID)
  client = bigquery.Client(project=PROJECT_ID)
  query = """
    select
    AVG(int1) as avg_int1, STDDEV(int1) as std_int1,
    AVG(int2) as avg_int2, STDDEV(int2) as std_int2,
    AVG(int3) as avg_int3, STDDEV(int3) as std_int3,
    AVG(int4) as avg_int4, STDDEV(int4) as std_int4,
    AVG(int5) as avg_int5, STDDEV(int5) as std_int5,
    AVG(int6) as avg_int6, STDDEV(int6) as std_int6,
    AVG(int7) as avg_int7, STDDEV(int7) as std_int7,
    AVG(int8) as avg_int8, STDDEV(int8) as std_int8,
    AVG(int9) as avg_int9, STDDEV(int9) as std_int9,
    AVG(int10) as avg_int10, STDDEV(int10) as std_int10,
    AVG(int11) as avg_int11, STDDEV(int11) as std_int11,
    AVG(int12) as avg_int12, STDDEV(int12) as std_int12,
    AVG(int13) as avg_int13, STDDEV(int13) as std_int13
    from `alekseyv-scalableai-dev.criteo_kaggle.days`
  """
  query_job = client.query(
      query,
      location="US",
  )  # API request - starts the query

  df = query_job.to_dataframe()
  #print(query_job.result())
  #print(query_job.errors)
  #print(df)

  mean_dict = dict((field[0].replace('avg_', ''), df[field[0]][0]) for field in df.items() if field[0].startswith('avg'))
  std_dict = dict((field[0].replace('std_', ''), df[field[0]][0]) for field in df.items() if field[0].startswith('std'))
  return (mean_dict, std_dict)

(mean_dict, std_dict) = get_mean_and_std_dicts()

print(mean_dict)
print(std_dict)



{'int1': 3.502413317075398, 'int2': 105.8484197976657, 'int3': 26.913041020611118, 'int4': 7.322680248873331, 'int5': 18538.99166487135, 'int6': 116.06185085211608, 'int7': 16.333130032135006, 'int8': 12.51704213755677, 'int9': 106.10982343805144, 'int10': 0.6175294977722182, 'int11': 2.7328343170173177, 'int12': 0.991035628772124, 'int13': 8.217461161174013}
{'int1': 9.429076407105086, 'int2': 391.4578226870704, 'int3': 397.97258302273474, 'int4': 8.793230712645805, 'int5': 69394.60184622335, 'int6': 382.5664493712364, 'int7': 66.04975524511708, 'int8': 16.688884567787582, 'int9': 220.28309398647906, 'int10': 0.6840505553977025, 'int11': 5.199070884811354, 'int12': 5.597723872237179, 'int13': 16.21193255817379}


In [54]:
@tf.function
def transofrom_row(row_dict, mean_dict, std_dict):
  tf.print(OrderedDict(row_dict))
  dict_without_label = row_dict.copy()
  #tf.print(dict_without_label)
  label = dict_without_label.pop('label')
  for field in CSV_SCHEMA:
    if (field.name.startswith('int')):
        if dict_without_label[field.name] == 0:
            value = float(dict_without_label[field.name])
            dict_without_label[field.name] = (value - mean_dict[field.name]) / std_dict[field.name]
        else:
            dict_without_label[field.name] = 0.0 # don't use normalized 0 value for nulls

  tf.print(OrderedDict(dict_without_label)
  return (dict_without_label, label)

def read_bigquery(dataset_id, table_name):


  tensorflow_io_bigquery_client = BigQueryClient()
  read_session = tensorflow_io_bigquery_client.read_session(
      "projects/" + PROJECT_ID,
      PROJECT_ID, table_name, dataset_id,
      list(field.name for field in CSV_SCHEMA),
      list(dtypes.int64 if field.field_type == 'INTEGER'
           else dtypes.string for field in CSV_SCHEMA),
      requested_streams=10)

  #dataset = read_session.parallel_read_rows()

  streams = read_session.get_streams()
  tf.print('bq streams: !!!!!!!!!!!!!!!!!!!!!!')
  tf.print(streams)
  streams_count = 10 # len(streams)
  #streams_count = read_session.get_streams().shape
  tf.print('big query read session returned {} streams'.format(streams_count))

  streams_ds = dataset_ops.Dataset.from_tensor_slices(streams).shuffle(buffer_size=streams_count)
  dataset = streams_ds.interleave(
            read_session.read_rows,
            cycle_length=streams_count,
            num_parallel_calls=streams_count)
  transformed_ds = dataset.map (lambda row: transofrom_row(row, mean_dict, std_dict), num_parallel_calls=streams_count)

  # Interleave dataset is not shardable, turning off sharding
  # See https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#dataset_sharding_and_batch_size
  # Instead we are shuffling data.
  options = tf.data.Options()
#  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
  result = transformed_ds.with_options(options)
  tf.print(str(result))
  return result



SyntaxError: invalid syntax (<ipython-input-54-c5091cb2fe50>, line 18)

In [164]:
from collections import OrderedDict 

@tf.function
def transoform_row(row_dict, mean_dict, std_dict):
  #dict_without_label = row_dict.copy() - OrderedDict.copy does not work in AutoGraph
  dict_without_label = dict(row_dict)
  label = dict_without_label.pop('label')
  for field in CSV_SCHEMA:
    if (field.name.startswith('int')):
        if dict_without_label[field.name] != 0:
            value = float(dict_without_label[field.name])
            dict_without_label[field.name] = (value - mean_dict[field.name]) / std_dict[field.name]
        else:
            dict_without_label[field.name] = 0.0 # don't use normalized 0 value for nulls
  return (dict_without_label, label)
           
def read_gcs(table_name, **kwargs):
  gcs_filename_glob = 'gs://alekseyv-scalableai-dev-public-bucket/criteo_kaggle_from_bq/{}*'.format(table_name)
  selected_columns = list(field.name for field in CSV_SCHEMA)
  column_names = selected_columns + ['row_hash']

  print(column_names)

  dataset = tf.data.experimental.make_csv_dataset(
      gcs_filename_glob,
      batch_size=1,
      column_names = column_names,
      select_columns = selected_columns,
      num_epochs=1,
      field_delim='\t',
      header=False,
      ignore_errors=False,
      **kwargs)

    #.apply(tf.data.experimental.unbatch()) \
  (mean_dict, std_dict) = get_mean_and_std_dicts()
  transformed_ds = dataset.unbatch().map (lambda row: transofrom_row(row, mean_dict, std_dict))

  return transformed_ds

dataset = read_gcs('test_small')
row_index = 0
for row,label in dataset.take(1):
  print("row %d: %s \n\n" % (row_index, row))
  row_index += 1
    
tf.autograph.to_code(transofrom_row.python_function)

['label', 'int1', 'int2', 'int3', 'int4', 'int5', 'int6', 'int7', 'int8', 'int9', 'int10', 'int11', 'int12', 'int13', 'cat1', 'cat2', 'cat3', 'cat4', 'cat5', 'cat6', 'cat7', 'cat8', 'cat9', 'cat10', 'cat11', 'cat12', 'cat13', 'cat14', 'cat15', 'cat16', 'cat17', 'cat18', 'cat19', 'cat20', 'cat21', 'cat22', 'cat23', 'cat24', 'cat25', 'cat26', 'row_hash']




row 0: {'int1': <tf.Tensor: id=14934920, shape=(), dtype=float32, numpy=-0.15933833>, 'int2': <tf.Tensor: id=14934925, shape=(), dtype=float32, numpy=-0.26784092>, 'int3': <tf.Tensor: id=14934926, shape=(), dtype=float32, numpy=-0.052548945>, 'int4': <tf.Tensor: id=14934927, shape=(), dtype=float32, numpy=0.19075122>, 'int5': <tf.Tensor: id=14934928, shape=(), dtype=float32, numpy=-0.2663895>, 'int6': <tf.Tensor: id=14934929, shape=(), dtype=float32, numpy=-0.27985165>, 'int7': <tf.Tensor: id=14934930, shape=(), dtype=float32, numpy=-0.21700503>, 'int8': <tf.Tensor: id=14934931, shape=(), dtype=float32, numpy=-0.39050195>, 'int9': <tf.Tensor: id=14934932, shape=(), dtype=float32, numpy=-0.44084102>, 'int10': <tf.Tensor: id=14934921, shape=(), dtype=float32, numpy=0.5591261>, 'int11': <tf.Tensor: id=14934922, shape=(), dtype=float32, numpy=-0.33329692>, 'int12': <tf.Tensor: id=14934923, shape=(), dtype=float32, numpy=0.0>, 'int13': <tf.Tensor: id=14934924, shape=(), dtype=float32, numpy

AttributeError: 'function' object has no attribute 'python_function'

In [168]:
def transform_row(row_dict, mean_dict, std_dict):
  #dict_without_label = row_dict.copy() - OrderedDict.copy does not work in AutoGraph
  dict_without_label = dict(row_dict)
  label = dict_without_label.pop('label')
  for field in CSV_SCHEMA:
    if (field.name.startswith('int')):
        if dict_without_label[field.name] != 0:
            value = float(dict_without_label[field.name])
            dict_without_label[field.name] = (value - mean_dict[field.name]) / std_dict[field.name]
        else:
            dict_without_label[field.name] = 0.0 # don't use normalized 0 value for nulls
  return (dict_without_label, label)

def transofrom_row_gcs(row_tuple, mean_dict, std_dict):
    row_dict = dict(zip(list(field.name for field in CSV_SCHEMA) + ['row_hash'], list(row_tuple)))
    row_dict.pop('row_hash')
    return transform_row(row_dict, mean_dict, std_dict)

def read_gcs(table_name):
  gcs_filename_glob = 'gs://alekseyv-scalableai-dev-public-bucket/criteo_kaggle_from_bq/{}.csv'.format(table_name)
  record_defaults = list(tf.int32 if field.name == 'label' else tf.constant(0, dtype=tf.int32) if field.name.startswith('int') else tf.constant('', dtype=tf.string) for field in CSV_SCHEMA) + [tf.string]
  dataset = tf.data.experimental.CsvDataset(
      gcs_filename_glob,
      record_defaults,
      field_delim='\t',
      header=False)
  (mean_dict, std_dict) = get_mean_and_std_dicts()
  transformed_ds = dataset.map (lambda *row_tuple: transofrom_row_gcs(row_tuple, mean_dict, std_dict))
  return transformed_ds

dataset = read_gcs('test_small')
row_index = 0
for row in dataset.take(1):
  print("row %d: %s \n\n" % (row_index, row))
  row_index += 1
    




row 0: ({'int1': <tf.Tensor: id=14936090, shape=(), dtype=float32, numpy=-0.053283405>, 'int2': <tf.Tensor: id=14936095, shape=(), dtype=float32, numpy=0.0>, 'int3': <tf.Tensor: id=14936096, shape=(), dtype=float32, numpy=-0.05003621>, 'int4': <tf.Tensor: id=14936097, shape=(), dtype=float32, numpy=0.53192276>, 'int5': <tf.Tensor: id=14936098, shape=(), dtype=float32, numpy=-0.26637506>, 'int6': <tf.Tensor: id=14936099, shape=(), dtype=float32, numpy=-0.25632632>, 'int7': <tf.Tensor: id=14936100, shape=(), dtype=float32, numpy=-0.17158473>, 'int8': <tf.Tensor: id=14936101, shape=(), dtype=float32, numpy=0.44837976>, 'int9': <tf.Tensor: id=14936102, shape=(), dtype=float32, numpy=-0.17300385>, 'int10': <tf.Tensor: id=14936091, shape=(), dtype=float32, numpy=0.5591261>, 'int11': <tf.Tensor: id=14936092, shape=(), dtype=float32, numpy=-0.14095487>, 'int12': <tf.Tensor: id=14936093, shape=(), dtype=float32, numpy=0.0>, 'int13': <tf.Tensor: id=14936094, shape=(), dtype=float32, numpy=0.2333

In [143]:
tpl = (1, 3, 6)

def print_tpl(tpla):
    print(tpla)
    
print_tpl(tpl)

(1, 3, 6)


In [109]:
!gsutil cat -r 0-500 gs://alekseyv-scalableai-dev-public-bucket/criteo_kaggle_from_bq/test_small.csv

0	3	0	7	12	54	18	5	20	68	1	2	0	12	8cf07265	38a947a1	642efc0f	48ee52c9	25c83c98	7e0ccccf	c7575e05	0b153874	a73ee510	e286f1e6	fd7856c1	7cad8267	6a430a5b	b28479f6	4e8979f6	da76182e	e5ba7672	002c3270			f492dbbf		3a171ecb	ba02f03a			6264987978780750083
0		0	3	1	266290			2					1	05db9164	38a947a1	78a10995	a5a18f25	4cf72387	7e0ccccf	4b3c7cfe	51d76abe	7cc72ec2	f6fd64a2	8b94178b	a79b473e	025225f2	b28479f6	4f047de8	9f3b50db	07c540c4	002c3270			f14a2f09	ad3062eb	32c7478e	ba02f03a			3598378017670020087
0	1	4