### **03_video_insights.ipynb**
### **Video Insights Pipeline**

* ##### 01 - Install packages
* ##### 02 - Import packages
* ##### 03 - Create tasks
* ##### 04 - Create pipeline
* ##### 05 - Create pipeline yaml
* ##### 06 - Create pipeline run

### 01 - Install packages

In [None]:
!pip install kfp

### 02 - Import packages

In [None]:
import os
import sys
sys.path.append(os.path.dirname(os.getcwd()))

import kfp
import kfp.kubernetes as kubernetes

from components.delete_artifacts             import delete_artifacts
from components.download_video               import download_video
from components.extract_audio                import extract_audio
from components.extract_speeches             import extract_speeches
from components.extract_summary              import extract_summary
from components.prepare_video                import prepare_video
from components.translate_english_portuguese import translate_english_portuguese
from components.translate_english_spanish    import translate_english_spanish
from components.upload_artifacts             import upload_artifacts

### 03 - Create tasks

In [None]:
task_base_image = 'registry.access.redhat.com/ubi9/python-311'

In [None]:
download_video_op = kfp.dsl.component(
    func                = download_video,
    base_image          = task_base_image,
    packages_to_install = ['boto3']
)

In [None]:
prepare_video_op = kfp.dsl.component(
    func       = prepare_video,
    base_image = task_base_image
)

In [None]:
extract_audio_op = kfp.dsl.component(
    func                = extract_audio,
    base_image          = task_base_image,
    packages_to_install = ['moviepy']
)

In [None]:
extract_speeches_op = kfp.dsl.component(
    func                = extract_speeches,
    base_image          = task_base_image,
    packages_to_install = ['torch', 'transformers']
)

In [None]:
extract_summary_op = kfp.dsl.component(
    func                = extract_summary,
    base_image          = task_base_image,
    packages_to_install = ['torch', 'transformers']
)

In [None]:
translate_english_spanish_op = kfp.dsl.component(
    func                = translate_english_spanish,
    base_image          = task_base_image,
    packages_to_install = ['torch', 'sentencepiece', 'transformers']
)

In [None]:
translate_english_portuguese_op = kfp.dsl.component(
    func                = translate_english_portuguese,
    base_image          = task_base_image,
    packages_to_install = ['torch', 'transformers']
)

In [None]:
upload_artifacts_op = kfp.dsl.component(
    func                = upload_artifacts,
    base_image          = task_base_image,
    packages_to_install = ['boto3']
)

In [None]:
delete_artifacts_op = kfp.dsl.component(
    func       = delete_artifacts,
    base_image = task_base_image
)

### 04 - Create pipeline

In [None]:
pipeline_name        = '03_video_insights'
pipeline_description = 'Video Insights Pipeline'

In [None]:
@kfp.dsl.pipeline(
    name        = pipeline_name,
    description = pipeline_description
)
def pipeline(
    s3_service_name      : str,
    s3_endpoint_url      : str,
    s3_access_key_id     : str,
    s3_secret_access_key : str,
    s3_region            : str,
    s3_bucket            : str
):

    import os

    create_pvc_task = kubernetes.CreatePVC(
        pvc_name_suffix    = '-pipeline-pvc',
        size               = '1Gi',
        access_modes       = ['ReadWriteOnce'],
        storage_class_name = '<storage_class_name>'
    )

    pvc_directory = os.path.join('/', 'pipeline')
    pvc_name      = create_pvc_task.outputs['name']

    download_video_task = download_video_op(
        s3_service_name      = s3_service_name,
        s3_endpoint_url      = s3_endpoint_url,
        s3_access_key_id     = s3_access_key_id,
        s3_secret_access_key = s3_secret_access_key,
        s3_region            = s3_region,
        s3_bucket            = s3_bucket,
        pipeline_name        = pipeline_name
    )
    download_video_task.after(create_pvc_task)

    prepare_video_task = prepare_video_op()
    kubernetes.mount_pvc(
        task       = prepare_video_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory,
    )
    prepare_video_task.after(download_video_task)

    extract_audio_task = extract_audio_op()
    kubernetes.mount_pvc(
        task       = extract_audio_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory,
    )
    extract_audio_task.after(prepare_video_task)

    extract_speeches_task = extract_speeches_op()
    kubernetes.mount_pvc(
        task       = extract_speeches_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory,
    )
    extract_speeches_task.after(extract_audio_task)

    extract_summary_task = extract_summary_op()
    kubernetes.mount_pvc(
        task       = extract_summary_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory,
    )
    extract_summary_task.after(extract_speeches_task)

    translate_english_spanish_task = translate_english_spanish_op()
    kubernetes.mount_pvc(
        task       = translate_english_spanish_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory,
    )
    translate_english_spanish_task.after(extract_summary_task)

    translate_english_portuguese_task = translate_english_portuguese_op()
    kubernetes.mount_pvc(
        task       = translate_english_portuguese_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory,
    )
    translate_english_portuguese_task.after(extract_summary_task)

    upload_artifacts_task = upload_artifacts_op(
        s3_service_name      = s3_service_name,
        s3_endpoint_url      = s3_endpoint_url,
        s3_access_key_id     = s3_access_key_id,
        s3_secret_access_key = s3_secret_access_key,
        s3_region            = s3_region,
        s3_bucket            = s3_bucket,
        pipeline_name        = pipeline_name
    )
    kubernetes.mount_pvc(
        task       = upload_artifacts_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory,
    )
    upload_artifacts_task.after(translate_english_portuguese_task)

    delete_artifacts_task = delete_artifacts_op()
    kubernetes.mount_pvc(
        task       = upload_artifacts_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory,
    )
    delete_artifacts_task.after(upload_artifacts_task)

    kubernetes.DeletePVC(pvc_name = pvc_name).after(delete_artifacts_task)

### 05 - Create pipeline yaml

In [None]:
pipeline_package_path = os.path.join('yaml', f'{ pipeline_name }.yaml')

In [None]:
kfp.compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path  = pipeline_package_path
)

### 06 - Create pipeline run

In [None]:
kubeflow_host  = '<kubeflow_host>'
kubeflow_token = '<kubeflow_token>'

In [None]:
pipeline_arguments = {
    's3_service_name'      : 's3',
    's3_endpoint_url'      : '<s3_endpoint_url>',
    's3_access_key_id'     : '<s3_access_key_id>',
    's3_secret_access_key' : '<s3_secret_access_key>',
    's3_region'            : '<s3_region>',
    's3_bucket'            : '<s3_bucket>',
}

In [None]:
kfp.client.Client(host = kubeflow_host, existing_token = kubeflow_token).create_run_from_pipeline_package(
    pipeline_file = pipeline_package_path,
    arguments     = pipeline_arguments
)