From db779e93ecd0f500d4ab7c785a1b1a4d2b9eaecb Mon Sep 17 00:00:00 2001 From: lambday Date: Fri, 25 Mar 2016 00:00:23 +0530 Subject: [PATCH] shogunized data manager src --- .../internals/DataManager.cpp | 231 +++++++++++------- .../internals/DataManager.h | 192 +++++++++++++-- 2 files changed, 307 insertions(+), 116 deletions(-) diff --git a/src/shogun/statistical_testing/internals/DataManager.cpp b/src/shogun/statistical_testing/internals/DataManager.cpp index 6bde98471b8..c1e1dbe7333 100644 --- a/src/shogun/statistical_testing/internals/DataManager.cpp +++ b/src/shogun/statistical_testing/internals/DataManager.cpp @@ -1,33 +1,46 @@ /* - * Restructuring Shogun's statistical hypothesis testing framework. - * Copyright (C) 2016 Soumyajit De + * Copyright (c) The Shogun Machine Learning Toolbox + * Written (w) 2014 - 2016 Soumyajit De + * All rights reserved. * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those + * of the authors and should not be interpreted as representing official policies, + * either expressed or implied, of the Shogun Development Team. */ -#include // TODO remove +#include +#include #include #include #include #include -#include using namespace shogun; using namespace internal; -DataManager::DataManager(index_t num_distributions) +DataManager::DataManager(size_t num_distributions) { + SG_SDEBUG("Data manager instance initialized with %d data sources!\n", num_distributions); fetchers.resize(num_distributions); std::fill(fetchers.begin(), fetchers.end(), nullptr); } @@ -38,170 +51,198 @@ DataManager::~DataManager() index_t DataManager::get_num_samples() const { - index_t n = 0; - using fetcher_type = const std::unique_ptr; - if (std::any_of(fetchers.begin(), fetchers.end(), [](fetcher_type& f) { return f->m_num_samples == 0; })) - { - std::cout << "number of samples from all the distributions are not set" << std::endl; - } + SG_SDEBUG("Entering!\n"); + index_t n=0; + using fetcher_type=const std::unique_ptr; + if (std::any_of(fetchers.begin(), fetchers.end(), [](fetcher_type& f) { return f->m_num_samples==0; })) + SG_SERROR("number of samples from all the distributions are not set!") else - { - std::for_each(fetchers.begin(), fetchers.end(), [&n](fetcher_type& f) { n+= f->m_num_samples; }); - } + std::for_each(fetchers.begin(), fetchers.end(), [&n](fetcher_type& f) { n+=f->m_num_samples; }); + SG_SDEBUG("Leaving!\n"); return n; } index_t DataManager::get_min_blocksize() const { - index_t min_blocksize = 0; - using fetcher_type = const std::unique_ptr; - if (std::any_of(fetchers.begin(), fetchers.end(), [](fetcher_type& f) { return f->m_num_samples == 0; })) - { - std::cout << "number of samples from all the distributions are not set" << std::endl; - } + SG_SDEBUG("Entering!\n"); + index_t min_blocksize=0; + using fetcher_type=const std::unique_ptr; + if (std::any_of(fetchers.begin(), fetchers.end(), [](fetcher_type& f) { return f->m_num_samples==0; })) + SG_SERROR("number of samples from all the distributions are not set!") else { - index_t divisor = 0; - std::function gcd = [&gcd](index_t m, index_t n) + index_t divisor=0; + std::function gcd=[&gcd](index_t m, index_t n) { - return n == 0 ? m : gcd(n, m % n); + return n==0?m:gcd(n, m%n); }; - for (auto i = 0; i < fetchers.size(); ++i) - { - divisor = gcd(divisor, fetchers[i]->m_num_samples); - } - min_blocksize = get_num_samples() / divisor; + for (size_t i=0; im_num_samples); + min_blocksize=get_num_samples()/divisor; } - std::cout << "min blocksize is " << min_blocksize << std::endl; + SG_SDEBUG("min blocksize is %d!", min_blocksize); + SG_SDEBUG("Leaving!\n"); return min_blocksize; } void DataManager::set_blocksize(index_t blocksize) { - auto n = get_num_samples(); - - ASSERT(n > 0); - ASSERT(blocksize > 0 && blocksize <= n); - ASSERT(n % blocksize == 0); - - for (auto i = 0; i < fetchers.size(); ++i) + SG_SDEBUG("Entering!\n"); + auto n=get_num_samples(); + + REQUIRE(n>0, + "Total number of samples is 0! Please set the number of samples!\n"); + REQUIRE(blocksize>0 && blocksize<=n, + "The blocksize has to be within [0, %d], given = %d!\n", + n, blocksize); + REQUIRE(n%blocksize==0, + "Total number of samples (%d) has to be divisble by the blocksize (%d)!\n", + n, blocksize); + + for (size_t i=0; im_num_samples; - ASSERT((blocksize * m) % n == 0); - fetchers[i]->fetch_blockwise().with_blocksize(blocksize * m / n); - std::cout << "block[" << i << "].size = " << blocksize * m / n << std::endl; + index_t m=fetchers[i]->m_num_samples; + REQUIRE((blocksize*m)%n==0, + "Blocksize (%d) cannot be even distributed with a ratio of %f!\n", + blocksize, m/n); + fetchers[i]->fetch_blockwise().with_blocksize(blocksize*m/n); + SG_SDEBUG("block[%d].size = ", i, blocksize*m/n); } + SG_SDEBUG("Leaving!\n"); } void DataManager::set_num_blocks_per_burst(index_t num_blocks_per_burst) { - ASSERT(num_blocks_per_burst > 0); + SG_SDEBUG("Entering!\n"); + REQUIRE(num_blocks_per_burst>0, + "Number of blocks per burst (%d) has to be greater than 0!\n", + num_blocks_per_burst); - index_t blocksize = 0; - using fetcher_type = std::unique_ptr; + index_t blocksize=0; + using fetcher_type=std::unique_ptr; std::for_each(fetchers.begin(), fetchers.end(), [&blocksize](fetcher_type& f) { - blocksize += f->m_block_details.m_blocksize; + blocksize+=f->m_block_details.m_blocksize; }); - ASSERT(blocksize > 0); + REQUIRE(blocksize>0, + "Blocksizes are not set!\n"); - index_t max_num_blocks_per_burst = get_num_samples() / blocksize; - ASSERT(num_blocks_per_burst <= max_num_blocks_per_burst); + index_t max_num_blocks_per_burst=get_num_samples()/blocksize; + REQUIRE(num_blocks_per_burst<=max_num_blocks_per_burst, + "There can only be %d blocks per burst given the blocksize (%d)!", + max_num_blocks_per_burst, blocksize); - for (auto i = 0; i < fetchers.size(); ++i) - { + for (size_t i=0; ifetch_blockwise().with_num_blocks_per_burst(num_blocks_per_burst); - } + SG_SDEBUG("Leaving!\n"); } -InitPerFeature DataManager::samples_at(index_t i) +InitPerFeature DataManager::samples_at(size_t i) { - std::cout << "DataManager::samples_at()" << std::endl; - ASSERT(i < fetchers.size()); + SG_SDEBUG("Entering!\n"); + REQUIRE(im_samples.get(); } -index_t& DataManager::num_samples_at(index_t i) +index_t& DataManager::num_samples_at(size_t i) { - std::cout << "DataManager::num_samples_at()" << std::endl; - ASSERT(i < fetchers.size()); + SG_SDEBUG("Entering!\n"); + REQUIRE(im_num_samples; } -const index_t DataManager::num_samples_at(index_t i) const +const index_t DataManager::num_samples_at(size_t i) const { - std::cout << "DataManager::num_samples_at() const" << std::endl; - ASSERT(i < fetchers.size()); + SG_SDEBUG("Entering!\n"); + REQUIRE(im_num_samples; } -const index_t DataManager::blocksize_at(index_t i) const +const index_t DataManager::blocksize_at(size_t i) const { - std::cout << "DataManager::blocksize_at() const" << std::endl; - ASSERT(i < fetchers.size()); + SG_SDEBUG("Entering!\n"); + REQUIRE(im_block_details.m_blocksize; } void DataManager::start() { - using fetcher_type = std::unique_ptr; + SG_SDEBUG("Entering!\n"); + using fetcher_type=std::unique_ptr; std::for_each(fetchers.begin(), fetchers.end(), [](fetcher_type& f) { f->start(); }); + SG_SDEBUG("Leaving!\n"); } NextSamples DataManager::next() { - std::cout << "DataManager::next()" << std::endl; + SG_SDEBUG("Entering!\n"); NextSamples next_samples(fetchers.size()); // fetch a number of blocks (per burst) from each distribution - for (auto i = 0; i < fetchers.size(); ++i) + for (size_t i=0; inext(); - if (feats != nullptr) + auto feats=fetchers[i]->next(); + if (feats!=nullptr) { - auto blocksize = fetchers[i]->m_block_details.m_blocksize; - auto num_blocks_curr_burst = feats->get_num_vectors() / blocksize; - if (next_samples.m_num_blocks == 0) - { - next_samples.m_num_blocks = num_blocks_curr_burst; - } + auto blocksize=fetchers[i]->m_block_details.m_blocksize; + auto num_blocks_curr_burst=feats->get_num_vectors()/blocksize; + if (next_samples.m_num_blocks==0) + next_samples.m_num_blocks=num_blocks_curr_burst; else - { - ASSERT(next_samples.m_num_blocks == num_blocks_curr_burst); - } + ASSERT(next_samples.m_num_blocks==num_blocks_curr_burst); // next samples are gonna hold one feats obj per block for this burst next_samples[i].resize(num_blocks_curr_burst); SGVector inds(blocksize); std::iota(inds.vector, inds.vector + inds.vlen, 0); - for (auto j = 0; j < num_blocks_curr_burst; ++j) + for (auto j=0; jadd_subset(inds); - auto block = static_cast(feats->clone()); - next_samples[i][j] = std::shared_ptr(block, [](CFeatures* ptr) { SG_UNREF(ptr); }); + auto block=static_cast(feats->clone()); + next_samples[i][j]=std::shared_ptr(block, [](CFeatures* ptr) { SG_UNREF(ptr); }); feats->remove_subset(); - std::for_each(inds.vector, inds.vector + inds.vlen, [&blocksize](index_t& val) { val += blocksize; }); + std::for_each(inds.vector, inds.vector+inds.vlen, [&blocksize](index_t& val) { val+=blocksize; }); } } } + SG_SDEBUG("Leaving!\n"); return next_samples; } void DataManager::end() { - using fetcher_type = std::unique_ptr; + SG_SDEBUG("Entering!\n"); + using fetcher_type=std::unique_ptr; std::for_each(fetchers.begin(), fetchers.end(), [](fetcher_type& f) { f->end(); }); + SG_SDEBUG("Leaving!\n"); } void DataManager::reset() { - using fetcher_type = std::unique_ptr; + SG_SDEBUG("Entering!\n"); + using fetcher_type=std::unique_ptr; std::for_each(fetchers.begin(), fetchers.end(), [](fetcher_type& f) { f->reset(); }); + SG_SDEBUG("Leaving!\n"); } diff --git a/src/shogun/statistical_testing/internals/DataManager.h b/src/shogun/statistical_testing/internals/DataManager.h index 3bc1eea9750..22936e6ddd5 100644 --- a/src/shogun/statistical_testing/internals/DataManager.h +++ b/src/shogun/statistical_testing/internals/DataManager.h @@ -1,19 +1,31 @@ /* - * Restructuring Shogun's statistical hypothesis testing framework. - * Copyright (C) 2016 Soumyajit De + * Copyright (c) The Shogun Machine Learning Toolbox + * Written (w) 2014 - 2016 Soumyajit De + * All rights reserved. * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those + * of the authors and should not be interpreted as representing official policies, + * either expressed or implied, of the Shogun Development Team. */ #ifndef DATA_MANAGER_H__ @@ -35,33 +47,171 @@ namespace internal class DataFetcher; class NextSamples; +/** + * @brief Class DataManager for fetching/streaming test data block-wise. + * It can handle data coming from multiple sources. The number of data + * sources is represented by the num_distributions parameter in the constructor + * of the data manager. It can handle heterogenous data sources, and it can + * stream multiple blocks per burst, as the computation would require. The size + * of the blocks and the number of blocks to be fetched per burst can be set + * externally. + * + * This class is designed to be used on a stack. An instance of DataManager + * should not be serialzied or copied or moved around. In Shogun, it is helpful + * when used inside just the implementation inside a PIMPL. + */ class DataManager { public: - DataManager(index_t num_distributions); + /** + * Default constructor. + * + * @param num_distributions number of data sources (i.e. CFeature objects) + */ + DataManager(size_t num_distributions); + + /** + * Disabled copy constructor + * @param other other instance + */ DataManager(const DataManager& other) = delete; + + /** + * Disabled assignment operator + * @param other other instance + */ DataManager& operator=(const DataManager& other) = delete; + + /** + * Destructor + */ ~DataManager(); + /** + * Sets the blocksize for block-wise data fetching. It divides the block-size + * per data source according to the total number of feature vectors available + * from that source. More formally, if there are \f$K\f$ data sources, \f$X_k\f$, + * \f$k=\[1,K]\f$, with number of feature vectors \f$n_{X_k}\f$ from each, then + * setting a block-size of \f$B\f$ would mean that in each next() call of the + * data manager instance, it will fetch \f$rho_{X_k} B\f$ samples from each + * \f$X_k\f$, where \f$rho_{X_k}=n_{X_k}/n\f$, \f$n=sum_k n_{X_k}\f$. + * + * @param blocksize The size of the block consisting of data from all the sources. + */ void set_blocksize(index_t blocksize); - void set_num_blocks_per_burst(index_t num_blocks_per_burst); - - InitPerFeature samples_at(index_t i); - CFeatures* samples_at(index_t i) const; - index_t& num_samples_at(index_t i); - const index_t num_samples_at(index_t i) const; - - const index_t blocksize_at(index_t i) const; + /** + * In order to speed up the computation, usually a number of blocks are fetched at + * once per next() call. This method sets that number. + * + * @param num_blocks_per_burst The number of blocks to be fetched in a burst. + */ + void set_num_blocks_per_burst(index_t num_blocks_per_burst); + /** + * Setter for feature object as a data source. Since multiple data sources are + * supported, this method takes an index in which the feature object is set. + * Internally, it initializes a data fetcher object for the provided feature + * object. + * + * Example usage: + * @code + * + * DataManager data_mgr; + * // feats_0 = some CFeatures instance + * // feats_1 = some CFeatures instance + * data_mgr.sample_at(0) = feats_0; + * data_mgr.sample_at(1) = feats_1; + * + * @endcode + * + * @param i The data source index, at which the feature object is to be set as a + * data source. + * @return An initializer for the specified data source (that sets up a fetcher + * for this feature), to be used as lvalue. + */ + InitPerFeature samples_at(size_t i); + + /** + * Getter for feature object at a give data source index. + * + * @param i The data source index, from which the feature object is to be obtained + * @return The underlying CFeatures object at the specified data source. + */ + CFeatures* samples_at(size_t i) const; + + /** + * Setter for the number of samples. Setting this number is mandatory for + * streaming features. For other type of feature objects, this number equals + * the number of vectors, and is set internally. + * + * Example usage: + * @code + * + * DataManager data_mgr; + * data_mgr.num_sample_at(0) = 10; + * data_mgr.num_sample_at(1) = 15; + * + * @endcode + * + * @param i The data source index, at which the number of samples is to be set. + * @return A reference for the number of samples for the specified data source + * to be used as lvalue. + */ + index_t& num_samples_at(size_t i); + + /** + * Getter for the number of samples. + * + * @param i The data source index, from which the number of samples is to be obtained. + * @return The number of samples for the specified data source. + */ + const index_t num_samples_at(size_t i) const; + + /** + * Getter for the number of samples from a specified data source in a block. + * + * @param i The data source index. + * @return The number of samples from i-th data source in a block. + */ + const index_t blocksize_at(size_t i) const; + + /** + * @return Total number of samples that can be fetched from all the data sources. + */ index_t get_num_samples() const; + + /** + * @return The minimum block-size that can be fetched from the specified data sources. + * For example, if there are two data sources, with samples 20 and 30, respectively, + * then minimum blocksize can be 5 (2 from 1st data source, 3 from the 2nd), and there + * can be then 10 such blocks. + */ index_t get_min_blocksize() const; + /** + * Call this method before fetching the data from the data manager + */ void start(); + + /** + * @return The next bunch of blocks fetched at any given burst. + */ NextSamples next(); + + /** + * call this method after fetching the data is done. + */ void end(); + + /** + * Resets the fetchers to the initial states. + */ void reset(); private: + /** + * The internal data fetcher instances. + */ std::vector> fetchers; };