In [1]:
!pip freeze | grep tensorflow==2.1

In [2]:
!pip install --user apache-beam[gcp]==2.16.0 
!pip install --user httplib2==0.12.0 

Collecting apache-beam[gcp]==2.16.0
  Downloading apache_beam-2.16.0-cp37-cp37m-manylinux1_x86_64.whl (3.0 MB)
[K     |████████████████████████████████| 3.0 MB 4.5 MB/s eta 0:00:01
[?25hCollecting dill<0.3.1,>=0.3.0
  Downloading dill-0.3.0.tar.gz (151 kB)
[K     |████████████████████████████████| 151 kB 50.5 MB/s eta 0:00:01
[?25hCollecting pyyaml<4.0.0,>=3.12
  Downloading PyYAML-3.13.tar.gz (270 kB)
[K     |████████████████████████████████| 270 kB 50.4 MB/s eta 0:00:01
[?25hCollecting httplib2<=0.12.0,>=0.8
  Downloading httplib2-0.12.0.tar.gz (218 kB)
[K     |████████████████████████████████| 218 kB 43.3 MB/s eta 0:00:01
[?25hCollecting fastavro<0.22,>=0.21.4
  Downloading fastavro-0.21.24-cp37-cp37m-manylinux1_x86_64.whl (1.2 MB)
[K     |████████████████████████████████| 1.2 MB 35.6 MB/s eta 0:00:01
Collecting mock<3.0.0,>=1.0.1
  Downloading mock-2.0.0-py2.py3-none-any.whl (56 kB)
[K     |████████████████████████████████| 56 kB 4.6 MB/s  eta 0:00:01
Collecting pyarrow<0

In [3]:
import tensorflow as tf
import apache_beam as beam
import shutil
print(tf.__version__)

2.2.0-dlenv


In [4]:
import os
PROJECT = 'iotsensor-276409'    # CHANGE THIS
BUCKET = 'iotsensor-276409' # REPLACE WITH YOUR BUCKET NAME. Use a regional bucket in the region you selected.
REGION = 'us-central1' # Choose an available region for Cloud AI Platform

In [5]:
# for bash
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = '2.1' 

## ensure we're using python3 env
os.environ['CLOUDSDK_PYTHON'] = 'python3'

In [6]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

## ensure we predict locally with our current Python environment
gcloud config set ml_engine/local_python `which python`

Updated property [core/project].
Updated property [compute/region].
Updated property [ml_engine/local_python].


In [48]:
def create_query(phase, EVERY_N):
  if EVERY_N == None:
    EVERY_N = 4 #use full dataset
    
  if phase == 'train':
  #select and pre-process fields
      base_query = """
SELECT * FROM [iotsensor-276409:shrip.Shrip_ML] WHERE
  RAND() < 0.8
  """
  #add subsampling criteria by modding with hashkey
      return base_query
  
  elif phase == 'valid':
    base_query = """
SELECT * FROM [iotsensor-276409:shrip.Shrip_ML] WHERE
  RAND() < 0.02
  """
  #add subsampling criteria by modding with hashkey
    return base_query

  elif phase == 'test':
    base_query = """
SELECT * FROM [iotsensor-276409:shrip.Shrip_ML] WHERE
  RAND() < 0.2
  """
  #add subsampling criteria by modding with hashkey
    return base_query

In [44]:
df.describe()

Unnamed: 0,LUX,Temperature,Humidity,Moisture,a,b,c,e,y,nony,final
count,1005.0,1005.0,1005.0,1005.0,1005.0,1005.0,1005.0,1005.0,1005.0,1005.0,1005.0
mean,18489.970149,27.344279,38.38209,546.644776,9.697625,4.118435,0.383821,63.167505,729.091194,-10005.382405,0.38209
std,8327.609364,6.289883,10.459716,284.423759,0.541257,0.943601,0.104597,23.557544,186.446575,8535.436324,0.48614
min,4000.0,17.0,21.0,50.0,8.29405,2.558755,0.21,13.269222,238.510264,-27631.322811,0.0
25%,11610.0,22.0,29.0,294.0,9.359622,3.339849,0.29,43.979616,599.687988,-17320.269417,0.0
50%,18276.0,27.0,39.0,550.0,9.813344,4.21442,0.39,66.062673,722.541629,-9418.645635,0.0
75%,25687.0,33.0,47.0,794.0,10.15374,4.89279,0.47,83.587834,866.935231,-2212.685723,1.0
max,32977.0,38.0,58.0,1100.0,10.403566,5.71957,0.58,102.820156,1174.53386,3568.106955,1.0


In [49]:
import datetime

####
# Arguments:
#   -rowdict: Dictionary. The beam bigquery reader returns a PCollection in
#     which each row is represented as a python dictionary
# Returns:
#   -rowstring: a comma separated string representation of the record with dayofweek
#     converted from int to string (e.g. 3 --> Tue)
####
def to_csv(rowdict):
  CSV_COLUMNS = 'LUX,Temperature,Humidity,Moisture,a,b,c,e,y,nony,final'.split(',')
  rowstring = ','.join([str(rowdict[k]) for k in CSV_COLUMNS])
  return rowstring


####
# Arguments:
#   -EVERY_N: Integer. Sample one out of every N rows from the full dataset.
#     Larger values will yield smaller sample
#   -RUNNER: 'DirectRunner' or 'DataflowRunner'. Specfy to run the pipeline
#     locally or on Google Cloud respectively. 
# Side-effects:
#   -Creates and executes dataflow pipeline. 
#     See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline
####
def preprocess(EVERY_N, RUNNER):
  job_name = 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
  print('Launching Dataflow job {} ... hang on'.format(job_name))
  OUTPUT_DIR = 'gs://{0}/shrip/preproc/'.format(BUCKET)

  #dictionary of pipeline options
  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
    'project': PROJECT,
    'runner': RUNNER,
    'num_workers' : 4,
    'max_num_workers' : 5
  }
  #instantiate PipelineOptions object using options dictionary
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  #instantantiate Pipeline object using PipelineOptions
  with beam.Pipeline(options=opts) as p:
      for phase in ['train', 'valid']:
        query = create_query(phase, EVERY_N) 
        outfile = os.path.join(OUTPUT_DIR, '{}.csv'.format(phase))
        (
          p | 'read_{}'.format(phase) >> beam.io.Read(beam.io.BigQuerySource(query=query))
            | 'tocsv_{}'.format(phase) >> beam.Map(to_csv)
            | 'write_{}'.format(phase) >> beam.io.Write(beam.io.WriteToText(outfile))
        )
  print("Done")

In [50]:
preprocess(10, 'DirectRunner')

Launching Dataflow job preprocess-taxifeatures-200803-081300 ... hang on




Done


In [52]:
%%bash
gsutil ls gs://$BUCKET/shrip/preproc/

gs://iotsensor-276409/shrip/preproc/train.csv-00000-of-00002
gs://iotsensor-276409/shrip/preproc/train.csv-00001-of-00002
gs://iotsensor-276409/shrip/preproc/valid.csv-00000-of-00001


In [53]:
%%bash
if gsutil ls | grep -q gs://$BUCKET/shrip/preproc/; then
  gsutil -m rm -rf gs://$BUCKET/shrip/preproc/
fi

In [54]:
preprocess(10, 'DataflowRunner') 

Launching Dataflow job preprocess-taxifeatures-200803-174542 ... hang on




Done


In [55]:
%%bash
#print first 10 lines of first shard of train.csv
gsutil cat "gs://$BUCKET/shrip/preproc/train.csv-00000-of-*" | head

13310,20,34,1006,9.49627091138916,3.01029995663981,0.34,97.1637983883894,481.785344599776,-24924.0182871429,0
23227,20,34,718,10.0530706740756,3.01029995663981,0.34,78.3857944036106,725.074193574004,-14396.0450239844,0
19026,21,34,765,9.8535617437664,3.33984878303764,0.34,81.6265928757481,655.993498505886,-16195.215332107,0
19951,21,34,64,9.90103454637506,3.33984878303764,0.34,15.7617499584421,1062.43622861327,3103.76642289904,1
27009,22,34,382,10.2039254227766,3.31132995230379,0.34,52.1805453326313,917.169876693429,-3951.47235366662,1
15686,23,34,254,9.66052387377261,3.46184495013578,0.34,39.9503733568546,860.115348346824,-1217.6330624735,1
6209,24,34,901,8.73375513136489,3.61235994796777,0.34,90.5955548856321,350.774255334111,-21996.9053094396,0
32088,27,34,669,10.3762374074499,4.29409129247696,0.34,74.9185659309488,817.399689488597,-12424.8921516596,0
12971,28,34,945,9.47047137534067,4.21441993929574,0.34,93.3824058812068,491.376981930749,-22816.3697931334,0
14220,28,34,434,9.562404

In [57]:
%%bash
if [ -d sample ]; then
  rm -rf sample
fi
mkdir sample
gsutil cat "gs://$BUCKET/shrip/preproc/train.csv-00000-of-*" > sample/train.csv
gsutil cat "gs://$BUCKET/shrip/preproc/valid.csv-00000-of-*" > sample/valid.csv

In [59]:
%%bash
grep -A 20 "INPUT_COLUMNS =" shripfare/trainer/model.py

INPUT_COLUMNS = [
    tf.feature_column.numeric_column('LUX'),
    tf.feature_column.numeric_column('Temperature'),
    tf.feature_column.numeric_column('Humidity'),
    tf.feature_column.numeric_column('Moisture'),
    tf.feature_column.numeric_column('a'),
    tf.feature_column.numeric_column('b'),
    tf.feature_column.numeric_column('c'),
    tf.feature_column.numeric_column('e'),
    tf.feature_column.numeric_column('y'),
    tf.feature_column.numeric_column('nony')
    ]

def build_estimator(model_dir, nbuckets, hidden_units):
    run_config = tf.estimator.RunConfig(save_checkpoints_secs = 30,
                                        keep_checkpoint_max = 3)
    my_feature_columns = []
    for col in df_features.columns:
       my_feature_columns.append(tf.feature_column.numeric_column(col))
    estimator = tf.estimator.DNNClassifier(
        model_dir = model_dir,


In [82]:
%%bash
rm -rf taxifare.tar.gz taxi_trained
export PYTHONPATH=${PYTHONPATH}:${PWD}/shripfare
python -m trainer.task \
  --train_data_paths=${PWD}/sample/train.csv \
  --eval_data_paths=${PWD}/sample/valid.csv  \
  --output_dir=${PWD}/shrip_trained \
  --train_steps=10 \
  --job-dir=/tmp

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': '/home/jupyter/shrip_trained/', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Using config: {'_model_dir': '/home/jupyter/shrip_trained/', '_tf_random_seed': None, '_s

In [83]:
%%bash
ls shrip_trained/export/exporter/

1596479691


In [84]:
%%bash
model_dir=$(ls ${PWD}/shrip_trained/export/exporter | tail -1)
saved_model_cli show --dir ${PWD}/shrip_trained/export/exporter/${model_dir} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['Humidity'] tensor_info:
        dtype: DT_DOUBLE
        shape: (-1)
        name: Placeholder_2:0
    inputs['LUX'] tensor_info:
        dtype: DT_DOUBLE
        shape: (-1)
        name: Placeholder:0
    inputs['Moisture'] tensor_info:
        dtype: DT_DOUBLE
        shape: (-1)
        name: Placeholder_3:0
    inputs['Temperature'] tensor_info:
        dtype: DT_DOUBLE
        shape: (-1)
        name: Placeholder_1:0
    inputs['a'] tensor_info:
        dtype: DT_DOUBLE
        shape: (-1)
        name: Placeholder_4:0
    inputs['b'] tensor_info:
        dtype: DT_DOUBLE
        shape: (-1)
        name: Placeholder_5:0
    inputs['c'] tensor_info:
        dtype: DT_DOUBLE
        shape: (-1)
        name: Placeholder_6:0
    inputs['e'] tensor_info:
        dtype: DT_DOUBLE
        shape: (-1)
    

In [85]:
%%writefile /tmp/test1.json
{"LUX": 4973, "Temperature": 28, "Humidity": 51, "Moisture": 198, "a": 8.51177855871474, "b": 4.21441993929574, "c": 0.51, "e": 33.8848331626289, "y": 641.828972179604, "nony": -1030.84349379631}

Overwriting /tmp/test1.json


In [87]:
%%writefile /tmp/test.json
{"LUX": 21650, "Temperature": 22, "Humidity": 24, "Moisture": 1012, "a": 9.98276073343064, "b": 3.31132995230379, "c": 0.24, "e": 97.530843046513, "y": 591.293104338195, "nony": -24590.2846315491}


Overwriting /tmp/test.json


In [88]:
%%bash
model_dir=$(ls ${PWD}/shrip_trained/export/exporter)
gcloud ai-platform local predict \
  --model-dir=${PWD}/shrip_trained/export/exporter/${model_dir} \
  --json-instances=/tmp/test.json

ALL_CLASS_IDS  ALL_CLASSES  CLASS_IDS  CLASSES  LOGISTIC  LOGITS                PROBABILITIES
[0, 1]         ['0', '1']   [0]        ['0']    [0.0]     [-549.6578979492188]  [1.0, 0.0]


If the signature defined in the model is not serving_default then you must specify it via --signature-name flag, otherwise the command may fail.
Instructions for updating:
non-resource variables are not supported in the long term
2020-08-03 18:39:25.234133: I tensorflow/core/platform/profile_utils/cpu_utils.cc:102] CPU Frequency: 2200130000 Hz
2020-08-03 18:39:25.234479: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x562a16771690 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-08-03 18:39:25.234516: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-08-03 18:39:25.234930: I tensorflow/core/common_runtime/process_util.cc:147] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.