In [1]:
# !git clone https://github.com/WongKinYiu/yolov7.git /home/jovyan/yolov7

# mkdir weights
# curl https://github.com/WongKinYiu/yolov7/releases/download/v0.1/yolov7_training.pt > weights/yolov7_training.pt

In [3]:
# !python -m ipykernel install --user --name yolov7 --display-name yolov7
# !{sys.executable} -m pip install -r requirements.txt --quiet
!{sys.executable} -m pip install kfp==1.8.22 --user --quiet

In [1]:
import sys
import os

import kfp
from kfp import compiler, dsl, components
from kfp.components import InputPath, OutputPath
from typing import NamedTuple
from kfp.dsl import component

from typing import Dict, List

# from kfp import compiler
# from kfp import dsl
# from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component


In [2]:
kfp.__version__

'1.8.22'

In [3]:
# @component(
#     packages_to_install=['google-cloud-storage', 'numpy', 'chevron', 'tarfile']
# )
def prepare_labelstudio_data_for_yolo(
        # model_name: str,
        project: str,
        labels: list,
        namespace: str,
        domain: str,
        config_template_url: str,
        transfer_weights_url: str,
        hyp_file_url: str,
        train_frac: float,
        validate_frac: float,
        workspace_path: OutputPath(str)
) -> NamedTuple('YOLOArgs',
                [('data_file', str),
                 ('config_file', str),
                 ('weights_file', str),
                 ('hyp_file', str),
                 ('names_file', str)]):
    '''
    Prepares Labelstudio Data for YOLO training
    Example weights_url: https://github.com/WongKinYiu/yolov7/releases/download/v0.1/yolov7_training.pt
    '''

    print(f'Prepares Labelstudio Data for training of YOLOv7')
    import chevron
    from google.cloud import storage
    import json
    import os
    from pathlib import Path
    # import numpy as np
    from urllib.parse import urlparse  #, ParseResult
    # from random import shuffle
    from collections import namedtuple
    import tarfile

    def create_directory(directory_name, basedir=None):
        if basedir is None:
            pth = Path(directory_name)
        else:
            pth = Path(os.path.join(basedir, directory_name))
        pth.mkdir(parents=True, exist_ok=True)
        return pth.as_posix()

    # Directory structure
    basedir = create_directory("/workspace")  # /workspace
    configdir = create_directory('config', basedir)  # /workspace/model
    datadir = create_directory('dataset', basedir)  # /workspace/dataset
    imagesdir = create_directory('images', datadir)  # /workspace/dataset/images
    trainimagesdir = create_directory('train', imagesdir)  # /workspace/dataset/images/train
    valimagesdir = create_directory('val', imagesdir)  # /workspace/dataset/images/val
    testimagesdir = create_directory('test', imagesdir)  # /workspace/dataset/images/test
    labelsdir = create_directory('labels', datadir)  # /workspace/dataset/labels
    trainlabelsdir = create_directory('train', labelsdir)  # /workspace/dataset/labels/train
    vallabelsdir = create_directory('val', labelsdir)  # /workspace/dataset/labels/val
    testlabelsdir = create_directory('test', labelsdir)  # /workspace/dataset/labels/test

    # Remove lost and found folder
    try:
        Path.rmdir("/workspace/lost+found")
    except:
        print("Could Not Remove 'lost+found'")

    bucket_name = f"{namespace}.{domain}"

    print(namespace)
    print(domain)
    print(bucket_name)

    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)

    my_prefix = f"label-studio/projects/{project}/results/"  # the name of the subfolder
    blobs = list(bucket.list_blobs(prefix=my_prefix, delimiter='/'))

    if len(blobs) == 0:
        print("NO RESULTS FOUND FOR {}".format(project))
        # output = namedtuple('YOLOArgs', ['data_file', 'config_file', 'transfer_weights_file', 'hyp_file', 'names_file'])
        # return output("", "", "", "", "")
        return

    test_frac = 1.0 - train_frac - validate_frac
    # shuffle(blobs)
    # need to pre-define specific train/val/test indices (avoid risk of leakage - especially with video data)
    # This needs to be discussed further. Sampling is challenging.
    # shuffle will work for random images that have no temporal relationships provided we ALWAYS keep the same
    # test set as a hold out, i.e. don't resample every time we retrain.
    m = len(blobs)
    train_end = int(train_frac * m)
    validate_end = int(validate_frac * m) + train_end

    print(f'Number of images: {m}')
    print(f'Train set size: {int(train_frac * m)}')
    print(f'Validate set size: {int(validate_frac * m)}')
    print(f'Test set size: {int(test_frac * m)}')

    def save_to_yolo_fmt(file_name, annotation, local_image_folder, local_label_folder):
        # <object-class> - integer number of object from 0 to (classes-1)
        # <x> <y> <width> <height> - float values relative to width and height of image, it can be equal from (0.0 to 1.0]
        # for example: <x> = <absolute_x> / <image_width> or <height> = <absolute_height> / <image_height>
        # attention: <x> <y> - are center of rectangle (are not top-left corner)

        # file_name is the name of the label studio file to which an annotation is stored (usually a numerical ID)
        # annotation is the parsed json content of a label studio annotation file
        # local_image/label_folder tells us where to save annotations and images for training

        image_data_url = annotation['task']['data']['image']
        try:
            _, ext = os.path.splitext(image_data_url)
            parsed_url = urlparse(image_data_url)
            blob = bucket.blob(parsed_url.path[1:])
            image_destination = os.path.join(local_image_folder, file_name + ext)
            print(image_destination)
            blob.download_to_filename(image_destination)
        except Exception as err_msg:
            print(f"WARNING: {err_msg}\n - Could not download {image_data_url}, the traing data will be incomplete.")
            return

        with open(os.path.join(local_label_folder, file_name + '.txt'), 'w') as f:
            for r in annotation['result']:
                scale = 100.0

                # Labelstudio has different format depending on original size in result:
                if ('original_width' not in r or r['original_width'] == 1) or (
                        'original_height' not in r or r['original_height'] == 1):
                    scale = 10000.0

                w = r['value']['width'] / scale
                h = r['value']['height'] / scale
                x = (r['value']['x'] + (r['value']['width'] / 2)) / scale
                y = (r['value']['y'] + (r['value']['height'] / 2)) / scale

                for l in r['value']['rectanglelabels']:
                    if l in labels:
                        idx = labels.index(l)
                        # There are a lot of annotations of very small objects, sort them out
                        if (w > 0.005) and (h > 0.005):
                            f.write(f'{idx} {x} {y} {w} {h}\n')
                            print(f'Adding:   {idx} {x} {y} {w} {h}')
                        else:
                            print(f'Skipping: {idx} {x} {y} {w} {h}')

    print(trainimagesdir)
    print(valimagesdir)
    print(testimagesdir)
    print(trainlabelsdir)
    print(vallabelsdir)
    print(testlabelsdir)

    for i, blob in enumerate(blobs):
        if (blob.name != my_prefix):  # ignoring the subfolder itself
            if i < train_end:
                file_name = blob.name.replace(my_prefix, "")
                print(f'adding {i}, {file_name} to train')
                annotation = json.loads(blob.download_as_string().decode())
                save_to_yolo_fmt(file_name, annotation, trainimagesdir, trainlabelsdir)
            elif i < validate_end:
                file_name = blob.name.replace(my_prefix, "")
                print(f'adding {i}, {file_name} to val')
                annotation = json.loads(blob.download_as_string().decode())
                save_to_yolo_fmt(file_name, annotation, valimagesdir, vallabelsdir)
            else:
                file_name = blob.name.replace(my_prefix, "")
                print(f'adding {i}, {file_name} to test')
                annotation = json.loads(blob.download_as_string().decode())
                save_to_yolo_fmt(file_name, annotation, testimagesdir, testlabelsdir)

    # DATA
    print(f"Writing DATA file")
    data_file = os.path.join(configdir, f'data.yaml')
    print(f"- DST: {data_file}")

    num_classes = len(labels)

    names_file = os.path.join(datadir, "object.names")
    train_data_file = os.path.join(datadir, "train.txt")
    val_data_file = os.path.join(datadir, "val.txt")
    test_data_file = os.path.join(datadir, "test.txt")

    with open(data_file, 'w') as out:
        out.write(f'train: {train_data_file}\n')
        out.write(f'val: {val_data_file}\n')
        out.write(f'test: {test_data_file}\n')
        out.write(f'nc: {num_classes}\n')
        out.write(f'names: [{", ".join(labels)}]')

    with open(names_file, 'w') as out:
        for l in labels:
            out.write(l + '\n')

    # /workspace/dataset/train.txt
    with open(train_data_file, 'w') as out:
        for f in os.listdir(trainimagesdir):
            out.write(f'{os.path.join(trainimagesdir, f)}\n')

    # /workspace/dataset/val.txt
    with open(val_data_file, 'w') as out:
        for f in os.listdir(valimagesdir):
            out.write(f'{os.path.join(valimagesdir, f)}\n')

    # /workspace/dataset/test.txt
    with open(test_data_file, 'w') as out:
        for f in os.listdir(testimagesdir):
            out.write(f'{os.path.join(testimagesdir, f)}\n')

    # CONFIG (from GCS)
    print(f"Writing YOLOv7 CONFIG file")
    print(f"- SRC: {config_template_url}")
    custom_config_file = os.path.join(configdir, "config.yaml")
    print(f"- DST: {custom_config_file}")
    parsed_url = urlparse(config_template_url)
    blob = bucket.blob(parsed_url.path[1:])
    with open(custom_config_file, 'w') as fout:
        parsed_config = chevron.render(blob.download_as_string().decode(),
                                       {"num_classes": num_classes})
        # to configure anything else, create a copy, modify, and reference.
        # full parameterization would likely be more confusing than helpful
        fout.write(parsed_config)
        print(parsed_config)

    # HYP FILE (from GCS)
    print("Copying HYP file")
    print(f"- SRC: {hyp_file_url}")
    hyp_file = os.path.join(configdir, "hyp.yaml")
    print(f"- DST: {hyp_file}")
    parsed_url = urlparse(hyp_file_url)
    blob = bucket.blob(parsed_url.path[1:])
    blob.download_to_filename(hyp_file)

    # PRETRAINED WEIGHTS (from GCS)
    transfer_weights_file = ""
    if transfer_weights_url:
        print(f"Copying weights for transfer learning")
        print(f"- SRC: {transfer_weights_url}")
        transfer_weights_file = os.path.join(configdir, 'transfer_weights.pt')
        print(f"- DST: {transfer_weights_file}")
        parsed_url = urlparse(transfer_weights_url)
        blob = bucket.blob(parsed_url.path[1:])
        blob.download_to_filename(transfer_weights_file)

    # COMPRESS WORKSPACE
    def make_tarfile(output_filename, source_dir):
        with tarfile.open(output_filename, "w:gz") as tar:
            tar.add(source_dir, arcname=os.path.basename(source_dir))

    make_tarfile(workspace_path, basedir)

    output = namedtuple('YOLOArgs', ['data_file', 'config_file', 'transfer_weights_file', 'hyp_file', 'names_file'])
    return output(data_file, custom_config_file, transfer_weights_file, hyp_file, names_file)


prepare_labelstudio_data_for_yolo_op = components.create_component_from_func(
    prepare_labelstudio_data_for_yolo,
    base_image='python:3.8',
    packages_to_install=['google-cloud-storage', 'numpy', 'chevron'],
    output_component_file='prepare_labelstudio_data_for_yolo.yaml')

In [4]:
def train_op(model_name: str,
             workspace: InputPath(str),
             weights: str,
             data: str,
             config: str,
             hyp: str,
             batch_size: int,
             img_size_train: int,
             img_size_test: int,
             epochs: int):
    return dsl.ContainerOp(
        name='Train YOLOv7',
        image='gcr.io/teknoir/yolov7-training:main-amd64',  # nvidia pytorch base image is VERY large
        command=['sh', '-c'],
        arguments=[f'''mkdir -p /workspace && tar -xzvf /workspace.tgz -C / && 
            echo "WEIGHTS: {weights}" &&
            echo "DATA: {data}" &&
            echo "CONFIG: {config}" &&
            echo "HYP: {hyp}" &&
            python3 train.py --workers 0 --device 0 --batch-size {batch_size} --data {data} --img {img_size_train} {img_size_test} --cfg {config} --name {model_name} --weights {weights} --hyp {hyp} --epochs {epochs} --project=/workspace --name=model --exist-ok'''],
        container_kwargs={'env': [V1EnvVar('MODEL_NAME', model_name)]},
        artifact_argument_paths=[workspace],
        file_outputs={
            'best_model': '/workspace/model/weights/best.pt',
            'last_model': '/workspace/model/weights/last.pt',
            'names': '/workspace/dataset/object.names',
            'data': '/workspace/config/data.yaml',
            'config': '/workspace/config/config.yaml',
            'hyp': '/workspace/config/hyp.yaml',
            'F1_curve_image': '/workspace/model/F1_curve.png',
            'P_curve_image': '/workspace/model/P_curve.png',
            'PR_curve_image': '/workspace/model/PR_curve.png',
            'R_curve_image': '/workspace/model/R_curve.png',
            'results_txt': '/workspace/model/results.txt',
            'results_image': '/workspace/model/results.png',
            'confusion_matrix_image': '/workspace/model/confusion_matrix.png',
        }
    )

In [19]:
# @component(
#     packages_to_install=['google-cloud-storage', 'six']
# )
def save_yolo_model(model_name: str,
                    namespace: str,
                    domain: str,
                    description: str,
                    best_model: InputPath(str),
                    last_model: InputPath(str),
                    names: InputPath(str),
                    data: InputPath(str),
                    config: InputPath(str),
                    hyp: InputPath(str),
                    f1_curve: InputPath(str),
                    p_curve: InputPath(str),
                    pr_curve: InputPath(str),
                    r_curve: InputPath(str),
                    results_txt: InputPath(str),
                    results_image: InputPath(str),
                    confusion_matrix_image: InputPath(str)):
    # -> NamedTuple('YOLOModel', [('outputs', dict)]):
    from google.cloud import storage
    import os
    from collections import namedtuple
    # from pathlib import Path

    def upload_blob(bucket_name, source_file_name, destination_blob_name):
        """Uploads a file to the bucket."""
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(source_file_name)
        print("File {} uploaded to {}.".format(source_file_name, destination_blob_name))

    bucket_name = f"{namespace}.{domain}"
    destination_prefix = os.path.join('models', model_name)

    # 'yolov7-tiny.weights', 'yolov7-tiny.cfg', 'object.names', 'confusion_matrix.png',
    # 'F1_curve.png', 'P_curve.png', 'PR_curve.png', 'R_curve.png', 'results.png', 
    # 'hyp.yaml', 'opt.yaml', ...

    #Upload best weights file (runs/train/<model name>/weights/best.pt)
    upload_blob(bucket_name, best_model, os.path.join(destination_prefix, "best_weights.pt"))

    #Upload last weights file as well
    upload_blob(bucket_name, last_model, os.path.join(destination_prefix, "last_weights.pt"))

    #Upload object names - eventually deprecate in favor of data.yaml
    upload_blob(bucket_name, names, os.path.join(destination_prefix, "object.names"))

    #Upload data.yaml
    upload_blob(bucket_name, data, os.path.join(destination_prefix, "training_data.yaml"))

    #Upload config.yaml
    upload_blob(bucket_name, config, os.path.join(destination_prefix, "training_config.yaml"))

    #Upload hyp.yaml
    upload_blob(bucket_name, hyp, os.path.join(destination_prefix, "training_hyp.yaml"))
    
    #Upload
    upload_blob(bucket_name, f1_curve, os.path.join(destination_prefix, "F1_curve.png"))
    upload_blob(bucket_name, p_curve, os.path.join(destination_prefix, "P_curve.png"))
    upload_blob(bucket_name, pr_curve, os.path.join(destination_prefix, "PR_curve.png"))
    upload_blob(bucket_name, r_curve, os.path.join(destination_prefix, "R_curve.png"))
    upload_blob(bucket_name, results_txt, os.path.join(destination_prefix, "results.txt"))
    upload_blob(bucket_name, results_image, os.path.join(destination_prefix, "results.png"))
    upload_blob(bucket_name, confusion_matrix_image, os.path.join(destination_prefix, "confusion_matrix.png"))

    catalog_info_t = f'''
---
apiVersion: backstage.io/v1alpha1
kind: Component
metadata:
  title: {model_name}
  name: {model_name}
  namespace: {namespace}
  description: {description}
  annotations:
    'teknoir.org/model-registry/f1_curve': {os.path.join(destination_prefix, "F1_curve.png")}
    'teknoir.org/model-registry/p_curve': {os.path.join(destination_prefix, "P_curve.png")}
    'teknoir.org/model-registry/pr_curve': {os.path.join(destination_prefix, "PR_curve.png")}
    'teknoir.org/model-registry/r_curve': {os.path.join(destination_prefix, "R_curve.png")}
    'teknoir.org/model-registry/results_txt': {os.path.join(destination_prefix, "results.txt")}
    'teknoir.org/model-registry/results': {os.path.join(destination_prefix, "results.png")}
    'teknoir.org/model-registry/confusion_matrix': {os.path.join(destination_prefix, "confusion_matrix.png")}
spec:
  type: model
  owner: group:{namespace}/{namespace}
  lifecycle: experimental
  system: system:{namespace}/{namespace}
'''
    with open("catalog-info.yaml", 'w') as f:
        f.write(catalog_info_t)
    upload_blob(bucket_name, "catalog-info.yaml", os.path.join(destination_prefix, "catalog-info.yaml"))


save_yolo_model_op = components.create_component_from_func(
    save_yolo_model,
    base_image='python:3.8',
    packages_to_install=['google-cloud-storage', 'six'],
    output_component_file='save_yolo_model.yaml')

In [20]:
# import os
# import kubernetes as k8s
from kubernetes.client.models import V1EnvVar, V1ContainerPort

pipeline_name = 'Train YOLOv7 with Labelstudio and 1 T4 GPU'
pipeline_description = 'A pipeline to train YOLOv7 on a custom data set from Teknoir Labelstudio. Transfer learn from "transfer_weights."'


@dsl.pipeline(name=pipeline_name, description=pipeline_description)
def train_yolov7(model_name: str,
                 labelstudio_project: str,
                 labels: list,
                 namespace: str,
                 domain: str,
                 config_template_url: str = 'gs://teknoir-ai.teknoir.cloud/yolov7/cfg/yolov7-tiny-config-template.yaml',
                 transfer_weights_url: str = 'gs://teknoir-ai.teknoir.cloud/yolov7/weights/yolov7_training.pt',
                 hyp_file_url: str = 'gs://teknoir-ai.teknoir.cloud/yolov7/hyp/hyp.scratch.tiny.yaml',
                 train_frac: float = 0.7,
                 # fraction of samples to use for training (test_frac = 1 - train_frac - validate_frac)
                 validate_frac: float = 0.2,
                 # fraction of samples to use for validation (test_frac = 1 - train_frac - validate_frac)
                 batch_size: int = 16,
                 img_size_train: int = 416,
                 img_size_test: int = 416,
                 epochs: int = 300,
                 ):
    """Train YOLOv7 with Labelstudio"""

    gpu_instance_type = "nvidia-tesla-t4"  # https://cloud.google.com/compute/docs/gpus

    prepare = prepare_labelstudio_data_for_yolo_op(
        project=labelstudio_project,
        labels=labels,
        namespace=namespace,
        domain=domain,
        config_template_url=config_template_url,
        transfer_weights_url=transfer_weights_url,
        hyp_file_url=hyp_file_url,
        train_frac=train_frac,
        validate_frac=validate_frac
    )
    prepare.execution_options.caching_strategy.max_cache_staleness = "P0D"


    train = train_op(
        workspace=kfp.dsl.InputArgumentPath(argument=prepare.outputs['workspace'], path='/workspace.tgz'),
        model_name=model_name,
        weights=prepare.outputs['weights_file'],
        data=prepare.outputs['data_file'],
        config=prepare.outputs['config_file'],
        hyp=prepare.outputs['hyp_file'],
        batch_size=batch_size,
        img_size_train=img_size_train,
        img_size_test=img_size_test,
        epochs=epochs)
    train.after(prepare) \
        .add_port(V1ContainerPort(container_port=8099)) \
        .add_port(V1ContainerPort(container_port=8079)) \
        .set_gpu_limit(1) \
        .add_node_selector_constraint('cloud.google.com/gke-accelerator', gpu_instance_type) \
        .execution_options.caching_strategy.max_cache_staleness = "P0D"


    save_model = save_yolo_model_op(
        model_name=model_name,
        namespace=namespace,
        domain=domain,
        description=pipeline_description,
        best_model=train.outputs['best_model'],
        last_model=train.outputs['last_model'],
        names=train.outputs['names'],
        data=train.outputs['data'],
        config=train.outputs['config'],
        hyp=train.outputs['hyp'],
        f1_curve=train.outputs['F1_curve_image'],
        p_curve=train.outputs['P_curve_image'],
        pr_curve=train.outputs['PR_curve_image'],
        r_curve=train.outputs['R_curve_image'],
        results_txt=train.outputs['results_txt'],
        results_image=train.outputs['results_image'],
        confusion_matrix_image=train.outputs['confusion_matrix_image']
    ).after(train)
    save_model.execution_options.caching_strategy.max_cache_staleness = "P0D"

In [21]:
client = kfp.Client(namespace='teknoir')
# 5h timeout
pipeline_conf = kfp.dsl.PipelineConf().set_timeout(3600 * 5).set_image_pull_policy(policy="Always")
workflow = kfp.compiler.Compiler().compile(pipeline_func=train_yolov7,
                                           package_path="train_yolov7.yaml",
                                           pipeline_conf=pipeline_conf)

ERROR:root:Failed to read a token from file '/var/run/secrets/kubeflow/pipelines/token' ([Errno 2] No such file or directory: '/var/run/secrets/kubeflow/pipelines/token').


In [22]:
import uuid
import json

pipeline_version_file = pipeline_file = 'train_yolov7.yaml'

client = kfp.Client(namespace='teknoir')
# 5h timeout
pipeline_conf = kfp.dsl.PipelineConf().set_timeout(3600 * 5).set_image_pull_policy(policy="Always")
workflow = kfp.compiler.Compiler().compile(pipeline_func=train_yolov7,
                                           package_path=pipeline_file,
                                           pipeline_conf=pipeline_conf)

filter = json.dumps({'predicates': [{'key': 'name',
                                     'op': 1,
                                     'string_value': pipeline_name}]})
pipelines = client.pipelines.list_pipelines(filter=filter)

if not pipelines.pipelines:
    pipeline = client.pipeline_uploads.upload_pipeline(pipeline_file,
                                                       name=pipeline_name,
                                                       description=pipeline_description)
else:
    pipeline_version_name = pipeline_name + f' - {str(uuid.uuid4())[:6]}'
    pipeline_version = client.pipeline_uploads.upload_pipeline_version(pipeline_version_file,
                                                                       name=pipeline_version_name,
                                                                       pipelineid=pipelines.pipelines[0].id)

ERROR:root:Failed to read a token from file '/var/run/secrets/kubeflow/pipelines/token' ([Errno 2] No such file or directory: '/var/run/secrets/kubeflow/pipelines/token').
