In [None]:
!pip uninstall -y kfp 
!pip install --upgrade kfp==1.8.20  

In [None]:
!pip list | grep kfp

# Import kubeflow pipeline libraries


In [1]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath

## Download Dataset

In [2]:
#Download data
def download_data(download_link: str, data_path: OutputPath(str)):
    import zipfile
    import wget
    import os

    if not os.path.exists(data_path):
        os.makedirs(data_path)

    # download files
    wget.download(download_link.format(file='train'), f'{data_path}/train_csv.zip')
    wget.download(download_link.format(file='test'), f'{data_path}/test_csv.zip')
    
    with zipfile.ZipFile(f"{data_path}/train_csv.zip","r") as zip_ref:
        zip_ref.extractall(data_path)
        
    with zipfile.ZipFile(f"{data_path}/test_csv.zip","r") as zip_ref:
        zip_ref.extractall(data_path)
    
    return(print('Done!'))

In [3]:
download_op = kfp.components.create_component_from_func(download_data,base_image="python:3.7", packages_to_install=['wget', 'zipfile36'])

# Load Data

In [4]:
def load_data(data_path:comp.InputPath(str),load_data_path: comp.OutputPath()):
    import pandas as pd
    import os, pickle
    
    train_data_path = data_path + '/train.csv'
    test_data_path = data_path + '/test.csv'
    tweet_df= pd.read_csv(train_data_path)
    test_df=pd.read_csv(test_data_path)
    df=pd.concat([tweet_df,test_df])
    
    #creating the preprocess directory
    os.makedirs(load_data_path, exist_ok = True)
    
    # join train and test together
    ntrain = tweet_df.shape[0]
    ntest = test_df.shape[0]
    with open(f'{load_data_path}/df', 'wb') as f:
        pickle.dump((ntrain, df, tweet_df), f)
    return(print('Done!'))

In [5]:
load_data_step = kfp.components.create_component_from_func(
    func=load_data,
    output_component_file='load_data_component.yaml', # This is optional. It saves the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas','pickle5'])

In [6]:
def preprocess_data(load_data_path:comp.InputPath(str), preprocess_data_path:comp.OutputPath(str)):
    
    import re
    import pandas as pd
    import os, pickle
    import string
    
     #loading the train data
    with open(f'{load_data_path}/df', 'rb') as f:
        ntrain, df, tweet_df = pickle.load(f)
        
    
    def remove_URL(text):
        url = re.compile(r'https?://\S+|www\.\S+')
        return url.sub(r'',text)
    def remove_html(text):
        html=re.compile(r'<.*?>')
        return html.sub(r'',text)
    def remove_emoji(text):
        emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"  # emoticons
                               u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                               u"\U0001F680-\U0001F6FF"  # transport & map symbols
                               u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                               u"\U00002702-\U000027B0"
                               u"\U000024C2-\U0001F251"
                               "]+", flags=re.UNICODE)
        return emoji_pattern.sub(r'', text)
    def remove_punct(text):
        table=str.maketrans('','',string.punctuation)
        return text.translate(table)
    

    
    
    df['text'] = df['text'].apply(lambda x : remove_URL(x))
    df['text'] = df['text'].apply(lambda x: remove_html(x))
    df['text'] = df['text'].apply(lambda x: remove_emoji(x))
    df['text'] = df['text'].apply(lambda x: remove_punct(x))

    
    #creating the preprocess directory
    os.makedirs(preprocess_data_path, exist_ok = True)
    
    with open(f'{preprocess_data_path}/df', 'wb') as f:
        pickle.dump((ntrain, df, tweet_df), f)
    return(print('Done!'))

In [7]:
preprocess_data_step = kfp.components.create_component_from_func(preprocess_data, packages_to_install=['pandas', 'regex', 'pickle5'], output_component_file='preprocess_data_component.yaml')

In [8]:
def corpus_creation(preprocess_data_path: comp.InputPath(str), corpus_path: comp.OutputPath(str)):
    import os, pickle
    import pandas as pd
    import nltk
    nltk.download('stopwords')
    nltk.download('punkt')
    from nltk.corpus import stopwords
    from nltk.util import ngrams
    from nltk.tokenize import word_tokenize
    stop=set(stopwords.words('english'))
    from tqdm import tqdm
    
    with open(f'{preprocess_data_path}/df', 'rb') as f:
        ntrain, df, tweet_df = pickle.load(f)
        
    def create_corpus(df):
        corpus=[]
        for tweet in tqdm(df['text']):
            words=[word.lower() for word in word_tokenize(tweet) if((word.isalpha()==1) & (word not in stop))]
            corpus.append(words)
        return corpus
    
     #creating the preprocess directory
    os.makedirs(corpus_path, exist_ok = True)
    
    corpus=create_corpus(df)
    with open(f'{corpus_path}/corpus', 'wb') as f:
        pickle.dump((corpus,tweet_df), f)
    return(print('Done!'))
        

In [9]:
corpus_creation_step = kfp.components.create_component_from_func(corpus_creation, packages_to_install=['pandas', 'pickle5', 'nltk','tqdm'], output_component_file='corpus_creation_component.yaml')

In [10]:
def embedding_step(download_link_glove: str,corpus_path: comp.InputPath(str), embedded_path: comp.OutputPath(str)):
    
    import os, pickle
    import pandas as pd
    import zipfile
    import wget
    import os
    from keras.preprocessing.text import Tokenizer
    from keras.utils import pad_sequences
    import numpy as np
    from tqdm import tqdm
    
    with open(f'{corpus_path}/corpus', 'rb') as f:
        corpus, tweet_df = pickle.load(f)
    
    if not os.path.exists(embedded_path):
        os.makedirs(embedded_path)
    # download files
    wget.download(download_link_glove, f'{embedded_path}/glove.6B.zip')
    
    with zipfile.ZipFile(f'{embedded_path}/glove.6B.zip', 'r') as zip_ref:
        zip_ref.extractall(embedded_path)
    
    embedding_dict={}
    """path_to_glove_file = os.path.join(
        os.path.expanduser("~"), f"{embedded_path}/glove.6B.100d.txt"
    )"""
    with open(f"{embedded_path}/glove.6B.100d.txt",'r') as f:
        for line in f:
            values=line.split()
            word=values[0]
            vectors=np.asarray(values[1:],'float32')
            embedding_dict[word]=vectors
    f.close()
    
    MAX_LEN=50
    tokenizer_obj=Tokenizer()
    tokenizer_obj.fit_on_texts(corpus)
    sequences=tokenizer_obj.texts_to_sequences(corpus)
    
    tweet_pad=pad_sequences(sequences,maxlen=MAX_LEN,truncating='post',padding='post')
    word_index=tokenizer_obj.word_index
    num_words=len(word_index)+1
    embedding_matrix=np.zeros((num_words,100))

    for word,i in tqdm(word_index.items()):
        if i > num_words:
            continue

        emb_vec=embedding_dict.get(word)
        if emb_vec is not None:
            embedding_matrix[i]=emb_vec
    
    with open(f'{embedded_path}/embedding', 'wb') as f:
        pickle.dump((embedding_matrix, num_words, tweet_pad, tweet_df, MAX_LEN), f)
    return(print('Done!'))


In [11]:
embedding_creation_step = kfp.components.create_component_from_func(embedding_step, packages_to_install=['pandas', 'zipfile36', 'wget','tqdm','keras','numpy','tensorflow', 'pickle5'], output_component_file='embedding_component.yaml')

In [12]:
def model_building_and_training(embedded_path: comp.InputPath(str), model_path: comp.OutputPath(str)):
    
    import os, pickle;
    import pandas as pd
    import numpy as np
    from keras.models import Sequential
    from keras.layers import Embedding,LSTM,Dense,SpatialDropout1D
    from keras.initializers import Constant
    from sklearn.model_selection import train_test_split
    from tensorflow.keras.optimizers import Adam
    
    with open(f'{embedded_path}/embedding', 'rb') as f:
        embedding_matrix, num_words, tweet_pad, tweet_df, MAX_LEN = pickle.load(f)
    
    train=tweet_pad[:tweet_df.shape[0]]
    final_test=tweet_pad[tweet_df.shape[0]:]
    X_train,X_test,y_train,y_test=train_test_split(train,tweet_df['target'].values,test_size=0.15)
    
    #defining model
    model=Sequential()

    embedding=Embedding(num_words,100,embeddings_initializer=Constant(embedding_matrix),
                       input_length=MAX_LEN,trainable=False)

    model.add(embedding)
    model.add(SpatialDropout1D(0.2))
    model.add(LSTM(64, dropout=0.2, recurrent_dropout=0.2))
    model.add(Dense(1, activation='sigmoid'))


    optimzer=Adam(learning_rate=1e-5)
    
    #Compiling the classifier model with Adam optimizer
    model.compile(loss='binary_crossentropy',optimizer=optimzer,metrics=['accuracy'])
    
    #fitting the model
    history=model.fit(X_train,y_train,batch_size=4,epochs=5,validation_data=(X_test,y_test),verbose=2)

    #evaluate model
    test_loss, test_acc = model.evaluate(np.array(X_test),  np.array(y_test), verbose=0)
    print("Test_loss: {}, Test_accuracy: {} ".format(test_loss,test_acc))
    
    #creating the preprocess directory
    os.makedirs(model_path, exist_ok = True)
    
    #saving the model
    model.save(f'{model_path}/model.h5')
    
    #dumping other values
    with open(f'{model_path}/values', 'wb') as f:
        pickle.dump((test_loss, test_acc), f)
    return(print('Done!'))

In [13]:
model_building_step = kfp.components.create_component_from_func(model_building_and_training, packages_to_install=['pandas', 'zipfile36', 'wget','tqdm','keras','numpy','tensorflow','sklearn','pickle5'], output_component_file='model_creation_component.yaml')

In [14]:
@dsl.pipeline(
   name='Trial pipeline',
   description='An example pipeline that performs pd formation and plotting class distibution.'
)
def trial_pipeline(
    download_link: str,
    data_path: str,
    load_data_path: str, 
    preprocess_data_path: str,
    corpus_path:str,
    download_link_glove:str,
    model_path:str,
):
    download_container = download_op(download_link)
    output1 = load_data_step(download_container.output)
    output2 = preprocess_data_step(output1.output)
    output3 = corpus_creation_step(output2.output)
    output4 = embedding_creation_step(download_link_glove, output3.output)
    output5 = model_building_step(output4.output)

In [15]:
# replace download_link with the repo link where the data is stored https:github-repo/data-dir/{file}.csv.zip?raw=true
download_link = 'https://github.com/AnkitRai-22/natural-language-processing-with-disaster-tweets-kaggle-competition/blob/main/data/{file}.csv.zip?raw=true'
data_path = "/mnt"
load_data_path = "load"
preprocess_data_path = "preprocess"
corpus_path = "corpus",
download_link_glove = "http://nlp.stanford.edu/data/glove.6B.zip"
model_path = "model"

In [16]:
from kfp import compiler

In [17]:
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY
cmplr=compiler.Compiler(mode)
cmplr.compile(pipeline_func=trial_pipeline,package_path='trial_pipeline.yaml')