In [6]:
import yaml
import re
import os
from itertools import chain
from copy import deepcopy
from pathlib import Path
from dataclasses import dataclass
from miniutils.progress_bar import parallel_progbar

In [7]:
# yaml_loader.py
class PyTorchToolboxLoader(yaml.SafeLoader):
    pass

def var_constructor(loader, node):
    return Variable(name=node.value)

@dataclass
class Variable:
    name: str
        
def ref_constructor(loader, node):
    ref_node_name_and_output_name = node.value.split(".")
    assert len(ref_node_name_and_output_name) == 2
    ref_node_name, output_name = ref_node_name_and_output_name
    return Reference(ref_node_name=ref_node_name, output_name=output_name)

@dataclass
class Reference:
    ref_node_name: str
    output_name: str
        
def replace_config_variables(config, resource_key="Resources", variable_key="Variables"):
    config = deepcopy(config)
    try:
        replaced_resources = replace_variables(config[resource_key], config[variable_key])
        config[resource_key] = replaced_resources
    except KeyError:
        return config
    return config

def replace_variables(resources, variables):
    if isinstance(resources, dict):
        for name, resource in resources.items():
            resources[name] = replace_variables(resource, variables)
    elif isinstance(resources, list):
        for i, resource in enumerate(resources):
            resources[i] = replace_variables(resource, variables)
    elif isinstance(resources, Variable):
        resources = variables[resources.name]
    else:
        return resources
    return resources


# This tells the loader that when it sees "!path" it will pass the value proceeding the !path value into the path constructor
PyTorchToolboxLoader.add_constructor('!Var', var_constructor)
PyTorchToolboxLoader.add_constructor('!Ref', ref_constructor)

In [8]:
config = yaml.load(Path("yaml_loader_test_resource.yml").open("r"), Loader=PyTorchToolboxLoader)

In [9]:
replaced_config = replace_config_variables(config)

In [10]:
replaced_config

{'Variables': {'single_variable': 'hello', 'list_variable': ['foo', 'bar']},
 'Resources': {'TestSingleVariableReplacement': {'single_variable': 'hello'},
  'TestListVariableReplacement': {'list_variable': ['foo', 'bar']},
  'TestVariableInListReplacement': ['hello', ['foo', 'bar']],
  'TestVariableInListOfDictionaryReplacement': [{'dict_1': 'hello'},
   {'dict_2': ['foo', 'bar']}],
  'MockReferenceForTest': {'output': ['some_reference']},
  'TestFindReference': {'ref_var': Reference(ref_node_name='MockReferenceForTest', output_name='some_reference')},
  'TestFindReferenceInList': [Reference(ref_node_name='MockReferenceForTest', output_name='some_reference')],
  'TestFindReferenceInListOfDictionary': {'ref_var_in_list_of_dictionary': [{'key_1': Reference(ref_node_name='MockReferenceForTest', output_name='some_reference')}]}}}

In [11]:
for name, resource in config['Resources'].items():
    print(name)
    print(find_references(resource))

TestSingleVariableReplacement


NameError: name 'find_references' is not defined

#### Try on original configuration file

In [91]:
from collections import Counter
from typing import Optional, Dict, Any, Iterator, Iterable, Sequence, Union, Callable, Tuple, List, Any, Collection

import pandas as pd
import numpy as np

def listify(p=None, q=None):
    "Make `p` same length as `q`"
    if p is None:
        p = []
    elif isinstance(p, str):
        p = [p]
    elif not isinstance(p, Iterable):
        p = [p]
    n = q if type(q) == int else len(p) if q is None else len(q)
    if len(p) == 1: p = p * n
    assert len(p) == n, f'List len mismatch ({len(p)} vs {n})'
    return list(p)

def load_training_data(root_image_paths, root_label_paths, use_n_samples):
    train_df = load_training_data_df(root_image_paths, root_label_paths, use_n_samples)
    labels = train_df['Target'].values
    labels_one_hot = make_one_hot(labels, n_classes=28)
    return np.array(train_df['ImagePath'].values), np.array(labels), np.array(labels_one_hot)

def load_training_data_df(root_image_paths, root_label_paths, use_n_samples):
    labels_df = load_training_labels(root_label_paths)
    labels_df.sort_values(["Id"], ascending=[True], inplace=True)
    labels_df_sorted_by_id = labels_df

    # As some duplicate images were removed, we only use the images that have labels
    image_paths = load_training_images(root_image_paths)
    image_paths_with_labels = filter_image_paths_with_labels(image_paths, labels_df)

    # Sort by ID so that the labels and the image matches up
    train_df = labels_df_sorted_by_id
    image_paths_sorted_by_id = sorted(image_paths_with_labels, key=lambda path: path.stem)
    train_df["ImagePath"] = image_paths_sorted_by_id

    if use_n_samples:
        train_df = train_df.sample(use_n_samples)
    return train_df


def load_training_labels(training_labels_path):
    labels_df = pd.read_csv(training_labels_path)
    labels_df['Target'] = [[int(i) for i in s.split()] for s in labels_df['Target']]
    labels_df['TargetTuple'] = [tuple(t) for t in labels_df['Target']]
    return labels_df

def load_training_images(training_images_path):
    image_paths = []
    for p in listify(training_images_path):
        image_paths.extend(Path(p).glob("*"))
    return image_paths

def filter_image_paths_with_labels(image_paths, labels_df):
    # We use a Counter to filter in O(n) instead of O(n^2) time
    image_id_with_labels_lookup = Counter(labels_df['Id'])
    image_paths_used_for_training = [Path(p) for p in image_paths if
                                     image_id_with_labels_lookup.get(Path(p).stem) is not None]
    return np.array(image_paths_used_for_training)

def add_number_of_labels_column(train_df):
    train_df.sort_values(["TargetTuple"], ascending=[True])
    label_counts = Counter([tuple(l) for l in train_df['Target'].values])
    train_df['Count'] = [label_counts[tuple(l)] for l in train_df['Target']]
    return train_df


def add_one_hot_labels_index_column(train_df):
    target_tuple_to_label = {v: k for k, v in enumerate(train_df['TargetTuple'].unique())}
    train_df['OneHotLabelIndex'] = train_df['TargetTuple'].map(lambda x: target_tuple_to_label[x])
    return train_df


def load_testing_data(root_image_paths, use_n_samples=None):
    X = sorted(list(Path(root_image_paths).glob("*")), key=lambda p: p.stem)
    if use_n_samples is not None:
        X = X[:use_n_samples]
    return np.array(X)


def make_one_hot(labels, n_classes=28):
    one_hots = []
    for label in labels:
        one_hot = np.zeros(n_classes)
        for label_idx in label:
            one_hot[label_idx] = 1
        one_hots.append(one_hot.astype(np.float32))
    return one_hots

def calculate_mean_and_std_for_dataset(some_var, data_paths, shared_state):
    print(some_var)
    print(shared_state)
    flattened_data_paths = list(chain(*data_paths))
    means, stds = list(zip(*parallel_progbar(calculate_mean_and_std, flattened_data_paths)))
    mean = np.stack(means).mean(axis=0)
    std = np.stack(stds).mean(axis=0)
    logging.info(f"Mean of dataset is: {mean}")
    logging.info(f"Standard deviation of dataset is: {std}")
    return mean, std

def calculate_mean_and_std(img_path):
    img = open_numpy(img_path, with_image_wrapper=True).tensor
    mean = np.mean(img.numpy(), axis=(1, 2))
    std = np.std(img.numpy(), axis=(1, 2))
    return mean, std


lookup = {
    "load_testing_data": load_testing_data,
    "load_training_data": load_training_data,
    "calculate_mean_and_std_for_dataset": calculate_mean_and_std_for_dataset
}

In [124]:
# pipeline.py
from functools import partial
import networkx as nx
import inspect

class Pipeline:
    def __init__(self, graph, config):
        self.graph = graph
        self.config = config
        self.shared_state = {
            "config": deepcopy(config)
        }
        
    @classmethod
    def create_graph_from_config(cls, config):
        assert config.get("Resources") is not None, "There is no Resources key in the configuration file"
        graph = nx.DiGraph()
        flattened_resources = flatten_dict(config["Resources"])
        graph = cls._add_nodes_to_graph(graph, flattened_resources)
        graph = cls._add_edges_to_graph(graph)
        return cls(graph, config)
        
    @staticmethod
    def _add_nodes_to_graph(graph, resources):
        for name, resource in resources.items():
            references = find_references(resource)
            properties = load_properties_with_default_values(resource["properties"])
            node = Node(name=name, references=references, **properties)
            graph.add_node(name, node=node)
        return graph
    
    @staticmethod
    def _add_edges_to_graph(graph):
        for name, node_wrapper in graph.nodes(data=True):
            node = node_wrapper["node"]
            for reference in node.references:
                referenced_node_name = reference.ref_node_name
                assert referenced_node_name in graph.nodes, f"The reference: {referenced_node_name} in node: {node.name} does not exist"
                graph.add_edge(referenced_node_name, name)
        return graph
    
    def run(self, to_node=None):
        for node_name in nx.algorithms.dag.topological_sort(self.graph):
            if to_node == node_name:
                break
            node = self.graph.nodes(data=True)[node_name]["node"]
            self.run_node(node)
            
    def run_node(self, node):
        replace_arguments(self.graph, self.shared_state, node)
        node.create_output()

class Node:
    def __init__(self, name, references, pointer, partial, arguments, output_names):
        self.name = name
        self.references = references
        self.pointer = pointer
        self.arguments = arguments
        self.output_names = output_names
        self.partial = partial
        self.reference_replaced_arguments = None
        self.output = None
        
    def create_output(self):
        assert self.output_names is not None, f"There are no outputs defined for node: {self.name}"
        if self.partial:
            assert len(self.output_names) == 1, "If the output of node: {self.name} is partial, then there should be one output, {len(self.output_names)} outputs are found"
            self.output = {self.output_names[0]: partial(self.pointer, **self.reference_replaced_arguments)}
        else:
            output = self.pointer(**self.reference_replaced_arguments)
            iterable_output = [output] if len(self.output_names) == 1 else output
            self.output = {output_name: output_value for output_name, output_value in zip(self.output_names, iterable_output)}

# graph_construction.py
def flatten_dict(d):
    flattened = dict()
    for resources in d.values():
        flattened = {**flattened, **resources}
    return flattened

def find_references(resource):
    references = []
    if isinstance(resource, dict):
        for _, value in resource.items():
            references.extend(find_references(value))
    elif isinstance(resource, list):
        for value in resource:
            references.extend(find_references(value))
    elif isinstance(resource, Reference):
        references.append(resource)
    else:
        pass
    return references

def replace_arguments(graph, shared_state, node):
    try:
        arguments = node.arguments
        needs_shared_state = "shared_state" in inspect.signature(node.pointer).parameters
        print(needs_shared_state)
        if arguments is not None:
            reference_replaced_arguments = replace_references(graph, deepcopy(arguments))
            if needs_shared_state:
                reference_replaced_arguments["shared_state"] = shared_state
            node.reference_replaced_arguments = reference_replaced_arguments
    except AttributeError as e:
        print(e)
        pass   
    


def replace_references(graph, arguments):
    if isinstance(arguments, dict):
        for name, argument in arguments.items():
            arguments[name] = replace_references(graph, argument)
    elif isinstance(arguments, list):
        for i, argument in enumerate(arguments):
            arguments[i] = replace_references(graph, argument)
    elif isinstance(arguments, Reference):
        # reassign name it make the intent clearer
        reference = arguments
        ref_node = graph.nodes(data=True)[reference.ref_node_name]["node"]
        ref_node_outputs = ref_node.output
        assert ref_node_outputs is not None, f"Node: {reference.ref_node_name} has no output"
        assert reference.output_name in ref_node_outputs, f"Node: {reference_node_name} has no output named {reference.output_name}"
        arguments = ref_node_outputs[reference.output_name]
    else:
        return arguments
    return arguments

def load_properties_with_default_values(properties):
    assert properties["pointer"] in lookup, f"There is no lookup called: {pointer}"
    return {
        "pointer": lookup[properties["pointer"]],
        "partial": properties.get("partial", False),
        "arguments": properties.get("arguments", {}),
        "output_names": properties.get("output_names")
    }

In [125]:
config = yaml.load(Path("densenet121_two_input_fc_with_tta_template.yml").open("r"), Loader=PyTorchToolboxLoader)
replaced_config = replace_config_variables(config)

In [126]:
p = Pipeline.create_graph_from_config(replaced_config)

In [127]:
p.run()

False
False
True


In [128]:
p.graph.nodes(data=True)["LoadTestingData"]["node"].output

{'test_X': array([PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/00008af0-bad0-11e8-b2b8-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0000a892-bacf-11e8-b2b8-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0006faa6-bac7-11e8-b2b7-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0008baca-bad7-11e8-b2b9-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/000cce7e-bad4-11e8-b2b8-ac1f6b6435d0.npy')],
       dtype=object)}

In [129]:
p.graph.nodes(data=True)["LoadTestingData"]["node"].output

{'test_X': array([PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/00008af0-bad0-11e8-b2b8-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0000a892-bacf-11e8-b2b8-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0006faa6-bac7-11e8-b2b7-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0008baca-bad7-11e8-b2b9-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/000cce7e-bad4-11e8-b2b8-ac1f6b6435d0.npy')],
       dtype=object)}

In [130]:
p.graph.nodes(data=True)["LoadTrainingData"]["node"].output

{'train_X': array([PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18/11938_90_D11_1.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18/76087_1608_A3_3.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined/44b5e136-bbca-11e8-b2bc-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18/62471_1282_H9_2.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18/49781_727_B4_1.npy')],
       dtype=object),
 'train_y': array([list([25, 7, 2]), list([2]), list([25, 17]), list([0]), list([2])],
       dtype=object),
 'train_y_one_hot': array([[0., 0., 1., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
        [0., 0., 1., 0., 0., 0

In [131]:
p.graph.nodes(data=True)["LoadTestingData"]["node"].output

{'test_X': array([PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/00008af0-bad0-11e8-b2b8-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0000a892-bacf-11e8-b2b8-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0006faa6-bac7-11e8-b2b7-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0008baca-bad7-11e8-b2b9-ac1f6b6435d0.npy'),
        PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/000cce7e-bad4-11e8-b2b8-ac1f6b6435d0.npy')],
       dtype=object)}

In [132]:
p.graph.nodes(data=True)["CalculateMeanAndStdForDataset"]["node"].reference_replaced_arguments

{'data_paths': [array([PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18/11938_90_D11_1.npy'),
         PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18/76087_1608_A3_3.npy'),
         PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined/44b5e136-bbca-11e8-b2bc-ac1f6b6435d0.npy'),
         PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18/62471_1282_H9_2.npy'),
         PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18/49781_727_B4_1.npy')],
        dtype=object),
  array([PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/00008af0-bad0-11e8-b2b8-ac1f6b6435d0.npy'),
         PosixPath('/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined/0000a892-bacf-11e8-b2b8-ac1f6b

In [133]:
p.graph.nodes(data=True)["CalculateMeanAndStdForDataset"]["node"].output["calc_fn"]("WOW")

WOW
{'config': {'Variables': {'test_image_paths': '/home/kevin/Documents/Kaggle/human-protein-image-classification/data/test_combined', 'train_image_paths': ['/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined', '/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_combined_HPAv18'], 'train_label_paths': '/home/kevin/Documents/Kaggle/human-protein-image-classification/data/train_all_no_dupes.csv'}, 'Resources': {'Preprocessing': {'CalculateMeanAndStdForDataset': {'properties': {'pointer': 'calculate_mean_and_std_for_dataset', 'partial': True, 'arguments': {'data_paths': [Reference(ref_node_name='LoadTrainingData', output_name='train_X'), Reference(ref_node_name='LoadTestingData', output_name='test_X')]}, 'output_names': ['calc_fn']}}}, 'Data': {'LoadTestingData': {'properties': {'pointer': 'load_testing_data', 'arguments': {'use_n_samples': 5, 'root_image_paths': '/home/kevin/Documents/Kaggle/human-protein-image-classification/data

HBox(children=(IntProgress(value=0, max=10), HTML(value='')))




NameError: name 'open_numpy' is not defined