In [9]:
import kfp
import kfp.dsl as dsl
from kfp.dsl import InputPath, OutputPath
from kfp.dsl import component as component
from kfp import kubernetes

In [10]:
BASE_IMAGE = "traininghost/pipelineimage:latest"

In [None]:
@component(base_image=BASE_IMAGE, packages_to_install=['requests']) 
def train_export_model(trainingjobName: str, epochs: str, version: str):
    import tensorflow as tf
    import numpy as np
    import pandas as pd
    import os
    import zipfile
    import requests
    from featurestoresdk.feature_store_sdk import FeatureStoreSdk
    from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
    
    fs_sdk = FeatureStoreSdk()
    mm_sdk = ModelMetricsSdk()
    
    print("Job name is:", trainingjobName)
    
    # Fetch features for training
    features = fs_sdk.get_features(trainingjobName, ['pdcpBytesDl', 'pdcpBytesUl'])
    print("Features dataframe:")
    print(features)
    
    features_cellc2b2 = features
    print("Converting data types...")
    features_cellc2b2["pdcpBytesDl"] = pd.to_numeric(features_cellc2b2["pdcpBytesDl"], downcast="float")
    features_cellc2b2["pdcpBytesUl"] = pd.to_numeric(features_cellc2b2["pdcpBytesUl"], downcast="float")
    
    features_cellc2b2 = features_cellc2b2[['pdcpBytesDl', 'pdcpBytesUl']]
    
    # Function to split series for time series training
    def split_series(series, n_past, n_future):
        X, y = list(), list()
        for window_start in range(len(series)):
            past_end = window_start + n_past
            future_end = past_end + n_future
            if future_end > len(series):
                break
            X.append(series[window_start:past_end, :])
            y.append(series[past_end:future_end, :])
        return np.array(X), np.array(y)
    
    X, y = split_series(features_cellc2b2.values, 10, 1)
    X = X.reshape((X.shape[0], X.shape[1], X.shape[2]))
    y = y.reshape((y.shape[0], y.shape[2]))
    print(X.shape, y.shape)
    
    # Download the model
    model_url = f"http://tm.traininghost:32002/model/{trainingjobName}/{version}/Model.zip"
    response = requests.get(model_url)
    
    if response.status_code == 200:
        local_file_path = 'Model.zip'
        with open(local_file_path, 'wb') as file:
            file.write(response.content)
        print(f'Downloaded file saved to {local_file_path}')
    else:
        raise Exception(f"Failed to download model from {model_url}")
    
    # Extract the model
    zip_file_path = "./Model.zip"
    extract_to_dir = "./Model"
    if not os.path.exists(extract_to_dir):
        os.makedirs(extract_to_dir)
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to_dir)
    os.remove(zip_file_path)
    
    # Load the model
    model_path = "./Model/1"
    model = tf.keras.models.load_model(model_path)
    model.compile(loss='mse', optimizer='adam', metrics=['mse'])
    
    # Define a directory for checkpoints
    checkpoint_dir = "./checkpoints"
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)
    
    checkpoint_path = os.path.join(checkpoint_dir, "model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.h5")
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path,
        monitor='val_loss',
        save_best_only=True,
        save_weights_only=False,
        mode='min',
        verbose=0
    )
    
    # Retrain the model
    print("Retraining the model with checkpoints...")
    model.fit(X, y, batch_size=10, epochs=int(epochs), validation_split=0.2, callbacks=[checkpoint_callback])
    
    yhat = model.predict(X, verbose=0)
    
    # Increment the version number by 1
    new_version = str(int(version) + 1)
    retrained_model_path = f"./retrain/{new_version}"
    
    if not os.path.exists(retrained_model_path):
        os.makedirs(retrained_model_path)
    
    # Save the retrained model with new version
    model.save(retrained_model_path)
    print(f"Retrained model saved at {retrained_model_path} with version {new_version}")
    
    # Upload metrics
    accuracy = np.mean(np.absolute(np.asarray(y) - np.asarray(yhat)) < 5)
    data = {
        'metrics': [{'Accuracy': str(accuracy)}]
    }
    
    mm_sdk.upload_metrics(data, trainingjobName, new_version)
    mm_sdk.upload_model(retrained_model_path, trainingjobName, new_version)


In [12]:
@dsl.pipeline(
    name="qoe Pipeline retrain",
    description="qoe retrain",
)
def super_model_pipeline( 
    trainingjob_name: str, epochs: str, version: str):
    
    trainop=train_export_model(trainingjobName=trainingjob_name, epochs=epochs, version=version)
    trainop.set_caching_options(False)
    kubernetes.set_image_pull_policy(trainop, "IfNotPresent")

In [13]:
pipeline_func = super_model_pipeline
file_name = "qoe_model_pipeline_retrain_2"

kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.yaml'.format(file_name))

In [None]:
import requests
pipeline_name="qoe_Pipeline_retrain_2"
pipeline_file = file_name+'.yaml'
requests.post("http://tm.traininghost:32002/pipelines/{}/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')})

In [14]:
from kfp.client import Client
client = Client(host='http://ml-pipeline-ui.kubeflow:80')
# client.upload_pipeline_version(pipeline_package_path='pipeline.yaml',pipeline_version_name='v2', pipeline_name='sample-pipeline')
result = client.create_run_from_pipeline_package('qoe_model_pipeline_retrain_2.yaml', arguments={'trainingjob_name':'testing_influxdb_301', 'epochs':'5', 'version':'1'})