# Creating a Pipeline

In [31]:
import warnings; warnings.simplefilter("ignore")
import pandas as pd
import pyarrow.parquet as pq
import kfp.components as comp
features_path='data/training.parquet'
train_image='gcr.io/stobias-dev/spam-trainer-image:latest'

# Create some utility functions

In [34]:
def s3_upload():
    import s3fs
    s3_endpoint='https://minio-kubeflow.apps.kubeflow.openshift.red8.cloud'
    access_key='minio'
    secret_access_key='minio123'
    bucket_name='mlpipeline'
    region='us-east-1'
    s3 = s3fs.S3FileSystem(
    client_kwargs=dict(
        endpoint_url=s3_endpoint),
        key=access_key,
        secret=secret_access_key)
    
def get_s3_client():
    import s3fs
    import pyarrow.parquet as pq
    s3_endpoint='https://minio-kubeflow.apps.kubeflow.openshift.red8.cloud'
    access_key='minio'
    secret_access_key='minio123'
    bucket_name='mlpipeline'
    region='us-east-1'
    
    return s3fs.S3FileSystem(
        client_kwargs=dict(
            endpoint_url=s3_endpoint
        ),
        key=access_key,
        secret=secret_access_key
        )
def create_training_data_vol(namespace,model_name,storage_class):
    return dsl.VolumeOp(
        name='model_volume',
        resource_name=str(model_name) + '-modelpvc',
        size='10Gi',
        modes=['ReadWriteMany'],
        storage_class=storage_class
    )

# Create feature file

In [37]:
def gen_features(bucket_name,training_data_path,features_output_path):
    import s3fs
    import pyarrow.parquet as pq
    from sklearn.feature_extraction.text import HashingVectorizer,TfidfTransformer
    from sklearn.pipeline import Pipeline
    import pickle, os
    from mlworkflows import util
    
    s3 = get_s3_client()
    # Download the dataset.
    pandas_dataframe = pq.ParquetDataset(f"s3://{bucket_name}/{training_data_path}", filesystem=s3).read_pandas().to_pandas()
    vect = HashingVectorizer(norm=None, token_pattern='(?u)\\b[A-Za-z]\\w+\\b', n_features=1024, alternate_sign = False)
    tfidf = TfidfTransformer()
    feat_pipeline = Pipeline([
        ('vect',vect),
        ('tfidf',tfidf)
    ])
    ## Save our vectors
    util.serialize_to(feat_pipeline, "feature_pipeline.sav")
    feature_vecs = feat_pipeline.fit_transform(data["text"]).toarray()
    labeled_vecs = pd.concat([data.reset_index()[["index", "label"]],
                                    pd.DataFrame(feature_vecs)], axis=1)
    labeled_vecs.columns = labeled_vecs.columns.astype(str)
    with s3.open(f"{bucket_name}/{features_output_path}",'wb') as f:
        labeled_vecs.to_parquet(f)
## Generate our training step.
gen_features_op = comp.create_component_from_func(base_image=train_image,func=gen_features)

# Download features and train model

In [44]:
def train_model(bucket_name,features_path,model_output_path):
    from sklearn import model_selection
    from sklearn.linear_model import LogisticRegression
    import pandas as pd
    import s3fs
    import pyarrow.parquet as pq
    from mlworkflows import util
    ## Load features from features path
    feats = pq.ParquetDataset(f"s3://{bucket_name}/{features_path}", filesystem=s3).read_pandas().to_pandas()
    ## Train the model
    train, test = model_selection.train_test_split(feats, random_state=43)
    model = LogisticRegression(solver = 'lbfgs', max_iter = 4000)
    model.fit(X=train.iloc[:,2:train.shape[1]], y=train["label"])
    # Save the model
    with s3.open(f"{bucket_name}/{output_path}",'wb') as f:
        util.serialize_to(model, f)
## Generate our training step.
train_op = comp.create_component_from_func(base_image=train_image,func=train_model)

# Create the pipeline

In [56]:
s3_endpoint='https://minio-kubeflow.apps.kubeflow.openshift.red8.cloud'
access_key='minio'
secret_access_key='minio123'
bucket_name='mlpipeline'
region='us-east-1'

def spam_filter_pipeline(name='spam_finder',
                         training_data_path='data/training.parquet',
                         features_output_path='data/features.parquet',
                         model_output_path='data/model_output.'):
    step1 = gen_features_op(bucket_name,training_data_path=training_data_path,features_output_path=features_output_path)
    step2 = train_op(bucket_name,features_path=features_output_path,model_output_path=model_output_path).after(step1)

In [57]:
host=None

# Submit a pipeline run
from kfp_tekton import TektonClient
#wow_ai_experiment = TektonClient(host=host).create_experiment(name='wow_ai', 
#                                                   description='Pipeline for reinforcement learning')
TektonClient(host=host).create_run_from_pipeline_func(spam_filter_pipeline, experiment_name='spam_filter', arguments={})

RunPipelineResult(run_id=bd1b041c-80b0-4517-99c2-32b61fab5a20)