<h1> Machine Learning w chmurze - Cloud ML </h1>

W tym notebooku pokażemy jak przenieść prosty model Tensorflow do GCP i uruchomić na nim predykcje

<h2> Predykcje na podstawie tekstu </h2>

<b>Źródło danych</b>: Yelp Restaurant Reviews (https://www.yelp.com/dataset/challenge)

Dataset zawiera między innymi informacje o restauracjach oraz opinie klientów

Zadaniem jest przewidzenie czy restauracje przejdą inspekcje (amerykańskiego) sanepidu na podstawie opinii gości oraz dodatkowych informacji takich jak lokalizacja i rodzaje kuchni serwowanych w restauracji.

<h2> Ustawienie zmiennych środowiskowych, import bibliotek </h2>

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [7]:
BUCKET = 'dswbiznesie'
PROJECT = 'dswbiznesie'
REGION = 'europe-west1'
REPO = '/content/datalab/dswbiznesie'

  chunks = self.iterencode(o, _one_shot=True)


In [3]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [4]:
%datalab project set -p $PROJECT

In [5]:
%%javascript
$.getScript('https://kmahelona.github.io/ipython_notebook_goodies/ipython_notebook_toc.js')

<IPython.core.display.Javascript object>

In [6]:
import google.datalab.ml as ml
import tensorflow as tf
import apache_beam as beam
import shutil
import datetime
from apache_beam.io.gcp.internal.clients import bigquery
import pandas as pd
import google.datalab.bigquery as bq
print tf.__version__

1.2.1


  chunks = self.iterencode(o, _one_shot=True)


<h2> Dane źródłowe </h2>


Dataset pobrany ze strony Yelp zawiera następujące pliki

In [9]:
!gsutil -q ls -l gs://$BUCKET/rawdata/hygiene

 103941968  2017-11-06T00:52:39Z  gs://dswbiznesie/rawdata/hygiene/hygiene.dat
    831242  2017-11-03T01:14:18Z  gs://dswbiznesie/rawdata/hygiene/hygiene.dat.additional
    159046  2017-11-03T01:14:17Z  gs://dswbiznesie/rawdata/hygiene/hygiene.dat.labels
TOTAL: 3 objects, 104932256 bytes (100.07 MiB)


* <b>hygiene.dat</b>: każda linia zawiera połączone opinie klientów danej restauracji
* <b>hygiene.dat.labels</b>: dla pierwszych 546 linii przypisana jest dodatkowe pole w którym 0 oznacza to że restauracja przeszła inspekcje, 1 to że restauracja <b>nie</b> przeszła inspekcji. Reszta wpisów posiada wpis "[None]" co oznacza że należą do zbioru testowego
* <b>hygiene.dat.additional</b>: plik CSV gdzie w pierwszym polu znajduje się lista oferowanych rodzajów kuchnii, w drugim kod pocztowy restauracji który można uznać za przybliżenie lokalizacji restauracji. W trzecim polu znajduje się liczba opinii, w czwartym średnia ocena ( w skali 0-5, gdzie 5 oznacza ocene najlepszą).

<h2> Feature engineering używając Apache Beam i BigQuery</h2>

Dane źródłowe należy przetowrzyć i dostosować do postaci której będzie można łatwo użyć do uczenia i ewaluacji modelu. 
Najwygodniejszym choć nie najtańszym rozwiązaniem jest załadowanie danych do BigQuery.

Odpowiednim narzędziem do tego zadania jest Apache Beam i jego implementacja - Google Dataflow.
Job Dataflow uruchamiany jest w chmurze. Jego przebieg można śledzić w Konsoli GCP (https://console.cloud.google.com/dataflow).
Uruchomienie joba trwa powyżej minuty.

In [11]:
def create_record(rest_tuple):
    #print(rest_tuple)
    identity = rest_tuple[0]
    reviews = rest_tuple[1]['reviews_kv'][0]
    inspection_result = int(rest_tuple[1]['labels_kv'][0]) if rest_tuple[1]['labels_kv'][0] != "[None]" else None
    categories_temp = rest_tuple[1]['attributes_kv'][0].split("\"")
    categories = ",".join([ x.replace('\'', '').replace('[','').replace(']','').strip() for x in categories_temp[1].split(',')])
    #categories = categories_temp[1].replace('\'', '').replace('[','').replace(']','')
    attributes_temp = categories_temp[2].split(",")
    zip_code = attributes_temp[1]
    review_count = int(attributes_temp[2])
    avg_rating = float(attributes_temp[3])
    
    return { 
        'identity': identity, 
        'reviews': reviews,
        'inspection_result': inspection_result,
        'categories': categories,
        'zip_code': zip_code,
        'review_count': review_count,
        'avg_rating': avg_rating
    }

def preprocess(RUNNER,BUCKET,BIGQUERY_TABLE):
    job_name = 'hygiene-ftng' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    print 'Launching Dataflow job {} ... hang on'.format(job_name)
    OUTPUT_DIR = 'gs://{0}/data/hygiene/'.format(BUCKET)
    options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name': 'hygiene-ftng' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
        'project': PROJECT,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True
    }
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    p = beam.Pipeline(RUNNER, options=opts)
    
    # Adding table definition
    table_schema = bigquery.TableSchema()
    
    # Fields definition
    identity_schema = bigquery.TableFieldSchema()
    identity_schema.name = 'identity'
    identity_schema.type = 'integer'
    identity_schema.mode = 'required'
    table_schema.fields.append(identity_schema)
    
    
    reviews_schema = bigquery.TableFieldSchema()
    reviews_schema.name = 'reviews'
    reviews_schema.type = 'string'
    reviews_schema.mode = 'required'
    table_schema.fields.append(reviews_schema)

    inspection_result_schema = bigquery.TableFieldSchema()
    inspection_result_schema.name = 'inspection_result'
    inspection_result_schema.type = 'integer'
    inspection_result_schema.mode = 'nullable'
    table_schema.fields.append(inspection_result_schema)
    
    categories_schema = bigquery.TableFieldSchema()
    categories_schema.name = 'categories'
    categories_schema.type = 'string'
    categories_schema.mode = 'required'
    table_schema.fields.append(categories_schema)
    
    zip_code_schema = bigquery.TableFieldSchema()
    zip_code_schema.name = 'zip_code'
    zip_code_schema.type = 'string'
    zip_code_schema.mode = 'required'
    table_schema.fields.append(zip_code_schema)
    
    review_count_schema = bigquery.TableFieldSchema()
    review_count_schema.name = 'review_count'
    review_count_schema.type = 'integer'
    review_count_schema.mode = 'required'
    table_schema.fields.append(review_count_schema)
    
    avg_rating_schema = bigquery.TableFieldSchema()
    avg_rating_schema.name = 'avg_rating'
    avg_rating_schema.type = 'float'
    avg_rating_schema.mode = 'required'
    table_schema.fields.append(avg_rating_schema)
    
    #processing logic
    reviews = p | 'readreviews' >> beam.io.ReadFromText('gs://{0}/rawdata/hygiene/hygiene.dat'.format(BUCKET))
    labels = p | 'readlabels' >> beam.io.ReadFromText('gs://{0}/rawdata/hygiene/hygiene.dat.labels'.format(BUCKET))
    attributes = p | 'readattributes' >> beam.io.ReadFromText('gs://{0}/rawdata/hygiene/hygiene.dat.additional'.format(BUCKET))
    
    reviews_kv = reviews | 'mapreviews to kv' >> beam.Map(lambda x: (x.split(",")[0], ",".join(x.split(",")[1:]).replace("|","")))
    labels_kv = labels | 'maplabels to kv' >> beam.Map(lambda x: (x.split(",")[0], x.split(",")[1]))
    attributes_kv = attributes | 'mapattributes to kv' >> beam.Map(lambda x: (x.split(",")[0], x))
    
    restaurants = (
        {'reviews_kv': reviews_kv, 'labels_kv': labels_kv, 'attributes_kv': attributes_kv}
        | 'CoGroupByRestaurantKey' >> beam.CoGroupByKey())
    
    records = restaurants | 'CreateRecords' >> beam.Map(create_record)
    
    records | 'write' >> beam.io.Write(
        beam.io.BigQuerySink(
            BIGQUERY_TABLE,
            schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    p.run().wait_until_finish()
    print("Done!")

<b> Uruchomienie etl </b>

In [12]:
bigquery_dataset = PROJECT+":"+PROJECT+".hygiene"
preprocess('DirectRunner',BUCKET, bigquery_dataset)

Launching Dataflow job hygiene-ftng-171109-214638 ... hang on
Done!


<h2> Przygotowanie danych do trenowania modelu </h2>

<b> Pobranie danych do DataFrame (pandas) </b>

In [10]:
query="""
SELECT
  #identity,
  reviews,
  inspection_result #,
  #categories,
  #zip_code,
  #review_count,
  #avg_rating
FROM
  `dswbiznesie.hygiene`
WHERE
  inspection_result IS NOT NULL
"""

  chunks = self.iterencode(o, _one_shot=True)


In [11]:
#df = bq.Query(query).execute().result().to_dataframe()

  chunks = self.iterencode(o, _one_shot=True)


<b> Podzial danych na zestaw treningowy i ewaluacyjny </b>

In [12]:
traindf = bq.Query(query + " AND MOD(ABS(FARM_FINGERPRINT(reviews)),4) > 0").execute().result().to_dataframe()
evaldf  = bq.Query(query + " AND MOD(ABS(FARM_FINGERPRINT(reviews)),4) = 0").execute().result().to_dataframe()
traindf.head()

Unnamed: 0,reviews,inspection_result
0,I was expecting a lot more given all the grea...,0
1,I usually can be found here every now &amp; t...,0
2,"Om nom nom. Not the best I've ever had, but i...",0
3,"Sweet tea, sweet potato fries, fried green to...",0
4,I remember coming here as a wide-eyed 8 year ...,0


  chunks = self.iterencode(o, _one_shot=True)


In [13]:
traindf['inspection_result'].value_counts()

0    205
1    203
Name: inspection_result, dtype: int64

  chunks = self.iterencode(o, _one_shot=True)


In [14]:
evaldf['inspection_result'].value_counts()

1    70
0    68
Name: inspection_result, dtype: int64

  chunks = self.iterencode(o, _one_shot=True)


In [16]:
traindf.to_csv('train.csv', header=False, index=False, encoding='utf-8', sep='|')
evaldf.to_csv('eval.csv', header=False, index=False, encoding='utf-8', sep='|')

  chunks = self.iterencode(o, _one_shot=True)


In [17]:
!head -3 train.csv

 I was expecting a lot more given all the great reviews!I ordered the dumplings, Kung Pao Chicken and the chow mein with homemade noodles. The dumplings were pretty good but I wouldn't say they were really anything special. I didn't even eat more than a few bites of the Kung Pao Chicken. The chicken was cheap and gross and I couldn't even choke it down. The noodles were good enough, but I agree with the reviewer who thought they were a bit dry. I followed that reviewer's advice and poured my leftover dumpling sauce all over them. Once again, the chicken in the noodles was inedible and I ate around it. I was expecting to have leftovers afterwards, but I have no desire to take up space in my fridge with not very good Chinese food I won't even want to eat later. I won't be ordering again! This was a last minute desperation kind of visit. &#160;We were driving around the area looking for something to eat and nothing sounded good. &#160;Chinese food is always sort of a safe bet for us. &#16

  chunks = self.iterencode(o, _one_shot=True)


In [18]:
!wc -l *.csv

    138 eval.csv
    408 train.csv
    408 vocab.csv
    954 total


  chunks = self.iterencode(o, _one_shot=True)


In [19]:
%bash
gsutil cp *.csv gs://${BUCKET}/data/

Copying file://eval.csv [Content-Type=text/csv]...
Copying file://train.csv [Content-Type=text/csv]...
Copying file://vocab.csv [Content-Type=text/csv]...
- [3 files][  7.6 MiB/  7.6 MiB]                                                
Operation completed over 3 objects/7.6 MiB.                                      
  chunks = self.iterencode(o, _one_shot=True)


In [20]:
!gsutil -q ls -l gs://$BUCKET/data

         0  2017-11-03T00:08:16Z  gs://dswbiznesie/data/
   1236330  2017-11-09T22:53:48Z  gs://dswbiznesie/data/eval.csv
   3334214  2017-11-09T22:53:48Z  gs://dswbiznesie/data/train.csv
   3354636  2017-11-09T22:53:49Z  gs://dswbiznesie/data/vocab.csv
     25091  2017-11-09T22:27:47Z  gs://dswbiznesie/data/vocab_words
TOTAL: 5 objects, 7950271 bytes (7.58 MiB)


  chunks = self.iterencode(o, _one_shot=True)


<h2> Model Tensorflow </h2>

Właściwy model tensorflow znajduje się w pliku <b>model.py</b> a definicja joba Cloud ML w pliku <b>task.py</b>
Kod poniżej ma za zadanie zilutrować działanie kodu tensorflow.

In [21]:
import tensorflow as tf
from tensorflow.contrib import lookup
from tensorflow.python.platform import gfile

print tf.__version__
MAX_DOCUMENT_LENGTH = 100000  
PADWORD = 'ZYXW'

# vocabulary
lines = ['This might be the best "taco" truck on the planet. Hidden between a smoke shop and The Home Depot, this semi permanent, stylish and clean cafe on wheels.', 
         'Im always looking for a good place to sing karaoke. This is one of them. Drinks are cheap. Food is delicious. And its laid back and fun. I shall return!', 
         'Crazy Man just Crazy there are like 3 different places called Saigon Deli at this intersection but this one is on Jackson just west of twelfthand with $2 pork Banh Mi you cant go wrong, I am going to Indiana for work so I went in and got 6 Pork', 
         'Ive eaten here a few times in the past and thought it was decent. I went last night, however, and our meal was really subpar so the place seems to have gone down hill.We had chow fun noodles and the hollow vegetables with chili sauce']

# create vocabulary
vocab_processor = tf.contrib.learn.preprocessing.VocabularyProcessor(MAX_DOCUMENT_LENGTH)
vocab_processor.fit(lines)
with gfile.Open('vocab.tsv', 'wb') as f:
    f.write("{}\n".format(PADWORD))
    for word, index in vocab_processor.vocabulary_._mapping.iteritems():
      f.write("{}\n".format(word))
N_WORDS = len(vocab_processor.vocabulary_)
print '{} words into vocab.tsv'.format(N_WORDS)

# can use the vocabulary to convert words to numbers
table = lookup.index_table_from_file(
  vocabulary_file='vocab.tsv', num_oov_buckets=1, vocab_size=None, default_value=-1)
numbers = table.lookup(tf.constant(lines[0].split()))
with tf.Session() as sess:
  tf.tables_initializer().run()
  print "{} --> {}".format(lines[0], numbers.eval())   

1.2.1
118 words into vocab.tsv
This might be the best "taco" truck on the planet. Hidden between a smoke shop and The Home Depot, this semi permanent, stylish and clean cafe on wheels. --> [ 42  12  41 118  33 119  51  47 118 119  96  39 112  54   1  87 111  56
 119  81  19 119  23  87 117  99  47 119]


  from . import _ni_label
  from . import _ni_label
  chunks = self.iterencode(o, _one_shot=True)


In [22]:
!head vocab.tsv

ZYXW
shop
just
cheap
go
Indiana
its
We
seems
laid


  chunks = self.iterencode(o, _one_shot=True)


In [23]:
# string operations
reviews = tf.constant(lines)
words = tf.string_split(reviews)
densewords = tf.sparse_tensor_to_dense(words, default_value=PADWORD)
numbers = table.lookup(densewords)

# now pad out with zeros and then slice to constant length
padding = tf.constant([[0,0],[0,MAX_DOCUMENT_LENGTH]])
padded = tf.pad(numbers, padding)
sliced = tf.slice(padded, [0,0], [-1, MAX_DOCUMENT_LENGTH])

with tf.Session() as sess:
  tf.tables_initializer().run()
  print "reviews=", reviews.eval(), reviews.shape
  print "words=", words.eval()
  print "dense=", densewords.eval(), densewords.shape
  print "numbers=", numbers.eval(), numbers.shape
  print "padding=", padding.eval(), padding.shape
  print "padded=", padded.eval(), padded.shape
  print "sliced=", sliced.eval(), sliced.shape

reviews= [ 'This might be the best "taco" truck on the planet. Hidden between a smoke shop and The Home Depot, this semi permanent, stylish and clean cafe on wheels.'
 'Im always looking for a good place to sing karaoke. This is one of them. Drinks are cheap. Food is delicious. And its laid back and fun. I shall return!'
 'Crazy Man just Crazy there are like 3 different places called Saigon Deli at this intersection but this one is on Jackson just west of twelfthand with $2 pork Banh Mi you cant go wrong, I am going to Indiana for work so I went in and got 6 Pork'
 'Ive eaten here a few times in the past and thought it was decent. I went last night, however, and our meal was really subpar so the place seems to have gone down hill.We had chow fun noodles and the hollow vegetables with chili sauce'] (4,)
words= SparseTensorValue(indices=array([[ 0,  0],
       [ 0,  1],
       [ 0,  2],
       [ 0,  3],
       [ 0,  4],
       [ 0,  5],
       [ 0,  6],
       [ 0,  7],
       [ 0,  8],


  chunks = self.iterencode(o, _one_shot=True)


Sprawdzenie działania modelu na małym zbiorze danych

In [11]:
%bash
echo "bucket=${BUCKET}"
rm -rf outputdir
export PYTHONPATH=${PYTHONPATH}:${PWD}/trainer
python -m trainer.task \
   --bucket=${BUCKET} \
   --output_dir=outputdir \
   --job-dir=./tmp --train_steps=200

bucket=dswbiznesie
3710 words into gs://dswbiznesie/data/vocab_words
{'reviews': <tf.Tensor 'DecodeCSV:0' shape=(?, 1) dtype=string>} Tensor("hash_table_Lookup:0", shape=(?, 1), dtype=int64)
Tensor("string_to_index_1_Lookup:0", shape=(?, ?), dtype=int64)
words_sliced=SparseTensor(indices=Tensor("StringSplit:0", shape=(?, 2), dtype=int64), values=Tensor("StringSplit:1", shape=(?,), dtype=string), dense_shape=Tensor("StringSplit:2", shape=(2,), dtype=int64))
words_embed=Tensor("EmbedSequence/embedding_lookup:0", shape=(?, 77838, 10), dtype=float32)
words_conv=Tensor("Squeeze_1:0", shape=(?, 15568), dtype=float32)
{'reviews': <tf.Tensor 'DecodeCSV:0' shape=(?, 1) dtype=string>} Tensor("hash_table_Lookup:0", shape=(?, 1), dtype=int64)
Tensor("string_to_index_1_Lookup:0", shape=(?, ?), dtype=int64)
words_sliced=SparseTensor(indices=Tensor("StringSplit:0", shape=(?, 2), dtype=int64), values=Tensor("StringSplit:1", shape=(?,), dtype=string), dense_shape=Tensor("StringSplit:2", shape=(2,), dty

Copying gs://dswbiznesie/data/train.csv...
/ [1 files][  3.2 MiB/  3.2 MiB]                                                
Operation completed over 1 objects/3.2 MiB.                                      
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_num_ps_replicas': 0, '_keep_checkpoint_max': 5, '_task_type': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fcb08b91790>, '_model_dir': 'outputdir/', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_session_config': None, '_tf_random_seed': None, '_environment': 'local', '_num_worker_replicas': 0, '_task_id': 0, '_save_summary_steps': 100, '_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1.0
}
, '_evaluation_master': '', '_master': ''}
Instructions for updating:
Monitors are deprecated. Please use tf.train.SessionRunHook.
INFO:tensorflow:Create CheckpointSaverHook.
2017-11-09 23:58:19

Jak widać kod pythonowy działa lokalnie. Można przetestować go lokalnie w Cloud ML.

In [18]:
%bash
rm -rf trained_model
gcloud ml-engine local train \
   --module-name=trainer.task \
   --package-path=${REPO}/notebooks/trainer \
   -- \
   --train_data_paths=${REPO}/notebooks/train.csv \
   --eval_data_paths=${REPO}/notebooks/eval.csv  \
   --num_epochs=10 \
   --output_dir=${REPO}/notebooks/trained_model

ERROR: (gcloud.ml-engine.local.train) /usr/bin/python2: command not found
  chunks = self.iterencode(o, _one_shot=True)


In [None]:
from google.datalab.ml import TensorBoard
TensorBoard().start('{}/notebooks/outputdir'.format(REPO))

In [None]:
for pid in TensorBoard.list()['pid']:
  TensorBoard().stop(pid)
  print 'Stopped TensorBoard with pid {}'.format(pid)

Następnie można uruchomić do w Cloud ML. Status joba można śledzić w konsoli GCP.

In [15]:
%bash
OUTDIR=gs://${BUCKET}/trained_model
JOBNAME=hygiene_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gsutil cp trainer/*.py $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=$(pwd)/trainer \
   --job-dir=$OUTDIR \
   --staging-bucket=gs://$BUCKET \
   --scale-tier=BASIC --runtime-version=1.2 \
   -- \
   --bucket=${BUCKET} \
   --output_dir=${OUTDIR} \
   --train_steps=36000

gs://dswbiznesie/trained_model europe-west1 hygiene_171110_001336
jobId: hygiene_171110_001336
state: QUEUED


Removing gs://dswbiznesie/trained_model/__init__.py#1510272792000207...
Removing gs://dswbiznesie/trained_model/model.py#1510272792248554...
Removing gs://dswbiznesie/trained_model/task.py#1510272792618131...
/ [3/3 objects] 100% Done                                                       
Operation completed over 3 objects.                                              
Copying file://trainer/__init__.py [Content-Type=text/x-python]...
Copying file://trainer/model.py [Content-Type=text/x-python]...
Copying file://trainer/task.py [Content-Type=text/x-python]...
-
Operation completed over 3 objects/8.9 KiB.                                      
Job [hygiene_171110_001336] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe hygiene_171110_001336

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs hygiene_171110_001336
  chunks = self.iterencode(o, _one_shot=True)


29506

  chunks = self.iterencode(o, _one_shot=True)


Stopped TensorBoard with pid 27841
Stopped TensorBoard with pid 29057


  chunks = self.iterencode(o, _one_shot=True)


<h2> Trenowanie modelu w Cloud ML </h2>

<h2> Serwowanie predykcji </h2>