In [174]:
import kfp
from kfp import dsl
from datetime import datetime
from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile
from kfp.components import func_to_container_op

In [175]:
def clean_train_data(cleaned_data_path: OutputPath('CSV')):
    
    import pandas as pd
    import nltk
    nltk.download('stopwords')
    from nltk.stem.porter import PorterStemmer
    from nltk.corpus import stopwords
    import re
    import numpy as np
    
    data = pd.read_csv('https://raw.githubusercontent.com/roshankonda98/kubeflow-sentiment-analysis/main/train_data.csv')
    
    processed_tweets = []
    stemmer = PorterStemmer()
    stop_words = stopwords.words('english')
    for t in data['text']:
        process_tweet = re.sub(r'@\w+','', t) #remove mentions from tweet
        process_tweet = re.sub('[^a-zA-Z]',' ',process_tweet).lower().split() #remove any punctuation, numbers, and other characters
        process_tweet = [w for w in process_tweet if not w in stop_words] #remove stop words
        process_tweet = [stemmer.stem(w) for w in process_tweet] #reduce words to stem
        processed_tweets.append(" ".join(process_tweet)) 

    data['processed_tweets'] = processed_tweets
    
    export = data.to_csv(cleaned_data_path)

In [176]:
def split_data(cleaned_data_input: InputPath('CSV'), tr_x: OutputPath('NPY'), tr_y: OutputPath('NPY'), ts_x: OutputPath('NPY'), ts_y: OutputPath('NPY')):

    from tensorflow.keras.preprocessing.text import Tokenizer
    import pandas as pd
    from sklearn.model_selection import train_test_split
    import numpy as np
    from tensorflow.keras.preprocessing.sequence import pad_sequences

    
    data = pd.read_csv(cleaned_data_input)
    data = data[['processed_tweets','airline_sentiment']] #get only required columns
    data = data.dropna() #remove any nan values
    print(data.head())
    print(data.columns)
    
    
    vocab_size = 7500
    tokenizer = Tokenizer(num_words = vocab_size)
    tokenizer.fit_on_texts(data['processed_tweets'])
    
    tweet_vectors = tokenizer.texts_to_sequences(data['processed_tweets'])
    tweet_vectors = pad_sequences(tweet_vectors, padding = "pre")
    
    preds = pd.get_dummies(data[['airline_sentiment']]).values
    
    train_x, test_x, train_y, test_y = train_test_split(tweet_vectors,preds,test_size = 0.2)
    
    with open(tr_x,'wb') as f:
        np.save(f,train_x)
        
    with open(tr_y,'wb') as f:
        np.save(f,train_y)
        
    with open(ts_x,'wb') as f:
        np.save(f,test_x)
        
    with open(ts_y,'wb') as f:
        np.save(f,test_y)

In [177]:
def train_lstm_model(tr_x: InputPath('NPY'), tr_y: InputPath('NPY'), model_output: OutputPath('H5')):
    
    from tensorflow.keras.layers import LSTM, Dropout, Dense, Embedding
    from tensorflow.keras.models import Sequential
    import numpy as np
    import os

    os.environ['CUDA_VISIBLE_DEVICES'] = '-1' #make sure gpu is not used
    
    with open(tr_x, 'rb') as f:
        train_x = np.load(f)
 
    with open(tr_y, 'rb') as f:
        train_y = np.load(f)
        
    
    model = Sequential()
    model.add(Embedding(7500,64,input_length = len(train_x[0])))
    model.add(LSTM(64, activation = 'relu', return_sequences = True))
    model.add(Dropout(0.2))
    model.add(LSTM(32, activation = 'relu'))
    model.add(Dropout(0.2))
    model.add(Dense(3,activation='softmax'))
    
    model.compile(loss = 'categorical_crossentropy',optimizer = "adam",metrics = ['accuracy'])
    model.summary()
    
    model.fit(train_x,train_y,epochs = 1)
    
    model.save(model_output)

In [178]:
def train_gru_model(tr_x: InputPath('NPY'), tr_y: InputPath('NPY'), model_output: OutputPath('H5')):
    
    from tensorflow.keras.layers import GRU, Dropout, Dense, Embedding
    from tensorflow.keras.models import Sequential
    import numpy as np
    import os
    
    os.environ['CUDA_VISIBLE_DEVICES'] = '-1' #make sure gpu is not used
    
    with open(tr_x, 'rb') as f:
        train_x = np.load(f)
 
    with open(tr_y, 'rb') as f:
        train_y = np.load(f)
    
    model = Sequential()
    model.add(Embedding(7500,64,input_length = len(train_x[0])))
    model.add(GRU(64, activation = 'relu', return_sequences = True))
    model.add(Dropout(0.2))
    model.add(GRU(32, activation = 'relu'))
    model.add(Dropout(0.2))
    model.add(Dense(3,activation='softmax'))
    
    model.compile(loss = 'categorical_crossentropy',optimizer = "adam",metrics = ['accuracy'])
    model.summary()
    
    model.fit(train_x,train_y,epochs = 1)
    
    model.save(model_output)

In [179]:
def evaluate_model(input_model: InputPath('H5'), ts_x: InputPath('NPY'), ts_y: InputPath('NPY')):
    
    import numpy as np
    import tensorflow as tf
    
    with open(ts_x, 'rb') as f:
        test_x = np.load(f)
 
    with open(ts_y, 'rb') as f:
        test_y = np.load(f)
        
    model = tf.keras.models.load_model(input_model)
    
    model.evaluate(test_x, test_y)

In [180]:
#base image is public image created for this repo

#convert python functions into docker containers 
clean_train_data_op = func_to_container_op(clean_train_data, base_image = 'dockerkonda1998/kubeflow-repo:ds-tensorflow')
split_data_op = func_to_container_op(split_data, base_image = 'dockerkonda1998/kubeflow-repo:ds-tensorflow')
train_lstm_model_op = func_to_container_op(train_lstm_model, base_image = 'dockerkonda1998/kubeflow-repo:ds-tensorflow')
evaluate_model_op = func_to_container_op(evaluate_model, base_image = 'dockerkonda1998/kubeflow-repo:ds-tensorflow')
train_gru_model_op = func_to_container_op(train_gru_model, base_image = 'dockerkonda1998/kubeflow-repo:ds-tensorflow')

In [181]:
@dsl.pipeline(
    name = "Sentiment Analysis Pipeline",
    description = "Predict Sentiment of Airline Tweets"
)

def sentiment_analysis_pipeline():
    cleaning_container = clean_train_data_op()
    split_data_container = split_data_op(cleaning_container.output)
    train_lstm_data_container = train_lstm_model_op(split_data_container.outputs['tr_x'], split_data_container.outputs['tr_y'])
    train_gru_data_container = train_gru_model_op(split_data_container.outputs['tr_x'], split_data_container.outputs['tr_y'])
    evaluate_lstm_model_container = evaluate_model_op(train_lstm_data_container.outputs['model_output'], split_data_container.outputs['ts_x'], split_data_container.outputs['ts_y'])
    evaluate_gru_model_container = evaluate_model_op(train_gru_data_container.outputs['model_output'], split_data_container.outputs['ts_x'], split_data_container.outputs['ts_y'])

In [182]:
client = kfp.Client()

In [183]:
experiment_name = "sentiment_analysis_kubeflow"
run_name = experiment_name + "-" + datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

kfp.compiler.Compiler().compile(sentiment_analysis_pipeline,"{}.zip".format('sentiment_analysis_pipeline'))

client.create_run_from_pipeline_func(sentiment_analysis_pipeline,
                                     experiment_name = experiment_name,
                                     run_name = run_name,
                                     arguments = {}
                                     )

RunPipelineResult(run_id=830be29f-bedc-4e97-b487-3e2500d830c3)