<p align="center">
  <img src="techno_volcano.png" />
</p>


# Introduction
In this notebook we will demonstrate using concepts from the entire course to provision a volume, train a model using the training operator and Volcano, serve the model with Kserve, and test the model once we have served it. Run the notebook from start to finish to see the pipeline. Once your pipeline is done, you can test the model.


In [None]:
import kfp
from kfp import dsl
from kfp import compiler
from kfp.client import Client


client = kfp.Client()
print(client.list_experiments())

In [None]:
@dsl.component(target_image='chasechristensen/volume_component:v2')
def create_volume(pvc_name: str,timeout_period: float,max_timeout: float,storage_size: float)-> str:
    # Define the PVC specifications
    config.load_incluster_config()
    v1 = client.CoreV1Api()
    pvc_name=generate_unique_volume_name(pvc_name)
    pvc_spec = client.V1PersistentVolumeClaim(
        metadata=client.V1ObjectMeta(name=pvc_name),
        spec=client.V1PersistentVolumeClaimSpec(
            access_modes=["ReadWriteOnce"],  # Adjust access modes as needed
            resources=client.V1ResourceRequirements(
                requests={"storage": str(storage_size)+"Gi"}
            ),
            storage_class_name="standard",
        ),
    )
    print(pvc_spec)
    namespace=get_namespace()
    created_pvc = v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc_spec)
    print(f"PVC {created_pvc.metadata.name} created in namespace {namespace}")
    monitor_pvc_status(api_instance=v1, namespace=namespace, pvc_name=pvc_name, timeout_period=timeout_period, max_timeout=max_timeout)
    return pvc_name

In [None]:
@dsl.component(target_image='chasechristensen/xgboost_train_component:v2')
def xgboost_train(claim_name: str, raw_manifest_url:str, timeout_period: float,max_timeout: float, worker_replicas: float,master_replicas: float) -> None:
    config.load_incluster_config()
    api_instance = client.CustomObjectsApi()
    crd_manifest = get_yaml_from_git(raw_manifest_url,claim_name)
    group = 'kubeflow.org'
    version = 'v1'
    plural = 'xgboostjobs'
    # Get namespace
    namespace = get_namespace()
    apply_custom_resource_conditionally(api_instance, group, version, namespace, plural, crd_manifest, timeout_period,max_timeout,worker_replicas,master_replicas)


In [None]:
@dsl.component(target_image='chasechristensen/xgboost_serve_component:v3')
def create_inference_service(timeout_period: float,max_timeout: float,name: str,storageUri: str,raw_manifest_url: str)-> None:
    config.load_incluster_config()
    api_instance = client.CustomObjectsApi()
    body = get_yaml_from_git(raw_manifest_url)
    group = 'serving.kserve.io'
    version = 'v1beta1'
    plural = 'inferenceservices'
    body['metadata']['name'] = name
    print( body['metadata']['name'])
    body['spec']['predictor']['xgboost']['storageUri']="pvc://"+storageUri
    namespace=get_namespace()
    try:
        api_response = api_instance.create_namespaced_custom_object(group=group, version=version, namespace=namespace, plural=plural, body=body)
        print("Custom resource applied. Status: %s" % api_response) 
        print(f"Monitoring status...")
        monitor_server_status(api_instance, group, version, namespace, plural, body['metadata']['name'],timeout_period,max_timeout)      
    except client.exceptions.ApiException as e:
        print(f"Failed to apply {name}: {e}")
        sys.exit(1)

In [None]:
@dsl.pipeline
def xgboost_train_pipeline(claim: str, train_manifest_url:str,timeout: float, max_time: float, workers: float,masters: float,storage: float, serve_manifest_url: str, model_name: str) -> None:
    task1=create_volume(pvc_name=claim,timeout_period=timeout,max_timeout=max_time,storage_size=storage)
    task2 = xgboost_train(claim_name=task1.output, raw_manifest_url=train_manifest_url,timeout_period=timeout,max_timeout=max_time,worker_replicas=workers,master_replicas=masters)
    task3 = create_inference_service(timeout_period=timeout,max_timeout=max_time,name=model_name,storageUri=task1.output,raw_manifest_url=serve_manifest_url).after(task2)
    

In [None]:
compiler.Compiler().compile(xgboost_train_pipeline, 'pipeline.yml')

In [165]:
client = kfp.Client()
run = client.create_run_from_pipeline_package(
    'pipeline.yml',
    arguments={
        'claim':"iris",
        'train_manifest_url':"https://raw.githubusercontent.com/chasecadet/kubeflow_course/main/kubeflow_integrations/volcano_iris_train.yml",
        'timeout':15.0,
        'max_time':20.0,
        'workers':3,
        'masters':1,
        'storage':4,
        'serve_manifest_url':"https://raw.githubusercontent.com/chasecadet/kubeflow_course/main/kubeflow_integrations/kserve_pvc.yml",
        'model_name': "iris",
    },
)

AttributeError: module 'kubernetes.client' has no attribute 'create_run_from_pipeline_package'

In [None]:
inference_input = {
  "inputs": [
    {
      "name": "feature_name_or_tensor",
      "shape": [1, 4],
      "datatype": "FP32",
      "data": [6.8, 2.8, 4.8, 1.4]
    }
  ]
}

isvc_url="http://iris-predictor.christensenc3526.svc.cluster.local/v2/models/iris/infer"
response = requests.post(isvc_url, json=inference_input)
print(response.text)


In [None]:
!curl http://iris-predictor.christensenc3526.svc.cluster.local