In [25]:
import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath
from typing import NamedTuple

In [50]:
@func_to_container_op
def TigerGraph_dataloader(query: str) -> str:
    """Loads a dataset from a TigerGraph instance."""
    return f"dataset=TG[{query}]"

In [51]:
@func_to_container_op
def DataSplitter(dataset: str) -> NamedTuple('Outputs', [
        ('train_set', str),
        ('test_set', str),
    ]):
    """Splits inputted dataset into training and testing parts."""
    return (f"training_{dataset}", f"testing_{dataset}")

In [52]:
@func_to_container_op
def ClassicFeatures(dataset: str) -> str:
    """Generates classic features for a given dataset."""
    return f"classic_features[{dataset}]"

In [53]:
@func_to_container_op
def GraphFeatures(dataset: str) -> str:
    """Generates graph features for a given dataset."""
    return f"graph_features[{dataset}]"

In [54]:
@func_to_container_op
def Join_datasets(dataset_a: str, dataset_b: str) -> str:
    """Joins two datasets."""
    return f"{dataset_a}+{dataset_b}"

In [55]:
@func_to_container_op
def Train(dataset: str) -> str:
    """Trains a model on given dataset."""
    return f"Model[{dataset}]"

In [56]:
@func_to_container_op
def Validate(dataset: str, model: str) -> int:
    """Validates a model on given dataset."""
    return f"Score[{dataset},{model}]"

In [57]:
@func_to_container_op
def CompareAndVisualize(score_a: int, score_b: int) -> str:
    """Compares scores and visualize result."""
    return f"Plot[{score_a},{score_b}]"

In [60]:
@dsl.pipeline(
    name='TG Demo Proposal',
    description='Shows control flow of a demo TG pipeeine.'
)
def TGDemoProposal():
    data = TigerGraph_dataloader("Cora")
    dataset = DataSplitter(data.output)
    
    train_features = ClassicFeatures(dataset.outputs['train_set'])
    test_features = ClassicFeatures(dataset.outputs['test_set'])
    
    train_graph_features = GraphFeatures(dataset.outputs['train_set'])
    test_graph_features = GraphFeatures(dataset.outputs['test_set'])
    
    train_enriched_features = Join_datasets(train_features.output, train_graph_features.output)
    test_enriched_features = Join_datasets(test_features.output, test_graph_features.output)

    model_a = Train(train_features.output)
    model_b = Train(train_enriched_features.output)
    
    score_a = Validate(test_features.output, model_a.output)
    score_b = Validate(test_enriched_features.output, model_b.output)
    
    result = CompareAndVisualize(score_a.output, score_b.output)
    
    
    

In [61]:
# Compiling the pipeline
kfp.compiler.Compiler().compile(TGDemoProposal,  'TGDemo.yaml')