Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix incorrect propagation of dtype in Series normalize and other methods #46

Merged
merged 9 commits into from
Nov 15, 2014
97 changes: 77 additions & 20 deletions python/test/test_images.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from collections import Counter
import glob
import struct
import unittest
import os
from operator import mul
from numpy import allclose, arange, array, array_equal, dtype, prod, vstack, zeros
from numpy import allclose, arange, array, array_equal, dtype, prod, zeros
import itertools
from nose.tools import assert_equals, assert_true, assert_almost_equal, assert_raises

from thunder.rdds.fileio.imagesloader import ImagesLoader
from thunder.rdds.fileio.seriesloader import SeriesLoader
from thunder.rdds.images import _BlockMemoryAsReversedSequence
from test_utils import PySparkTestCase, PySparkTestCaseWithOutputDir

from test_utils import *

_have_image = False
try:
Expand Down Expand Up @@ -46,22 +44,6 @@ def test_castToFloat(self):
assert_equals('float16', str(castdata.dtype))
assert_equals('float16', str(castdata.first()[1].dtype))

def test_mean(self):
from numpy import mean
arys, shape, size = _generate_test_arrays(2, 'uint8')
imagedata = ImagesLoader(self.sc).fromArrays(arys)
meanval = imagedata.mean()

def elementwise_mean(arys):
# surprising that numpy doesn't have this built in?
combined = vstack([ary.ravel() for ary in arys])
meanary = mean(combined, axis=0)
return meanary.reshape(arys[0].shape)

expected = elementwise_mean(arys).astype('float16')
assert_true(allclose(expected, meanval))
assert_equals('float16', str(meanval.dtype))

def test_toSeries(self):
# create 3 arrays of 4x3x3 images (C-order), containing sequential integers
narys = 3
Expand Down Expand Up @@ -274,6 +256,81 @@ def test_toBlocksBySlices(self):
assert_true(array_equal(arys[i], gatheredary[i]))


class TestImagesStats(PySparkTestCase):
def test_mean(self):
from test_utils import elementwise_mean
arys, shape, size = _generate_test_arrays(2, 'uint8')
imagedata = ImagesLoader(self.sc).fromArrays(arys)
meanval = imagedata.mean()

expected = elementwise_mean(arys).astype('float16')
assert_true(allclose(expected, meanval))
assert_equals('float16', str(meanval.dtype))

def test_sum(self):
from numpy import add
arys, shape, size = _generate_test_arrays(2, 'uint8')
imagedata = ImagesLoader(self.sc).fromArrays(arys)
sumval = imagedata.sum(dtype='uint32')

arys = [ary.astype('uint32') for ary in arys]
expected = reduce(add, arys)
assert_true(array_equal(expected, sumval))
assert_equals('uint32', str(sumval.dtype))

def test_variance(self):
from test_utils import elementwise_var
arys, shape, size = _generate_test_arrays(2, 'uint8')
imagedata = ImagesLoader(self.sc).fromArrays(arys)
varval = imagedata.variance()

expected = elementwise_var([ary.astype('float16') for ary in arys])
assert_true(allclose(expected, varval))
assert_equals('float16', str(varval.dtype))

def test_stdev(self):
from test_utils import elementwise_stdev
arys, shape, size = _generate_test_arrays(2, 'uint8')
imagedata = ImagesLoader(self.sc).fromArrays(arys)
stdval = imagedata.stdev()

expected = elementwise_stdev([ary.astype('float16') for ary in arys])
assert_true(allclose(expected, stdval))
#assert_equals('float16', str(stdval.dtype))
# it isn't clear to me why this comes out as float32 and not float16, especially
# given that var returns float16, as expected. But I'm not too concerned about it.
# Consider this documentation of current behavior rather than a description of
# desired behavior.
assert_equals('float32', str(stdval.dtype))

def test_stats(self):
from test_utils import elementwise_mean, elementwise_var
arys, shape, size = _generate_test_arrays(2, 'uint8')
imagedata = ImagesLoader(self.sc).fromArrays(arys)
statsval = imagedata.stats()

floatarys = [ary.astype('float16') for ary in arys]
# StatsCounter contains a few different measures, only test a couple:
expectedmean = elementwise_mean(floatarys)
expectedvar = elementwise_var(floatarys)
assert_true(allclose(expectedmean, statsval.mean()))
assert_true(allclose(expectedvar, statsval.variance()))

def test_max(self):
from numpy import maximum
arys, shape, size = _generate_test_arrays(2, 'uint8')
imagedata = ImagesLoader(self.sc).fromArrays(arys)
maxval = imagedata.max()
assert_true(array_equal(reduce(maximum, arys), maxval))

def test_min(self):
from numpy import minimum
arys, shape, size = _generate_test_arrays(2, 'uint8')
imagedata = ImagesLoader(self.sc).fromArrays(arys)
minval = imagedata.min()
assert_true(array_equal(reduce(minimum, arys), minval))


class TestImagesUsingOutputDir(PySparkTestCaseWithOutputDir):

@staticmethod
Expand Down
141 changes: 127 additions & 14 deletions python/test/test_series.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from numpy import array, allclose
from nose.tools import assert_equals
from numpy import allclose, arange, array, array_equal, dtype
from nose.tools import assert_equals, assert_true

from thunder.rdds.series import Series
from test_utils import PySparkTestCase
from test_utils import *


class TestSeriesConversions(PySparkTestCase):
Expand Down Expand Up @@ -38,6 +38,79 @@ def test_cast_to_float(self):
assert_equals('float16', str(castseries.first()[1].dtype))


class TestSeriesDataStatsMethods(PySparkTestCase):
def generateTestSeries(self):
from thunder.rdds.fileio.seriesloader import SeriesLoader
ary1 = arange(8, dtype=dtype('uint8')).reshape((2, 4))
ary2 = arange(8, 16, dtype=dtype('uint8')).reshape((2, 4))
return SeriesLoader(self.sc).fromArrays([ary1, ary2])

def test_mean(self):
from test_utils import elementwise_mean
series = self.generateTestSeries()
meanval = series.mean()

expected = elementwise_mean(series.values().collect())
assert_true(allclose(expected, meanval))
assert_equals('float16', str(meanval.dtype))

def test_sum(self):
from numpy import add
series = self.generateTestSeries()
sumval = series.sum(dtype='uint32')

arys = series.values().collect()
expected = reduce(add, arys)
assert_true(array_equal(expected, sumval))
assert_equals('uint32', str(sumval.dtype))

def test_variance(self):
from test_utils import elementwise_var
series = self.generateTestSeries()
varval = series.variance()

arys = series.values().collect()
expected = elementwise_var([ary.astype('float16') for ary in arys])
assert_true(allclose(expected, varval))
assert_equals('float16', str(varval.dtype))

def test_stdev(self):
from test_utils import elementwise_stdev
series = self.generateTestSeries()
stdval = series.stdev()

arys = series.values().collect()
expected = elementwise_stdev([ary.astype('float16') for ary in arys])
assert_true(allclose(expected, stdval, atol=0.001))
assert_equals('float32', str(stdval.dtype)) # why not float16? see equivalent Images test

def test_stats(self):
from test_utils import elementwise_mean, elementwise_var
series = self.generateTestSeries()
statsval = series.stats()

arys = series.values().collect()
floatarys = [ary.astype('float16') for ary in arys]
expectedmean = elementwise_mean(floatarys)
expectedvar = elementwise_var(floatarys)
assert_true(allclose(expectedmean, statsval.mean()))
assert_true(allclose(expectedvar, statsval.variance()))

def test_max(self):
from numpy import maximum
series = self.generateTestSeries()
maxval = series.max()
arys = series.values().collect()
assert_true(array_equal(reduce(maximum, arys), maxval))

def test_min(self):
from numpy import minimum
series = self.generateTestSeries()
minval = series.min()
arys = series.values().collect()
assert_true(array_equal(reduce(minimum, arys), minval))


class TestSeriesMethods(PySparkTestCase):

def test_between(self):
Expand Down Expand Up @@ -73,30 +146,69 @@ def test_series_stats(self):
assert(allclose(data.seriesStats().select('count').first()[1], 5))

def test_normalization(self):
rdd = self.sc.parallelize([(0, array([1, 2, 3, 4, 5]))])
rdd = self.sc.parallelize([(0, array([1, 2, 3, 4, 5], dtype='int16'))])
data = Series(rdd)
out = data.normalize('percentile')
# check that _dtype has been set properly *before* calling first(), b/c first() will update this
# value even if it hasn't been correctly set
assert_equals('float32', str(out._dtype))
vals = out.first()[1]
assert_equals('float32', str(vals.dtype))
assert(allclose(vals, array([-0.42105, 0.10526, 0.63157, 1.15789, 1.68421]), atol=1e-4))

def test_normalization_bymean(self):
rdd = self.sc.parallelize([(0, array([1, 2, 3, 4, 5], dtype='int16'))])
data = Series(rdd)
assert(allclose(data.normalize('percentile').first()[1],
array([-0.42105, 0.10526, 0.63157, 1.15789, 1.68421]), atol=1e-4))
out = data.normalize('mean')
# check that _dtype has been set properly *before* calling first(), b/c first() will update this
# value even if it hasn't been correctly set
assert_equals('float32', str(out._dtype))
vals = out.first()[1]
assert_equals('float32', str(vals.dtype))
assert(allclose(out.first()[1],
array([-0.64516, -0.32258, 0.0, 0.32258, 0.64516]), atol=1e-4))

def test_standardization_axis0(self):
rdd = self.sc.parallelize([(0, array([1, 2, 3, 4, 5]))])
data = Series(rdd)
assert(allclose(data.center(0).first()[1], array([-2, -1, 0, 1, 2])))
assert(allclose(data.standardize(0).first()[1], array([0.70710, 1.41421, 2.12132, 2.82842, 3.53553])))
assert(allclose(data.zscore(0).first()[1], array([-1.41421, -0.70710, 0, 0.70710, 1.41421])))
centered = data.center(0)
standardized = data.standardize(0)
zscored = data.zscore(0)
assert_equals(None, centered._dtype)
assert_equals(None, standardized._dtype)
assert_equals(None, zscored._dtype)
assert(allclose(centered.first()[1], array([-2, -1, 0, 1, 2])))
assert(allclose(standardized.first()[1], array([0.70710, 1.41421, 2.12132, 2.82842, 3.53553])))
assert(allclose(zscored.first()[1], array([-1.41421, -0.70710, 0, 0.70710, 1.41421])))
# TODO: use smaller dtype here by default, and propagate w/o requiring first() call
assert_equals('float64', str(centered._dtype))
assert_equals('float64', str(standardized._dtype))
assert_equals('float64', str(zscored._dtype))

def test_standardization_axis1(self):
rdd = self.sc.parallelize([(0, array([1, 2])), (0, array([3, 4]))])
data = Series(rdd)
assert(allclose(data.center(1).first()[1], array([-1, -1])))
assert(allclose(data.standardize(1).first()[1], array([1, 2])))
assert(allclose(data.zscore(1).first()[1], array([-1, -1])))
centered = data.center(1)
standardized = data.standardize(1)
zscored = data.zscore(1)
assert_equals(None, centered._dtype)
assert_equals(None, standardized._dtype)
assert_equals(None, zscored._dtype)
assert(allclose(centered.first()[1], array([-1, -1])))
assert(allclose(standardized.first()[1], array([1, 2])))
assert(allclose(zscored.first()[1], array([-1, -1])))
# TODO: use smaller dtype here by default, and propagate w/o requiring first() call
assert_equals('float64', str(centered._dtype))
assert_equals('float64', str(standardized._dtype))
assert_equals('float64', str(zscored._dtype))

def test_correlate(self):
rdd = self.sc.parallelize([(0, array([1, 2, 3, 4, 5]))])
data = Series(rdd)
sig1 = [4, 5, 6, 7, 8]
corr = data.correlate(sig1).values().collect()
corrdata = data.correlate(sig1)
assert_equals(None, corrdata._dtype)
corr = corrdata.values().collect()
assert(allclose(corr[0], 1))
sig12 = [[4, 5, 6, 7, 8], [8, 7, 6, 5, 4]]
corrs = data.correlate(sig12).values().collect()
Expand Down Expand Up @@ -141,4 +253,5 @@ def test_query_linear_singleton(self):

inds = array([array([1, 2])])
keys, values = data.query(inds)
assert(allclose(values[0, :], array([1.5, 2., 3.5])))
assert(allclose(values[0, :], array([1.5, 2., 3.5])))
assert_equals(data.dtype, values[0, :].dtype)
24 changes: 23 additions & 1 deletion python/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import tempfile
import unittest
import logging
from numpy import vstack
from pyspark import SparkContext


Expand All @@ -28,4 +29,25 @@ def setUp(self):

def tearDown(self):
super(PySparkTestCaseWithOutputDir, self).tearDown()
shutil.rmtree(self.outputdir)
shutil.rmtree(self.outputdir)


def elementwise_mean(arys):
from numpy import mean
combined = vstack([ary.ravel() for ary in arys])
meanary = mean(combined, axis=0)
return meanary.reshape(arys[0].shape)


def elementwise_var(arys):
from numpy import var
combined = vstack([ary.ravel() for ary in arys])
meanary = var(combined, axis=0)
return meanary.reshape(arys[0].shape)


def elementwise_stdev(arys):
from numpy import std
combined = vstack([ary.ravel() for ary in arys])
stdary = std(combined, axis=0)
return stdary.reshape(arys[0].shape)
10 changes: 7 additions & 3 deletions python/thunder/rdds/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def populateParamsFromFirstRecord(self):
self._dtype = str(record[1].dtype)
return record

def __finalize__(self, other):
def __finalize__(self, other, nopropagate=()):
"""
Lazily propagate attributes from other to self, only if attributes
are not already defined in self
Expand All @@ -53,11 +53,15 @@ def __finalize__(self, other):
other : the object from which to get the attributes that we are going
to propagate

nopropagate : iterable of string attribute names (with underscores), default empty tuple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we formally requiring that attributes have underscores in their names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh. Was just trying to indicate here that the string attribute name to be passed in should be the "private" (e.g. _dtype) version rather than the public (dtype) version. Seems like a potential point of confusion. Can try to clarify this.

attributes found in nopropagate will *not* have their values propagated forward from self,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it "to self" from other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh. You're right. By the time __finalize__ is called, self is now the new object being created, and the self that's getting passed in in the argument list is other. Will fix the comment.

but will keep their existing values, even if these are None
"""
if isinstance(other, Data):
for name in self._metadata:
if (getattr(other, name, None) is not None) and (getattr(self, name, None) is None):
object.__setattr__(self, name, getattr(other, name, None))
if not name in nopropagate:
if (getattr(other, name, None) is not None) and (getattr(self, name, None) is None):
object.__setattr__(self, name, getattr(other, name, None))
return self

@property
Expand Down
16 changes: 14 additions & 2 deletions python/thunder/rdds/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def subtract(self, val):

return self.apply(lambda x: x - val)

def apply(self, func):
def apply(self, func, expectedDtype=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might prefer to call this newdtype, to match the use of newindex throughout many of the other functions where it's playing a similar purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to make clear with the parameter name that passing in a dtype here will not guarantee that that's the dtype you'll get in the underlying array - it's not like a casting operation, it's merely setting the attribute on the outer Data object. So I didn't want it to be just dtype, as in the casting methods, and I worry that newdtype would have the same implication. But probably I'm overthinking this. :)

"""
Apply a function to all images / volumes,
otherwise perserving attributes
Expand All @@ -495,8 +495,20 @@ def apply(self, func):
----------
func : function
Function to apply
expectedDtype : numpy dtype or dtype specifier, or None (default), or string 'unset' or 'same'
Numpy dtype expected from output of func. This will be set as the dtype attribute
of the output Data object. If 'same', then the resulting `dtype` will be the same as that of `self`. If
the string 'unset' or None is passed, the `dtype` of the output will be lazily determined as needed. Note
that this argument, if passed, does not *enforce* that the function output will actually be of the given
dtype. If in doubt, leaving this as None is the safest thing to do.
"""
return self._constructor(self.rdd.mapValues(func)).__finalize__(self)
rdd = self.rdd.mapValues(func)
if isinstance(expectedDtype, basestring):
if expectedDtype == 'same':
expectedDtype = self._dtype
elif expectedDtype == 'unset':
expectedDtype = None
return self._constructor(rdd, dtype=expectedDtype).__finalize__(self, nopropagate=('_dtype',))


class _BlockMemoryAsSequence(object):
Expand Down