In [39]:
import warnings
warnings.filterwarnings('ignore')

In [40]:
output_dir = "/home/jovyan/stage-f-10-police-shootings/data/out"

In [41]:
def get_data(download_url, out_path):
    import subprocess
    import sys
    import logging
    
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    
    default_url = "https://storage.googleapis.com/used-cars/shootings/shootings.csv"
    url = download_url if download_url.startswith("https://storage.googleapis") else default_url
    
    subprocess.run(["wget", "-O", f"{out_path}/shootings.csv", url])
    
    print("File Downloaded")

    

In [42]:
# get_data("", output_dir)

In [43]:
def preprocess(out_path):
    import subprocess
    import sys
    
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas', 'scikit-learn', 'imblearn'])
    
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import LabelEncoder
    import imblearn
    from imblearn.over_sampling import SMOTE
    import pandas as pd
    from sklearn.utils import shuffle
    import logging
    import pickle
    
    
    def pre_process(data, out_path, is_train=False):


        cat_cols = ['state', 'arms_category', 'race']

        onehot_encoding_columns = ['gender', 'signs_of_mental_illness', 'manner_of_death', 'body_camera']

        data = pd.get_dummies(data, drop_first=True, columns=onehot_encoding_columns, prefix_sep='-')

        if is_train:

            state_encoder = LabelEncoder()
            ac_encoder = LabelEncoder()
            race_encoder = LabelEncoder()

            cat_cols_encoders = [state_encoder, ac_encoder, race_encoder]

            encoders = zip(cat_cols, cat_cols_encoders)

            for column, encoder in encoders:

                data[column] = encoder.fit_transform(data[column])

                with open(f"{out_path}/{column}_encoder.pkl", "wb") as enc:
                    pickle.dump(encoder, enc)



        else:
            encoders_dict = {}
            for col in cat_cols:

                with open(f"{out_path}/{col}_encoder.pkl", "rb") as enc:
                    encoders_dict[f"{col}_encoder"] = pickle.load(enc)


            encoders = zip(cat_cols, encoders_dict.keys())

            for col, encoder in encoders:
                data[col] = encoders_dict[encoder].transform(data[col])


        df_copy = data.copy()

        df_copy = shuffle(df_copy)
        features = df_copy.drop(columns=['name','date','label', 'id', 'armed', 'city', 'threat_level', 'flee' ])
        target = df_copy['label']


        # Oversampling the undersampled labels
        if is_train:
            smote = SMOTE(random_state=0)
            X, y = smote.fit_sample(features, target)

        else:
            X, y = features, target

        # converting ndarray to dataframe
        X = pd.DataFrame(X, columns=features.columns)
        y = pd.Series(y, name=target.name)

        return X, y
    
    
        
    def f(row):
        
        '''
          Function that will be used to create the target column of two classes 1 and 0.
          Where 1 represents the unjustified cases and 0 represents the just ones. 
          '''
        if ((row['threat_level']=='undetermined' or row['threat_level']=='other') and (row['flee']=='Not fleeing')):
            val = 1

        else:
            val = 0
        return val
    

 

    
    data = pd.read_csv(f'{out_path}/shootings.csv')
    data['label'] = data.apply(f, axis=1)
    train, test = train_test_split(data, test_size=0.2, random_state=100)
    
    trainset = pre_process(train, out_path, is_train=True)
    testset = pre_process(test, out_path)
    
    logging.info(f"Training data count: {trainset[0].shape}")
    logging.info(f"Testing data count: {testset[0].shape}")
    

        
    with open(f"{out_path}/trainset.pkl", "wb") as train:
        pickle.dump(trainset, train)
        
    with open(f"{out_path}/testset.pkl", "wb") as test:
        pickle.dump(testset, test)

In [44]:
def hyperparameter_tuning():
    return "Hyperparameters tuned"

In [45]:
# prepare_data(output_dir)

In [46]:
def train(out_path, trainset, bucket_name, model_path):
    
    import sys
    import subprocess
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas', 'scikit-learn'])
    import json
    from sklearn.linear_model import LogisticRegression
    import numpy as np
    import pickle
    import joblib
    
    
    with open(f"{out_path}/{trainset}", 'rb') as f:
        preprocessed_data = pickle.load(f)
        
    features = preprocessed_data[0]
    targets = preprocessed_data[1]
    
    lrc = LogisticRegression()
    lrc.fit(features, targets)
    
    
    with open(f'{out_path}/model.joblib', "wb") as model:
        joblib.dump(lrc, model)


        
    return json.dumps({"features": features.values.tolist()})

    
    


In [47]:
# train(output_dir, 'trainset.pkl', 'police-shootings', 'model')

In [48]:
def test(out_path, testset, model_name):
    
    import sys
    import subprocess
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas', 'scikit-learn'])
    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import roc_auc_score
    import numpy as np
    import pickle
    import joblib
    import logging
    
    with open(f"{out_path}/{testset}", 'rb') as f:
        preprocessed_data = pickle.load(f)
        
    with open(f"{out_path}/{model_name}", 'rb') as f:
        lrc = joblib.load(f)
        
    features = preprocessed_data[0]
    targets = preprocessed_data[1]
    
    lrc_pred = lrc.predict_proba(features)
    auc_score = roc_auc_score(targets, lrc_pred[:,1])
    
    print("AUC Score", auc_score)
    logging.info(f"ROC Score: {auc_score}")
    
    return features
    
    


In [49]:
test(output_dir, 'testset.pkl', 'model.joblib')

AUC Score 0.6045497280635986


Unnamed: 0,age,race,state,arms_category,gender-M,signs_of_mental_illness-True,manner_of_death-shot and Tasered,body_camera-True
4720,34.0,5,34,3,1,1,0,0
917,30.0,4,47,8,1,0,0,0
4558,35.0,5,26,3,1,0,0,0
2609,61.0,5,25,10,1,0,0,0
1599,23.0,2,4,3,1,0,0,0
...,...,...,...,...,...,...,...,...
4779,24.0,1,33,3,1,0,0,1
4746,67.0,5,9,3,1,0,0,0
3537,25.0,5,49,8,1,0,0,0
1709,28.0,1,22,3,1,0,0,1


In [50]:
def upload(out_path, bucket_name):
    import os
    import subprocess
    import sys
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'google-cloud-storage'])
    from google.cloud import storage

    def upload_blob(bucket_name, source_file_name, destination_blob_name):
        """Uploads a file to gcp bucket."""
        bucket_name = bucket_name
        source_file_name = source_file_name
        destination_blob_name = destination_blob_name

        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)

        blob.upload_from_filename(source_file_name)

        print(
            "File {} uploaded to {}.".format(
                source_file_name, destination_blob_name
            )
        )
    
    os.chdir(out_path)
    files = os.listdir()
    for file in files:
        if 'set' not in file:
            if 'pkl' in file:
                upload_blob(bucket_name, f'{file}', f'encoders/{file}')
            elif 'joblib' in file:
                upload_blob(bucket_name, f'{file}', f'model/{file}')





In [51]:
# upload(output_dir, 'police-shootings')

In [52]:
import kfp
from kfp import dsl
import kfp.components as comp
from string import Template
import json

In [53]:
download_op = comp.func_to_container_op(get_data, base_image="python:3.7")

preprocess_op = comp.func_to_container_op(preprocess, base_image="python:3.7-slim")

tuning_op = comp.func_to_container_op(hyperparameter_tuning, base_image="python:3.7-slim")

train_op = comp.func_to_container_op(train, base_image="python:3.7-slim")

validate_op = comp.func_to_container_op(test, base_image="python:3.7-slim")

upload_op = comp.func_to_container_op(upload, base_image="python:3.7-slim")


In [54]:
@dsl.pipeline(
    name="Police Shootings Sales Justification Pipeline",
    description="A Machine Learning Pipeline for determining police shootings justification"
)

def police_shootings_pipeline(
    out_path="/mnt",
    trainset="trainset.pkl",
    testset="testset.pkl",
    model_name="model.joblib",
    bucket_name="used-cars",
    model_path="model",
    download_url=" ",
    serving_name="my-ps-serving",
    serving_namespace="kubeflow",
    serving_export_dir="gs://used-cars/model",
    transform_image="gcr.io/kubeflow-292422/police-shootings-processing:latest"
    
):
    
    volume_op = dsl.VolumeOp(
        name="volume",
        resource_name="data-volume",
        size="2Gi",
        modes=dsl.VOLUME_MODE_RWO)
    
    download = download_op(download_url, out_path).add_pvolumes({out_path: volume_op.volume})
    
    preprocess = preprocess_op(out_path).add_pvolumes({out_path: download.pvolume})
    
    hyperparameter_tuning = tuning_op().add_pvolumes({out_path: preprocess.pvolume})
    
    train = train_op(out_path, trainset, bucket_name, model_path).add_pvolumes({out_path: hyperparameter_tuning.pvolume})
    
    validate = validate_op(out_path, testset, model_name).add_pvolumes({out_path: train.pvolume})
    
    upload = upload_op(out_path, bucket_name).add_pvolumes({out_path: validate.pvolume})
    
    
    
#     kfserving_template = Template(
#         """
#             {
#                   "apiVersion": "serving.kubeflow.org/v1alpha2",
#                   "kind": "InferenceService",
#                   "metadata": {
#                     "labels": {
#                       "controller-tools.k8s.io": "1.0"
#                     },
#                     "name": "$name",
#                     "namespace": "$namespace"
#                   },
#                   "spec": {
#                     "default": {
#                       "predictor": {
#                         "minReplicas": 1,
#                         "serviceAccountName": "kf-user",
#                         "custom": {
#                             "container": {
#                                 "name": "predictor",
#                                 "image": "$transformer",
#                                 "command": 
#                             }
#                         }
#                       }
#                     }
#                   }
#         }
#         """
#     )
    
#     kfservingjson = kfserving_template.substitute({'name': str(serving_name),
#                                                   'namespace': str(serving_namespace),
#                                                   'bucket': str(serving_export_dir),
#                                                   'transformer': str(transform_image)})
    
#     kfservingdeployment = json.loads(kfservingjson)
    
#     serve = dsl.ResourceOp(
#         name="serve",
#         k8s_resource=kfservingdeployment,
#         action="apply",
#         success_condition="status.url"
#     )
    
#     (
#         serve
#         .after(test)
#         .add_volume({out_path: test.pvolume})
#     )
    



In [55]:
pipeline_func = police_shootings_pipeline
experiment_name = 'used-cars-training'
run_name = 'used-cars-pipeline run'

In [56]:
# OUT_PATH = '/mnt',
# TRAINSET = 'trainset.pkl',
# TESTSET = 'testset.pkl',
# MODEL_URI = '/tmp/export',
# DOWNLOAD_URL = ""

In [57]:

# arguments = {
#     "out_path": OUT_PATH,
#     "trainset": TRAINSET,
#     "testset": TESTSET,
#     "model_uri": MODEL_URI,
#     "download_url": DOWNLOAD_URL,
# }

kfp.compiler.Compiler().compile(pipeline_func, f'{experiment_name}.zip')


In [58]:
client = kfp.Client()

run_result = client.create_run_from_pipeline_func(pipeline_func,
                                                    experiment_name=experiment_name,
                                                    run_name=run_name,
                                                    arguments={}
                                                 )