In [1]:
%%writefile requirements.txt
pandas
boto3
botocore
tensorflow==1.15.2
joblib
numpy
xgboost
scikit-learn>=0.21.0
seldon-core
tornado>=6.0.3

Overwriting requirements.txt


In [2]:
!pip install -r requirements.txt

Collecting joblib
  Using cached joblib-1.0.0-py3-none-any.whl (302 kB)
Collecting scikit-learn>=0.21.0
  Using cached scikit_learn-0.24.1-cp36-cp36m-manylinux2010_x86_64.whl (22.2 MB)
Collecting seldon-core
  Using cached seldon_core-1.5.1-py3-none-any.whl (127 kB)
Collecting threadpoolctl>=2.0.0
  Using cached threadpoolctl-2.1.0-py3-none-any.whl (12 kB)
Collecting Flask-cors<4.0.0
  Using cached Flask_Cors-3.0.10-py2.py3-none-any.whl (14 kB)
Processing /home/jovyan/.cache/pip/wheels/83/73/4c/0e331f57d4702becb1fca9d9148277aca96d127bd838faf85e/opentracing-2.4.0-py3-none-any.whl
Processing /home/jovyan/.cache/pip/wheels/a8/37/30/d86c970966efbf6da89e8085db50f2be4cf36ca68513eff785/jaeger_client-4.3.0-py3-none-any.whl
Collecting flatbuffers<2.0.0
  Using cached flatbuffers-1.12-py2.py3-none-any.whl (15 kB)
Collecting redis<4.0.0
  Using cached redis-3.5.3-py2.py3-none-any.whl (72 kB)
Processing /home/jovyan/.cache/pip/wheels/ad/4b/2d/24ff0da0a0b53c7c77ce59b843bcceaf644c88703241e59615/Flas

In [3]:
import argparse
import logging
import joblib
import sys
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor

from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import LabelEncoder
from tensorflow import keras
layers = keras.layers

logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)




In [4]:
def read_input(data_file_path):
    # Get the data: original source is here: https://www.kaggle.com/zynicide/wine-reviews/data
    data = pd.read_csv(data_file_path)
    
    # Shuffle the data
    data = data.sample(frac=1)
    
    return data

def preprocess(data):
    # Do some preprocessing to limit the # of wine varities in the dataset
    data = data[pd.notnull(data['country'])]
    
    print("preprocess 1")
    data = data[pd.notnull(data['price'])]
    data = data.drop(data.columns[0], axis=1) 

    print("preprocess 2")
    variety_threshold = 500 # Anything that occurs less than this will be removed.
    value_counts = data['variety'].value_counts()
    to_remove = value_counts[value_counts <= variety_threshold].index
    
    print("preprocess 3")
    data.replace(to_remove, np.nan, inplace=True)
    data = data[pd.notnull(data['variety'])]
    
    return data

def train_model(data):
    train_size = int(len(data) * .8)
    print ("Train size: %d" % train_size)
    print ("Test size: %d" % (len(data) - train_size))
    print("train model 1")

    # Train features
    description_train = data['description'][:train_size]
    print("train model 2")
    variety_train = data['variety'][:train_size]
    labels_train = data['price'][:train_size]

    # Create a tokenizer to preprocess our text descriptions
    vocab_size = 12000 # This is a hyperparameter, experiment with different values for your dataset
    print("train model 3")
    tokenize = keras.preprocessing.text.Tokenizer(num_words=vocab_size, char_level=False)
    tokenize.fit_on_texts(description_train) # only fit on train
    print("train model 4")

    # Wide feature 1: sparse bag of words (bow) vocab_size vector 
    description_bow_train = tokenize.texts_to_matrix(description_train)
    print("train model 5: wide feature 1 BOW")

    # Wide feature 2: one-hot vector of variety categories
    # Use sklearn utility to convert label strings to numbered index
    encoder = LabelEncoder()
    encoder.fit(variety_train)
    variety_train = encoder.transform(variety_train)
    num_classes = np.max(variety_train) + 1
    print("train model 6: wide feature 2")

    # Convert labels to one hot
    variety_train = keras.utils.to_categorical(variety_train, num_classes)
    print("train model 7: wide feature 2/one hot encode")

    # Define our wide model with the functional API
    bow_inputs = layers.Input(shape=(vocab_size,))
    variety_inputs = layers.Input(shape=(num_classes,))
    merged_layer = layers.concatenate([bow_inputs, variety_inputs])
    merged_layer = layers.Dense(256, activation='relu')(merged_layer)
    predictions = layers.Dense(1)(merged_layer)
    wide_model = keras.Model(inputs=[bow_inputs, variety_inputs], outputs=predictions)
    print("train model 8")

    wide_model.compile(loss='mse', optimizer='adam', metrics=['accuracy'])
    #print(wide_model.summary())
    print("train model 9")

    # Deep model feature: word embeddings of wine descriptions
    train_embed = tokenize.texts_to_sequences(description_train)
    print("train model 10: deep model/train")

    max_seq_length = 170
    train_embed = keras.preprocessing.sequence.pad_sequences(train_embed, maxlen=max_seq_length, padding="post")

    # Define our deep model with the Functional API
    print("train model 11")
    deep_inputs = layers.Input(shape=(max_seq_length,))
    embedding = layers.Embedding(vocab_size, 8, input_length=max_seq_length)(deep_inputs)

    print("train model 12")
    embedding = layers.Flatten()(embedding)
    embed_out = layers.Dense(1)(embedding)
    deep_model = keras.Model(inputs=deep_inputs, outputs=embed_out)
    #print(deep_model.summary())
    print("train model 13")

    deep_model.compile(loss='mse',
                       optimizer='adam',
                       metrics=['accuracy'])

    print("train model 14")
   
    # Combine wide and deep into one model
    merged_out = layers.concatenate([wide_model.output, deep_model.output])
    merged_out = layers.Dense(1)(merged_out)
    combined_model = keras.Model(wide_model.input + [deep_model.input], merged_out)
    #print(combined_model.summary())

    print("train model 15")
    combined_model.compile(loss='mse',
                               optimizer='adam',
                               metrics=['accuracy'])
    print("train model 16")

    # Run training
    combined_model.fit([description_bow_train, variety_train] + [train_embed], labels_train, epochs=10, batch_size=128)
    return combined_model

def eval_model(filename, data):
    """Evaluate the model performance."""
    print("eval model 1")
    
    model = keras.models.load_model(filename)
    
    train_size = int(len(data) * .8)
    description_test = data['description'][train_size:]
    variety_test = data['variety'][train_size:]
    
    print("eval model 2")
    vocab_size = 12000 # This is a hyperparameter, experiment with different values for your dataset
    tokenize = keras.preprocessing.text.Tokenizer(num_words=vocab_size, char_level=False)
    tokenize.fit_on_texts(description_test) # only fit on train
    
    print("eval model 3")
    # Test labels
    labels_test = data['price'][train_size:]

    print("eval model 4")
    # Wide feature 1: sparse bag of words (bow) vocab_size vector 
    description_bow_test = tokenize.texts_to_matrix(description_test)

    print("eval model 5")
    # Wide feature 2: one-hot vector of variety categories
    # Use sklearn utility to convert label strings to numbered index
    encoder = LabelEncoder()
    encoder.fit(variety_test)
    variety_test = encoder.transform(variety_test)
    num_classes = np.max(variety_test) + 1

    print("eval model 6")
    # Convert labels to one hot
    variety_test = keras.utils.to_categorical(variety_test, num_classes)

    print("eval model 7")
    # Deep model feature: word embeddings of wine descriptions
    test_embed = tokenize.texts_to_sequences(description_test)

    print("eval model 8")
    max_seq_length = 170
    test_embed = keras.preprocessing.sequence.pad_sequences(test_embed, maxlen=max_seq_length, padding="post")

    print("eval model 9")
    # Generate predictions
    predictions = model.predict([description_bow_test, variety_test] + [test_embed])
    
    num_predictions = 40
    diff = 0

    for i in range(num_predictions):
        val = predictions[i]
        print(description_test.iloc[i])
        print('Predicted: ', val[0], 'Actual: ', labels_test.iloc[i], '\n')
        diff += abs(val[0] - labels_test.iloc[i])
        
    # Compare the average difference between actual price and the model's predicted price
    print('Average prediction difference: ', diff / num_predictions)
    print("eval model 11")

def save_model(model, model_file):
    model.save(model_file)
    print("Model export success: %s", model_file)
    
class WinePricer(object):
    
    def __init__(self):
        self.train_input = "wine_data.csv"
        self.n_estimators = 50
        self.learning_rate = 0.1
        self.model_file = "trained_wine_model.dat"
        self.model = None

    def train(self):
        data = read_input(self.train_input)
        data = preprocess(data)
        model = train_model(data)
        save_model(model, self.model_file)
        eval_model(self.model_file, data)

    def predict(self, X, feature_names=None):
        """Predict using the model for given ndarray."""
        if not self.model:
            self.model = keras.models.load_model(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return prediction

In [5]:
model = WinePricer()
model.train()

preprocess 1
preprocess 2
preprocess 3
Train size: 95646
Test size: 23912
train model 1
train model 2
train model 3
train model 4
train model 5: wide feature 1 BOW
train model 6: wide feature 2
train model 7: wide feature 2/one hot encode
Instructions for updating:
If using Keras pass *_constraint arguments to layers.


From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.


train model 8
train model 9
train model 10: deep model/train
train model 11
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/keras/initializers.py:119: calling RandomUniform.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


train model 12
train model 13
train model 14
train model 15
train model 16
Train on 95646 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Model export success: %s trained_wine_model.dat
eval model 1
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/ops/init_ops.py:97: calling GlorotUniform.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/ops/init_ops.py:97: calling Zeros.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


eval model 2
eval model 3
eval model 4
eval model 5
eval model 6
eval model 7
eval model 8
eval model 9
A heavy wine, atypical of the appellation, which usually produces light-bodied and elegant Pinots. There's something earthy and thick about this one, almost rustic. The flavors are fruity but it lacks the delicacy an Anderson Valley Pinot Noir ought to have. And there's an unpleasant sting of acidity.
Predicted:  38.960835 Actual:  16.0 

Seems like the hot sun of central Spain got to this three-grape blend because it's undeniably sweet and baked. The aromas are pure raisin, while the palate is drawn down and syrupy. Too thick and sweet for its own good.
Predicted:  25.963655 Actual:  18.0 

Snappy, crisp and green on the nose, but well balanced and showing the good side of Chilean SB, which is oceanic, fresh, green and citrusy. Flavors of lime, grapefruit and passion fruit are true, while the finish is fruity at first before breaking up.
Predicted:  13.438766 Actual:  12.0 

A pleas

In [6]:
import random, string
HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(3)] + [random.choice(string.digits) for n in range(3)])
AWS_REGION = 'us-east-1'
!aws s3 mb s3://{HASH}'-kubeflow-pipeline-data' --region $AWS_REGION --endpoint-url https://s3.us-east-1.amazonaws.com
#!aws s3 mb s3://{HASH}'-kubeflow-pipeline-data' --region $AWS_REGION

make_bucket: leb547-kubeflow-pipeline-data


In [7]:
from kubeflow import fairing
from kubeflow.fairing import TrainJob
from kubeflow.fairing.backends import KubeflowAWSBackend


from kubeflow import fairing

FAIRING_BACKEND = 'KubeflowAWSBackend'

AWS_ACCOUNT_ID = fairing.cloud.aws.guess_account_id()
AWS_REGION = 'us-east-1'
DOCKER_REGISTRY = '{}.dkr.ecr.{}.amazonaws.com'.format(AWS_ACCOUNT_ID, AWS_REGION)
S3_BUCKET = f'{HASH}-kubeflow-pipeline-data'

In [8]:
import importlib

if FAIRING_BACKEND == 'KubeflowAWSBackend':
    from kubeflow.fairing.builders.cluster.s3_context import S3ContextSource
    BuildContext = S3ContextSource(
        aws_account=AWS_ACCOUNT_ID, region=AWS_REGION,
        bucket_name=S3_BUCKET
    )

BackendClass = getattr(importlib.import_module('kubeflow.fairing.backends'), FAIRING_BACKEND)

In [9]:
from kubeflow.fairing import TrainJob
train_job = TrainJob(WinePricer, input_files=['wine_data.csv', "requirements.txt"],
                     docker_registry=DOCKER_REGISTRY,
                     backend=BackendClass(build_context_source=BuildContext))
train_job.submit()

Using default base docker image: registry.hub.docker.com/library/python:3.6.9
Using builder: <class 'kubeflow.fairing.builders.cluster.cluster.ClusterBuilder'>
Building the docker image.
Building image using cluster builder.
/usr/local/lib/python3.6/dist-packages/kubeflow/fairing/__init__.py already exists in Fairing context, skipping...
Creating docker context: /tmp/fairing_context_q3r5iyrf
/usr/local/lib/python3.6/dist-packages/kubeflow/fairing/__init__.py already exists in Fairing context, skipping...
Not able to find aws credentials secret: aws-secret
Waiting for fairing-builder-5t22s-n66kn to start...
Waiting for fairing-builder-5t22s-n66kn to start...
Waiting for fairing-builder-5t22s-n66kn to start...
Pod started running True


[36mINFO[0m[0006] Resolved base name registry.hub.docker.com/library/python:3.6.9 to registry.hub.docker.com/library/python:3.6.9
[36mINFO[0m[0006] Resolved base name registry.hub.docker.com/library/python:3.6.9 to registry.hub.docker.com/library/python:3.6.9
[36mINFO[0m[0006] Downloading base image registry.hub.docker.com/library/python:3.6.9
[36mINFO[0m[0006] Error while retrieving image from cache: getting file info: stat /cache/sha256:036d4ab50fa49df89e746cf1b5369c88db46e8af2fbd08531788e7d920e9a491: no such file or directory
[36mINFO[0m[0006] Downloading base image registry.hub.docker.com/library/python:3.6.9
[36mINFO[0m[0006] Built cross stage deps: map[]
[36mINFO[0m[0006] Downloading base image registry.hub.docker.com/library/python:3.6.9
[36mINFO[0m[0006] Error while retrieving image from cache: getting file info: stat /cache/sha256:036d4ab50fa49df89e746cf1b5369c88db46e8af2fbd08531788e7d920e9a491: no such file or directory
[36mINFO[0m[0006] Downloading base ima

Not able to find aws credentials secret: aws-secret
The job fairing-job-9bs5k launched.
Waiting for fairing-job-9bs5k-zm9hr to start...
Waiting for fairing-job-9bs5k-zm9hr to start...
Waiting for fairing-job-9bs5k-zm9hr to start...
Pod started running True


From /usr/local/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
From /usr/local/lib/python3.6/site-packages/tensorflow_core/python/keras/initializers.py:119: calling RandomUniform.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
2021-02-02 19:34:09.290172: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 AVX512F FMA
2021-02-02 19:34:09.298040: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2499995000 Hz
2021-02-02 19

Cleaning up job fairing-job-9bs5k...


'fairing-job-9bs5k'

In [None]:
from kubeflow.fairing import PredictionEndpoint
endpoint = PredictionEndpoint(WinePricer, input_files=['trained_wine_model.dat', "requirements.txt"],
                              docker_registry=DOCKER_REGISTRY,
                              service_type='ClusterIP',
                              backend=BackendClass(build_context_source=BuildContext))
endpoint.create()

In [None]:
# Wait service a while to be ready and replace `<endpoint>` with the output from last step.
# Here's an example !nc -vz fairing-service-srwh2.anonymous.svc.cluster.local 5000

!netcat fairing-service-rfmnj.eksworkspace.svc.cluster.local 5000

In [None]:

# PR https://github.com/kubeflow/fairing/pull/376
# Add `:5000/predict` to mitigate the issue.
endpoint.url='http://fairing-service-n8qv2.anonymous.svc.cluster.local:5000/predict'

endpoint.predict_nparray(test_X)