In [1]:
import pandas as pd
from xgboost import XGBClassifier
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score, KFold
from sklearn.feature_extraction.text import HashingVectorizer
import sys

import pickle
import re

import warnings

import kfp
import kfp.components as components
import kfp.dsl as dsl
import kubeflow.fairing.utils

from kfp.components import InputPath, OutputPath

import urllib.request
import os

import requests

import os
import boto3
from botocore.client import Config

import pickle

from kale.common.serveutils import serve

warnings.filterwarnings('ignore')
NAMESPACE = kubeflow.fairing.utils.get_current_k8s_namespace()

In [2]:
NAMESPACE

'hatterasuser'

In [3]:
#!/usr/bin/env python
import re
import pickle
import os

import pandas as pd

class TextCleaner:
    """TextCleaner METHODS:
    TextCleaner.alpha_iterator():
    TextCleaner.alpha_iterator(text) ==>  Returns lower-case letters stripped of
    punctuation and numbers.
    By default alpha_iterator removes numbers and emoticons.To keep numbers:
        cleaner.alpha_iterator(text, remove_numeric = False)
        Likewise, to keep emoticons:
        cleaner.alpha_iterator(text, remove_emoticon = False)
        To Keep both:
        cleaner.alpha_iterator(text, remove_emoticon = False, remove_numeric = False)

    TextCleaner.stop_word_iterator():
    TextCleaner.stop_word_iterator(text) ==>  Removes common "stop" words, like "and".

    TextCleaner.custom_stop_word_iterator():
    TextCleaner.custom_stop_word_iterator(text, stop_words) ==> Custom stop_words in
    list format. stop_words are words to be removed.
    can use this in-lieu of stop_word_iterator, or in addition to. Alternatively, you can add words
    TextCleaner.stop_words list object, using TextCleaner.stop_words.append(word). This is the
    preferred method if you are using the in-built stopwords.

    TextCleaner.streaming_cleaner_and_tokenizer():
    TextCleaner.streaming_cleaner_and_tokenizer ==> A text cleaner, tokenizer, and stop-word remover
    that is designed to be used with bag-of-word, out-of-processor, or other algorithms
    that have an argument for a tokenizer.
    Method is applied for each row in a list. Pass this as an argument wherever you need a
    tokenizer/processor for algorithms like bag-of-words. Note that this method keeps emoticons,
    since they can help in determining polarity of text.

     STATIC METHODS:
        TextCleaner.tokenizer(text) ==> Returns tokens (unigrams) from a
        list of sentences.

    GENERAL USAGE:
    Import:
        2) from  TextCleaner2000.TextCleaner import TextCleaner
        Instance Instantiation:
        3) Simply  instantiate a cleaner object with empty call:
        cleaner = TextCleaner()

    CLASS METHOD USAGE:
        For the following examples, text refers to an array-like object.
        For best results, pass text as a list() or a Pandas
         DataFrame column: (assuming data_frame is a pandas DataFrame) data_frame["column_name"].
        For stop words used in custom stop word removal, pass stop words as a list().
    GENERAL NUMBER AND PUNCTUATION REMOVAL:
        alpha_words = cleaner.alpha_iterator(text, remove_emoticon = True, remove_numeric = True)
    COMMON STOPWORD REMOVAL:
        cleaned_of_stops = cleaner.stop_word_iterator(text)
    CUSTOM STOPWORD REMOVAL:
        cleaned_of_custom_stops = cleaner.custom_stop_word_iterator(text, stop_words)
        Remember that stop_words is a comma-separated list().
    STREAMING TEXT:
        Pass TextCleaner.streaming_cleaner_and_tokenizer as an argument to an algorithm
        that calls for a tokenizer. For example, with HashingVectorizer which can
        be used with SGDClassifier:
            HashingVectorizer(decode_error = 'ignore',
                        n_features = 2**21,
                        preprocessor = None,
                        tokenizer = cleaner.streaming_cleaner_and_tokenizer)"""
    
    URL = "http://minio-service.kubeflow.svc.cluster.local:9000/hatterasuser/data/stops.pkl" #minio server

    STOPWORDS = pickle.load(urllib.request.urlopen(URL))
    
    def __init__(self):

        self.emoticons = None


    @classmethod 
    def __alphaizer(self, text, remove_numeric, remove_emoticon):
        """Given a string (text), removes all punctuation and numbers.
        Returns lower-case words. Called by the iterator method
        alpha_iterator to apply this to lists, or array-like (pandas dataframe)
        objects."""

        if remove_numeric and remove_emoticon:
            non_numeric = ''.join(i for i in text if not i.isdigit())
            cleaned = re.sub('[^A-Za-z0-9]+', ' ', non_numeric)
            return cleaned.lower().strip()

        if not remove_numeric and remove_emoticon:
            cleaned = re.sub('[^A-Za-z0-9]+', ' ', text)
            return cleaned.lower().strip()

        if not remove_numeric and not remove_emoticon:

            emoticons = TextCleaner.__emoticon_finder(text)
            emoticons.replace('-', '')
            self.set_emoticons(emoticons)
            cleaned = re.sub('[^A-Za-z0-9]+', ' ', text)
            clean = cleaned.lower().strip() + ' ' + emoticons
            return clean

        if remove_numeric and not remove_emoticon:
            emoticons = TextCleaner.__emoticon_finder(text)
            emoticons.replace('-', '')
            self.set_emoticons(emoticons)
            
            non_numeric = ''.join(i for i in text if not i.isdigit())
            cleaned = re.sub('[^A-Za-z0-9]+', ' ', non_numeric)
            clean = cleaned.lower().strip() + ' ' + emoticons
            return clean

    @staticmethod
    def __tokenizer(text):
        """Given a sentence, splits sentence on blanks and returns a list of ngrams or tokens"""
           
        text = re.sub('<[^>]*>', '', text)
        emoticons = re.findall('(?::|;|=)(?:-)?(?:\)|\(|D|P)', text.lower())
        text = re.sub('[\W]+', ' ', text.lower())\
        + ' '.join(emoticons).replace('-','')
        tokenized = [t for t in text.split() if t not in TextCleaner.STOPWORDS]
        return tokenized

    @classmethod
    def __stop_word_remover(self, text, stop):
        """Removes common stop-words like: "and", "or","but", etc. Called by
        stop_word_iterator to apply this to lists, or array-like (pandas dataframe)
        objects. """

        clean = ''
        tokens = (t for t in self.__tokenizer(text) if t not in stop)
        for t in list(tokens):
            clean += " " + t
        return clean.lstrip()

    @staticmethod
    def __emoticon_finder(text):
        """Finds emoticons."""

        emoticons_ = ""
        emoticons = re.findall(r'(?::|;|=)(?:-)?(?:\)|\(|D|P)', text)
        for e in emoticons:
            emoticons_ += ' ' + e
        return emoticons_.lstrip()

    @classmethod 
    def stop_word_iterator(self, text):
        """Calls __stop_word_remover to apply this method to array-like objects.
        Usage: TextCleaner.stop_word_iterator(Text)."""

        clean = (self.__stop_word_remover(t, TextCleaner.STOPWORDS) for t in text)
        return list(clean)

    @classmethod 
    def alpha_iterator(self, text, remove_numeric=True, remove_emoticon=True):
        """Calls __alphaizer to apply this method to array-like objects. Usage:
        TextCleaner.alphaizer(Text).
        Note: By default this method removes numbers from each string.
        To change this behavior pass the flag remove_numerals:
        alphaizer(Text, remove_numerals = False)
        """

        clean = (self.__alphaizer(t, remove_numeric, remove_emoticon) for t in text)
        return list(clean)
    
    @classmethod 
    def custom_stop_word_iterator(self, text, stop_words):
        """Removes custom stop-words. For cleaned example, "patient", or "medicine", if
        one is dealing with medical Text and do not want to include those words
        in analysis. Can use this method to pass any set of stop
        words, or in-lieu of common stop-word method stop_word_iterator.Calls
        __stop_word_remover to apply this method to array-like objects. Usage:
        TextCleaner.custom_stop_word_iterator(Text, stop_words), where
        stop-words and Text are in a comma-
        separated list, or iterable."""

        clean = (self.__stop_word_remover(t, stop_words) for t in text)
        return list(clean)
    
    @classmethod 
    def streaming_cleaner_and_tokenizer(self, text, remove_numeric=True, remove_emoticon=False):
        """Called per line of text in a a stream application, such as SGD.
        Can be passed directly to SGD algorithms as TextCleaner.streaming_cleaner_and_tokenizer"""

        alpha = self.__alphaizer(text=text, remove_numeric=remove_numeric,
                                 remove_emoticon=remove_emoticon)

        clean = self.__stop_word_remover(alpha, self.stop_words)
        tokens = TextCleaner.tokenizer(clean)
        return tokens
    
    @classmethod 
    def set_emoticons(self, emoticons):
        """Called from __alphaizer. 
        Sets self.emoticons value to be retrieved."""

        self.emoticons = emoticons



In [4]:
url = "http://minio-service.kubeflow.svc.cluster.local:9000/hatterasuser/data/ratings_shuffled.csv" #minio server

req = requests.get(url)
content = req.content

In [5]:
with open('data/ratings_shuffled.csv', 'wb') as csv_file:

    csv_file.write(content)

In [6]:
df = pd.read_csv('data/ratings_shuffled.csv')

In [7]:
df.head()

Unnamed: 0,review,sentiment
0,"First of all, I have to start this comment by ...",1
1,The brilliance of this story delivers at least...,1
2,Spheeris debut must be one of the best music d...,1
3,I have to admit that I had low expectations fo...,1
4,"Ying, a Chinese girl who speaks Czech, invited...",1


In [8]:
holdout = df.iloc[[5,6,7,8]]

In [9]:
indexes = [x for x in df.index if x not in holdout.index]

In [10]:
model_data = df.iloc[indexes]

In [11]:
holdout.to_csv("data/holdout.csv", index=False)

In [12]:
cleaner = TextCleaner()

In [13]:
model_data['review'] = cleaner.alpha_iterator(model_data['review'], remove_emoticon=False)

In [14]:
model_data['review'] = cleaner.stop_word_iterator(model_data['review'])

In [15]:
model_data.head()

Unnamed: 0,review,sentiment
0,start comment saying huge nightmare elm street...,1
1,brilliance story delivers skillfully crafted m...,1
2,spheeris debut best music documentaries time f...,1
3,admit low expectations movie surprised enterta...,1
4,ying chinese girl speaks czech invited screeni...,1


In [16]:
x = model_data['review'].values
y = model_data['sentiment'].values

In [17]:
xtrain, xtest, ytrain, ytest=train_test_split(x, y, test_size=0.15)

In [20]:
vect = HashingVectorizer(decode_error = 'ignore',
                        n_features = 2**20,
                        preprocessor = None,
                        tokenizer = cleaner.__tokenizer())

AttributeError: 'TextCleaner' object has no attribute '__tokenizer'

In [17]:
xtrain = vect.transform(xtrain)

In [18]:
xtest = vect.transform(xtest)

In [19]:
x = vect.transform(x) # for final model

In [20]:
xgbc = XGBClassifier()
print(xgbc)
XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
       colsample_bynode=1, colsample_bytree=1, gamma=0, learning_rate=0.1,
       max_delta_step=0, max_depth=3, min_child_weight=1, missing=None,
       n_estimators=100, n_jobs=1, nthread=None,
       objective='multi:softprob', random_state=0, reg_alpha=0,
       reg_lambda=1, scale_pos_weight=1, seed=None, silent=None,
       subsample=1, verbosity=1) 

XGBClassifier()


XGBClassifier(objective='multi:softprob')

In [21]:
xgbc.fit(xtrain, ytrain)

XGBClassifier()

In [22]:
scores = cross_val_score(xgbc, xtrain, ytrain, cv=5)


In [23]:
print("Mean cross-validation score: {}".format(scores.mean())) 

Mean cross-validation score: 0.8022873445318826


In [None]:
kfold = KFold(n_splits=10, shuffle=True)
kf_cv_scores = cross_val_score(xgbc, xtrain, ytrain, cv=kfold )
print("K-fold CV average score: {}".format(kf_cv_scores.mean()))

In [None]:
ypred = xgbc.predict(xtest)
cm = confusion_matrix(ytest,ypred) 

In [None]:
cm

In [None]:
model = xgbc.fit(x, y)

In [None]:
# save file to pickle
with open("model.pkl", "wb") as file:
    
    pickle.dump(model, file)

In [None]:
with open("credentials.txt", "r") as f:
    
    creds = f.read()
    
aws_access_key_id = creds.split()[0]
aws_secret_access_key = creds.split()[1]

In [None]:
s3 = boto3.client(
        "s3",
        endpoint_url="http://10.100.97.179:9000",
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        config=Config(signature_version="s3v4"),
    )


In [None]:
response = s3.list_buckets()
export_bucket = "sentiment-model"
export_bucket_exists = False

for bucket in response["Buckets"]:
    if bucket["Name"] == export_bucket:
        export_bucket_exists = True
        
if not export_bucket_exists:
    
    s3.create_bucket(ACL="public-read-write", Bucket=export_bucket)

In [None]:
model_name = "sentiment-model"
model_version = "1"

s3.upload_file("model.pkl",
                export_bucket,
                f"{model_name}/{model_version}.pkl",
                ExtraArgs={"ACL": "public-read"},
            )

In [37]:
help(kfp.run_pipeline_func_on_cluster)

Help on function run_pipeline_func_on_cluster in module kfp._runners:

run_pipeline_func_on_cluster(pipeline_func:Callable, arguments:Mapping[str, str], run_name:str=None, experiment_name:str=None, kfp_client:kfp._client.Client=None, pipeline_conf:kfp.dsl._pipeline.PipelineConf=None)
    Runs pipeline on KFP-enabled Kubernetes cluster.
    
    This command compiles the pipeline function, creates or gets an experiment
    and submits the pipeline for execution.
    
    Feature stage:
    [Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha)
    
    Args:
      pipeline_func: A function that describes a pipeline by calling components
      and composing them into execution graph.
      arguments: Arguments to the pipeline function provided as a dict.
      run_name: Optional. Name of the run to be shown in the UI.
      experiment_name: Optional. Name of the experiment to add the run to.
      kfp_client: Opti

In [41]:
kfp.run_pipeline_func_on_cluster(run_name="test-1119", pipeline_func)

TypeError: run_pipeline_func_on_cluster() missing 2 required positional arguments: 'pipeline_func' and 'arguments'

In [38]:
minio.kubeflow.svc.cluster.local:9000

/bin/bash: ping: command not found


In [43]:
# hatterasdatabase : model,....,pipeline
# need pipeline name, yaml need uuid from pipeline 
# when we dpeloy we can get pipelines for namespace -> then they can get the latest? need this to get yaml
# 

In [None]:
# deploy querying mysql for 