In [8]:
from src.dl_data_pipeline.validator import TypeValidator, ShapeValidator, MinMaxValidator, MeanVarValidator
from src.dl_data_pipeline.pipeline.pipe_node import PipeNode
from src.dl_data_pipeline.pipeline.data_pipeline import Pipeline
from src.dl_data_pipeline import deferred_execution
from src.dl_data_pipeline.process_functions import (any_process, process_2d)
import numpy as np

@deferred_execution
def mul(a, b):
    return a*b

input1 = PipeNode(name = "Input1")
input2 = PipeNode(name = "Input2")
x = any_process.rescale(input1, -1, 1)
x = process_2d.resize_with_max_distortion(x, (256, 256), 0)
x = process_2d.padding_2d(x, (256, 256), 1)
x = process_2d.image_hwc_to_chw(x)
out1 = mul(x, input2)

pipe = Pipeline([input1, input2], out1)
pipe.add_validator(TypeValidator(np.ndarray), 0)
pipe.add_validator(ShapeValidator((3, 256, 256)), 0)

pipe(np.random.rand(255, 253,3), 5)



array([[[ 5.        , -2.61292827, -4.42140078, ...,  4.48656157,
          5.        ,  5.        ],
        [ 5.        , -2.54745835,  4.10549163, ...,  2.77437772,
          5.        ,  5.        ],
        [ 5.        ,  2.15823947, -0.40813999, ...,  3.02380534,
          5.        ,  5.        ],
        ...,
        [ 5.        ,  2.08573226,  2.25624513, ..., -0.2017486 ,
          5.        ,  5.        ],
        [ 5.        , -1.10069021,  2.18526702, ..., -1.12005752,
          5.        ,  5.        ],
        [ 5.        ,  5.        ,  5.        , ...,  5.        ,
          5.        ,  5.        ]],

       [[ 5.        , -0.06202532, -4.49431224, ...,  4.11166736,
          5.        ,  5.        ],
        [ 5.        , -3.02523814, -2.02113289, ...,  3.81246693,
          5.        ,  5.        ],
        [ 5.        ,  2.64062473,  0.63177532, ..., -4.09521589,
          5.        ,  5.        ],
        ...,
        [ 5.        , -0.26310522, -4.60301597, ...,  

In [22]:
import subprocess
import os

MIN_COVER = 75
log_path = os.path.normpath("test/test.log")
if os.path.exists(log_path):
    os.remove(log_path)
result = subprocess.run(
    ["python3", "-m", "pytest", "--cov=src", "--cov-report=html:test/htmlcov", f"--cov-fail-under={MIN_COVER}", "test/"],
    stdout=open(log_path, "w"),stderr=subprocess.STDOUT)
exc = result.returncode
if exc != 0:
    print(f"Test fails. Exit code {exc}")
    print(f"Check log file : {log_path} for more info")
else:
    if os.name == 'nt':
        os.startfile(os.path.normpath("test/htmlcov/index.html"))
    elif os.name == 'posix':
        subprocess.run(["open", "test/htmlcov/index.html"])

Test fails. Exit code 1
Check log file : test/test.log for more info


In [19]:
from src.dl_data_pipeline import deferred
from src.dl_data_pipeline import Pipeline, DATA_PLACEHOLDER as DP
from src.dl_data_pipeline import process_functions
import numpy as np


# deferred.use_deferred_execution()

pipe = Pipeline()
pipe.add_forward_process(process_functions.any_process.rescale, 0, 1)

pipe.forward(np.random.rand(10)*50)

array([0.9695446 , 0.03420365, 1.        , 0.90440622, 0.00927965,
       0.78291949, 0.23193117, 0.89277142, 0.        , 0.58972763])