In [2]:
#| default_exp core

# run

> API details.

Features

* each pipeline step is a function with defined outputs
    * train, test, build_model, build_dataset
* Use hydra and pydantic for config management
* Add logging decorator

In [3]:
#|hide
from nbdev.showdoc import *

In [4]:
from fastcore.test import *

In [5]:
#|export
def foo(): pass

## User defined pipeline functions

The user can define any number of functions which may or may not depend on the output of of functions. For example

In [6]:
def get_dataset() -> str: 
    print("calling get_dataset")
    return "dataset"

In [7]:
def get_model() -> str: 
    print("calling get_model")

In [8]:
def get_model() -> str: 
    print("calling get_model")
    return "model"

In [9]:
def train(dataset: str, model: str) -> str: 
    print(f"calling train with: {locals()}")
    return f"{dataset=}, {model=}"

In [10]:
def test(trained_model: str) -> int: 
    print(f"calling test with {locals()}")
    return 2

## Tool

Since all of an empty list returns True, but we want it to return False

In [11]:
all([])

True

In [12]:
def all_keys_in_dict(dictionary, keys):
    in_dict = [k in dictionary.keys() for k in keys]
    return all(in_dict) if in_dict else False

In [13]:
test_eq(all_keys_in_dict({"A": 1, "B": 2},  ["A", "B"]), True) 
test_eq(all_keys_in_dict({"A": 1, "B": 2},  ["A"]), True) 
test_eq(all_keys_in_dict({"A": 1, "B": 2},  ["A", "B", "C"]), False) 
test_eq(all_keys_in_dict({"A": 1, "B": 2},  []), False)
test_eq(all_keys_in_dict({},  ["A"]), False)
test_eq(all_keys_in_dict({},  []), False)

Given a dictionary where the keys are the function names, the values are dictionaries with the functions and parameters defined as such

In [14]:
fns = {
    "get_dataset": {"fn": get_dataset, "params": []},
    "get_model": {"fn": get_model, "params": []},
    "train": {"fn": train, "params": ["get_dataset", "get_model"]},
    "test": {"fn": test, "params": ["train"]},
}

In [15]:
def run_fns(fns):
    
    # First run the independent functions
    indep = {fn["fn"].__name__: fn["fn"]() for fn in fns.values() if not fn["params"]}
    for k in indep.keys():
            del fns[k]
            
    # print(fns.keys())
    
    
    while fns:
        # print(fns)
        next_level = {fn["fn"].__name__: fn["fn"](*[indep[x] for x in fn["params"]]) for fn in fns.values() if all_keys_in_dict(indep, fn["params"])}
        indep.update(next_level)
        # print("indep/n", indep)
        
        # print("fns/n", fns.keys())
        
        for k in set(indep.keys()).intersection(set(fns.keys())):
            # print(k)
            del fns[k]
    
    return indep

In [16]:
run_fns(fns)

calling get_dataset
calling get_model
calling train with: {'dataset': 'dataset', 'model': 'model'}
calling test with {'trained_model': "dataset='dataset', model='model'"}


{'get_dataset': 'dataset',
 'get_model': 'model',
 'train': "dataset='dataset', model='model'",
 'test': 2}

## Parsing a config yaml

Next step is to parse a config yaml a dictionary similar to the fns above. The yaml will have a section pipeline_steps as show below

```
pipeline_steps:
    - get_dataset
    - get_model
    - train:get_dataset:get_model
    - test:train
```

In [17]:
import yaml

In [18]:
def load_yaml(filename):
    with open(filename, 'r') as stream:
        try:
            parsed_yaml=yaml.safe_load(stream)
            print(parsed_yaml)
        except yaml.YAMLError as exc:
            print(exc)
        
    return parsed_yaml

In [19]:
parsed_yaml = load_yaml("/home/IBEO.AS/jawa/projects/mlpipeline/src/config.yaml")

{'pipeline_steps': ['get_dataset', 'get_model', 'train:get_dataset:get_model', 'test:train']}


In [20]:
parsed_yaml["pipeline_steps"]

['get_dataset', 'get_model', 'train:get_dataset:get_model', 'test:train']

In [21]:
fns = {
    "get_dataset": {"fn": get_dataset, "params": []},
    "get_model": {"fn": get_model, "params": []},
    "train": {"fn": train, "params": ["get_dataset", "get_model"]},
    "test": {"fn": test, "params": ["train"]},
}

In [27]:
# TODO - carry on here

s = parsed_yaml["pipeline_steps"][1]
sl = s.split(":")
{"fn": sl[0], "params": sl[1:]}

{'fn': 'get_model', 'params': []}