# Calculating Value at Risk for a portfolio of bonds

In [8]:
# Imports
import kfp
import json, os, shutil
import kfp.dsl as dsl
from kfp.dsl import component, ContainerOp
from kfp.dsl.types import Dict, List
import kfp.compiler as compiler
from kfp.components import InputPath, InputTextFile, InputBinaryFile, OutputPath, OutputTextFile, OutputBinaryFile
from kubernetes.client.models.v1_local_object_reference import V1LocalObjectReference

In [9]:
def create_pvc():
    return dsl.VolumeOp(
        name='create-pipeline-volume' ,
        resource_name='pipeline-pvc-resource' ,
        storage_class='azurefile-csi',
        modes=dsl.VOLUME_MODE_RWM,
        size='10M'
    )

In [10]:
# STEP 0.5
@component
def clean(path:str):
    return ContainerOp(
        name = 'cleaner_',  
        image = 'docker.yq.credo.be/cust/yq-python-ml-notebook:v2.5.6.1', 
        command = ['python', 'scripts/BV_STEP_05.py'], 
        arguments =['--path', path],
        pvolumes = {"/mnt": create_pvc().volume}
    ).add_pod_label('yq-environment', 'true')

In [11]:
# STEP 1
@component
def generate(scenarios:int, bonds:int, seed:int, path:str):
    return ContainerOp(
        name = 'scenarios_',  
        image = 'docker.yq.credo.be/cust/yq-python-ml-notebook:v2.5.6.1', 
        command = ['python', 'scripts/BV_STEP_1.py'], 
        arguments =[ '--scenarios', scenarios
                   , '--bonds', bonds
                   , '--seed', seed
                   , '--path', path]
    ).add_pod_label('yq-environment', 'true')

In [12]:
# STEP 2
@component
def consolidate(path:str, confidence:float):
    return ContainerOp(
        name = 'consolidation_',  
        image = 'docker.yq.credo.be/cust/yq-python-ml-notebook:v2.5.6.1', 
        command = ['python', 'scripts/BV_STEP_2.py'], 
        arguments =[ '--path', path
                   , '--confidence', confidence]
    ).add_pod_label('yq-environment', 'true')

In [13]:
# BONDS VaR PIPELINE
runName = 'Bohdan_Test_Pipeline'
@dsl.pipeline(name='Bonds Test')
def bonds_pipeline(pods:list=[1],
                   scenarios:int=100,
                   bonds:int=22,
                   path:str="scripts/Bonds_VaR",
                   confidence:float=0.01):
    
    step_05 = clean(path)
    
    # Kubeflow Garbage Collection 
    #dsl.get_pipeline_conf().set_ttl_seconds_after_finished(86400)
    dsl.get_pipeline_conf().set_ttl_seconds_after_finished(720)
    
    #remove_pvc_task = dsl.ResourceOp(
    #    name='remove-tmp-pvc',
    #    k8s_resource=step_05.outputs["create-pipeline-volume-manifest"]
    #)
    #with dsl.ExitHandler(remove_pvc_task):
    with dsl.ParallelFor(pods) as pod:
        step_1 = generate(scenarios=scenarios, 
                          bonds=bonds,
                          seed=pod,
                          path=path).after(step_05).set_memory_limit('1G').set_cpu_request('1.0')

        step_2 = consolidate(path, confidence).after(step_1)

In [14]:
# with this one you can run it directly from notebook. Follow the "here" buttons below
if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(bonds_pipeline, arguments={}, run_name=runName, namespace='bohdana')

In [7]:
# use this when you want to compile and download your pipeline
if __name__ == '__main__':
    compiler.Compiler().compile(bonds_pipeline, 'scripts/BV_111P.yaml', type_check=True) 