From d02ebb3c21a13ed3bf18a8222cf7deae597b1aba Mon Sep 17 00:00:00 2001 From: david-ragazzi Date: Wed, 15 Apr 2015 22:42:22 -0300 Subject: [PATCH] Remove FlatSpatialPooler --- nupic/bindings/algorithms.i | 105 ---- nupic/research/flat_spatial_pooler.py | 254 --------- .../flat_spatial_pooler_compatability_test.py | 533 ------------------ .../flat_spatial_pooler_cpp_api_test.py | 34 -- .../flat_spatial_pooler_py_api_test.py | 69 --- .../research/flat_spatial_pooler_unit_test.py | 157 ------ .../research/spatial_pooler_unit_test.py | 52 ++ 7 files changed, 52 insertions(+), 1152 deletions(-) delete mode 100644 nupic/research/flat_spatial_pooler.py delete mode 100755 tests/unit/nupic/research/flat_spatial_pooler_compatability_test.py delete mode 100644 tests/unit/nupic/research/flat_spatial_pooler_cpp_api_test.py delete mode 100644 tests/unit/nupic/research/flat_spatial_pooler_py_api_test.py delete mode 100755 tests/unit/nupic/research/flat_spatial_pooler_unit_test.py diff --git a/nupic/bindings/algorithms.i b/nupic/bindings/algorithms.i index 0f96b8d170..b1f53bff6e 100644 --- a/nupic/bindings/algorithms.i +++ b/nupic/bindings/algorithms.i @@ -97,7 +97,6 @@ _ALGORITHMS = _algorithms #include #include #include -#include #include #include @@ -2035,110 +2034,6 @@ inline PyObject* generate2DGaussianSample(nupic::UInt32 nrows, nupic::UInt32 nco } -%include - -%extend nupic::algorithms::spatial_pooler::FlatSpatialPooler -{ - %pythoncode %{ - import numpy - - def __init__(self, - inputShape=(32, 32), - inputBorder=8, - inputDensity=1.0, - coincidencesShape=(48, 48), - coincInputRadius=16, - coincInputPoolPct=0.5, - gaussianDist=False, - commonDistributions=False, - localAreaDensity=-1.0, - numActivePerInhArea=10.0, - stimulusThreshold=0, - synPermInactiveDec=0.01, - synPermActiveInc=0.1, - synPermActiveSharedDec=0.0, - synPermOrphanDec=0.0, - synPermConnected=0.10, - minPctDutyCycleBeforeInh=0.001, - minPctDutyCycleAfterInh=0.001, - dutyCyclePeriod=1000, - maxFiringBoost=10.0, - maxSSFiringBoost=2.0, - maxSynPermBoost=10.0, - minDistance=0.0, - cloneMap=None, - numCloneMasters=-1, - seed=-1, - spVerbosity=0, - printPeriodicStats=0, - testMode=False, - globalInhibition=False, - spReconstructionParam="unweighted_mean", - useHighTier=True, - randomSP=False, - ): - - self.this = _ALGORITHMS.new_FlatSpatialPooler() - _ALGORITHMS.FlatSpatialPooler_initializeFlat( - self, - numInputs=numpy.prod(inputShape), - numColumns=numpy.prod(coincidencesShape), - potentialPct = coincInputPoolPct, - localAreaDensity=localAreaDensity, - numActiveColumnsPerInhArea=numActivePerInhArea, - stimulusThreshold=stimulusThreshold, - synPermInactiveDec=synPermInactiveDec, - synPermActiveInc=synPermActiveInc, - synPermConnected=synPermConnected, - minPctOverlapDutyCycles=minPctDutyCycleBeforeInh, - minPctActiveDutyCycles=minPctDutyCycleAfterInh, - dutyCyclePeriod=dutyCyclePeriod, - maxBoost=maxFiringBoost, - minDistance=minDistance, - randomSP=randomSP, - seed=seed, - spVerbosity=spVerbosity - ) - - def __getstate__(self): - # Save the local attributes but override the C++ flat spatial pooler with - # the string representation. - d = dict(self.__dict__) - d["this"] = self.getCState() - return d - - def __setstate__(self, state): - # Create an empty C++ flat spatial pooler and populate it from the - # serialized string. - self.this = _ALGORITHMS.new_FlatSpatialPooler() - if isinstance(state, str): - self.loadFromString(state) - self.valueToCategory = {} - else: - self.loadFromString(state["this"]) - # Use the rest of the state to set local Python attributes. - del state["this"] - self.__dict__.update(state) - %} - - inline void compute(PyObject *py_x, bool learn, PyObject *py_y, - bool stripNeverLearned) - { - PyArrayObject* x = (PyArrayObject*) py_x; - PyArrayObject* y = (PyArrayObject*) py_y; - self->compute((nupic::UInt*) x->data, (bool)learn, (nupic::UInt*) y->data, - (bool)stripNeverLearned); - } - - inline void compute(PyObject *py_x, bool learn, PyObject *py_y) - { - PyArrayObject* x = (PyArrayObject*) py_x; - PyArrayObject* y = (PyArrayObject*) py_y; - self->compute((nupic::UInt*) x->data, (bool)learn, (nupic::UInt*) y->data); - } - -} - %include %pythoncode %{ diff --git a/nupic/research/flat_spatial_pooler.py b/nupic/research/flat_spatial_pooler.py deleted file mode 100644 index 041c5f42f3..0000000000 --- a/nupic/research/flat_spatial_pooler.py +++ /dev/null @@ -1,254 +0,0 @@ -# ---------------------------------------------------------------------- -# Numenta Platform for Intelligent Computing (NuPIC) -# Copyright (C) 2013, Numenta, Inc. Unless you have an agreement -# with Numenta, Inc., for a separate license for this software code, the -# following terms and conditions apply: -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see http://www.gnu.org/licenses. -# -# http://numenta.org/licenses/ -# ---------------------------------------------------------------------- - -"""Implements the flat spatial pooler.""" - -import numpy - -from nupic.bindings.math import GetNTAReal -from nupic.research.spatial_pooler import SpatialPooler - -realDType = GetNTAReal() - - - -class FlatSpatialPooler(SpatialPooler): - """ - This class implements the flat spatial pooler. This version of the spatial - pooler contains no toplogy information. It uses global coverage and global - inhibition. It implements 'high tier' learning. - - High tier learning gives preference to unlearned columns. An unlearned - column will always win unless another column has learned to perfectly - represent an input pattern. Once this initial phase has passed, it should - behave like the normal spatial pooler. This option is useful if you might - encounter very small datasets where you might want a unique representation - for every input, regardless of similarity. - - The randomSP option allows you to use a flat spatial pooler without invoking - any learning. This is extremely useful for understanding the properties of a - basic SP that is initialized with random permanences. A randomSP will give - reasonable SDR's and is easier to analyze and reason about. (A properly - trained SP should give even better SDR's.) You can't achieve this function - with SpatialPooler because it normally strips out unlearned columns when - learning is turned off. If the randomSP functionality is generally useful, we - might move this option to SpatialPooler. - """ - - - def setMinDistance(self, minDistance): - self._minDistance = minDistance - - - def getMinDistance(self): - return self._minDistance - - - def setRandomSP(self, randomSP): - self._randomSP = randomSP - - - def getRandomSP(self): - return self._randomSP - - - def __init__(self, - inputShape=(32, 32), - inputBorder=8, - inputDensity=1.0, - coincidencesShape=(48, 48), - coincInputRadius=16, - coincInputPoolPct=0.5, - gaussianDist=False, - commonDistributions=False, - localAreaDensity=-1.0, - numActivePerInhArea=10.0, - stimulusThreshold=0, - synPermInactiveDec=0.01, - synPermActiveInc=0.1, - synPermActiveSharedDec=0.0, - synPermOrphanDec=0.0, - synPermConnected=0.10, - minPctDutyCycleBeforeInh=0.001, - minPctDutyCycleAfterInh=0.001, - dutyCyclePeriod=1000, - maxFiringBoost=10.0, - maxSSFiringBoost=2.0, - maxSynPermBoost=10.0, - minDistance=0.0, - cloneMap=None, - numCloneMasters=-1, - seed=-1, - spVerbosity=0, - printPeriodicStats=0, - testMode=False, - globalInhibition=False, - spReconstructionParam="unweighted_mean", - useHighTier=True, - randomSP=False, - ): - - assert useHighTier, "useHighTier must be True in flat_spatial_pooler" - - numInputs = numpy.array(inputShape).prod() - numColumns = numpy.array(coincidencesShape).prod() - super(FlatSpatialPooler, self).__init__( - inputDimensions=numpy.array(inputShape), - columnDimensions=numpy.array(coincidencesShape), - potentialRadius=numInputs, - potentialPct=coincInputPoolPct, - globalInhibition=True, # Global inhibition always true in the flat pooler - localAreaDensity=localAreaDensity, - numActiveColumnsPerInhArea=numActivePerInhArea, - stimulusThreshold=stimulusThreshold, - synPermInactiveDec=synPermInactiveDec, - synPermActiveInc=synPermActiveInc, - synPermConnected=synPermConnected, - minPctOverlapDutyCycle=minPctDutyCycleBeforeInh, - minPctActiveDutyCycle=minPctDutyCycleAfterInh, - dutyCyclePeriod=dutyCyclePeriod, - maxBoost=maxFiringBoost, - seed=seed, - spVerbosity=spVerbosity, - ) - - # save arguments - self._numInputs = numpy.prod(numpy.array(inputShape)) - self._numColumns = numpy.prod(numpy.array(coincidencesShape)) - self._minDistance = minDistance - self._randomSP = randomSP - - #set active duty cycles to ones, because they set anomaly scores to 0 - self._activeDutyCycles = numpy.ones(self._numColumns) - - # set of columns to be 'hungry' for learning - self._boostFactors *= maxFiringBoost - - # For high tier to work we need to set the min duty cycles to be non-zero - # This will ensure that columns with 0 active duty cycle get high boost - # in the beginning. - self._minOverlapDutyCycles.fill(1e-6) - self._minActiveDutyCycles.fill(1e-6) - - if self._spVerbosity > 0: - self.printFlatParameters() - - - def compute(self, inputArray, learn, activeArray, stripNeverLearned=True): - """ - This is the primary public method of the SpatialPooler class. This function - takes a input vector and outputs the indices of the active columns. If - 'learn' is set to True, and randomSP is set to false, this method also - updates the permanences of the columns. - - Parameters: - ---------------------------- - inputVector: a numpy array of 0's and 1's thata comprises the input to - the spatial pooler. The array will be treated as a one - dimensional array, therefore the dimensions of the array - do not have to much the exact dimensions specified in the - class constructor. In fact, even a list would suffice. - The number of input bits in the vector must, however, - match the number of bits specified by the call to the - constructor. Therefore there must be a '0' or '1' in the - array for every input bit. - learn: a boolean value indicating whether learning should be - performed. Learning entails updating the permanence - values of the synapses, and hence modifying the 'state' - of the model. Setting learning to 'off' freezes the SP - and has many uses. For example, you might want to feed in - various inputs and examine the resulting SDR's. - :param stripNeverLearned: If True and learn=False, then columns that - have never learned will be stripped out of the active columns. This - should be set to False when using a random SP with learning disabled. - NOTE: This parameter should be set explicitly as the default will - likely be changed to False in the near future and if you want to retain - the current behavior you should additionally pass the resulting - activeArray to the stripUnlearnedColumns method manually. - """ - if self._randomSP: - learn=False - - assert (numpy.size(inputArray) == self._numInputs) - self._updateBookeepingVars(learn) - inputVector = numpy.array(inputArray, dtype=realDType) - overlaps = self._calculateOverlap(inputVector) - overlapsPct = self._calculateOverlapPct(overlaps) - highTierColumns = self._selectHighTierColumns(overlapsPct) - virginColumns = self._selectVirginColumns() - - if learn: - vipOverlaps = self._boostFactors * overlaps - else: - vipOverlaps = overlaps.copy() - - # Ensure one of the high tier columns win - # If learning is on, ensure an unlearned column wins - vipBonus = max(vipOverlaps) + 1.0 - if learn: - vipOverlaps[virginColumns] = vipBonus - vipOverlaps[highTierColumns] += vipBonus - - activeColumns = self._inhibitColumns(vipOverlaps) - - if learn: - self._adaptSynapses(inputVector, activeColumns) - self._updateDutyCycles(overlaps, activeColumns) - self._bumpUpWeakColumns() - self._updateBoostFactors() - - if self._isUpdateRound(): - self._updateInhibitionRadius() - self._updateMinDutyCycles() - elif stripNeverLearned: - activeColumns = self.stripUnlearnedColumns(activeColumns) - - activeArray.fill(0) - if activeColumns.size > 0: - activeArray[activeColumns] = 1 - - - - def _selectVirginColumns(self): - """ - returns a set of virgin columns. Virgin columns are columns that have never - been active. - """ - return numpy.where(self._activeDutyCycles == 0)[0] - - - def _selectHighTierColumns(self, overlapsPct): - """ - returns the set of high tier columns. High tier columns are columns who - have learned to represent a particular input pattern. How well a column - represents an input pattern is represented by the percent of connected - synapses connected to inputs bits which are turned on. 'self._minDistance' - determines with how much precision a column must learn to represent an - input pattern in order to be considered a 'high tier' column. - """ - return numpy.where(overlapsPct >= (1.0 - self._minDistance))[0] - - - def printFlatParameters(self): - """Print parameters specific to this class.""" - print " PY FlatSpatialPooler Parameters" - print "minDistance = ", self.getMinDistance() - print "randomSP = ", self.getRandomSP() diff --git a/tests/unit/nupic/research/flat_spatial_pooler_compatability_test.py b/tests/unit/nupic/research/flat_spatial_pooler_compatability_test.py deleted file mode 100755 index 08bd20ce83..0000000000 --- a/tests/unit/nupic/research/flat_spatial_pooler_compatability_test.py +++ /dev/null @@ -1,533 +0,0 @@ -#! /usr/bin/env python -# ---------------------------------------------------------------------- -# Numenta Platform for Intelligent Computing (NuPIC) -# Copyright (C) 2013, Numenta, Inc. Unless you have an agreement -# with Numenta, Inc., for a separate license for this software code, the -# following terms and conditions apply: -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see http://www.gnu.org/licenses. -# -# http://numenta.org/licenses/ -# ---------------------------------------------------------------------- - -import cPickle as pickle -import numpy -import unittest2 as unittest - -from nupic.support.unittesthelpers.algorithm_test_helpers import ( - getNumpyRandomGenerator, convertPermanences ) - -from nupic.bindings.algorithms import FlatSpatialPooler as CPPFlatSpatialPooler -from nupic.bindings.math import GetNTAReal, Random -from nupic.research.flat_spatial_pooler import ( - FlatSpatialPooler as PyFlatSpatialPooler) - -realType = GetNTAReal() -uintType = "uint32" -NUM_RECORDS = 100 - - - -class FlatSpatialPoolerCompatabilityTest(unittest.TestCase): - - def setUp(self): - # Set to 1 for more verbose debugging output - self.verbosity = 0 - - - def assertListAlmostEqual(self, alist, blist): - self.assertEqual(len(alist), len(blist), - "Lists have different length") - for idx, val in enumerate(alist): - self.assertAlmostEqual( - val, - blist[idx], - places = 3, - msg = "Lists different at index %d with values %g and %g" - % (idx, val, blist[idx])) - - - def compare(self, pySp, cppSp): - self.assertAlmostEqual(pySp.getNumColumns(), - cppSp.getNumColumns()) - self.assertAlmostEqual(pySp.getNumInputs(), - cppSp.getNumInputs()) - self.assertAlmostEqual(pySp.getPotentialRadius(), - cppSp.getPotentialRadius()) - self.assertAlmostEqual(pySp.getPotentialPct(), - cppSp.getPotentialPct()) - self.assertAlmostEqual(pySp.getGlobalInhibition(), - cppSp.getGlobalInhibition()) - self.assertAlmostEqual(pySp.getNumActiveColumnsPerInhArea(), - cppSp.getNumActiveColumnsPerInhArea()) - self.assertAlmostEqual(pySp.getLocalAreaDensity(), - cppSp.getLocalAreaDensity()) - self.assertAlmostEqual(pySp.getStimulusThreshold(), - cppSp.getStimulusThreshold()) - self.assertAlmostEqual(pySp.getInhibitionRadius(), - cppSp.getInhibitionRadius()) - self.assertAlmostEqual(pySp.getDutyCyclePeriod(), - cppSp.getDutyCyclePeriod()) - self.assertAlmostEqual(pySp.getMaxBoost(), - cppSp.getMaxBoost()) - - self.assertAlmostEqual(pySp.getIterationNum(), - cppSp.getIterationNum()) - self.assertAlmostEqual(pySp.getIterationLearnNum(), - cppSp.getIterationLearnNum()) - - self.assertAlmostEqual(pySp.getSpVerbosity(), - cppSp.getSpVerbosity()) - self.assertAlmostEqual(pySp.getUpdatePeriod(), - cppSp.getUpdatePeriod()) - self.assertAlmostEqual(pySp.getSynPermTrimThreshold(), - cppSp.getSynPermTrimThreshold()) - self.assertAlmostEqual(pySp.getSynPermActiveInc(), - cppSp.getSynPermActiveInc()) - self.assertAlmostEqual(pySp.getSynPermInactiveDec(), - cppSp.getSynPermInactiveDec()) - self.assertAlmostEqual(pySp.getSynPermBelowStimulusInc(), - cppSp.getSynPermBelowStimulusInc()) - self.assertAlmostEqual(pySp.getSynPermConnected(), - cppSp.getSynPermConnected()) - self.assertAlmostEqual(pySp.getMinPctOverlapDutyCycles(), - cppSp.getMinPctOverlapDutyCycles()) - self.assertAlmostEqual(pySp.getMinPctActiveDutyCycles(), - cppSp.getMinPctActiveDutyCycles()) - - numColumns = pySp.getNumColumns() - numInputs = pySp.getNumInputs() - - pyBoost = numpy.zeros(numColumns).astype(realType) - cppBoost = numpy.zeros(numColumns).astype(realType) - pySp.getBoostFactors(pyBoost) - cppSp.getBoostFactors(cppBoost) - self.assertListAlmostEqual(list(pyBoost), list(cppBoost)) - - self.assertEqual(pySp.getRandomSP(), cppSp.getRandomSP()) - - pyOverlap = numpy.zeros(numColumns).astype(realType) - cppOverlap = numpy.zeros(numColumns).astype(realType) - pySp.getOverlapDutyCycles(pyOverlap) - cppSp.getOverlapDutyCycles(cppOverlap) - self.assertListAlmostEqual(list(pyOverlap), list(cppOverlap)) - - pyActive = numpy.zeros(numColumns).astype(realType) - cppActive = numpy.zeros(numColumns).astype(realType) - pySp.getActiveDutyCycles(pyActive) - cppSp.getActiveDutyCycles(cppActive) - self.assertListAlmostEqual(pyActive, cppActive) - - pyMinOverlap = numpy.zeros(numColumns).astype(realType) - cppMinOverlap = numpy.zeros(numColumns).astype(realType) - pySp.getMinOverlapDutyCycles(pyMinOverlap) - cppSp.getMinOverlapDutyCycles(cppMinOverlap) - self.assertListAlmostEqual(list(pyMinOverlap), list(cppMinOverlap)) - - pyMinActive = numpy.zeros(numColumns).astype(realType) - cppMinActive = numpy.zeros(numColumns).astype(realType) - pySp.getMinActiveDutyCycles(pyMinActive) - cppSp.getMinActiveDutyCycles(cppMinActive) - self.assertListAlmostEqual(list(pyMinActive), list(cppMinActive)) - - for i in xrange(pySp.getNumColumns()): - pyPot = numpy.zeros(numInputs).astype(uintType) - cppPot = numpy.zeros(numInputs).astype(uintType) - pySp.getPotential(i, pyPot) - cppSp.getPotential(i, cppPot) - self.assertListEqual(list(pyPot),list(cppPot)) - - pyPerm = numpy.zeros(numInputs).astype(realType) - cppPerm = numpy.zeros(numInputs).astype(realType) - pySp.getPermanence(i, pyPerm) - cppSp.getPermanence(i, cppPerm) - self.assertListAlmostEqual(list(pyPerm),list(cppPerm)) - - pyCon = numpy.zeros(numInputs).astype(uintType) - cppCon = numpy.zeros(numInputs).astype(uintType) - pySp.getConnectedSynapses(i, pyCon) - cppSp.getConnectedSynapses(i, cppCon) - self.assertListEqual(list(pyCon), list(cppCon)) - - - pyConCounts = numpy.zeros(numColumns).astype(uintType) - cppConCounts = numpy.zeros(numColumns).astype(uintType) - pySp.getConnectedCounts(pyConCounts) - cppSp.getConnectedCounts(cppConCounts) - self.assertListEqual(list(pyConCounts), list(cppConCounts)) - - - def debugPrint(self, sp, name): - """ - Helpful debug print statements while debugging this test. - """ - minDutyCycle = numpy.zeros(sp.getNumColumns(), dtype = GetNTAReal()) - sp.getMinActiveDutyCycles(minDutyCycle) - - activeDutyCycle = numpy.zeros(sp.getNumColumns(), dtype = GetNTAReal()) - sp.getActiveDutyCycles(activeDutyCycle) - - boost = numpy.zeros(sp.getNumColumns(), dtype = GetNTAReal()) - sp.getBoostFactors(boost) - print "====================\n",name - print "Learning iteration:", sp.getIterationNum() - print "Min duty cycles:",minDutyCycle[0] - print "Active duty cycle", activeDutyCycle - print - print "Boost factor for sp:",boost - - - def createSp(self, imp, params): - """ - Create the SP implementation according to the parameters. Validate that - the SP created properly. - """ - if (imp == "py"): - spClass = PyFlatSpatialPooler - elif (imp == "cpp"): - spClass = CPPFlatSpatialPooler - else: - raise RuntimeError("unrecognized implementation") - - sp = spClass( - inputShape=params["inputShape"], - coincidencesShape=params["coincidencesShape"], - localAreaDensity=params["localAreaDensity"], - numActivePerInhArea=params["numActivePerInhArea"], - stimulusThreshold=params["stimulusThreshold"], - synPermInactiveDec=params["synPermInactiveDec"], - synPermActiveInc=params["synPermActiveInc"], - synPermConnected=params["synPermConnected"], - minPctDutyCycleBeforeInh=params["minPctDutyCycleBeforeInh"], - minPctDutyCycleAfterInh=params["minPctDutyCycleAfterInh"], - dutyCyclePeriod=params["dutyCyclePeriod"], - maxFiringBoost=params["maxFiringBoost"], - minDistance=params["minDistance"], - seed=params["seed"], - spVerbosity=params["spVerbosity"], - randomSP=params["randomSP"], - coincInputPoolPct=params.get("coincInputPoolPct",0.5), - ) - - self.assertEqual(params["randomSP"], sp.getRandomSP()) - self.assertEqual(params["spVerbosity"], sp.getSpVerbosity()) - self.assertAlmostEqual(params["minDistance"], sp.getMinDistance()) - self.assertAlmostEqual(params.get("coincInputPoolPct",0.5), - sp.getPotentialPct()) - - return sp - - - def runSideBySide(self, params, seed = None, - learnMode = None, - convertEveryIteration = False, - numRecords = None): - """ - Run the PY and CPP implementations side by side on random inputs. - If seed is None a random seed will be chosen based on time, otherwise - the fixed seed will be used. - - If learnMode is None learning will be randomly turned on and off. - If it is False or True then set it accordingly. - - If convertEveryIteration is True, the CPP will be copied from the PY - instance on every iteration just before each compute. - - If numRecords is None, use the default global value NUM_RECORDS - """ - if numRecords is None: - numRecords = NUM_RECORDS - randomState = getNumpyRandomGenerator(seed) - pySp = self.createSp("py", params) - numColumns = pySp.getNumColumns() - numInputs = pySp.getNumInputs() - cppSp = self.createSp("cpp", params) - self.compare(pySp, cppSp) - threshold = 0.8 - # Create numRecords records, each numInputs long, where each input - # is an unsigned 32 bit integer of either 0 or 1 - inputMatrix = ( - randomState.rand(numRecords,numInputs) > threshold).astype(uintType) - for i,inputVector in enumerate(inputMatrix): - if learnMode is None: - learn = (randomState.rand() > 0.5) - else: - learn = learnMode - if self.verbosity > 1: - print "\nIteration:",i,"learn=",learn - PyActiveArray = numpy.zeros(numColumns).astype(uintType) - CppActiveArray = numpy.zeros(numColumns).astype(uintType) - pySp.compute(inputVector, learn, PyActiveArray) - cppSp.compute(inputVector, learn, CppActiveArray) - self.compare(pySp, cppSp) - self.assertListEqual(list(PyActiveArray), list(CppActiveArray)) - - # The permanence values for the two implementations drift ever so slowly - # over time due to numerical precision issues. This occasionally causes - # different permanences to be connected. By transferring the permanence - # values every so often, we can avoid this drift but still check that - # the logic is applied equally for both implementations. - if convertEveryIteration or ((i+1)%10 == 0): - convertPermanences(pySp, cppSp) - - - - def runSerialize(self, imp, params, - learnMode = None, - seed = None, - numRecords = None): - """ - Create an SP instance. Run it for half the iterations and then pickle it. - Then unpickle it and run for the rest of the iterations. Ensure output - is identical to the unpickled instance. - """ - if numRecords is None: - numRecords = NUM_RECORDS - randomState = getNumpyRandomGenerator(seed) - sp1 = self.createSp(imp, params) - numColumns = sp1.getNumColumns() - numInputs = sp1.getNumInputs() - threshold = 0.8 - inputMatrix = ( - randomState.rand(numRecords,numInputs) > threshold).astype(uintType) - - for i in xrange(numRecords/2): - activeArray = numpy.zeros(numColumns).astype(uintType) - inputVector = inputMatrix[i,:] - if learnMode is None: - learn = (randomState.rand() > 0.5) - else: - learn = learnMode - if self.verbosity > 1: - print "\nIteration:",i,"learn=",learn - sp1.compute(inputVector, learn, activeArray) - - sp2 = pickle.loads(pickle.dumps(sp1)) - self.compare(sp1, sp2) - for i in xrange(numRecords/2+1,numRecords): - activeArray1 = numpy.zeros(numColumns).astype(uintType) - activeArray2 = numpy.zeros(numColumns).astype(uintType) - inputVector = inputMatrix[i,:] - if learnMode is None: - learn = (randomState.rand() > 0.5) - else: - learn = learnMode - if self.verbosity > 1: - print "\nIteration:",i,"learn=",learn - sp1.compute(inputVector, learn, activeArray1) - sp2.compute(inputVector, learn, activeArray2) - self.compare(sp1, sp2) - self.assertListEqual(list(activeArray1), list(activeArray2)) - - - def testCompatability1(self): - params = { - "inputShape" : 20, - "coincidencesShape" : 21, - "localAreaDensity" : 0, - "numActivePerInhArea" : 7, - "stimulusThreshold" : 0, - "synPermInactiveDec" : 0.01, - "synPermActiveInc" : 0.1, - "synPermConnected" : 0.10, - "minPctDutyCycleBeforeInh" : 0.001, - "minPctDutyCycleAfterInh" : 0.001, - "dutyCyclePeriod" : 30, - "maxFiringBoost" : 10.0, - "minDistance" : 0.0, - "seed" : 3, - "spVerbosity" : 0, - "randomSP" : False - } - # We test a few combinations - self.runSideBySide(params, learnMode=False) - self.runSideBySide(params, convertEveryIteration = True) - - - def testCompatability2(self): - params = { - "inputShape" : 15, - "coincidencesShape" : 36, - "localAreaDensity" : 0.2, - "numActivePerInhArea" : 0, - "stimulusThreshold" : 2, - "synPermInactiveDec" : 0.025, - "synPermActiveInc" : 0.2, - "synPermConnected" : 0.13, - "minPctDutyCycleBeforeInh" : 0.031, - "minPctDutyCycleAfterInh" : 0.032, - "dutyCyclePeriod" : 30, - "maxFiringBoost" : 10.0, - "minDistance" : 0.2, - "seed" : 7, - "spVerbosity" : 0, - "randomSP" : False - } - self.runSideBySide(params, convertEveryIteration = True) - - - def testCompatability3(self): - params = { - "inputShape" : 27, - "coincidencesShape" : 63, - "localAreaDensity" : 0.4, - "numActivePerInhArea" : 0, - "stimulusThreshold" : 2, - "synPermInactiveDec" : 0.02, - "synPermActiveInc" : 0.1, - "synPermConnected" : 0.15, - "minPctDutyCycleBeforeInh" : 0.001, - "minPctDutyCycleAfterInh" : 0.002, - "dutyCyclePeriod" : 31, - "maxFiringBoost" : 14.0, - "minDistance" : 0.4, - "seed" : 19, - "spVerbosity" : 0, - "randomSP" : True - } - self.runSideBySide(params) - - - def testCompatability3NoLearn(self): - params = { - "inputShape" : 27, - "coincidencesShape" : 63, - "localAreaDensity" : 0.4, - "numActivePerInhArea" : 0, - "stimulusThreshold" : 2, - "synPermInactiveDec" : 0.02, - "synPermActiveInc" : 0.1, - "synPermConnected" : 0.15, - "minPctDutyCycleBeforeInh" : 0.001, - "minPctDutyCycleAfterInh" : 0.002, - "dutyCyclePeriod" : 31, - "maxFiringBoost" : 14.0, - "minDistance" : 0.4, - "seed" : 19, - "spVerbosity" : self.verbosity, - "randomSP" : True - } - self.runSideBySide(params, learnMode = False) - - - def testNormalParams(self): - """ - Larger parameters more representative of problems such as hotgym - """ - params = { - "inputShape" : 45, - "coincidencesShape" : 2048, - "localAreaDensity" : 0, - "numActivePerInhArea" : 40, - "stimulusThreshold" : 2, - "synPermInactiveDec" : 0.02, - "synPermActiveInc" : 0.1, - "synPermConnected" : 0.15, - "minPctDutyCycleBeforeInh" : 0.001, - "minPctDutyCycleAfterInh" : 0.002, - "dutyCyclePeriod" : 31, - "maxFiringBoost" : 14.0, - "minDistance" : 0.0, - "seed" : 19, - "spVerbosity" : self.verbosity, - "randomSP" : True, - "coincInputPoolPct": 0.5, - } - self.runSideBySide(params, learnMode = False, numRecords = 5) - - - def testSmallerPoolPct(self): - params = { - "inputShape" : 78, - "coincidencesShape" : 63, - "localAreaDensity" : 0.0, - "numActivePerInhArea" : 10, - "stimulusThreshold" : 2, - "synPermInactiveDec" : 0.02, - "synPermActiveInc" : 0.1, - "synPermConnected" : 0.15, - "minPctDutyCycleBeforeInh" : 0.001, - "minPctDutyCycleAfterInh" : 0.002, - "dutyCyclePeriod" : 31, - "maxFiringBoost" : 14.0, - "minDistance" : 0.4, - "seed" : 19, - "spVerbosity" : self.verbosity, - "randomSP" : True, - "coincInputPoolPct": 0.3, - } - self.runSideBySide(params, learnMode = False) - self.runSideBySide(params, convertEveryIteration = True) - - - def testSerialization(self): - params = { - 'inputShape' : 27, - 'coincidencesShape' : 63, - 'localAreaDensity' : 0.4, - 'numActivePerInhArea' : 0, - 'stimulusThreshold' : 2, - 'synPermInactiveDec' : 0.02, - 'synPermActiveInc' : 0.1, - 'synPermConnected' : 0.15, - 'minPctDutyCycleBeforeInh' : 0.001, - 'minPctDutyCycleAfterInh' : 0.002, - 'dutyCyclePeriod' : 31, - 'maxFiringBoost' : 14.0, - 'minDistance' : 0.4, - 'seed' : 19, - 'spVerbosity' : 0, - 'randomSP' : True - } - sppy1 = self.createSp("py", params) - sppy2 = pickle.loads(pickle.dumps(sppy1)) - self.compare(sppy1, sppy2) - - spcpp1 = self.createSp("cpp", params) - spcpp2 = pickle.loads(pickle.dumps(spcpp1)) - self.compare(spcpp1, spcpp2) - - # Now compare the original PY instance with the unpickled CPP instance - self.compare(sppy1, spcpp2) - - - def testSerializationRun(self): - params = { - 'inputShape' : 27, - 'coincidencesShape' : 63, - 'localAreaDensity' : 0.4, - 'numActivePerInhArea' : 0, - 'stimulusThreshold' : 2, - 'synPermInactiveDec' : 0.02, - 'synPermActiveInc' : 0.1, - 'synPermConnected' : 0.15, - 'minPctDutyCycleBeforeInh' : 0.001, - 'minPctDutyCycleAfterInh' : 0.002, - 'dutyCyclePeriod' : 31, - 'maxFiringBoost' : 14.0, - 'minDistance' : 0.4, - 'seed' : 19, - 'spVerbosity' : 0, - 'randomSP' : False - } - self.runSerialize("py", params) - self.runSerialize("cpp", params) - params['randomSP'] = True - self.runSerialize("cpp", params) - self.runSerialize("py", params) - - - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/nupic/research/flat_spatial_pooler_cpp_api_test.py b/tests/unit/nupic/research/flat_spatial_pooler_cpp_api_test.py deleted file mode 100644 index e456e6f280..0000000000 --- a/tests/unit/nupic/research/flat_spatial_pooler_cpp_api_test.py +++ /dev/null @@ -1,34 +0,0 @@ -#! /usr/bin/env python -# ---------------------------------------------------------------------- -# Numenta Platform for Intelligent Computing (NuPIC) -# Copyright (C) 2013, Numenta, Inc. Unless you have an agreement -# with Numenta, Inc., for a separate license for this software code, the -# following terms and conditions apply: -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see http://www.gnu.org/licenses. -# -# http://numenta.org/licenses/ -# ---------------------------------------------------------------------- - - -import unittest2 as unittest - -from nupic.bindings.algorithms import FlatSpatialPooler as CPPFlatSpatialPooler - -import flat_spatial_pooler_py_api_test - -flat_spatial_pooler_py_api_test.SpatialPooler = CPPFlatSpatialPooler -FlatSpatialPoolerCPPAPITest = flat_spatial_pooler_py_api_test.FlatSpatialPoolerAPITest - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/nupic/research/flat_spatial_pooler_py_api_test.py b/tests/unit/nupic/research/flat_spatial_pooler_py_api_test.py deleted file mode 100644 index 4beb8d64d4..0000000000 --- a/tests/unit/nupic/research/flat_spatial_pooler_py_api_test.py +++ /dev/null @@ -1,69 +0,0 @@ -#! /usr/bin/env python -# ---------------------------------------------------------------------- -# Numenta Platform for Intelligent Computing (NuPIC) -# Copyright (C) 2013, Numenta, Inc. Unless you have an agreement -# with Numenta, Inc., for a separate license for this software code, the -# following terms and conditions apply: -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see http://www.gnu.org/licenses. -# -# http://numenta.org/licenses/ -# ---------------------------------------------------------------------- - - -import unittest2 as unittest - -from nupic.bindings.algorithms import FlatSpatialPooler as FlatSpatialPooler - -import spatial_pooler_py_api_test - -spatial_pooler_py_api_test.SpatialPooler = FlatSpatialPooler -FlatSpatialPoolerBaseAPITest = spatial_pooler_py_api_test.SpatialPoolerAPITest - - -def flatSetUp(self): - self.sp = FlatSpatialPooler(inputShape=[5], coincidencesShape=[5]) - - -spatial_pooler_py_api_test.SpatialPoolerAPITest.setUp = flatSetUp - - - -class FlatSpatialPoolerAPITest(unittest.TestCase): - - - def testGetMinDistance(self): - sp = FlatSpatialPooler() - inParam = 0.2 - sp.setMinDistance(inParam) - outParam = sp.getMinDistance() - self.assertAlmostEqual(inParam, outParam) - - - def testGetRandomSP(self): - sp = FlatSpatialPooler() - inParam = True - sp.setRandomSP(inParam) - outParam = sp.getRandomSP() - self.assertEqual(inParam, outParam) - - - inParam = False - sp.setRandomSP(inParam) - outParam = sp.getRandomSP() - self.assertEqual(inParam, outParam) - - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/nupic/research/flat_spatial_pooler_unit_test.py b/tests/unit/nupic/research/flat_spatial_pooler_unit_test.py deleted file mode 100755 index 7154cb661e..0000000000 --- a/tests/unit/nupic/research/flat_spatial_pooler_unit_test.py +++ /dev/null @@ -1,157 +0,0 @@ -#! /usr/bin/env python -# ---------------------------------------------------------------------- -# Numenta Platform for Intelligent Computing (NuPIC) -# Copyright (C) 2013, Numenta, Inc. Unless you have an agreement -# with Numenta, Inc., for a separate license for this software code, the -# following terms and conditions apply: -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see http://www.gnu.org/licenses. -# -# http://numenta.org/licenses/ -# ---------------------------------------------------------------------- - -from mock import Mock, patch, ANY, call -import numpy -import unittest2 as unittest -from copy import copy - -from nupic.bindings.math import (count_gte, - GetNTAReal, - SM_01_32_32 as SparseBinaryMatrix, - SM32 as SparseMatrix) -from nupic.research.flat_spatial_pooler import FlatSpatialPooler - -# Globals -realType = GetNTAReal() -uintType = "uint32" - -class FlatSpatialPoolerTest(unittest.TestCase): - - - def setUp(self): - self._sp = FlatSpatialPooler( - inputShape=(5, 1), - coincidencesShape=(10, 1)) - - - def testSelectVirginColumns(self): - sp = self._sp - sp._numColumns = 6 - sp._activeDutyCycles = numpy.zeros(sp._numColumns) - virgins = list(sp._selectVirginColumns()) - trueVirgins = range(sp._numColumns) - self.assertListEqual(trueVirgins,virgins) - - sp._activeDutyCycles = numpy.zeros(sp._numColumns) - sp._activeDutyCycles[[3,4]] = 0.2 - virgins = list(sp._selectVirginColumns()) - trueVirgins = [0,1,2,5] - self.assertListEqual(trueVirgins,virgins) - - - def testSelectHighTierColumns(self): - sp = self._sp - sp._numColumns = 7 - sp._minDistance = 0.0 - sp._overlapsPct = numpy.array([1.0, 0.7, 0.8, 0.1, 1.0, 0.3, 0.1]) - vipColumns = sp._selectHighTierColumns(sp._overlapsPct) - trueVIPColumns = [0, 4] - self.assertListEqual(trueVIPColumns, list(vipColumns)) - - sp._numColumns = 7 - sp._minDistance = 0.1 - sp._overlapsPct = numpy.array([0.0, 0.9, 0.85, 0.91, 1.0, 0.3, 0.89]) - vipColumns = sp._selectHighTierColumns(sp._overlapsPct) - trueVIPColumns = [1, 3, 4] - self.assertListEqual(trueVIPColumns, list(vipColumns)) - - sp._numColumns = 7 - sp._minDistance = 0.15 - sp._overlapsPct = numpy.array([0.0, 0.9, 0.85, 0.91, 1.0, 0.3, 0.89]) - vipColumns = sp._selectHighTierColumns(sp._overlapsPct) - trueVIPColumns = [1, 2, 3, 4, 6] - self.assertListEqual(trueVIPColumns, list(vipColumns)) - - sp._numColumns = 7 - sp._minDistance = 1.0 - sp._overlapsPct = numpy.array([0.0, 0.9, 0.85, 0.91, 1.0, 0.3, 0.89]) - vipColumns = sp._selectHighTierColumns(sp._overlapsPct) - trueVIPColumns = range(7) - self.assertListEqual(trueVIPColumns, list(vipColumns)) - - sp._numColumns = 7 - sp._minDistance = 0.99 - sp._overlapsPct = numpy.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) - vipColumns = sp._selectHighTierColumns(sp._overlapsPct) - trueVIPColumns = [] - self.assertListEqual(trueVIPColumns, list(vipColumns)) - - def testRandomSPDoesNotLearn(self): - - sp = FlatSpatialPooler(inputShape=5, - coincidencesShape=10, - randomSP=True) - inputArray = (numpy.random.rand(5) > 0.5).astype(uintType) - activeArray = numpy.zeros(sp._numColumns).astype(realType) - # Should start off at 0 - self.assertEqual(sp._iterationNum, 0) - self.assertEqual(sp._iterationLearnNum, 0) - - # Store the initialized state - initialPerms = copy(sp._permanences) - - sp.compute(inputArray, False, activeArray) - # Should have incremented general counter but not learning counter - self.assertEqual(sp._iterationNum, 1) - self.assertEqual(sp._iterationLearnNum, 0) - - # Should not learn even if learning set to True - sp.compute(inputArray, True, activeArray) - self.assertEqual(sp._iterationNum, 2) - self.assertEqual(sp._iterationLearnNum, 0) - - # Check the initial perm state was not modified either - self.assertEqual(sp._permanences, initialPerms) - - def testActiveColumnsEqualNumActive(self): - ''' - After feeding in a record the number of active columns should - always be equal to numActivePerInhArea - ''' - - for i in [1, 10, 50]: - numActive = i - inputShape = 10 - sp = FlatSpatialPooler(inputShape=inputShape, - coincidencesShape=100, - numActivePerInhArea=numActive) - inputArray = (numpy.random.rand(inputShape) > 0.5).astype(uintType) - inputArray2 = (numpy.random.rand(inputShape) > 0.8).astype(uintType) - activeArray = numpy.zeros(sp._numColumns).astype(realType) - - # Random SP - sp._randomSP = True - sp.compute(inputArray, False, activeArray) - sp.compute(inputArray2, False, activeArray) - self.assertEqual(sum(activeArray), numActive) - - # Default, learning on - sp._randomSP = False - sp.compute(inputArray, True, activeArray) - sp.compute(inputArray2, True, activeArray) - self.assertEqual(sum(activeArray), numActive) - - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/nupic/research/spatial_pooler_unit_test.py b/tests/unit/nupic/research/spatial_pooler_unit_test.py index 33f2c9b423..ec800a801e 100755 --- a/tests/unit/nupic/research/spatial_pooler_unit_test.py +++ b/tests/unit/nupic/research/spatial_pooler_unit_test.py @@ -26,6 +26,7 @@ import numbers import tempfile import unittest +from copy import copy import capnp from mock import Mock @@ -40,6 +41,7 @@ from nupic.bindings.proto import SpatialPoolerProto_capnp from nupic.research.spatial_pooler import SpatialPooler +uintDType = "uint32" realDType = GetNTAReal() class SpatialPoolerTest(unittest.TestCase): @@ -1789,6 +1791,56 @@ def testWrite(self): self.assertSetEqual(indices1, indices2) + def testRandomSPDoesNotLearn(self): + + sp = SpatialPooler(inputDimensions=[5], + columnDimensions=[10]) + inputArray = (numpy.random.rand(5) > 0.5).astype(uintDType) + activeArray = numpy.zeros(sp._numColumns).astype(realDType) + # Should start off at 0 + self.assertEqual(sp._iterationNum, 0) + self.assertEqual(sp._iterationLearnNum, 0) + + # Store the initialized state + initialPerms = copy(sp._permanences) + + sp.compute(inputArray, False, activeArray) + # Should have incremented general counter but not learning counter + self.assertEqual(sp._iterationNum, 1) + self.assertEqual(sp._iterationLearnNum, 0) + + # Check the initial perm state was not modified either + self.assertEqual(sp._permanences, initialPerms) + + + @unittest.skip("Ported from the removed FlatSpatialPooler but fails. \ + See: https://github.com/numenta/nupic/issues/1897") + def testActiveColumnsEqualNumActive(self): + ''' + After feeding in a record the number of active columns should + always be equal to numActivePerInhArea + ''' + + for i in [1, 10, 50]: + numActive = i + inputShape = 10 + sp = SpatialPooler(inputDimensions=[inputShape], + columnDimensions=[100], + numActiveColumnsPerInhArea=numActive) + inputArray = (numpy.random.rand(inputShape) > 0.5).astype(uintDType) + inputArray2 = (numpy.random.rand(inputShape) > 0.8).astype(uintDType) + activeArray = numpy.zeros(sp._numColumns).astype(realDType) + + # Default, learning on + sp.compute(inputArray, True, activeArray) + sp.compute(inputArray2, True, activeArray) + self.assertEqual(sum(activeArray), numActive) + + # learning OFF + sp.compute(inputArray, False, activeArray) + sp.compute(inputArray2, False, activeArray) + self.assertEqual(sum(activeArray), numActive) + if __name__ == "__main__": unittest.main()