In [1]:
# import d3m utility
from d3m import utils, index
from d3m.container.dataset import D3MDatasetLoader, Dataset
from d3m.metadata.pipeline import PrimitiveStep, ArgumentType
from d3m.metadata.problem import parse_problem_description, TaskType, TaskSubtype
from d3m.metadata import base as metadata_base
from d3m.metadata.base import Metadata, ALL_ELEMENTS

# import evaluation packages
# remember to install it from https://gitlab.datadrivendiscovery.org/nist/nist_eval_output_validation_scoring
# from d3m_outputs import Predictions

# import python packages
import argparse
import json
import os
# import networkx
import csv
# from importlib import reload

import warnings
warnings.filterwarnings('ignore')


from multiprocessing import Pool

TOP_NUM = 20

In [2]:
from dsbox.pipeline.fitted_pipeline import FittedPipeline
from dsbox.template.runtime import Runtime, add_target_columns_metadata
from dsbox.template.search import get_target_columns

In [3]:
def load_one_pipeline(path) -> tuple:
    '''
    read a pipeline, return its rank and pipeline
    '''

    with open(path, "r") as f:
        pipeline = json.load(f)
        return pipeline["id"]


def load_for_dataset(path, res) -> list:
    '''
    load all the pipelines under pipeline_path and return a sorted list of them 
    '''

    res_loc = os.path.join(path, res)
    pipeline_loc = os.path.join(res_loc, "pipelines")

    pipelines_for_dataset = os.listdir(pipeline_loc)
    result = []
    if pipelines_for_dataset:
#         print(f"Reading pipelines for {res} ...") # are {pipelines_for_dataset} read
#         print("*" * 20)
        for p in pipelines_for_dataset:
            tmp_path = os.path.join(pipeline_loc, p)
            result.append((res, load_one_pipeline(tmp_path)) )

    else:
        print("No fitted pipeline generated for", res_loc)
        return []
#     result.sort(key=lambda tup: tup[1])
    return result


def load_all_fitted_pipeline(pipeline_path) -> dict:
    '''
    go through all the directorys from problem runs
    '''

    mypath = os.path.join(pipeline_path)
    allfile = os.listdir(mypath)
    pip_mapper = []
    for res in allfile:
        if not res.endswith(".txt"):
            pip_mapper += load_for_dataset(mypath, res)

    # print(allfile)
    return pip_mapper


def top_selection(mapper) -> dict:
    '''
    select top 10 pipelines from the dict of pipelines and also remove pipelines that are empty
    '''
    rmlist = []
    for key in mapper.keys():
        if len(mapper[key]) == 0:
            rmlist.append(key)
        if len(mapper[key]) >= TOP_NUM:
            mapper[key] = mapper[key][0:TOP_NUM]
    if rmlist:
        for r in rmlist:
            del mapper[r]
    return mapper


def create_fitted_pipelines_for_dataset(path, pipelines, log_dir) -> list:
    '''
    return a list of (fitted_pipeline, run)
    '''
    result = []
    for p in pipelines:
        yield FittedPipeline.load(folder_loc=path, pipeline_id=p[0], log_dir=log_dir)
#     return result


def load_test_dataset_for_pipeline(config_path) -> tuple:
    '''
    load and return test_dataset and test_problem given by configfile: test_config.json
    '''
    test_config_path = os.path.join(config_path, "test_config.json")
    with open(test_config_path, "r") as f:
        test_config = json.load(f)
        data_path = test_config["dataset_schema"]
        problem_path = test_config["problem_schema"]
    dataset = D3MDatasetLoader()
    if "file:" not in data_path:
        data_path = 'file://{dataset_path}'.format(dataset_path=os.path.abspath(data_path))
    with open(problem_path) as f:
        problem_doc = json.load(f)
        problem = Metadata(problem_doc)
    dataset = dataset.load(dataset_uri=data_path)
    dataset = add_target_columns_metadata(dataset, problem)
    return dataset, problem


def predict_and_write(pipeline, test_dataset, test_problem, saving_path) -> str:
    '''
    run produce for pipelines and store as .csvs and return stored path
    '''
    resID = test_problem.query(())["inputs"]["data"][0]["targets"][0]["resID"]
    test_length = test_dataset.metadata.query((resID, ALL_ELEMENTS))["dimension"]["length"]
    for v in range(0, test_length):
        types = test_dataset.metadata.query((resID, ALL_ELEMENTS, v))["semantic_types"]
        for t in types:
            if t == "https://metadata.datadrivendiscovery.org/types/TrueTarget":
                target_col_name = test_dataset.metadata.query((resID, ALL_ELEMENTS, v))["name"]
                break

    pipeline.produce(inputs=[test_dataset])
    prediction = pipeline.runtime.produce_outputs[-1]
    d3m_index = get_target_columns(test_dataset, test_problem)["d3mIndex"]
    d3m_index = d3m_index.reset_index().drop(columns=["index"])
    prediction_col_name = prediction.columns[-1]
    prediction["d3mIndex"] = d3m_index
    prediction = prediction[["d3mIndex", prediction_col_name]]
    prediction = prediction.rename(columns={prediction_col_name: target_col_name})
    # print(prediction.head())
#     with open(saving_path, "w") as f:
#         prediction.to_csv(f, index=False)
#     print("Prediction result wrote to", saving_path)
#     return saving_path
    return prediction


def score_prediction(prediction_file, ground_truth_dir) -> dict:
    '''
    using NIST to score the result and return a dict that contain informations
    '''
    res = {}
    return res

In [4]:
runs_loc = "/nfs1/dsbox-repo/runs/wade-run/ll0"
configs_loc = "/nfs1/dsbox-repo/muxin/all_confs/ll0"

In [5]:
%%time
all_pipelines = load_all_fitted_pipeline(runs_loc);
# run_pipelines = top_selection(all_pipelines)

CPU times: user 1.75 s, sys: 651 ms, total: 2.4 s
Wall time: 6.73 s


In [6]:
dataset_pipeline, pipe_ID = all_pipelines[0]
dataset_pipeline

'LL0_285_flags'

In [7]:
folder_path = os.path.join(runs_loc, dataset_pipeline)
dataset, problem = load_test_dataset_for_pipeline(os.path.join(configs_loc, dataset_pipeline))

In [8]:
dataset

Dataset(id='LL0_285_flags_dataset', name='flags', location_uris='('file:///nfs1/dsbox-repo/data/datasets-v31/training_datasets/LL0/LL0_285_flags/LL0_285_flags_dataset/datasetDoc.json',)')

In [9]:
fitted_pipe = FittedPipeline.load(folder_loc=folder_path, pipeline_id=pipe_ID, log_dir= os.path.join(folder_path, "logs"))
# fitted_pipelines = create_fitted_pipelines_for_dataset(folder_path, run_pipelines[dataset_pipeline], os.path.join(folder_path, "logs"))

Using TensorFlow backend.
Could not load the primitive: d3m.primitives.data.AudioReader
Traceback (most recent call last):
  File "/nfs1/dsbox-repo/qasemi/d3m/d3m/index.py", line 162, in load_all
    get_primitive(primitive_path)
  File "/nfs1/dsbox-repo/qasemi/d3m/d3m/index.py", line 103, in get_primitive
    return getattr(module, name)
  File "/nfs1/dsbox-repo/qasemi/d3m/d3m/namespace.py", line 107, in __getattr__
    primitive = entry_point.load(require=True)
  File "/nfs1/dsbox-repo/qasemi/miniconda/envs/d3m-devel/lib/python3.6/site-packages/pkg_resources/__init__.py", line 2318, in load
    return self.resolve()
  File "/nfs1/dsbox-repo/qasemi/miniconda/envs/d3m-devel/lib/python3.6/site-packages/pkg_resources/__init__.py", line 2324, in resolve
    module = __import__(self.module_name, fromlist=['__name__'], level=0)
  File "/nfs1/dsbox-repo/qasemi/common-primitives/common_primitives/audio_reader.py", line 8, in <module>
    import prctl  # type: ignore
ModuleNotFoundError: No mo

InvalidArgumentValueError: Not all hyper-parameters are specified: {'split_on_column_with_avg_len'}

In [10]:
fitted_pipe

NameError: name 'fitted_pipe' is not defined

In [10]:
with Pool() as p:
    p.map()


ERROR:d3m.index:Could not load the primitive: d3m.primitives.common_primitives.BayesianLogisticRegression
Traceback (most recent call last):
  File "/nfs1/dsbox-repo/qasemi/d3m/d3m/namespace.py", line 107, in __getattr__
    primitive = entry_point.load(require=True)
  File "/nfs1/dsbox-repo/qasemi/miniconda/envs/d3m-devel/lib/python3.6/site-packages/pkg_resources/__init__.py", line 2317, in load
    self.require(*args, **kwargs)
  File "/nfs1/dsbox-repo/qasemi/miniconda/envs/d3m-devel/lib/python3.6/site-packages/pkg_resources/__init__.py", line 2340, in require
    items = working_set.resolve(reqs, env, installer, extras=self.extras)
  File "/nfs1/dsbox-repo/qasemi/miniconda/envs/d3m-devel/lib/python3.6/site-packages/pkg_resources/__init__.py", line 779, in resolve
    raise VersionConflict(dist, req).with_context(dependent_req)
pkg_resources.ContextualVersionConflict: (pycurl 7.43.0.2 (/nfs1/dsbox-repo/qasemi/miniconda/envs/d3m-devel/lib/python3.6/site-packages), Requirement.parse('p

(<dsbox.pipeline.fitted_pipeline.FittedPipeline at 0x7f0ff1b8b208>,
 <dsbox.template.runtime.Runtime at 0x7f0ff1b9aa20>)

In [3]:
def main(args):
    # print(args.path, args.filename)
    all_pipelines = load_all_fitted_pipeline(args.path)
#     run_pipelines = top_selection(all_pipelines)

    for dataset_pipeline in run_pipelines.keys():
        print("Start testing", dataset_pipeline)
        folder_path = os.path.join(args.path, dataset_pipeline)
        dataset, problem = load_test_dataset_for_pipeline(os.path.join(args.configs, dataset_pipeline))
        print("Using dataset", dataset, "and problem description", problem)
        fitted_pipelines = create_fitted_pipelines_for_dataset(folder_path, run_pipelines[dataset_pipeline], os.path.join(folder_path, "logs"))
        for fitted_pipeline, run in fitted_pipelines:
            '''
            sequence associate with rank
            '''
            # print(fitted_pipelines)
            fname = fitted_pipeline.id + ".csv"
            dir_path = os.path.join(folder_path, "results")
            # if not os.path.exists(dir_path):
            #     try:
            #         os.makedirs(os.path.dirname(dir_path))
            #     except:
            #         print("path not created.")
            os.makedirs(dir_path, exist_ok=True)
            saving_path = os.path.join(dir_path, fname)
            prediction_file = predict_and_write(fitted_pipeline, dataset, problem, saving_path)
            # score_prediction(prediction_file)
            # fitted_pipeline.produce(inputs =[dataset])
            # prediction = fitted_pipeline.produce_outputs[-1]
    # print(run_pipelines)

In [None]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Test all fitted pipeline and give results")
    parser.add_argument("--path", help="Where the pipelines stored, example: /nfs1/dsbox-repo/muxin/ta2-outputs/seed", default="/nfs1/dsbox-repo/muxin/ta2-outputs/seed")
    parser.add_argument("--configs", help="Where the configuration files stored, example: /nfs1/dsbox-repo/muxin/all_confs/seed", default="/nfs1/dsbox-repo/muxin/all_confs/seed")
    parser.add_argument("--filename", help="Name of the output csv", default=-1)
    args = parser.parse_args()
    main(args)