# Filter-Based Feature Selection, mRMR

In this notebook we will use a public dataset to perform a selection of features using minimum-Redundancy Maximum-Relevancy.

* [Hanchuan Peng, Fuhui Long, and Chris Ding. Feature selection based on mutual information: criteria of max-dependency, max-relevance, and min-redundancy. IEEE Transactions on Pattern Analysis and Machine Intelligence,
Vol. 27, No. 8, pp.1226-1238, 2005.](http://home.penglab.com/papersall/docpdf/2005_TPAMI_FeaSel.pdf)

## Data

The website http://home.penglab.com/proj/mRMR/ provides a collection of datasets used to test the original algorithm. We will use some of them to test the distributed version of mRMR.

In [1]:
import urllib

filename = "test_colon_s3.csv"
url = "http://home.penglab.com/proj/mRMR/" + filename
f = urllib.urlretrieve(url, filename)

## Preprocessing of the data

_MLlib_ relies on _LabeledPoint_ as data structure to that stores a numerical vector (dense or sparse) and a numerical label. An RDD of LabeledPoint represents the dataset given as input to train or test supervised machine learning models.

The function _csv2lp_ transforms each line of the csv file into a LabeledPoint. The second parameter of the function is the number of the first _n_ features to filter, as the downstream analysis would require high amount of memory with the whole set of features. In this example we reduce the initial number of features to 20.

In [2]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

def csv2lp(x, nf):
    xs = [float(y) for y in x.split(",")][0:nf]
    c  = xs[0]
    f  = xs[1:]
    
    sv = Vectors.sparse(len(f), [(i,j) for i,j in enumerate(f) if j != 0 ])
    lp = LabeledPoint(c, sv)
    
    return lp

file_no_header = sc.textFile(filename).filter(lambda x: x[0] != "c")  # remove the first line as header of the file.
rdd = file_no_header.map(lambda x: csv2lp(x, 20))  # transform the text file into an RDD[LabeledPoint]
ncols = rdd.first().features.size  # number of columns (no class) of the dataset

In [3]:
rdd.first()

LabeledPoint(-1.0, (19,[0,4,6,8,11,12,13,15,17,18],[2.0,-2.0,-2.0,2.0,-2.0,-2.0,-2.0,-2.0,-2.0,-2.0]))

## Distributed computations of Mutual Information between class-features pairs, and feature-feature pairs

In this notebook we calculate first the mutual information between the every feature and the class vectors (_fscores_), then the mutual information between every pair of features (_ffscores_). Given these intermediate results we proceed into performing the feature selection according to the mRMR algorithm.

Here below we define the functions used during the _map_ and _reduce_ stage of the distributed calculations.

In [4]:
from sklearn.metrics import normalized_mutual_info_score
from sklearn.feature_selection import mutual_info_regression
import numpy as np

def meltLPclass(lp):
    '''
    This function creates a list of k,v tuples, one per each
    label-feature combination. 'k' corresponds to the index
    of the feature and 'v' corresponds to a tuple of two
    elements: value of the label, value of the feature
    
    Parameters
    ----------
    lp : LabeledPoint
        a point in the feature space with label
    '''
    label = lp.label
    features = lp.features
    r = range(features.size)
    return [(i, (label, features[i])) for i in r]

def meltLPfeatures(lp):
    '''
    This function creates a list of k,v tuples, one per each
    feature-feature combination. 'k' corresponds to the index
    of the features and 'v' corresponds to a tuple of two
    elements: value of the feature1, value of the feature2
    
    Parameters
    ----------
    lp : LabeledPoint
        a point in the feature space with label
    '''
    features = lp.features
    r = range(features.size)
    return [((i, j), (features[i], features[j])) for i in r for j in r if i < j]

def mi(x):
    '''
    This function calculates the Mutual information between two discrete variables.
    It relies on the 'adjusted_mutual_info_score' function available in the
    sklearn package.
    
    Doc: http://scikit-learn.org/stable/modules/generated/sklearn.metrics.mutual_info_score.html
    
    Parameters
    ----------
    x : tuple
        x[0] is a scalar value (or a tuple), representing the index(es) of the feature(s)
        x[1] is a pyspark.resultiterable.ResultIterable object
    '''
    idx = x[0]
    values = list(x[1])
    
    l = list(values)
    v1, v2 = zip(*values)
    res = normalized_mutual_info_score(v1, v2)
    
    return (idx, res)

def miCont(x):
    '''
    This function calculates the Mutual information between two continuous variables.
    It relies on the 'mutual_info_regression' function available in the
    sklearn package.
    
    Doc: http://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.mutual_info_regression.html
    
    Parameters
    ----------
    x : tuple
        x[0] is a scalar value (or a tuple), representing the index(es) of the feature(s)
        x[1] is a pyspark.resultiterable.ResultIterable object
    '''
    idx = x[0]
    values = list(x[1])
    
    l = list(values)
    v1, v2 = zip(*values)
    V1 = np.array(v1).reshape(len(v1),1)  # 1-column matrix layout transformation
    res = mutual_info_regression(V1, v2, discrete_features=False, random_state=42)[0]  # random_state is set to provide deterministic results
    
    return (idx, res)

From our RDD (_rdd_) that represents our dataset,
- the _flatMap_ operation iterates over every single instance and produce intermediate tuples containing the values of the pair label-feature and the feature index. We need the feature index to be the key of the tuple, so we can group every tuple regarding such feature in the Reducers.
- the _groupByKey_ operation sort and gather tuples having the same key in the Reducers (one Reducer per each key)
- the _map_ operation of feature _j_ has been provided with all the data label-feature of the feature _j_. That is the vectors of the label and the feature _j_. Having such data in one place, the _mi_ function can calculate the mutual information. You can plug in other (custom) functions to assess the association of the two variables, such as _miCont_.
- the _collect_ operation with collect all the resulting data in the spark driver process.

In [5]:
fscores = rdd.flatMap(meltLPclass).groupByKey().map(mi).collect()
fdict = dict(fscores)

In [6]:
dict(fscores[0:5])

{0: 0.038187438441575725,
 2: 0.030097363097944068,
 4: 0.045972729310487084,
 6: 0.007314754207585962,
 8: 0.038187438441575725}

From our RDD (_rdd_) that represents our dataset,
- the _flatMap_ operation iterates over every single instance and produce intermediate tuples containing the values of the pair feature-feature and the feature indexes. We need the feature indexes to be the key of the tuple, so we can group every tuple regarding the pair of features in the Reducers. This is our distributed way of computing each element of the mutual information matrix.
- Following steps are described above.

In [7]:
ffscores = rdd.flatMap(meltLPfeatures).groupByKey().map(mi).collect()
ffdict = dict(ffscores)

In [8]:
dict(ffscores[0:5])

{(0, 12): 0.04549685587602393,
 (2, 18): 0.048374754181886546,
 (3, 15): 0.10831458954134202,
 (4, 6): 0.10776993745903882,
 (6, 10): 0.089789535509841714}

## Feature Selection

### minimum-Redundancy Maximum-Relevancy (mRMR)

Given the mutual information matrixes of class-feature and feature-features, we select the top _nfs_ features according to the mRMR algorithm. _selFidx_ stores the indexes of the selected features.

In [9]:
nfs = 5  # number of feature to select

In [10]:
def compute(cidx, fdict, ffdict, selFidx):
    mrmrC = fdict[cidx]
    mrmrF = 0
    for sidx in selFidx:
        if cidx < sidx:
            mrmrF = mrmrF + ffdict[(cidx, sidx)]
        else:
            mrmrF = mrmrF + ffdict[(sidx, cidx)]
    if len(selFidx) > 0:
        return mrmrC - (mrmrF / len(selFidx))
    else:
        return mrmrC

selFidx = []
selFscore = []
canFidx = range(ncols)
out = []
for i in range(nfs):
    mrmr_i = [(cidx, compute(cidx, fdict, ffdict, selFidx)) for cidx in canFidx]
    mrmr_i.sort(key=lambda tup: tup[1], reverse=True)
    best_feature_i = mrmr_i[0]
    selFidx.append(best_feature_i[0])
    selFscore.append(best_feature_i[1])
    canFidx.remove(best_feature_i[0])

zip(selFidx, selFscore)

[(14, 0.1045852767092034),
 (15, 0.069298782833987979),
 (2, -0.00604816180222233),
 (16, -0.040546956404271237),
 (8, -0.04021388673654136)]

The final step is to reduce the dimensionality of _rdd_ according to the selected features.

In [11]:
from pyspark.mllib.linalg import Vectors

def reduceLP(lp, fsIdx):
    label = lp.label
    features = lp.features
    v = [features[i] for i in fsIdx]
    return LabeledPoint(label, Vectors.dense(v))

rddFS = rdd.map(lambda x: reduceLP(x, selFidx))

In [12]:
rddFS.first()

LabeledPoint(-1.0, [0.0,-2.0,0.0,0.0,2.0])

## Food for thought

1. does this algorithm scale with the number of instances? That is, given 100M instances instead of the current 1K, will this code work or will it crash?
2. what about the same question above concerning the number of feature instead?
3. for this feature selection, do we need to rely on the LabeledPoint data structure? Can we use another (simpler) data structure?
4. can I directly calculate the mutual information in _map_ phase instead of going through the _map_ and _reduce_ phases? Why?
5. Can you calcuate the number of tuples produced at the _flatMap_ operation? Can you estimate the ratio between the size of such intermediate results and the original dataset?