Skip to content

Commit

Permalink
Merge 25edca5 into 2862985
Browse files Browse the repository at this point in the history
  • Loading branch information
swkeemink committed Jun 8, 2021
2 parents 2862985 + 25edca5 commit a326847
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 4 deletions.
18 changes: 16 additions & 2 deletions fissa/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,16 @@ def separation_prep(self, redo=False):
fname = os.path.join(self.folder, 'preparation.npy')

# try to load data from filename
if fname is None or not os.path.isfile(fname):
redo = True
if not redo:
try:
nCell, raw, roi_polys = np.load(fname, allow_pickle=True)
print('Reloading previously prepared data...')
except BaseException:
except BaseException as err:
print("An error occurred while loading {}".format(fname))
print(err)
print("Extraction will be redone and {} overwritten".format(fname))
redo = True

if redo:
Expand Down Expand Up @@ -387,6 +392,7 @@ def separate(self, redo_prep=False, redo_sep=False):
# Do data preparation
if redo_prep or self.raw is None:
self.separation_prep(redo_prep)
if redo_prep:
redo_sep = True

# Define filename to store data in
Expand All @@ -395,11 +401,19 @@ def separate(self, redo_prep=False, redo_sep=False):
redo_sep = True
else:
fname = os.path.join(self.folder, 'separated.npy')
if fname is None or not os.path.isfile(fname):
redo_sep = True
if not redo_sep:
try:
info, mixmat, sep, result = np.load(fname, allow_pickle=True)
print('Reloading previously separated data...')
except BaseException:
except BaseException as err:
print("An error occurred while loading {}".format(fname))
print(err)
print(
"Signal separation will be redone and {} overwritten"
"".format(fname)
)
redo_sep = True

# separate data, if necessary
Expand Down
65 changes: 65 additions & 0 deletions fissa/tests/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import unittest
import os.path
from inspect import getsourcefile
import sys

import numpy as np
from numpy.testing import (assert_almost_equal,
assert_array_equal,
assert_allclose,
assert_equal)
import pytest


# Check where the test directory is located, to be used when fetching
Expand Down Expand Up @@ -44,6 +46,37 @@ def subTest(self, *args, **kwargs):
else:
yield None

@pytest.fixture(autouse=True)
def capsys(self, capsys):
self.capsys = capsys

def recapsys(self, *captures):
"""
Capture stdout and stderr, then write them back to stdout and stderr.
Capture is done using the `pytest.capsys` fixture.
Parameters
----------
*captures : pytest.CaptureResult, optional
A series of extra captures to output. For each `capture` in
`captures`, `capture.out` and `capture.err` are written to stdout
and stderr.
Returns
-------
capture : pytest.CaptureResult
`capture.out` and `capture.err` contain all the outputs to stdout
and stderr since the previous capture with `~pytest.capsys`.
"""
capture_now = self.capsys.readouterr()
for capture in captures:
sys.stdout.write(capture.out)
sys.stderr.write(capture.err)
sys.stdout.write(capture_now.out)
sys.stderr.write(capture_now.err)
return capture_now

def assert_almost_equal(self, *args, **kwargs):
return assert_almost_equal(*args, **kwargs)

Expand Down Expand Up @@ -72,3 +105,35 @@ def assert_equal_dict_of_array(self, desired, actual):
self.assertEqual(desired.keys(), actual.keys())
for k in desired.keys():
self.assertEqual(desired[k], actual[k])

def assert_starts_with(self, desired, actual):
"""
Check that a string starts with a certain substring.
Parameters
----------
desired : str
Desired initial string.
actual : str-like
Actual string or string-like object.
"""
try:
self.assertTrue(len(actual) >= len(desired))
except BaseException as err:
print("Actual string too short ({} < {} characters)".format(len(actual), len(desired)))
print("ACTUAL: {}".format(actual))
raise
try:
return assert_equal(str(actual)[:len(desired)], desired)
except BaseException as err:
msg = "ACTUAL: {}".format(actual)
if isinstance(getattr(err, "args", None), str):
err.args += "\n" + msg
elif isinstance(getattr(err, "args", None), tuple):
if len(err.args) == 1:
err.args = (err.args[0] + "\n" + msg, )
else:
err.args += (msg, )
else:
print(msg)
raise
119 changes: 117 additions & 2 deletions fissa/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,32 +244,147 @@ def test_prepfirst(self):

def test_redo(self):
exp = core.Experiment(self.images_dir, self.roi_zip_path, self.output_dir)
capture_pre = self.capsys.readouterr() # Clear stdout
exp.separate()
capture_post = self.recapsys(capture_pre)
self.assert_starts_with("Doing", capture_post.out)
capture_pre = self.capsys.readouterr() # Clear stdout
exp.separate(redo_prep=True, redo_sep=True)
capture_post = self.recapsys(capture_pre)
self.assert_starts_with("Doing", capture_post.out)

def test_load_cache(self):
image_path = self.images_dir
roi_path = self.roi_zip_path
exp1 = core.Experiment(image_path, roi_path, self.output_dir)
exp1.separate()
exp = core.Experiment(image_path, roi_path, self.output_dir)
# Cache should be loaded without calling separate
actual = exp.result
self.assert_equal(len(actual), 1)
self.assert_equal(len(actual[0]), 1)
self.assert_allclose(actual[0][0], self.expected_00)

def test_noredo(self):
def test_load_cache_piecemeal(self):
image_path = self.images_dir
roi_path = self.roi_zip_path
exp1 = core.Experiment(image_path, roi_path, self.output_dir)
exp1.separate()
exp = core.Experiment(image_path, roi_path, self.output_dir)
capture_pre = self.capsys.readouterr() # Clear stdout
exp.separation_prep()
capture_post = self.recapsys(capture_pre)
self.assert_starts_with("Reloading previously prepared", capture_post.out)
capture_pre = self.capsys.readouterr() # Clear stdout
exp.separate()
capture_post = self.recapsys(capture_pre)
self.assert_starts_with("Reloading previously separated", capture_post.out)
actual = exp.result
self.assert_equal(len(actual), 1)
self.assert_equal(len(actual[0]), 1)
self.assert_allclose(actual[0][0], self.expected_00)

def test_previousprep(self):
def test_load_cached_prep(self):
image_path = self.images_dir
roi_path = self.roi_zip_path
exp1 = core.Experiment(image_path, roi_path, self.output_dir)
exp1.separation_prep()
capture_pre = self.capsys.readouterr() # Clear stdout
exp = core.Experiment(image_path, roi_path, self.output_dir)
capture_post = self.recapsys(capture_pre) # Capture and then re-output
self.assert_starts_with("Reloading previously prepared", capture_post.out)
capture_pre = self.capsys.readouterr() # Clear stdout
exp.separation_prep()
capture_post = self.recapsys(capture_pre) # Capture and then re-output
self.assert_starts_with("Reloading previously prepared", capture_post.out)
capture_pre = self.capsys.readouterr() # Clear stdout
exp.separate()
capture_post = self.recapsys(capture_pre)
self.assert_starts_with("Doing signal separation", capture_post.out)
actual = exp.result
self.assert_equal(len(actual), 1)
self.assert_equal(len(actual[0]), 1)
self.assert_allclose(actual[0][0], self.expected_00)

@unittest.expectedFailure
def test_badprepcache_init1(self):
image_path = self.images_dir
roi_path = self.roi_zip_path
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
with open(os.path.join(self.output_dir, "preparation.npy"), "w") as f:
f.write("badfilecontents")

capture_pre = self.capsys.readouterr() # Clear stdout
exp = core.Experiment(image_path, roi_path, self.output_dir)
capture_post = self.recapsys(capture_pre) # Capture and then re-output
self.assert_starts_with("An error occurred", capture_post.out)

self.assertTrue(exp.raw is None)

@unittest.expectedFailure
def test_badprepcache_init2(self):
image_path = self.images_dir
roi_path = self.roi_zip_path
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
with open(os.path.join(self.output_dir, "preparation.npy"), "w") as f:
f.write("badfilecontents")

capture_pre = self.capsys.readouterr() # Clear stdout
exp = core.Experiment(image_path, roi_path, self.output_dir)
capture_post = self.recapsys(capture_pre) # Capture and then re-output
self.assert_starts_with("An error occurred", capture_post.out)

capture_pre = self.capsys.readouterr() # Clear stdout
exp.separation_prep()
capture_post = self.recapsys(capture_pre) # Capture and then re-output
self.assert_starts_with("Doing region growing", capture_post.out)

actual = exp.result
self.assert_equal(len(actual), 1)
self.assert_equal(len(actual[0]), 1)
self.assert_allclose(actual[0][0], self.expected_00)

def test_badprepcache(self):
image_path = self.images_dir
roi_path = self.roi_zip_path
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
exp = core.Experiment(image_path, roi_path, self.output_dir)
with open(os.path.join(self.output_dir, "preparation.npy"), "w") as f:
f.write("badfilecontents")

capture_pre = self.capsys.readouterr() # Clear stdout
exp.separation_prep()
capture_post = self.recapsys(capture_pre) # Capture and then re-output
self.assert_starts_with("An error occurred", capture_post.out)

capture_pre = self.capsys.readouterr() # Clear stdout
exp.separate()
capture_post = self.recapsys(capture_pre) # Capture and then re-output
self.assert_starts_with("Doing signal separation", capture_post.out)

actual = exp.result
self.assert_equal(len(actual), 1)
self.assert_equal(len(actual[0]), 1)
self.assert_allclose(actual[0][0], self.expected_00)

def test_badsepcache(self):
image_path = self.images_dir
roi_path = self.roi_zip_path
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
exp = core.Experiment(image_path, roi_path, self.output_dir)
exp.separation_prep()
with open(os.path.join(self.output_dir, "separated.npy"), "w") as f:
f.write("badfilecontents")

capture_pre = self.capsys.readouterr() # Clear stdout
exp.separate()
capture_post = self.recapsys(capture_pre) # Capture and then re-output
self.assert_starts_with("An error occurred", capture_post.out)

actual = exp.result
self.assert_equal(len(actual), 1)
self.assert_equal(len(actual[0]), 1)
Expand Down

0 comments on commit a326847

Please sign in to comment.