In [1]:
import datetime
import importlib.resources as pkg_resources
import kfp
from typing import NamedTuple

from kfp_ca import config
from kfp_ca import sql as sql_dir

In [2]:
conf = config.load()

In [3]:
src_qry = pkg_resources.read_text(sql_dir,'big_query_extract_dataset.sql')

In [4]:
def seeded_bq_query(seed, qry, bucket, name_str, ts) -> NamedTuple('DataFileLocation',[('bucket',str),('filename',str)]):
    import pandas as pd
    import datetime
    from google.cloud import bigquery, storage
    bqclient = bigquery.Client()
    
    spaced_seed = seed.replace('|',' | ')
    quoted_seed = '" '+seed+' "'
    
    df = bqclient.query(qry.replace("SEEDS",quoted_seed)).to_dataframe()
    df.drop('rnk',axis=1).to_feather('/tmp/df.feather')
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket)
    
    filename = f"{name_str}.{ts}.feather"
    
    blob = bucket.blob(filename)
    blob.upload_from_filename('/tmp/df.feather')
    
    return blob.bucket.name, blob.name

In [5]:
def download_and_train(input_bucket: str, 
                       input_file:str, 
                       output_bucket:str, 
                       ts:str, 
                       model_label:str) -> NamedTuple('ModelOutput',[('bucket',str),
                                                                     ('model_label',str),
                                                                     ('eval_data',str)]):
    
    import joblib
    import pandas as pd    
    from google.cloud import storage
    from sklearn.pipeline import Pipeline
    from sklearn.linear_model import SGDClassifier
    from sklearn.model_selection import train_test_split, GridSearchCV
    from sklearn.feature_extraction.text import CountVectorizer
    import traceback

    def download_blob(bucket_name, source_blob_name, destination_file_name):
        """Downloads a blob from the bucket."""
        # bucket_name = "your-bucket-name"
        # source_blob_name = "storage-object-name"
        # destination_file_name = "local/path/to/file"

        storage_client = storage.Client()

        bucket = storage_client.bucket(bucket_name)

        # Construct a client side representation of a blob.
        # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
        # any content from Google Cloud Storage. As we don't need additional data,
        # using `Bucket.blob` is preferred here.
        blob = bucket.blob(source_blob_name)
        blob.download_to_filename(destination_file_name)

        
    def single_uploader(local_file, bucket):

        stripped_file_name = local_file.split('/')[-1]

        client = storage.Client()
        bucket = client.bucket(bucket)
        blob = bucket.blob(stripped_file_name)
        blob.upload_from_filename(local_file)

    download_blob(input_bucket,input_file,f'/tmp/df-{ts}.feather')
    
    df = pd.read_feather(f'/tmp/df-{ts}.feather')
        
    y = df['label'].values
    X = df['content'].values

    X_train, X_test, y_train, y_test = train_test_split(X,y)
    
    eval_label = f'evaldata-{model_label}-{ts}.pkl'
    with open("/tmp/"+eval_label,'wb') as f:
        joblib.dump((X_test,y_test),f)
        print(f"Xtest shape: {X_test.shape}")
    
    single_uploader("/tmp/"+eval_label, output_bucket)

    cls = SGDClassifier(loss='log',
        penalty='elasticnet',
        learning_rate='adaptive',
        eta0=2,
        verbose=1,
        tol=1e-2,
        max_iter=10)

    count_vectorizer = CountVectorizer()

    pipeline = Pipeline([
        ('vect', count_vectorizer),
        ('cls', cls)
        ])

    grid = {'cls__alpha':[0.01,0.5,0.99,2,5], 
            'cls__l1_ratio':[0.01,0.5,0.99]}

#     grid_search_cv = GridSearchCV(pipeline, param_grid=grid, scoring='roc_auc')


    model = pipeline.fit(X=X_train.ravel(), y=y_train)

    model_label = f'{model_label}-{ts}.joblib'
    

    
    joblib.dump(model,f'/tmp/{model_label}')

    single_uploader(f'/tmp/{model_label}', output_bucket)
        
    return output_bucket, model_label, eval_label 
    

In [6]:
def metrics_component(bucket, eval_data, model_file) -> NamedTuple('Outputs', [
  ('mlpipeline_metrics', 'Metrics'),
]):
    
    from google.cloud import storage
    import joblib
    import json
    from sklearn.metrics import roc_auc_score
    import pandas as pd    
    
    def download_blob(bucket_name, source_blob_name, destination_file_name):
        """Downloads a blob from the bucket."""

        storage_client = storage.Client()

        bucket = storage_client.bucket(bucket_name)

        blob = bucket.blob(source_blob_name)
        blob.download_to_filename(destination_file_name)
        
        
    def top_and_bottom(model):
        

        # {id:'contentid'}
        inverse_dictionary = {model[0].vocabulary_[k]:k for k in model[0].vocabulary_.keys()}

        dictionary_df = (
            pd.DataFrame(inverse_dictionary.items())
            .rename(columns={0:'id', 1:'content_id'})
        ).set_index('id')

        coef_df = pd.DataFrame({
            'coef':model[1].coef_[0],
            'id':range(len(model[1].coef_[0]))}).set_index('id')

        content_df = dictionary_df.join(coef_df).set_index('content_id')

        all_df = (
            content_df.join(macro_content_df)
            .sort_values("coef", ascending=False))

        return all_df
        
                
    download_blob(bucket, eval_data, '/tmp/eval_data.joblib')
    
    X, y = joblib.load('/tmp/eval_data.joblib')

    download_blob(bucket, model_file, '/tmp/model_file.joblib')

    model = joblib.load('/tmp/model_file.joblib')

    download_blob(bucket, 'macro_content.csv', '/tmp/macro_content.csv')
    macro_content_df = pd.read_csv('/tmp/macro_content.csv').set_index('content_id')
    print(top_and_bottom(model))
        
    preds = model.predict_proba(X)[:,1]
    accuracy_score = model.score(X,y)
    roc_auc_score = roc_auc_score(y_true=y,y_score=preds)
    print(f"AUC Score: {roc_auc_score}")
    
    metrics = {
        'metrics': [{
          'name': 'Accuracy', # The name of the metric. Visualized as the column name in the runs table.
          'numberValue':  accuracy_score, # The value of the metric. Must be a numeric value.
          'format': "PERCENTAGE",   # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
        },
        {
            'name':'AUC',
            'numberValue': roc_auc_score,
            'format':'PERCENTAGE',
        }]
      }
    
    return [json.dumps(metrics)]

    

In [7]:
bq_op = kfp.components.func_to_container_op(func=seeded_bq_query,
                                                 packages_to_install=['google-cloud-storage',
                                                                      'google-cloud-bigquery-storage',
                                                                      'pyarrow',
                                                                      'pandas',
                                                                      'google-cloud-bigquery'],
                                                 output_component_file='seeded_bq_op.yaml')

train_op = kfp.components.func_to_container_op(func=download_and_train,
                                                 packages_to_install=['google-cloud-storage',
                                                                      'pandas',
                                                                      'pyarrow',
                                                                      'scikit-learn'],
                                                 output_component_file='download_op.yaml')

metrics_op = kfp.components.func_to_container_op(func=metrics_component,
                                                packages_to_install=['google-cloud-storage',
                                                                     'scikit-learn',
                                                                     'pandas'],
                                                output_component_file='metrics_op.yaml')

In [8]:
client = kfp.Client(host='3fad3dc513a320a4-dot-us-central2.pipelines.googleusercontent.com')

In [9]:
@kfp.dsl.pipeline(
    name='Scikit_KFP',
    description=''
)
def my_pipeline_func(
    qry,
    seed,    
    bq_output_path,
    ts,
    model_label
):

    bq_op_output = bq_op(seed, qry, bq_output_path, name_str="bq_data",ts=ts)
    
    train_op_output = train_op(input_bucket=bq_op_output.outputs['bucket'], 
                 input_file=bq_op_output.outputs['filename'],
                 output_bucket=bq_output_path,
                 ts=ts,
                 model_label=model_label)
    
    metrics_output = metrics_op(bucket=train_op_output.outputs['bucket'],
                                eval_data=train_op_output.outputs['eval_data'],
                                model_file=train_op_output.outputs['model_label'])


In [16]:
model_label = 'OL7454 - Music Fans'
model_seed = 'ff3r|ckxv'

pipeline_args = {
    'qry':src_qry,
    'seed':model_seed,
    'bq_output_path':'kfpca_bq',
    'ts':datetime.datetime.now().isoformat(),
    'model_label':model_label
}

my_run = client.create_run_from_pipeline_func(pipeline_func=my_pipeline_func,
                                     arguments=pipeline_args,
                                     run_name=f"{model_label}-{datetime.datetime.now().isoformat()}",
                                     experiment_name="KFP Demo")

In [14]:
model_label = 'OL7608 - Millenial Media'
model_seed = '4sk|dwbo|xupk'

pipeline_args = {
    'qry':src_qry,
    'seed':model_seed,
    'bq_output_path':'kfpca_bq',
    'ts':datetime.datetime.now().isoformat(),
    'model_label':model_label
}

my_run = client.create_run_from_pipeline_func(pipeline_func=my_pipeline_func,
                                     arguments=pipeline_args,
                                     run_name=f"{model_label}-{datetime.datetime.now().isoformat()}",
                                     experiment_name="KFP Demo")

In [15]:
model_label = 'OL8206 - Wedding Planning'
model_seed = 'cjbq,fzdx,bzck'

pipeline_args = {
    'qry':src_qry,
    'seed':model_seed,
    'bq_output_path':'kfpca_bq',
    'ts':datetime.datetime.now().isoformat(),
    'model_label':model_label
}

my_run = client.create_run_from_pipeline_func(pipeline_func=my_pipeline_func,
                                     arguments=pipeline_args,
                                     run_name=f"{model_label}-{datetime.datetime.now().isoformat()}",
                                     experiment_name="KFP Demo")