Making of a toy-dataset. More interesting notes are further below.

---

Picking up the last *n1* feature columns without modification and saving the resulting data in a new file. The single decision column (*y*s), which is second to last for some reason is saved in another file as well. 

In [37]:
n1 = 60

from pandas import read_csv

madelon = read_csv("madelon.csv", header=None)
madelon.iloc[:,-(n1 + 2):-2].to_csv('madelon_X_tiny.csv', header=False, index=False)
madelon.iloc[:,-2].to_csv('madelon_y.csv', header=False, index=False)

In [12]:
# read_csv("madelon_y.csv", header=None)[:10]

Similarly picking up preceeding *n2* columns, discretising with various values of the *divisions* parameter, and saving to another file.

In [1]:
%%writefile wrap_discretize.cpp
/*
<%
cfg['compiler_args'] = ['-std=c++11', '-fopenmp']
cfg['linker_args'] = ['-fopenmp']
setup_pybind11(cfg)
%>
*/

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <random>
#include <vector>
#include <pybind11/pybind11.h>
//#include <pybind11/stl.h>
#include <pybind11/numpy.h>

namespace py = pybind11;
using namespace pybind11::literals;

void discretize(
    uint32_t seed,
    uint32_t discretization_index,
    uint32_t feature_id,
    std::size_t divisions,
    std::size_t object_count,
    py::array_t<double> py_in_data,
    py::array_t<double> py_sorted_in_data,
    py::array_t<uint8_t> py_out_data,
    double range_
) {
    
    // the python part
    py::buffer_info py_in_data_buf = py_in_data.request();
    auto *in_data = static_cast<const double *>(py_in_data_buf.ptr);    
    
    py::buffer_info py_sorted_in_data_buf = py_sorted_in_data.request();
    auto *sorted_in_data = static_cast<const double *>(py_sorted_in_data_buf.ptr);
    
    py::buffer_info py_out_data_buf = py_out_data.request();
    auto *out_data = static_cast<uint8_t *>(py_out_data_buf.ptr);
    
    // end of the python part
    
    double* thresholds = new double[divisions];

    // brackets to limit scope
    {
        double sum = 0.0f;
        // brackets to limit scope
        {
            std::mt19937 seed_random_generator0(seed);
            std::mt19937 seed_random_generator1(seed_random_generator0() ^ discretization_index);
            std::mt19937 random_generator(seed_random_generator1() ^ feature_id);

            // E(X) = (a + b) / 2 = (1 - range + 1 + range) / 2 = 1
            std::uniform_real_distribution<double> uniform_range(1.0f - range_, 1.0f + range_);

            for (std::size_t d = 0; d < divisions; ++d) {
                thresholds[d] = uniform_range(random_generator);
                sum += thresholds[d];
            }

            sum += uniform_range(random_generator);
        }

        std::size_t done = 0;
        const double length_step = static_cast<double>(object_count) / sum;

        // thresholds are converted from an arbitrary space into real values (via indices)
        // d - iterates over divisions (of a variable in a discretization)
        for (std::size_t d = 0; d < divisions; ++d) {
            done += std::lround(thresholds[d] * length_step);

            // Note: Check when will this happen, maybe could be skipped
            if (done >= object_count) {
                done = object_count - 1;
            }

            thresholds[d] = sorted_in_data[done];
        }
    }

    // o - iterates over objects
    for (std::size_t o = 0; o < object_count; ++o) {
        out_data[o] = 0;

        // out_data[o] (starting with 0) is incremented every time in_data[o] is above a threshold
        // divisions is a small number (<=15), no reason to use binsearch, hence linear
        // d - iterates over divisions (per object o)
        for (std::size_t d = 0; d < divisions; ++d) {
            out_data[o] += in_data[o] > thresholds[d];
        }
    }

    delete[] thresholds;
}


PYBIND11_MODULE(wrap_discretize, module) {
    module.doc() = "Wrapper of https://bitbucket.org/mdfs/mdfs/src/master/src/cpu/discretize.cpp";
    module.def("discretize", &discretize,
               "In place discretization.",
               "seed"_a,
               "discretization_index"_a,
               "feature_id"_a,
               "divisions"_a,
               "object_count"_a,
               "py_in_data"_a,
               "py_sorted_in_data"_a,
               "py_out_data"_a,
               "range_"_a
              );
}

Writing wrap_discretize.cpp


In [2]:
import cppimport
#cppimport.set_quiet(False)

cppimport.imp("wrap_discretize")

<module 'wrap_discretize' from '/home/olszewskip/Desktop/git-repos/MDFS_playground/python/scheduler/ver1/wrap_discretize.cpython-35m-x86_64-linux-gnu.so'>

In [3]:
import wrap_discretize
import numpy as np
from pandas import read_csv
import pandas as pd

In [38]:
n2 = 10
divisions_list = [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
assert n2 == len(divisions_list)

data_df = read_csv("madelon.csv", header=None).iloc[:,-n2-n1-2:-n1-2]
in_data = np.ascontiguousarray(np.array(data_df.values.T, dtype='float64'))
out_data = np.empty_like(in_data, dtype='uint8')

for col_idx in range(n2):
    wrap_discretize.discretize(
        seed=123,
        discretization_index = 0,
        feature_id = 0,
        divisions = divisions_list[col_idx],
        object_count = in_data.shape[1],
        py_in_data = in_data[col_idx],
        py_sorted_in_data = np.sort(in_data),
        py_out_data = out_data[col_idx],
        range_ = 0        
    )

pd.DataFrame(out_data.T.astype('int')).to_csv("madelon_X_tiny_discrete.csv", header=None, index=None)

Input data

In [39]:
big_data = read_csv("madelon_X_tiny.csv", header=None)
small_data = read_csv("madelon_X_tiny_discrete.csv", header=None)
labels = read_csv("madelon_y.csv", header=None)
big_data.shape, small_data.shape, labels.shape 

((2000, 60), (2000, 10), (2000, 1))

In [40]:
big_data[:3]

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,50,51,52,53,54,55,56,57,58,59
0,541,487,568,492,467,479,483,479,546,476,...,477,481,477,485,511,485,481,479,475,496
1,494,496,463,460,472,478,457,487,420,463,...,463,478,487,338,513,486,483,492,510,517
2,469,480,503,472,490,478,471,492,516,483,...,487,481,492,650,506,501,480,489,499,498


In [41]:
small_data[:5]

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,1,1,1,2,2,3,0,4,5,2
1,0,1,1,0,2,0,3,4,2,2
2,1,1,0,2,3,2,0,0,0,1
3,0,0,0,2,1,2,0,4,1,3
4,0,1,0,0,2,0,0,4,0,0


In [42]:
labels[:20].T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
0,0,0,0,1,1,1,1,0,1,0,0,0,1,0,0,0,0,1,0,1


---
---

From this point on we will treat the three numpy arrays contained in the above data frames as provided by the user, with some specific choice of all the other parameters needed to define the forthcoming discretization and calculation of IGs, say:
* *divisions* = 2
* *range*= 0.1
* *tile_width* = 9
* *k*-dimension = 3
* information gain *threshold* $IG_{tr}$ = 100

Although the program of course has to be able to handle arbitrary (in some reasonable range) values of those parameters.

The first major task is - broadly speaking - to create a sequence of tiles, ie. instances of some data structure which will hold the information that can be easily sent to a worker. The sequence is to be generated dynamically (i.e. by a generator). Being given a tile, the worker will then - without requiring any further communication - load appropriate small subset of columns out of (some or all of) the arrays. The worker will then, presumably, locally generate its own specific sequence of k-dimensional tuples of columns. As an additional requirement, there actually need to be two separate tile-sequences:
1. The first one will provide tiles than comply with some simplifying assumptions and - as a result - can be handled by a worker with a GPU. These assumptions are that all the columns in a tile need to come from the *big_data* array (and hence they have equal number of *divisions*), and that the tile has all edges of equal lenth (a statement that is not explained here but it hopefully is clear to the Reader).
2. The second sequence is basically equal to whole sequence minus the first one. It will produce tiles that will actually be harder to handle beacuse those tiles will be composed of columns of unequal *divisions* parameter, and will be of a rectangular (rather than square) shape (so to speak, using 2d adjectives for arbitrary k-dimension).

The program will be written in python and it will be distributed among computing nodes using mpi4py package. One of the nodes will be running the *scheduling* set of instructions: that is it will create the two tile-sequences described above. All other nodes, regarded as *workers*, will perform the computational heave-lifting and they will call wrapped C++11-functions. Every worker subsequently:
1. Sends the result of its previous tile-calculation to the scheduler (starting with an empty result).
2. The scheduler receives the result and sends the next tile to this worker (and proceeds to record the result and generate a next tile.)
3. The worker checks whether the tile it has received is actually a termination signal. If not, it processes the tile and loops back to point 1.
4. Once a sequence is exhausted, the scheduler sends termination signals rather than a proper tiles.
5. Having received a terminating signal the worker returns.
6. Having exhausted both queues, the scheduler does the last fixed-size loop, recording results from the remaining workers and sending the termination signal. After the loop the scheduler returns as well.

Preferably, at each moment the scheduler keeps a record about which tile is being processed by which worker, and how many tiles had been sent to each worker thus far.

The tiles need to constructed in such a way that all the k-tuples they contain are characterized by the same number of degrees of freedom (dof), which means that set of columns in each edge of a given tile has homogenous *divisions* parameter.

Hence, a specific dof parametrizes each result sent back by a worker. Aside from that, the result is essentially a map from column indeces (of columns that were present in the tile) to an unknown-length-list of pairs. Each pair is composed of the $IG$ scalar and the k-tuple of column-indeces that the $IG$ corresponds to. The list is of an unknown length if a nonzero $IG_{tr}$ was given. Then the list has to contain all $IG$s for the given column which came out larger than $IG_{tr}$. If $IG_{tr}=0$ then the largest $IG$ obtained for than column is to be kept and sent.

Part of the task of the scheduler is to aggregate the incoming results: to either keep the maximum $IG$ for each column, or a growing list of $IG$s larger than $IG_{tr}$ (along with the information, which k-tuple those $IG$s originate from). 

Open questions are:
- What exactly is the *tile* data structure than unambiguously defines the task for worker, that can be generated dynamically by the tile-generator on the python side, and that is simple enough that the C function can use it to generate its sequence of k-tuples.
- What exactly is the format of the final output. We need to agree on something for the sake of testing and comparing results. My proposal is: separate csv for each dof, each line composed of:

column-index \t IG_0; tuple0-index_0, ..., tuple0-index-(k-1) \t IG_1; tuple1-index0, ... \t ... \t IG_?; tuple?-index0, ... \n

where column-indeces start with the indeces (starting with 0) from *big-data* and then continue directly by refering to subsequent column-indeces in *small-data*. The rows in each csv file are sorted in an ascending order of the column indeces.