Skip to content

Commit

Permalink
Merge pull request #95 from /issues/94/xdist-fixtures
Browse files Browse the repository at this point in the history
Allow for distributed tests and reduce inference test time.

Closes #94
  • Loading branch information
tallamjr committed Jun 12, 2022
2 parents 64b5024 + c0d4817 commit e90513c
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 165 deletions.
168 changes: 61 additions & 107 deletions astronet/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import inspect
import json
import subprocess

import numpy as np
import pandas as pd
import pytest
import tensorflow as tf
from filelock import FileLock

from astronet.constants import ASTRONET_WORKING_DIRECTORY as asnwd
from astronet.constants import LOCAL_DEBUG
from astronet.utils import astronet_logger

log = astronet_logger(__file__)

ISA = subprocess.run(
"uname -m",
Expand All @@ -20,8 +27,59 @@
BATCH_SIZE = 64


@pytest.fixture
def fixt_UGRIZY_wZ_numpy(scope="session"):
class NumpyEncoder(json.JSONEncoder):
"""Special json encoder for numpy types"""

def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)


def pandas_encoder(obj):
# TODO: Reshape required to fix ValueError: Must pass 2-d input. shape=(869864, 100, 6)
# Refs:
# - https://stackoverflow.com/a/32034565/4521950
# - https://stackoverflow.com/a/32838859/4521950
# - https://stackoverflow.com/a/44752209/4521950
log.critical(f"{inspect.stack()[0].function} -- Not Fully Implemented Yet")
return pd.DataFrame(obj).to_json(orient="values")


@pytest.fixture(scope="session")
def get_fixt_UGRIZY_wZ(tmp_path_factory, worker_id, name="fixt_UGRIZY_wZ"):
if not worker_id:
# not executing in with multiple workers, just produce the data and let
# pytest's fixture caching do its job
return fixt_UGRIZY_wZ()

# get the temp directory shared by all workers
root_tmp_dir = tmp_path_factory.getbasetemp().parent

fn = root_tmp_dir / "data.json"
with FileLock(str(fn) + ".lock"):
if fn.is_file():
data = json.loads(fn.read_text())
X_test = np.asarray(data["X_test"])
y_test = np.asarray(data["y_test"])
Z_test = np.asarray(data["Z_test"])
else:
X_test, y_test, Z_test = fixt_UGRIZY_wZ()
fn.write_text(
json.dumps(
{"X_test": X_test, "y_test": y_test, "Z_test": Z_test},
cls=NumpyEncoder,
# default=pandas_encoder,
)
)
return X_test, y_test, Z_test


def fixt_UGRIZY_wZ():
"""This fixture will only be available within the scope of TestPlots"""
X_test = np.load(
f"{asnwd}/data/plasticc/test_set/infer/X_test.npy",
Expand All @@ -33,108 +91,4 @@ def fixt_UGRIZY_wZ_numpy(scope="session"):
f"{asnwd}/data/plasticc/test_set/infer/Z_test.npy",
)

inputs = [X_test, Z_test]

return X_test, y_test, Z_test, inputs


@pytest.fixture
def fixt_UGRIZY_wZ(scope="session"):
"""This fixture will only be available within the scope of TestPlots"""
X_test = np.load(
f"{asnwd}/data/plasticc/test_set/infer/X_test.npy",
)
y_test = np.load(
f"{asnwd}/data/plasticc/test_set/infer/y_test.npy",
)
Z_test = np.load(
f"{asnwd}/data/plasticc/test_set/infer/Z_test.npy",
)

test_input = [X_test, Z_test]

test_ds = (
tf.data.Dataset.from_tensor_slices(
({"input_1": test_input[0], "input_2": test_input[1]}, y_test)
)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

y_test_ds = (
tf.data.Dataset.from_tensor_slices(y_test)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

if LOCAL_DEBUG is not None:
print("LOCAL_DEBUG set, reducing dataset size...")
test_ds = test_ds.take(300)
y_test_ds = y_test_ds.take(300)

return test_ds, y_test_ds, test_input


@pytest.fixture
def fixt_UGRIZY_noZ(scope="session"):
"""This fixture will only be available within the scope of TestPlots"""
X_test = np.load(
f"{asnwd}/data/plasticc/test_set/infer/X_test.npy",
)
y_test = np.load(
f"{asnwd}/data/plasticc/test_set/infer/y_test.npy",
)

test_input = X_test

test_ds = (
tf.data.Dataset.from_tensor_slices((test_input, y_test))
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

y_test_ds = (
tf.data.Dataset.from_tensor_slices(y_test)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

if LOCAL_DEBUG is not None:
print("LOCAL_DEBUG set, reducing dataset size...")
test_ds = test_ds.take(300)
y_test_ds = y_test_ds.take(300)

return test_ds, y_test_ds


@pytest.fixture
def fixt_GR_noZ(scope="session"):
"""This fixture will only be available within the scope of TestPlots"""
X_test = np.load(
f"{asnwd}/data/plasticc/test_set/infer/X_test.npy",
)
y_test = np.load(
f"{asnwd}/data/plasticc/test_set/infer/y_test.npy",
)

X_test = X_test[:, :, 0:3:2]
test_input = X_test

test_ds = (
tf.data.Dataset.from_tensor_slices((test_input, y_test))
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

y_test_ds = (
tf.data.Dataset.from_tensor_slices(y_test)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

if LOCAL_DEBUG is not None:
print("LOCAL_DEBUG set, reducing dataset size...")
test_ds = test_ds.take(300)
y_test_ds = y_test_ds.take(300)

return test_ds, y_test_ds
return X_test, y_test, Z_test
143 changes: 105 additions & 38 deletions astronet/tests/reg/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from tensorflow import keras

from astronet.constants import ASTRONET_WORKING_DIRECTORY as asnwd
from astronet.constants import LOCAL_DEBUG
from astronet.metrics import WeightedLogLoss
from astronet.tests.conftest import BATCH_SIZE
from astronet.tinho.lite import LiteModel
from astronet.utils import astronet_logger

Expand Down Expand Up @@ -50,13 +52,41 @@ class TestInference:
),
)
def test_inference_UGRIZY_wZ(
self, architecture, dataset, model_name, fixt_UGRIZY_wZ
self, architecture, dataset, model_name, get_fixt_UGRIZY_wZ
):

# Previous models were trained using numpy data as the inputs, newer models leverage
# tf.data.Dataset instead for faster inference. This is a legacy requirment.
# Fix ValueError of shape mismatch.
test_ds, y_test_ds, test_inputs = fixt_UGRIZY_wZ
X_test, y_test, Z_test = get_fixt_UGRIZY_wZ

test_input = [X_test, Z_test]

test_ds = (
tf.data.Dataset.from_tensor_slices(
({"input_1": test_input[0], "input_2": test_input[1]}, y_test)
)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

y_test_ds = (
tf.data.Dataset.from_tensor_slices(y_test)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

if LOCAL_DEBUG is not None:
log.info("LOCAL_DEBUG set, reducing dataset size...")
test_ds = test_ds.take(300)
y_test_ds = y_test_ds.take(300)

worker_id = (
os.environ.get("PYTEST_XDIST_WORKER")
if "PYTEST_CURRENT_TEST" in os.environ
else 0
)
log.info(f"Data loaded successfully on worker: {worker_id}")

model = keras.models.load_model(
f"{asnwd}/astronet/{architecture}/models/{dataset}/model-{model_name}",
Expand All @@ -65,7 +95,7 @@ def test_inference_UGRIZY_wZ(
)

wloss = WeightedLogLoss()
y_preds = model.predict(test_inputs)
y_preds = model.predict(test_input)

y_test = np.concatenate([y for y in y_test_ds], axis=0)

Expand All @@ -84,10 +114,30 @@ def test_inference_UGRIZY_wZ(
),
)
def test_inference_UGRIZY_noZ(
self, architecture, dataset, model_name, fixt_UGRIZY_noZ
self, architecture, dataset, model_name, get_fixt_UGRIZY_wZ
):

test_ds, y_test_ds = fixt_UGRIZY_noZ
X_test, y_test, _ = get_fixt_UGRIZY_wZ

test_input = X_test

test_ds = (
tf.data.Dataset.from_tensor_slices((test_input, y_test))
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

y_test_ds = (
tf.data.Dataset.from_tensor_slices(y_test)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

if LOCAL_DEBUG is not None:
print("LOCAL_DEBUG set, reducing dataset size...")
test_ds = test_ds.take(300)
y_test_ds = y_test_ds.take(300)

y_test = np.concatenate([y for y in y_test_ds], axis=0)
x_test = np.concatenate([x for x, y in test_ds], axis=0)

Expand Down Expand Up @@ -125,9 +175,32 @@ def test_inference_UGRIZY_noZ(
),
),
)
def test_inference_GR_noZ(self, architecture, dataset, model_name, fixt_GR_noZ):
def test_inference_GR_noZ(
self, architecture, dataset, model_name, get_fixt_UGRIZY_wZ
):

X_test, y_test, _ = get_fixt_UGRIZY_wZ
X_test = X_test[:, :, 0:3:2]

test_input = X_test

test_ds = (
tf.data.Dataset.from_tensor_slices((test_input, y_test))
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

y_test_ds = (
tf.data.Dataset.from_tensor_slices(y_test)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

if LOCAL_DEBUG is not None:
print("LOCAL_DEBUG set, reducing dataset size...")
test_ds = test_ds.take(300)
y_test_ds = y_test_ds.take(300)

test_ds, y_test_ds = fixt_GR_noZ
y_test = np.concatenate([y for y in y_test_ds], axis=0)

model = keras.models.load_model(
Expand All @@ -152,20 +225,41 @@ def test_inference_GR_noZ(self, architecture, dataset, model_name, fixt_GR_noZ):
(
"tinho",
"plasticc",
"model-GR-28341-1654269564-0.5.1.dev73+g70f85f8-LL0.836.tflite",
"model-GR-noZ-28341-1654269564-0.5.1.dev73+g70f85f8-LL0.836.tflite",
),
(
"tinho-quantized",
"plasticc",
"quantized-model-GR-28341-1654269564-0.5.1.dev73+g70f85f8-LL0.836.tflite",
"quantized-model-GR-noZ-28341-1654269564-0.5.1.dev73+g70f85f8-LL0.836.tflite",
),
),
)
def test_inference_GR_noZ_TFLITE(
self, architecture, dataset, model_name, fixt_GR_noZ
self, architecture, dataset, model_name, get_fixt_UGRIZY_wZ
):

test_ds, y_test_ds = fixt_GR_noZ
X_test, y_test, _ = get_fixt_UGRIZY_wZ
X_test = X_test[:, :, 0:3:2]

test_input = X_test

test_ds = (
tf.data.Dataset.from_tensor_slices((test_input, y_test))
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

y_test_ds = (
tf.data.Dataset.from_tensor_slices(y_test)
.batch(BATCH_SIZE, drop_remainder=False)
.prefetch(tf.data.AUTOTUNE)
)

if LOCAL_DEBUG is not None:
print("LOCAL_DEBUG set, reducing dataset size...")
test_ds = test_ds.take(300)
y_test_ds = y_test_ds.take(300)

y_test = np.concatenate([y for y in y_test_ds], axis=0)
x_test = np.concatenate([x for x, y in test_ds], axis=0)

Expand All @@ -183,30 +277,3 @@ def test_inference_GR_noZ_TFLITE(
loss = wloss(y_test, y_preds).numpy()
log.info(f"LOSS tinho-quantized: {loss:.3f}")
assert loss == pytest.approx(0.834, 0.001)

# @pytest.mark.parametrize(
# ("architecture", "dataset", "model_name"),
# (
# (
# "tinho",
# "plasticc",
# "UGRIZY-31367-1654360237-0.5.1.dev78+g702e399.d20220604-LL0.450",
# ),
# ),
# )
# def test_inference_with_z_tfdata(self, architecture, dataset, model_name, fixt):

# test_ds, y_test_ds = fixt
# y_test = np.concatenate([y for y in y_test_ds], axis=0)

# model = keras.models.load_model(
# f"{asnwd}/astronet/{architecture}/models/{dataset}/model-{model_name}",
# custom_objects={"WeightedLogLoss": WeightedLogLoss()},
# compile=False,
# )

# wloss = WeightedLogLoss()
# y_preds = model.predict(test_ds)

# if architecture == "tinho":
# assert wloss(y_test, y_preds).numpy() == pytest.approx(0.450, 0.01)
Loading

0 comments on commit e90513c

Please sign in to comment.