## Machine learning analysis of calcium imaging data with Spark
Statistical learning approaches are multivariate analysis techniques that allow the quantification of information content about a stimulus or behaviour in a population of neurons. Essentially, we try to decode the presented stimulus or observed behaviour directly from the activity level of the neurons. This is done by fitting a model to a part of the data set (the training set), similar to what was done in the Regression tutorial. To avoid over-fitting, the derived model is then applied to an independent data set (the test set). We can iteratively leave out different parts of our data in a procedure called cross-validation. Machine learning techniques are widely used for data mining in many different fields and consequently Spark provides highly optimized routines for many different algorithms. Here, we will explore the use of a Random Forrest algorithm. 

### Initial setup & data import
This section is largely identical to the first part of [Tutorial_Basics](Tutorial_Basics.ipynb). To run it all in one go, you can select the next section heading ([Prepare stimulus data](#prep_stim)) and choose Cell --> Run All Above.

In [None]:
# Import required modules
import numpy as np
import pylab as plt
import h5py
import os, sys
import seaborn as sns

# Set figure style options for Seaborn
sns.set_style('darkgrid')
sns.set_context('notebook')

# show figure in notebook
%matplotlib inline

In [None]:
# add folder 'utils' to the Python path
# this folder contains custom written code that is required for data import and analysis
utils_dir = os.path.join(os.getcwd(), 'utils')
sys.path.append(utils_dir)

In [None]:
# starting Spark depends on where the notebook is running (local computer or OpenStack cluster)
# choose 'local' or 'openstack'
nbBackend = 'openstack'
print "Running notebook on " + nbBackend + " backend"

In [None]:
# Initialize Spark
# returns the SparkContext object 'sc' which tells Spark how to access the cluster
from setupSpark import initSpark
sc = initSpark(nbBackend)
sc.setLogLevel('WARN') # only show most relevant output

In [None]:
# add Python files in 'utils' folder to the SparkContext 
# this is required so that all files are available on all the cluster workers
for filename in os.listdir(utils_dir):
    if filename.endswith('.py'):
        sc.addPyFile(os.path.join(utils_dir, filename))

In [None]:
# full path to directory containing HDF5 files
directory = '/home/ubuntu/example_data'

# select HDF5 file
# following files are available: 
# Monyer_Leitner_F296_spot01.h5
# Monyer_Leitner_F397_spot01.h5
# Monyer_Leitner_F400_spot02.h5
# Monyer_Leitner_F400_spot04.h5
h5file = 'Monyer_Leitner_F296_spot01.h5'
h5file = directory + os.sep + h5file

In [None]:
# obtain further information about the dataset (size, sampling rate, number of trials)
from NeuroH5Utils import getFileInfo
dsetSz, sampF, nTrials = getFileInfo(h5file)

In [None]:
# create the Spark RDD
from NeuroH5Utils import convert2RDD
numPartitions = 10 # how many partitions?
rdd = convert2RDD(sc, h5file, numPartitions=numPartitions)

In [None]:
# # compute number of neurons, time points and time axis
nNeurons = rdd.count()
s  = np.asarray(rdd.lookup(0))
t = (np.linspace(1, len(s[0]), len(s[0]))) / sampF 
nTimepoints = len(t)

<a name="prep_stim"></a>
### Prepare stimulus data

In [None]:
# Next, we import the stimulus
from NeuroH5Utils import getStimData
stimData, stimNames = getStimData(h5file)
# in this tutorial, we convert the stimulus vector to another format, to allow more efficient processing
# we use a Python dictionary which is a key-value based data type
# in our case, the key is the stimulus ID whereas the value is an array of start indices for the respective stimulus
# have a look at the output of this cell and this will make more sense
uniqueStims = np.unique(stimData[stimData>1]) # ignore air
stimDict = {}
for iStim in uniqueStims:
    stimDict[int(iStim)-1] = np.where(stimData==iStim)[0]
print stimDict

In [None]:
# the advantage of using a dictionary is that we can now easily retrieve the indices for a particular stimulus:
stimDict[1]

### Introducing the Spark machine learning library: MLlib
MLlib is Spark’s machine learning library. Its goal is to make practical machine learning scalable and easy. It consists of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as lower-level optimization primitives and higher-level pipeline APIs.

In [None]:
# import the relevant modules from MLlib
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

### Single neuron classification analysis
First, we apply machine learning to quantify the stimulus-specific information content in each Roi's response individually. To do this, we extract the peri-stimulus data as in [Tutorial_Basics](Tutorial_Basics.ipynb). 

In [None]:
# prepare the data
from CalciumAnalysisUtils import psAnalysis

# select time interval around stimulus (in frames; timepoints before stimulus onset will be ignored later)
baseFrames = 10
evokedFrames = 100

# compute PSdata for all neurons
# note that psAnalysis ignores stims with ID = 1 (air)
psData = rdd.map(lambda (k, v): (k, psAnalysis(v, stimData, (baseFrames, evokedFrames))))
psData = psData.partitionBy(numPartitions).cache()
tPs = (np.linspace(0, evokedFrames, baseFrames+evokedFrames)-baseFrames)/sampF

Now we prepare the data for classification analysis with Spark's MLlib. The most common data type for this in MLlib is called a Labeled Point. A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms, like Random Forrest. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....

In [None]:
# here we prepare a list of LabeledPoints, one for each neuron
# neurons2classify = nNeurons # select a subset for testing
debug = 0 # plot selected features if debug=1
labeledPoint_rddList = []
# loop over all Rois
for nCell in xrange(nNeurons):
    nCell_lp = []
    # get ps-data for this Roi
    cellPsData = psData.lookup(nCell)[0]
    # loop over stimulus types
    for nStim in range(len(cellPsData)):
        currentData = cellPsData[nStim]
        # loop over trials per stimulus
        for nTrial in range(np.shape(currentData)[0]):
            # select data points after stimulus onset
            v = np.squeeze(currentData[nTrial, np.where(tPs > 0)])
            # baseline normalization (subtract mean of pre-stimulus baseline)
            v = v - np.mean(currentData[nTrial, np.where(tPs < 0)])
            t = tPs[np.where(tPs > 0)]
            # feature selection using linear interpolation
            tinterp = np.linspace(np.min(t)+0.5, np.max(t), 10)
            vinterp = np.interp(tinterp, t, v)
            # build the LabeledPoint list
            nCell_lp.append(LabeledPoint(nStim, vinterp))
            # debug: plot original and interpolated data for first trial
            if debug and not nTrial:
                plt.plot(t,v)
                plt.plot(tinterp, vinterp, 'o')
                plt.show()
    # Parallelize for each neuron and append to RDD list
    labeledPoint_rddList.append(sc.parallelize(nCell_lp))

In [None]:
# First, an example how to run the classification with one iteration and one neuron
neuronIx = 0 # index of the neuron
# randomSplit allows us to split the data into training and test set
# we use 70% of the data for training and 30% for testing
(trainingData, testData) = labeledPoint_rddList[neuronIx].randomSplit([0.7, 0.3])
# train classifier on the training data
# note that number of decision trees is quite low here, for performance reasons
# in production use 50 - 100 trees
model = RandomForest.trainClassifier(trainingData, numClasses=len(stimNames)-1, categoricalFeaturesInfo={}, 
                                     numTrees=10, featureSubsetStrategy="auto", 
                                     impurity='gini', maxDepth=4, maxBins=20)
# use the model to obtain predictions for the test data set
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
# compute prediction accuracy
correct = labelsAndPredictions.filter(lambda (v, p): v == p).count() / float(testData.count())
print correct

In [None]:
# Next, we run the classification analysis for each Roi and several train / test splits (iterations)
# depending on the number of iterations, this can take a whiles
iters = 10 # number of iterations (train-test splits)
fractCorrect = np.zeros((len(labeledPoint_rddList), iters))
for ix, labeledPointRDD in enumerate(labeledPoint_rddList):
    for nIter in range(iters):
        (trainingData, testData) = labeledPointRDD.randomSplit([0.7, 0.3])
        model = RandomForest.trainClassifier(trainingData, numClasses=len(stimNames)-1, categoricalFeaturesInfo={}, 
                                             numTrees=10, featureSubsetStrategy="auto", 
                                             impurity='gini', maxDepth=4, maxBins=32)
        predictions = model.predict(testData.map(lambda x: x.features))
        labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
        fractCorrect[ix][nIter] = labelsAndPredictions.filter(lambda (v, p): v == p).count() / float(testData.count())
    progress = (float(ix)/len(labeledPoint_rddList))*100
    sys.stdout.write("\r%02.0f%% completed" % progress)
    sys.stdout.flush()
sys.stdout.write("\rDone!")

In [None]:
# plot classification performance for all neurons
fig = plt.figure(figsize=(15,6))
plt.plot([0, nNeurons], [1.0/(len(stimNames)-1), 1.0/(len(stimNames)-1)], color='k', linestyle='--')
plt.bar(np.linspace(1,nNeurons,nNeurons), np.mean(fractCorrect, axis=1), color='r', yerr=np.std(fractCorrect, axis=1))
plt.ylabel('Fraction correct +- SD');
plt.xlabel('Neuron ID');
plt.title('Single Cell Decoding');

### Population classification analysis
Next, we apply the same approach to the population response. At each time point (relative to the stimulus), the population activity vector of all neurons can be used as input for the Random Forrest algorithm. Thus, we obtain a time course of the classification accuracy relative to the stimulus.

In [None]:
# First, we write a function that performs the Random Forrest classification on a LabeledPoint RDD
# this code is identical to the corresponding section in Single Cell classification 
# (we could recycle the function there too)
def classify_RandomForrest(labeledPointRDD, iters, numClasses):
    correct = np.zeros(iters)
    labeledPointRDD = labeledPointRDD.cache()
    for iIter in range(iters):
        (trainingData, testData) = labeledPointRDD.randomSplit([0.7, 0.3])
        model = RandomForest.trainClassifier(trainingData, numClasses=numClasses, categoricalFeaturesInfo={}, 
                                             numTrees=10, featureSubsetStrategy="auto", 
                                             impurity='gini', maxDepth=4, maxBins=20)
        predictions = model.predict(testData.map(lambda x: x.features))
        labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
        correct[iIter] = labelsAndPredictions.filter(lambda (v, p): v == p).count() / float(testData.count())
    return correct

In [None]:
# Next, we write a function to plot the results (to make the code more reusable)
def plot_results(accuracy_array, psTime):
    fig = plt.figure(figsize=(15,6))
    plt.plot([np.min(psTime)-0.5, np.max(psTime)+1], [1.0/(len(stimNames)-1), 1.0/(len(stimNames)-1)], 
             color='k', linestyle='--')
    plt.bar(psTime, np.mean(accuracy_array, axis=0), color='r', 
            yerr=np.std(accuracy_array, axis=0))
    plt.ylabel('Fraction correct +- SD')
    plt.xlabel('Timepoint')
    plt.xlim((np.min(psTime)-0.5, np.max(psTime)+1))
    plt.title('Population Decoding')

For this example, we use a more interesting way to extract the data from the RDD and convert it into a Labeled Point. This is necessary because we potentially deal with very many neurons ("Big Data"), so we cannot simply use rdd.lookup to extract the data (it might not fit on the Spark drivers memory). The strategy thus is to use successive RDD transformations which never have to load the full dataset into memory. First, we extract all the data points sorted according to the particular stimuli. Then, we combine these values into a new RDD with neuron index as key and the corresponding data points as values. Finally, we convert this RDD into a LabeledPoint RDD.

In [None]:
# Function: extract points
# this function extracts the data points of all neurons for a given stimulus / ps-timepoint combination
def extract_points(neuron_ix, arr, stimDict, tPs):
    points = []
    for stim, idx_list in stimDict.iteritems():
        for ix in idx_list:
            points.append(((stim-1, ix+tPs), (neuron_ix, arr[ix+tPs])))
    return points

In [None]:
# Function: add elements
# this function takes the values from extract_points and arranges them in a new RDD
def add_elements(arr, new):
    neuron_ix, val = new
    arr[neuron_ix] = val
    return arr

In [None]:
# Now we define timepoints for classification around the stimulus 
# (independent population analyses are run for each timepoint)
baseFrames = 10
evokedFrames = 60
# the more time points we have, the denser the sampling but the longer the analysis will take
psTimepoints = range(-baseFrames,evokedFrames, 5)
psTime = psTimepoints/sampF

In [None]:
# create labeled point RDD list by RDD transformation
# because Spark executes transformations lazily (only when needed), this runs very fast
labeledPoint_rddList = []
for tPs in psTimepoints:
    point_rdd = rdd.flatMap(lambda (k,v): extract_points(k, v, stimDict, tPs))
    point_rdd = point_rdd.aggregateByKey(np.zeros(nNeurons), add_elements, lambda a,b: a+b)
    labeledPointRDD = point_rdd.map(lambda (k,v): LabeledPoint(k[0], v))
    labeledPoint_rddList.append(labeledPointRDD)

In [None]:
# Now run the classifications on the different time points
iters = 10 # number of iterations (train-test splits)
accuracy_array = np.zeros((iters, len(psTimepoints)))
for ix, labeledPointRDD in enumerate(labeledPoint_rddList):
    correct = classify_RandomForrest(labeledPointRDD, iters, len(stimNames)-1)
    accuracy_array[:, ix] = correct
    progress = (float(ix)/len(psTimepoints))*100
    sys.stdout.write("\r%02.0f%% completed" % progress)
    sys.stdout.flush()
sys.stdout.write("\rDone!")

In [None]:
# plot the results
plot_results(accuracy_array, psTime)