## Installing the necessary pipelines

In [1]:
#!python -m pip install --user --upgrade pip

#!pip3 install pandas==0.23.4 matplotlib==3.0.3 scipy==1.2.1 scikit-learn==0.22 tensorflow==2.0 keras==1.2.2 --user

Collecting pip
[?25l  Downloading https://files.pythonhosted.org/packages/ab/11/2dc62c5263d9eb322f2f028f7b56cd9d096bb8988fcf82d65fa2e4057afe/pip-20.3.1-py2.py3-none-any.whl (1.5MB)
[K     |████████████████████████████████| 1.5MB 7.1MB/s eta 0:00:01
[?25hInstalling collected packages: pip
Successfully installed pip-20.3.1
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.
Collecting keras==1.2.2
  Downloading Keras-1.2.2.tar.gz (175 kB)
[K     |████████████████████████████████| 175 kB 5.1 MB/s eta 0:00:01
Collecting matplotlib==3.0.3
  Downloading matplotlib-3.0.3-cp36-cp36m-manylinux1_x86_64.whl (13.0 MB)
[K     |████████████████████████████████| 13.0 MB 18.8 MB/s eta 0:00:01
Collecting pandas==0.23.4
  Downloading pandas-0.23.4-cp36-cp36m-manylinux1_x86_64.whl (8.9 MB)


## Installling kubeflow pipeline SDK

In [2]:
# install kubeflow pipeline sdk
#!pip3 install kfp --upgrade --user

Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.
Collecting kfp
  Downloading kfp-1.1.2.tar.gz (159 kB)
[K     |████████████████████████████████| 159 kB 6.8 MB/s eta 0:00:01
Collecting click
  Downloading click-7.1.2-py2.py3-none-any.whl (82 kB)
[K     |████████████████████████████████| 82 kB 988 kB/s  eta 0:00:01
[?25hCollecting Deprecated
  Downloading Deprecated-1.2.10-py2.py3-none-any.whl (8.7 kB)
Collecting docstring-parser>=0.7.3
  Downloading docstring_parser-0.7.3.tar.gz (13 kB)
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h    Preparing wheel metadata ... [?25ldone
Collecting kfp-pipeline-spec<0.2.0,>=0.1.0
  Downloading kfp_pipeline_spec-0.1.3.1-py3-none-any.whl (11 kB)
Collecting kfp-server-api<2.0.0,>=1.1.1b1
  Downloading kfp-server-api-1.1.2rc1.tar.gz (54 kB)
[K     |█

In [1]:
#checking if installation was successful
!which dsl-compile

In [2]:
# import libraries for pipeline
import kfp
import kfp.dsl as dsl
import kfp.components as comp

In [3]:
# create  directory for outputs.
output_dir = "/home/jovyan/data/"

In [4]:
#create preprocess pipeline
def preprocess(data_path):
    
    # Import Libraries
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'keras==1.2.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'nltk==3.2.5']) 
    import numpy as np
    import pandas as pd
    import re
    import nltk
    from nltk.corpus import stopwords
    nltk.download('stopwords')
    nltk.download('punkt')
    import tensorflow as tf
    import keras
    from tensorflow.keras.preprocessing.text import Tokenizer
    from keras.preprocessing.sequence import pad_sequences
    from sklearn.model_selection import train_test_split
    
    #downloading the dataset
    url = 'https://raw.githubusercontent.com/HamoyeHQ/g04-brazillian-commerce/master/data/olist_order_reviews_dataset.csv'
    
    
    #reading data from url
    review_df = pd.read_csv(url)
    
    # Preprocessing the reviews dataset
    review_data_title = review_df['review_comment_title']
    review_data = review_df.drop(['review_comment_title'],axis=1)

    # Dropping NaN values
    review_data  = review_data.dropna()
    review_data_title = review_data_title.dropna()

    # Resetting the reviews index and visualizing the data
    review_data = review_data.reset_index(drop=True)
    review_data_title = review_data_title.reset_index(drop=True)
    
    # Mapping the ratings
    review_data['Sentiment_rating'] = np.where(review_data.review_score > 3,1,0)

    # Removing neutral reviews 
    review_data = review_data[review_data.review_score != 3]
    
    # Getting rid of stopwords, tokenizing and making the text lowercase
    comments = []
    stop_words = set(stopwords.words('portuguese'))
    
    for words in review_data['review_comment_message']:
        only_letters = re.sub("[^a-zA-Z]", " ",words)
        tokens = nltk.word_tokenize(only_letters) #tokenize the sentences
        lower_case = [l.lower() for l in tokens] #convert all letters to lower case
        filtered_result = list(filter(lambda l: l not in stop_words, lower_case)) #Remove stopwords from the comments
        comments.append(' '.join(filtered_result))
    
    # making the text an array
    data = np.array(comments)
    
    #making the target categorical
    labels = np.array(review_data['Sentiment_rating'])
    y = []
    for i in range(len(labels)):
        if labels[i] == 0:
            y.append(0)
        if labels[i] == 1:
            y.append(1)
    y = np.array(y)
    labels = tf.keras.utils.to_categorical(y, 3, dtype="float32")
    del y

    max_words = 5000
    max_len = 200

    # Encoding and padding the texts
    tokenizer = Tokenizer(num_words=max_words)
    tokenizer.fit_on_texts(data)
    sequences = tokenizer.texts_to_sequences(data)
    encoded_data = pad_sequences(sequences, maxlen=max_len)
    
     #Save the whole_data as a pickle file to be used by the preprocess component.
    with open(f'{data_path}/clean_data','wb') as f:
        pickle.dump((encoded_data, labels),f)
        
    
    return (print('Done!'))  

In [5]:
preprocess(output_dir)

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Using TensorFlow backend.


Done!


In [6]:
#create train pipeline
def train(data_path):
    
    # Import Libraries
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'keras==1.2.2'])
    
    import numpy as np
    import pandas as pd
    import re
    import tensorflow as tf
    import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras import layers
    from tensorflow.keras.optimizers import RMSprop,Adam
    from tensorflow.keras.preprocessing.text import Tokenizer
    from tensorflow.keras.preprocessing.sequence import pad_sequences
    from tensorflow.keras import regularizers
    from tensorflow.keras import backend as K
    from tensorflow.keras.callbacks import ModelCheckpoint
    from tensorflow.keras.callbacks import EarlyStopping
    from sklearn.model_selection import train_test_split
    
    # Load and unpack the clean_data
    with open(f'{data_path}/clean_data','rb') as f:
        clean_data = pickle.load(f)
        
    # Separate the independent data (X) from the dependent data(y).
    encoded_data, labels = clean_data
    
    #Splitting the data
    X_train, X_test, y_train, y_test = train_test_split(encoded_data,labels, random_state=0)
    
    max_words = 5000
    max_len = 200
    
    # Building and training a bidirectional LSTM model
    classifier = Sequential()
    classifier.add(layers.Embedding(max_words, 40, input_length=max_len))
    classifier.add(layers.Bidirectional(layers.LSTM(20,dropout=0.6)))
    classifier.add(layers.Dense(3,activation='softmax'))
    classifier.compile(optimizer='rmsprop',loss='categorical_crossentropy', metrics=['accuracy'])
    checkpoint = tf.keras.callbacks.ModelCheckpoint('/home/jovyan/best_model.hdf5', monitor='val_accuracy', verbose=1,save_best_only=True, mode='auto', period=1,save_weights_only=False)
    classifier.fit(X_train,y_train, epochs=25,validation_data=(X_test, y_test),callbacks=[checkpoint])
    
    classifier.summary()
    
    #checking the model loss
    test_loss, test_acc = classifier.evaluate(X_test, y_test, verbose=2)
    print('Model accuracy: ',test_acc) 
    
    #Save the model to the designated 
    classifier.save(f'{data_path}/sentiment_model.h5')
    
    #Save the test_data as a pickle file to be used by the predict component.
    with open(f'{data_path}/test_data', 'wb') as f:
        pickle.dump((X_test,  y_test), f)
    
    return (print('Done!'))  

In [7]:
classifier = train(output_dir)

Train on 28566 samples, validate on 9522 samples
Epoch 1/25
Epoch 00001: val_accuracy improved from -inf to 0.92313, saving model to /home/jovyan/best_model.hdf5
Epoch 2/25
Epoch 00002: val_accuracy improved from 0.92313 to 0.92785, saving model to /home/jovyan/best_model.hdf5
Epoch 3/25
Epoch 00003: val_accuracy improved from 0.92785 to 0.92838, saving model to /home/jovyan/best_model.hdf5
Epoch 4/25
Epoch 00004: val_accuracy improved from 0.92838 to 0.92922, saving model to /home/jovyan/best_model.hdf5
Epoch 5/25
Epoch 00005: val_accuracy improved from 0.92922 to 0.93216, saving model to /home/jovyan/best_model.hdf5
Epoch 6/25
Epoch 00006: val_accuracy did not improve from 0.93216
Epoch 7/25
Epoch 00007: val_accuracy improved from 0.93216 to 0.93331, saving model to /home/jovyan/best_model.hdf5
Epoch 8/25
Epoch 00008: val_accuracy did not improve from 0.93331
Epoch 9/25
Epoch 00009: val_accuracy improved from 0.93331 to 0.93541, saving model to /home/jovyan/best_model.hdf5
Epoch 10/2

In [8]:
def predict(data_path):
    
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'keras==1.2.2'])
    import numpy as np
    import tensorflow 
    import keras
    
    
    # Load the saved Keras model
    classifier = tensorflow.keras.models.load_model(f'{data_path}/sentiment_model.h5')

    # Load and unpack the test_data
    with open(f'{data_path}/test_data','rb') as f:
        test_data = pickle.load(f)
        
    # Separate the X_test from y_test.
    X_test,  y_test = test_data

    # make predictions.
    y_pred = classifier.predict(X_test)

    # create a threshold
    y_pred=(y_pred>0.5)
    
    with open(f'{data_path}/result.txt', 'w') as result:
        result.write(" Prediction: {}, Actual: {} ".format(y_pred,y_test.astype("int64")))
    
    print('Prediction has be saved successfully!')


In [9]:
predict(output_dir)

Prediction has be saved successfully!


In [10]:
# Create preprocess, train and predict lightweight components.
preprocess_op = comp.func_to_container_op(preprocess , base_image = "tensorflow/tensorflow:latest-gpu-py3")
train_op = comp.func_to_container_op(train , base_image = "tensorflow/tensorflow:latest-gpu-py3")
predict_op = comp.func_to_container_op(predict , base_image = "tensorflow/tensorflow:latest-gpu-py3")

## Build kubeflow pipeline

In [11]:
#Create a client to enable communication with the Pipelines API server.
client = kfp.Client()

In [12]:
# Define the pipeline
@dsl.pipeline(
   name='Sentiment Analysis Pipeline',
   description='An ML pipeline that performs sentiment analysis model training and prediction on customer reviews .'
)

# Define parameters to be fed into pipeline
def sentiment_container_pipeline(
    data_path: str
):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)
    
    # Create sentiment analysis preprocess component.
    sentiment_preprocess_container = preprocess_op(data_path) \
                                    .add_pvolumes({data_path: vop.volume})
    
    # Create sentiment analysis training component.
    sentiment_training_container = train_op(data_path) \
                                    .add_pvolumes({data_path: sentiment_preprocess_container.pvolume})

    # Create sentiment analysis prediction component.
    sentiment_predict_container = predict_op(data_path) \
                                    .add_pvolumes({data_path: sentiment_training_container.pvolume})
    
    # Print the result of the prediction
    sentiment_result_container = dsl.ContainerOp(
        name="print_prediction",
        image='library/bash:4.4.23',
        pvolumes={data_path: sentiment_predict_container.pvolume},
        arguments=['cat', f'{data_path}/result.txt']
    )

## Run the pipeline

In [13]:
DATA_PATH = '/home/jovyan/data/clean_data'

In [14]:
pipeline_func = sentiment_container_pipeline

In [15]:
experiment_name = 'sentiment_analysis_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

