In [1]:
import kfp
from kfp import dsl
from kfp.components import create_component_from_func, InputPath, OutputPath
from kubernetes import client as k8s

In [2]:
client = kfp.Client()

In [4]:
BASE_IMAGE = "python:3.7"

In [5]:
def getData(
    output_dataframe_path: OutputPath()
):
    import pandas as pd
    import influxdb_client
    import os
    from pathlib import Path
    
    bucket = "MaiL"
    config_path = "/mnt/data/config.ini"
    
    retDf = pd.DataFrame()
    colName = []
    client = influxdb_client.InfluxDBClient.from_config_file(config_path)
    query_api = client.query_api()
    query = f'from(bucket:"{bucket}")\
    |> range(start:-1d)\
    |> filter(fn:(r)=> r._measurement=="SensorData" and r.Name=="M16M")'

    result = query_api.query(query=query)

    for table in result:
        col = table.records[0].get_field()
        colName.append(col)
        temp = []
        for record in table.records:
            temp.append((int(record.get_value())))
        retDf[col] = temp
    
    retDf = retDf[["MM", "DD", "Day", "HH", "Min", "Sec",
    "Illuminance", "Movement", "Manual", "On",
     "Brightness"]]
    output_path = Path(output_dataframe_path)
    output_path.mkdir(parents=True, exist_ok=True)
    
    retDf.to_csv(output_path/"data.csv", mode='w',index = False)
    # return retDf

In [6]:
getData_op = create_component_from_func(getData,
                                        base_image=BASE_IMAGE,
                                        packages_to_install=["pandas","influxdb_client",
                                                            "pathlib"])

In [7]:
def dataSplit(
    dataframe_path : InputPath(),
    output_dataframe_path: OutputPath()
             ):
    import pandas as pd
    import numpy as np
    from pathlib import Path
    from sklearn.model_selection import train_test_split
    
    input_path = Path(dataframe_path)
    output_path = Path(output_dataframe_path)
    output_path.mkdir(parents=True, exist_ok=True)
    
    myData = np.genfromtxt(input_path/"data.csv", delimiter=",", dtype = int)
    feature_size = myData.shape[1] - 1
    x_train, x_test, y_train, y_test = train_test_split(myData[:,:feature_size], myData[:,feature_size:],
                                                    test_size=0.1, random_state=42,
                                                   shuffle=False)
    
    np.savetxt(output_path/"x_train.csv",x_train, delimiter=",", fmt="%d")
    np.savetxt(output_path/"x_test.csv",x_test, delimiter=",", fmt="%d")
    np.savetxt(output_path/"y_train.csv",y_train, delimiter=",", fmt="%d")
    np.savetxt(output_path/"y_test.csv",y_test, delimiter=",", fmt="%d")

In [8]:
dataSplit_op = create_component_from_func(dataSplit,
                                        base_image=BASE_IMAGE,
                                        packages_to_install=["numpy","pandas", "pathlib",
                                                            "scikit-learn"])

In [11]:
def modelTransferLearning(
    dataframe_path : InputPath(),
    # x_train, y_train,
    # pre_model
    output_model_path : OutputPath()
):
    import tensorflow as tf
    import matplotlib.pyplot as plt
    from pathlib import Path
    import pandas as pd
    import numpy as np
    
    dataframe_path = Path(dataframe_path)
    output_path = Path(output_model_path)
    output_path.mkdir(parents=True, exist_ok=True)
    
    x_train = np.genfromtxt(dataframe_path/"x_train.csv", delimiter=",", dtype=int)
    y_train = np.genfromtxt(dataframe_path/"y_train.csv", delimiter=",", dtype=int)
    
    x_test = np.genfromtxt(dataframe_path/"x_test.csv", delimiter=",", dtype=int)
    y_test = np.genfromtxt(dataframe_path/"y_test.csv", delimiter=",", dtype=int)
    
    model_path = "/mnt/data/myModel"
    pre_model = tf.keras.models.load_model(model_path)
    
    model = tf.keras.models.Sequential()
    pre_feat = pre_model.layers[0].input.shape[1]
    new_feat = x_train[1].shape[0]
    feat_change = False
    
    if pre_feat == new_feat:
        model.add(pre_model.layers[0])
    else:
        model.add(tf.keras.layers.Dense(16, activation = 'relu', input_shape = (new_feat, )))
        feat_change = True
    
    for i in range(1, len(pre_model.layers)):
        model.add(pre_model.layers[i])
    for i in range(len(pre_model.layers) - 2):
        if feat_change and i == 0: continue
        model.layers[i].trainable = False
    
    model.compile(optimizer = 'adam', loss = 'mse')
    history = model.fit(x_train, y_train,
    epochs = 10,
    batch_size = 32,
    validation_split = 0.18)

    # Visualization
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
    
    
    model.save(output_path/"myModel")
    np.savetxt(output_path/"x_test.csv",x_test, delimiter=",", fmt="%d")
    np.savetxt(output_path/"y_test.csv",y_test, delimiter=",", fmt="%d")

In [12]:
modelTransferLearning_op = create_component_from_func(modelTransferLearning,
                                          base_image = BASE_IMAGE,
                                          packages_to_install=["tensorflow", "matplotlib", "pathlib", "pandas",
                                                              "numpy"])

In [13]:
def modelTest(
    data_path : InputPath(),
    # x_test, y_test,
    # model
    model_path : OutputPath()
):
    import tensorflow as tf
    import matplotlib.pyplot as plt
    import pandas as pd
    from pathlib import Path
    import numpy as np
    
    data_path = Path(data_path)
    
    
    x_test = np.genfromtxt(data_path/"x_test.csv", delimiter=",", dtype=int)
    y_test = np.genfromtxt(data_path/"y_test.csv", delimiter=",", dtype=int)
    
    model = tf.keras.models.load_model(data_path/"myModel")
    
    score = model.evaluate(x_test, y_test)
    # Visualization
    y_pred = model.predict(x_test)
    x_label = [x for x in range(0, x_test.shape[0])]
    plt.scatter(x_label, y_test, label = "Actual", s = 0.01)
    plt.scatter(x_label, y_pred, label = "Predicted", s = 0.01)
    plt.xlabel('Sample count')
    plt.ylabel('Value')
    plt.legend()
    plt.show()
    
    output_path = Path(model_path)
    output_path.mkdir(parents=True, exist_ok=True)
    model.save(output_path/"myModel")

In [14]:
modelTest_op = create_component_from_func(modelTest,
                                          base_image = BASE_IMAGE,
                                          packages_to_install=["tensorflow", "matplotlib", "pathlib", "pandas",
                                                              "numpy"])

In [29]:
def modelConvert(
    model_path : InputPath(),
    tflite_path : OutputPath()
):
    import tensorflow as tf
    from pathlib import Path
    import os
    
    model_path = Path(model_path)
    
    model = tf.keras.models.load_model(model_path/"myModel")
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()
    
    output_path = Path(tflite_path)
    output_path.mkdir(parents=True, exist_ok=True)
    
    file_path = output_path/"myModel.tflite"
    with open(file_path, 'wb') as f:
        f.write(tflite_model)
    print("success")
    # return

In [30]:
modelConvert_op = create_component_from_func(modelConvert,
                                          base_image = BASE_IMAGE,
                                          packages_to_install=["tensorflow", "pathlib"])

In [36]:
def ModelServe(
    input_path : InputPath()
):
    import minio
    from pathlib import Path
    minioClient = minio.Minio("minio-service.kubeflow:9000",
            access_key="minio",
            secret_key="minio123",
            secure=False
        )
    minio_path = "M16M.tflite"
    local_path = Path(input_path)
    minioClient.fput_object(bucket, minio_path, local_path/"myModel.tflite")

In [37]:
modelServe_op = create_component_from_func(ModelServe,
                                           base_image = BASE_IMAGE,
                                           packages_to_install = ["minio", "pathlib"])

In [38]:
# pod setting
def pod_defaults(op):
    op.set_memory_request('10Mi').set_cpu_request('10m')
    # if you have to acess PVC,
    # Use below annotated code.
    volume_name = "a305-storages"
    volume_mount_path = "/mnt/data"
    volume = k8s.V1Volume(
        name=volume_name,
        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=volume_name),
    )
    volume_mount = k8s.V1VolumeMount(
        mount_path=volume_mount_path,
        name=volume_name,
    )
    op.add_pvolumes({volume_mount_path: volume})
    
    return op

In [39]:
# Build a pipeline using the component

@dsl.pipeline(
    name = 'simple-test',
    description = 'Toy Pipeline'
)
def simple_pipeline():
    data_path = pod_defaults(getData_op())
    data_path = data_path.output
    
    split_data_path = pod_defaults(dataSplit_op(data_path))
    split_data_path = split_data_path.output
    
    model_path = pod_defaults(modelTransferLearning_op(split_data_path))
    model_path = model_path.output
    
    test_path = pod_defaults(modelTest_op(model_path))
    test_path = test_path.output
    
    serve_path = pod_defaults(modelConvert_op(test_path))
    serve_path = serve_path.output
    
    pod_defaults(modelServe_op(serve_path))

In [40]:
arguments = {}
EXPERIMENT_NAME = 'my_simple_pipeline'
kfp.Client().create_run_from_pipeline_func(simple_pipeline,
                                           arguments=arguments,
                                           experiment_name=EXPERIMENT_NAME)

RunPipelineResult(run_id=b9ee11da-15cc-46a0-8a7d-bc315809963a)