# CMU 10-405/10-605 auto-graded notebook

Before you turn this assignment in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE."

---

# Homework 3: Logistic Regression & PCA

In [4]:
# Who did you collaborate with on this assignment? 
# if no one, collaborators should contain an empty string,
# else list your collaborators below

collaborators = [""]
# YOUR CODE HERE
#raise NotImplementedError()

In [5]:
try:
    collaborators
except:
    raise AssertionError("you did not list your collaborators, if any")

# Click-Through Rate Prediction
In this section you will go through the steps for creating a click-through rate (CTR) prediction pipeline. You will work with the Criteo Labs dataset.

## This section will cover:

* *Part 1:* Featurize categorical data using one-hot-encoding (OHE)

* *Part 2:* Construct an OHE dictionary

* *Part 3:* Parse CTR data and generate OHE features
 * *Visualization 1:* Feature frequency

* *Part 4:* CTR prediction and logloss evaluation
 * *Visualization 2:* ROC curve

* *Part 5:* Reduce feature dimension via feature hashing

> Note that, for reference, you can look up the details of:
> * the relevant Spark methods in [PySpark's DataFrame API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.DataFrame)
> * the relevant NumPy methods in the [NumPy Reference](http://docs.scipy.org/doc/numpy/reference/index.html)

In [7]:
from nose.tools import assert_equal, assert_true

In [8]:
# YOU CAN MOST LIKELY IGNORE THIS CELL. This is only of use for running this notebook locally.

# THIS CELL DOES NOT NEED TO BE RUN ON DATABRICKS. 
# Note that Databricks already creates a SparkContext for you, so this cell can be skipped.
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext(appName="hw")
sqlContext = SQLContext(sc)

print("spark context started")

## Part 1: Featurize categorical data using one-hot-encoding

### (1a) One-hot-encoding

We would like to develop code to convert categorical features to numerical ones. In order to build intuition, we will work with a unlabeled dataset with three data points, with each data point representing an animal. The first feature indicates the type of animal (bear, cat, mouse); the second feature describes the animal's color (black, tabby); and the third (optional) feature describes what the animal eats (mouse, salmon).

In a one-hot-encoding (OHE) scheme, we want to represent each tuple of `(featureID, category)` via its own binary feature.  We can do this in Python by creating a dictionary that maps each tuple to a distinct integer, where the integer corresponds to a binary feature. To start, manually enter the entries in the OHE dictionary associated with the sample dataset by mapping the tuples to consecutive integers starting from zero,  ordering the tuples first by featureID and next by category.

Later in this lab, we'll use OHE dictionaries to transform data points into compact lists of features that can be used in machine learning algorithms.

In [11]:
# By default, when a shuffle operation occurs with DataFrames, the post-shuffle partition
# count is 200. This is controlled by Spark configuration value spark.sql.shuffle.partitions.
# 200 is a little too high for this data set, so we set the post-shuffle partition count to
# twice the number of available threads in Community Edition.
sqlContext.setConf('spark.sql.shuffle.partitions', '6')  # Set default partitions for DataFrame operations

In [12]:
from collections import defaultdict
# Data for manual OHE
# Note: the first data point does not include any value for the optional third feature
sample_one = [(0, 'mouse'), (1, 'black')]
sample_two = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
sample_three =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

def sample_to_row(sample):
    tmp_dict = defaultdict(lambda: None)
    tmp_dict.update(sample)
    return [tmp_dict[i] for i in range(3)]

sqlContext.createDataFrame(map(sample_to_row, [sample_one, sample_two, sample_three]),
                           ['animal', 'color', 'food']).show()
sample_data_df = sqlContext.createDataFrame([(sample_one,), (sample_two,), (sample_three,)], ['features'])
sample_data_df.show(truncate=False)

In [13]:
# # TODO: Replace <FILL IN> with appropriate code
# sample_ohe_dict_manual = {}
# sample_ohe_dict_manual[(0, 'bear')] = <FILL IN >
# sample_ohe_dict_manual[(0, 'cat')] = <FILL IN >
# sample_ohe_dict_manual[(0, 'mouse')] = <FILL IN >
# sample_ohe_dict_manual < FILL IN >
# sample_ohe_dict_manual < FILL IN >
# sample_ohe_dict_manual < FILL IN >
# sample_ohe_dict_manual < FILL IN >

sample_ohe_dict_manual = {}
sample_ohe_dict_manual[(0, 'bear')] = 0
sample_ohe_dict_manual[(0, 'cat')] = 1
sample_ohe_dict_manual[(0, 'mouse')] = 2
sample_ohe_dict_manual [(1,'black')]  = 3
sample_ohe_dict_manual [(1,'tabby')] = 4
sample_ohe_dict_manual [(2,'mouse')] = 5
sample_ohe_dict_manual [(2,'salmon')] = 6

# YOUR CODE HERE
#raise NotImplementedError()

In [14]:
# TEST One-hot-encoding (1a)
assert_equal(sample_ohe_dict_manual[(0, 'bear')], 0)
assert_equal(sample_ohe_dict_manual[(0, 'cat')], 1)
assert_equal(sample_ohe_dict_manual[(0, 'mouse')], 2)
assert_equal(len(sample_ohe_dict_manual.keys()), 7)

### (1b) Sparse vectors

Data points can typically be represented with a small number of non-zero OHE features which are relative to the total number of features that occur in the dataset.  By leveraging this sparsity and using sparse vector representations for OHE data, we can reduce storage and computational burdens.  Below are a few sample vectors represented as dense numpy arrays.  Use [SparseVector](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.linalg.SparseVector) to represent them in a sparse fashion, and verify that both the sparse and dense representations yield the same results when computing [dot products](http://en.wikipedia.org/wiki/Dot_product) (we will later use MLlib to train classifiers via gradient descent, and MLlib will need to compute dot products between SparseVectors and dense parameter vectors).

Use `SparseVector(size, *args)` to create a new sparse vector where size is the length of the vector and args are either:
1. A list of indices and a list of values corresponding to the indices. The indices list must be sorted in ascending order. For example, SparseVector(5, [1, 3, 4], [10, 30, 40]) will represent the vector [0, 10, 0, 30, 40]. The non-zero indices are 1, 3 and 4. On the other hand, SparseVector(3, [2, 1], [5, 5]) will give you an error because the indices list [2, 1] is not in ascending order. Note: you cannot simply sort the indices list, because otherwise the values will not correspond to the respective indices anymore.
2. A list of (index, value) pair. In this case, the indices need not be sorted. For example, SparseVector(5, [(3, 1), (1, 2)]) will give you the vector [0, 2, 0, 1, 0].

SparseVectors are much more efficient when working with sparse data because they do not store zero values (only store non-zero values and their indices). You'll need to create a sparse vector representation of each dense vector `a_dense` and `b_dense`.

In [16]:
import numpy as np
from pyspark.ml.linalg import SparseVector

In [17]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# a_dense = np.array([0., 3., 0., 4.])
# a_sparse = <FILL IN >
a_dense = np.array([0., 3., 0., 4.])
a_sparse = SparseVector(len(a_dense),[i for i in range(len(a_dense)) if a_dense[i] != 0.0],[x for x in a_dense if x!= 0])

# b_dense = np.array([0., 0., 0., 1.])
# b_sparse = <FILL IN >
b_dense = np.array([0., 0., 0., 1.])
b_sparse = SparseVector(len(b_dense),[i for i in range(len(b_dense)) if b_dense[i] != 0.0],[x for x in b_dense if x!= 0])

# YOUR CODE HERE
#raise NotImplementedError()

w = np.array([0.4, 3.1, -1.4, -.5])
print(a_dense.dot(w))
print(a_sparse.dot(w))
print(b_dense.dot(w))
print(b_sparse.dot(w))

In [18]:
# TEST Sparse Vectors (1b)
assert_true(isinstance(a_sparse, SparseVector), 'a_sparse needs to be an instance of SparseVector')
assert_true(b_dense.dot(w) == b_sparse.dot(w),
                'dot product of b_dense and w should equal dot product of b_sparse and w')
assert_true(a_sparse.numNonzeros() == 2, 'a_sparse should not store zero values')

### (1c) OHE features as sparse vectors

Now let's see how we can represent the OHE features for points in our sample dataset.  Using the mapping defined by the OHE dictionary from Part (1a), manually define OHE features for the three sample data points using SparseVector format.  In this case, all the features will have a value of 1.0.  For example, the `DenseVector` for a point with features 2 and 4 would be `[0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0]`.

In [20]:
# Reminder of the sample features
sample_one = [(0, 'mouse'), (1, 'black')]
sample_two = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
sample_three =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

In [21]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code. Use SparseVector
# sample_one_ohe_feat_manual = <FILL IN >
# sample_two_ohe_feat_manual = <FILL IN >
# sample_three_ohe_feat_manual = <FILL IN >

sample_one_ohe_feat_manual = SparseVector(len(sample_ohe_dict_manual),[sample_ohe_dict_manual[feature] for feature in  sample_one], [1 for i in range(len(sample_one))] )
sample_two_ohe_feat_manual =SparseVector(len(sample_ohe_dict_manual),[sample_ohe_dict_manual[feature] for feature in  sample_two], [1 for i in range(len(sample_two))] )
sample_three_ohe_feat_manual = SparseVector(len(sample_ohe_dict_manual),[sample_ohe_dict_manual[feature] for feature in  sample_three], [1 for i in range(len(sample_three))] )


# YOUR CODE HERE
#raise NotImplementedError()

In [22]:
# TEST OHE Features as sparse vectors (1c)
assert_true(isinstance(sample_one_ohe_feat_manual, SparseVector),
                'sample_one_ohe_feat_manual needs to be a SparseVector')
assert_true(isinstance(sample_two_ohe_feat_manual, SparseVector),
                'sample_two_ohe_feat_manual needs to be a SparseVector')
assert_true(isinstance(sample_three_ohe_feat_manual, SparseVector),
                'sample_three_ohe_feat_manual needs to be a SparseVector')

assert_equal(sample_one_ohe_feat_manual[2], 1.0, 'incorrect value for sample_one_ohe_feat_manual')
assert_equal(sample_one_ohe_feat_manual[3], 1.0, 'incorrect value for sample_one_ohe_feat_manual')


### (1d) Define a OHE function

Next we will use the OHE dictionary from Part (1a) to programatically generate OHE features from the original categorical data.  First write a function called `one_hot_encoding` that creates OHE feature vectors in `SparseVector` format.  Then use this function to create OHE features for the first sample data point and verify that the result matches the result from Part (1c).

> Note: We'll pass in the OHE dictionary as a [Broadcast](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.Broadcast) variable, which will greatly improve performance when we call this function as part of a UDF. **When accessing a broadcast variable, you _must_ use `.value`.** For instance: `ohe_dict_broadcast.value`.

In [24]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def one_hot_encoding(raw_feats, ohe_dict_broadcast, num_ohe_feats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        You should ensure that the indices used to create a SparseVector are sorted.

    Args:
        raw_feats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sample_one)
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.
        num_ohe_feats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length num_ohe_feats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    # <FILL IN>
    # ohe_features = [<FILL IN>]
    # return SparseVector(<FILL IN>)
    
    
    #<FILL IN>
    ohe_features = [ohe_dict_broadcast.value[feature] for feature in  raw_feats]
    ohe_features.sort()  # increase order
    return SparseVector(num_ohe_feats, ohe_features, [1 for i in range(len(ohe_features))])

    # YOUR CODE HERE
    #raise NotImplementedError()
  
# # Calculate the number of features in sample_ohe_dict_manual
# num_sample_ohe_feats = <FILL IN >
# sample_ohe_dict_manual_broadcast = sc.broadcast(sample_ohe_dict_manual)
num_sample_ohe_feats = len(sample_ohe_dict_manual)
sample_ohe_dict_manual_broadcast = sc.broadcast(sample_ohe_dict_manual)

# # Run one_hot_encoding() on sample_one.  Make sure to pass in the Broadcast variable.
# sample_one_ohe_feat = <FILL IN >  
sample_one_ohe_feat = one_hot_encoding(sample_one,sample_ohe_dict_manual_broadcast,num_sample_ohe_feats)
  
# YOUR CODE HERE
#raise NotImplementedError()

print(sample_one_ohe_feat)

In [25]:
# TEST Define an OHE Function (1d)
assert_true(sample_one_ohe_feat == sample_one_ohe_feat_manual,
                'sample_one_ohe_feat should equal sample_one_ohe_feat_manual')
assert_equal(sample_one_ohe_feat, SparseVector(7, [2, 3], [1.0, 1.0]),
                  'incorrect value for sample_one_ohe_feat')
assert_equal(one_hot_encoding([(1, 'black'), (0, 'mouse')], sample_ohe_dict_manual_broadcast,
                                   num_sample_ohe_feats), SparseVector(7, [2, 3], [1.0, 1.0]),
                  'incorrect definition for one_hot_encoding')

### (1e) Apply OHE to a dataset

Finally, use the function from Part (1d) to create OHE features for all 3 data points in the sample dataset.  You'll need to generate a [UDF](https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf) that can be used in a `DataFrame` `select` statement.

> Note: Your implemenation of `ohe_udf_generator` needs to call your `one_hot_encoding` function.

In [27]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT

def ohe_udf_generator(ohe_dict_broadcast):
    """Generate a UDF that is setup to one-hot-encode rows with the given dictionary.

    Note:
        We'll reuse this function to generate a UDF that can one-hot-encode rows based on a
        one-hot-encoding dictionary built from the training data.  Also, you should calculate
        the number of features before calling the one_hot_encoding function.

    Args:
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.

    Returns:
        UserDefinedFunction: A UDF can be used in `DataFrame` `select` statement to call a
            function on each row in a given column.  This UDF should call the one_hot_encoding
            function with the appropriate parameters.
    """
#     length = <FILL IN>
#     return udf(lambda x: <FILL IN>, VectorUDT())
    length = len(ohe_dict_broadcast.value)
    return udf(lambda x: one_hot_encoding(x, ohe_dict_broadcast,length) , VectorUDT()) 
#udf是一个返回的是一个function
#x是sample_ohe_dict_udf('features') features这一列的每一个数据，也就是每一个sample point

# YOUR CODE HERE
#raise NotImplementedError()

# sample_ohe_dict_udf = ohe_udf_generator(sample_ohe_dict_manual_broadcast)
# sample_ohe_df = sample_data_df.select( < FILL IN >)
# sample_ohe_df.show(truncate=False)

sample_ohe_dict_udf = ohe_udf_generator(sample_ohe_dict_manual_broadcast)
#print(type(sample_ohe_dict_udf)) #返回的是一个function

sample_ohe_df = sample_data_df.select(sample_ohe_dict_udf('features'))
sample_ohe_df.show(truncate=False)

# YOUR CODE HERE
#raise NotImplementedError()

In [28]:
# TEST Apply OHE to a dataset (1e)
sample_ohe_data_values = sample_ohe_df.collect()
assert_true(len(sample_ohe_data_values) == 3, 'sample_ohe_data_values should have three elements')
assert_equal(sample_ohe_data_values[0], (SparseVector(7, {2: 1.0, 3: 1.0}),),
                  'incorrect OHE for first sample')
assert_equal(sample_ohe_data_values[1], (SparseVector(7, {1: 1.0, 4: 1.0, 5: 1.0}),),
                  'incorrect OHE for second sample')
assert_equal(sample_ohe_data_values[2], (SparseVector(7, {0: 1.0, 3: 1.0, 6: 1.0}),),
                  'incorrect OHE for third sample')

## Part 2: Construct an OHE dictionary

### (2a) DataFrame with rows of `(featureID, category)`

To start, create a DataFrame of distinct `(feature_id, category)` tuples. In our sample dataset, the 7 items in the resulting DataFrame are `(0, 'bear')`, `(0, 'cat')`, `(0, 'mouse')`, `(1, 'black')`, `(1, 'tabby')`, `(2, 'mouse')`, `(2, 'salmon')`. Notably `'black'` appears twice in the dataset but only contributes one item to the DataFrame: `(1, 'black')`, while `'mouse'` also appears twice and contributes two items: `(0, 'mouse')` and `(2, 'mouse')`.  Use [explode](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.explode) and [distinct](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct).

In [31]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.sql.functions import explode
# sample_distinct_feats_df = (sample_data_df
#                               <FILL IN>)

#sample_distinct_feats_df = (sample_data_df.select(explode("features")).distinct() )

sample_distinct_feats_df = (sample_data_df.select(explode("features")).distinct() )
# YOUR CODE HERE
#raise NotImplementedError()
sample_distinct_feats_df.show()

In [32]:
# TEST DataFrame with rows of `(featureID, category)` (2a)
assert_equal(sorted(map(lambda r: r[0], sample_distinct_feats_df.collect())),
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'incorrect value for sample_distinct_feats_df')

### (2b) OHE Dictionary from distinct features

Next, create an RDD of key-value tuples, where each `(feature_id, category)` tuple in `sample_distinct_feats_df` is a key and the values are distinct integers ranging from 0 to (number of keys - 1).  Then convert this RDD into a dictionary, which can be done using the `collectAsMap` action.  Note that there is no unique mapping from keys to values, as all we require is that each `(featureID, category)` key be mapped to a unique integer between 0 and the number of keys.  In this exercise, any valid mapping is acceptable.  Use [zipWithIndex](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.zipWithIndex) followed by [collectAsMap](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collectAsMap).

In our sample dataset, one valid list of key-value tuples is: `[((0, 'bear'), 0), ((2, 'salmon'), 1), ((1, 'tabby'), 2), ((2, 'mouse'), 3), ((0, 'mouse'), 4), ((0, 'cat'), 5), ((1, 'black'), 6)]`. The dictionary defined in Part (1a) illustrates another valid mapping between keys and integers.

> Note: We provide the code to convert the DataFrame to an RDD.

In [34]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# sample_ohe_dict = (sample_distinct_feats_df
#                      .rdd
#                      .map(lambda r: tuple(r[0]))
#                      <FILL IN>)

sample_ohe_dict = (sample_distinct_feats_df
                     .rdd
                     .map(lambda r: tuple(r[0]))
                     .zipWithIndex()
                     .collectAsMap())

# YOUR CODE HERE
#raise NotImplementedError()
print(sample_ohe_dict)

In [35]:
# TEST OHE Dictionary from distinct features (2b)
assert_equal(sorted(sample_ohe_dict.keys()),
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'sample_ohe_dict has unexpected keys')
assert_equal(sorted(sample_ohe_dict.values()), list(range(7)), 'sample_ohe_dict has unexpected values')

### (2c) Automated creation of an OHE dictionary

Now use the code from Parts (2a) and (2b) to write a function that takes an input dataset and outputs an OHE dictionary.  Then use this function to create an OHE dictionary for the sample dataset, and verify that it matches the dictionary from Part (2b).

In [37]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def create_one_hot_dict(input_df):

#     """Creates a one-hot-encoder dictionary based on the input data.

#     Args:
#         input_df (DataFrame with 'features' column): A DataFrame where each row contains a list of
#             (featureID, value) tuples.

#     Returns:
#         dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
#             unique integers.
#     """
#    <FILL IN>
#    distinct_feature_df = (<FILL IN>)
#    key_value_tuple_list = (<FILL IN)
#    return dict(key_value_tuple_list)
   
  distinct_feature_df = input_df.select(explode("features")).distinct()
  key_value_tuple_list = distinct_feature_df.rdd.map(lambda r: tuple(r[0])).zipWithIndex().collect()
  return dict(key_value_tuple_list)
  

# YOUR CODE HERE

#raise NotImplementedError()

sample_ohe_dict_auto = create_one_hot_dict(sample_data_df)

In [38]:
# TEST Automated creation of an OHE dictionary (2c)
assert_equal(sorted(sample_ohe_dict_auto.keys()),
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'sample_ohe_dict_auto has unexpected keys')
assert_equal(sorted(sample_ohe_dict_auto.values()), list(range(7)),
                  'sample_ohe_dict_auto has unexpected values')

## Part 3: Parse CTR data and generate OHE features

***Before we can proceed, you'll first need to obtain the sample data.  ***

The data is from a past [kaggle competition](https://www.kaggle.com/c/criteo-display-ad-challenge/overview). The original data was too big. So we randomly samepled the data for this assignment from the original dataset.

For data fileds:
* Label - Target variable that indicates if an ad was clicked (1) or not (0).
* I1-I13 - A total of 13 columns of integer features (mostly count features).
* C1-C26 - A total of 26 columns of categorical features. The values of these features have been hashed onto 32 bits for anonymization purposes.
The semantic of the features is undisclosed.
Just run the cell below.

In [41]:
# Run this cell for getting data from the github repo.
from pyspark import SparkFiles
from pyspark.sql import Row
url = "https://raw.githubusercontent.com/10605/data/master/hw3/dac.txt"
sc.addFile(url)

raw_df = sc.textFile("file://" + SparkFiles.get("dac.txt")).map(lambda r: Row(r)).toDF(["text"])

In [42]:
raw_df.show()

### (3a) Loading and splitting the data

We are now ready to start working with the actual CTR data, and our first task involves splitting it into training, validation, and test sets.  Usually we use the [randomSplit method](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) with the specified weights and seed to create DFs storing each of these datasets. BUT randomSplit may generate non-deterministic results. So for the sake of testing, we manually split the data in to train, validation, test by the ratio of 0.8, 0.1, 0.1.

Then your work is to [cache](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cache) each of these DFs, as we will access them multiple times in the remainder of this lab. Finally, compute the size of each dataset.

In [44]:
import pyspark.sql.functions as f
indexDf = raw_df.withColumn('index', f.monotonically_increasing_id())
total_count = raw_df.count()
train_count = int(total_count * 4 / 5)
dev_count = int(total_count / 5)
val_count = int(total_count / 10)
test_count = int(total_count / 10)
# first 20% rows
raw_dev_df = indexDf.sort('index').limit(dev_count)
# last 80% rows
raw_train_df = indexDf.sort('index', ascending=False).limit(train_count).drop('index')
# first 10% rows
raw_validation_df = raw_dev_df.sort('index').limit(val_count).drop('index')
# last 10% rows
raw_test_df = raw_dev_df.sort('index', ascending=False).limit(test_count).drop('index')

# # Cache and count the DataFrames
# n_train = raw_train_df.<FILL IN>
# n_val = raw_validation_df.<FILL IN>
# n_test = raw_test_df.<FILL IN>

n_train = raw_train_df.cache().count()
n_val = raw_validation_df.cache().count()
n_test = raw_test_df.cache().count()

# YOUR CODE HERE
#raise NotImplementedError()
print(n_train, n_val, n_test, n_train + n_val + n_test)
raw_df.show(1)

In [45]:
# TEST Loading and splitting the data (3a)
assert_true(all([raw_train_df.is_cached, raw_validation_df.is_cached, raw_test_df.is_cached]),
                'you must cache the split data')
assert_equal(n_train, 80000, 'incorrect value for n_train')
assert_equal(n_val, 10000, 'incorrect value for n_val')
assert_equal(n_test, 10000, 'incorrect value for n_test')

### (3b) Extract features

We will now parse the raw training data in order to create a DataFrame that we can subsequently use to create an OHE dictionary. Note from the `show()` command in Part (3a) that each raw data point is a string containing several fields separated by some delimiters.  For now, we will ignore the first field (which is just the 0-1 label), and parse the remaining fields (or raw features).  To do this, complete the implemention of the `parse_point` function.

In [47]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def parse_point(point):
#     """Converts a comma separated string into a list of (featureID, value) tuples.

#     Note:
#         featureIDs should start at 0 and increase to the number of features - 1.

#     Args:
#         point (str): A comma separated string where the first value is the label and the rest
#             are features.

#     Returns:
#         list: A list of (featureID, value) tuples.
#     """
  all = point.split(",")
  useful = all[1:]
  length  = len(useful)
  ret = []
  for id in range(length):
    newtuple = (id,useful[id])
    ret.append(newtuple)
  return ret

# YOUR CODE HERE
#raise NotImplementedError()

print(parse_point(raw_df.select('text').first()[0]))

In [48]:
# TEST Extract features (3b)
assert_equal(parse_point(raw_df.select('text').first()[0])[:3], [(0, u'0'), (1, u'127'), (2, u'1')],
                  'incorrect implementation of parse_point')

### (3c) Extracting features continued

Next, we'll create a `parse_raw_df` function that creates a `label` column for the first value in a data point and a `features` column for the rest.  The `features` column will be created using `parse_point_udf`, which we've provided and is based on your `parse_point` function.  Note that to name your columns you should use [alias](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.alias).  You can split the `text` field in `raw_df` using [split](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.split) and retrieve the first value of the resulting array with [getItem](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.getItem). Be sure to call [cast](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast) to cast the column value to `double`. Your `parse_raw_df` function should also cache the DataFrame it returns.

In [50]:
# TODO: Uncomment the lines below and replace <FILL IN> with the appropriate code
from pyspark.sql.functions import udf, split
from pyspark.sql.types import ArrayType, StructType, StructField, LongType, StringType

parse_point_udf = udf(parse_point, ArrayType(StructType([StructField('_1', LongType()),
                                                         StructField('_2', StringType())])))

def parse_raw_df(raw_df):
#     """Convert a DataFrame consisting of rows of comma separated text into labels and features.

#     Args:
#         raw_df (DataFrame with a 'text' column): DataFrame containing the raw comma separated data.

#     Returns:
#         DataFrame: A DataFrame with 'label' and 'features' columns.
#     """
#     return raw_df.select(<FILL IN>)
#           .<FILL IN>
#           .<FILL IN>
#           .<FILL IN>
#           .<FILL IN>
#           .<FILL IN>
#           .cache()

    return raw_df.select(split('text',",").getItem(0).cast("double").alias("label"), parse_point_udf("text").alias("features") ).cache()



  
# YOUR CODE HERE
#raise NotImplementedError()

# # Parse the raw training DataFrame
parsed_train_df = parse_raw_df(raw_train_df)

# YOUR CODE HERE
#raise NotImplementedError()

from pyspark.sql.functions import (explode, col)
num_categories = (parsed_train_df
                   .select(explode('features').alias('features'))
                   .distinct()
                   .select(col('features').getField('_1').alias('featureNumber'))
                   .groupBy('featureNumber')
                   .sum()
                   .orderBy('featureNumber')
                   .collect())

print(num_categories[2][1])

In [51]:
print(parsed_train_df)

In [52]:
print(parsed_train_df)# TEST Extract features (3c)
assert_true(parsed_train_df.is_cached, 'parse_raw_df should return a cached DataFrame')
assert_equal(num_categories[2][1], 1356, 'incorrect implementation of parse_point or parse_raw_df')


### (3d) Create an OHE dictionary from the dataset

Note that `parse_point` returns a data point in the format of a list of `(featureID, category)` tuples, which is the same format as the sample dataset studied in Parts 1 and 2 of this lab.  Using this observation, create an OHE dictionary from the parsed training data using the function implemented in Part (2c). Note that we will assume for simplicity that all features in our CTR dataset are categorical.

In [54]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# ctr_ohe_dict = <FILL IN>
# num_ctr_ohe_feats = <FILL IN>
# YOUR CODE HERE
#raise NotImplementedError()


ctr_ohe_dict =  create_one_hot_dict(parsed_train_df)
num_ctr_ohe_feats = len(ctr_ohe_dict)
print(ctr_ohe_dict[(0, '4')])    

In [55]:
# TEST Create an OHE dictionary from the dataset (3d)
assert_equal(num_ctr_ohe_feats, 215556, 'incorrect number of features in ctr_ohe_dict')
assert_true((0, '4') in ctr_ohe_dict, 'incorrect features in ctr_ohe_dict')

### (3e) Apply OHE to the dataset

Now let's use this OHE dictionary, by starting with the training data that we've parsed into `label` and `features` columns, to create one-hot-encoded features.  Recall that we created a function `ohe_udf_generator` that can create the UDF that we need to convert row into `features`.  Make sure that `ohe_train_df` contains a `label` and `features` column and is cached.

In [57]:
# # TODO: Uncomment the lines below and replace <FILL IN> with the appropriate code
# ohe_dict_broadcast = <FILL IN>
# ohe_dict_udf = <FILL IN>
# ohe_train_df = (parsed_train_df
#                   <FILL IN>)



ohe_dict_broadcast = sc.broadcast(ctr_ohe_dict)
ohe_dict_udf = ohe_udf_generator(ohe_dict_broadcast)
ohe_train_df = (parsed_train_df.select("label" , ohe_dict_udf("features").alias("features")).cache())



# YOUR CODE HERE
#raise NotImplementedError()

print(ohe_train_df.count())
print(ohe_train_df.take(1))

In [58]:
# TEST Apply OHE to the dataset (3e)
assert_true('label' in ohe_train_df.columns and 'features' in ohe_train_df.columns, 'ohe_train_df should have label and features columns')
assert_true(ohe_train_df.is_cached, 'ohe_train_df should be cached')
num_nz = sum(parsed_train_df.rdd.map(lambda r: len(r[1])).take(5))
num_nz_alt = sum(ohe_train_df.rdd.map(lambda r: len(r[1].indices)).take(5))
assert_equal(num_nz, num_nz_alt, 'incorrect value for ohe_train_df')

### Visualization 1: Feature frequency

We will now visualize the number of times each of the 233,941 OHE features appears in the training data. We first compute the number of times each feature appears, then bucket the features by these counts.  The buckets are sized by powers of 2, so the first bucket corresponds to features that appear exactly once ( \\( \scriptsize 2^0 \\) ), the second to features that appear twice ( \\( \scriptsize 2^1 \\) ), the third to features that occur between three and four ( \\( \scriptsize 2^2 \\) ) times, the fifth bucket is five to eight ( \\( \scriptsize 2^3 \\) ) times and so on. The scatter plot below shows the logarithm of the bucket thresholds versus the logarithm of the number of features that have counts that fall in the buckets.

In [60]:
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql.functions import log

get_indices = udf(lambda sv: list(map(int, sv.indices)), ArrayType(IntegerType()))
feature_counts = (ohe_train_df
                   .select(explode(get_indices('features')))
                   .groupBy('col')
                   .count()
                   .withColumn('bucket', log('count').cast('int'))
                   .groupBy('bucket')
                   .count()
                   .orderBy('bucket')
                   .collect())


In [61]:
import matplotlib.pyplot as plt

x, y = zip(*feature_counts)
x, y = x, np.log(y)

def prepare_plot(xticks, yticks, figsize=(10.5, 6), hide_labels=False, grid_color='#999999',
                 grid_width=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hide_labels: axis.set_ticklabels([])
    plt.grid(color=grid_color, linewidth=grid_width, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 12, 1), np.arange(0, 14, 2))
ax.set_xlabel(r'$\log_e(bucketSize)$'), ax.set_ylabel(r'$\log_e(countInBucket)$')
plt.scatter(x, y, s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
display(fig)

### (3f) Handling unseen features

We naturally would like to repeat the process from Part (3e), to compute OHE features for the validation and test datasets.  However, we must be careful, as some categorical values will likely appear in new data that did not exist in the training data. To deal with this situation, update the `one_hot_encoding()` function from Part (1d) to ignore previously unseen categories, and then compute OHE features for the validation data.  Remember that you can parse a raw DataFrame using `parse_raw_df`.
> Note: you'll have to generate a new UDF using `ohe_udf_generator` so that the updated `one_hot_encoding` function is used.  And make sure to cache `ohe_validation_df`.

In [63]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def one_hot_encoding(raw_feats, ohe_dict_broadcast, num_ohe_feats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        You should ensure that the indices used to create a SparseVector are sorted, and that the
        function handles missing features.

    Args:
        raw_feats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sample_one)
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.
        num_ohe_feats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length num_ohe_feats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """

#     ohe_feature = <FILL IN>
#     return SparseVector(<FILL IN>)

    ohe_features = [ohe_dict_broadcast.value[feature] for feature in  raw_feats if feature in ohe_dict_broadcast.value]
    ohe_features.sort()
    return SparseVector(num_ohe_feats, ohe_features, [1 for i in range(len(ohe_features))])

# YOUR CODE HERE
#raise NotImplementedError()

# ohe_dict_missing_udf = <FILL IN>
# ohe_validation_df = (<FILL IN>)

ohe_dict_missing_udf = ohe_udf_generator(ohe_dict_broadcast)
ohe_validation_df = parse_raw_df(raw_validation_df).select('label', ohe_dict_missing_udf('features').alias('features')).cache()

# YOUR CODE HERE
#raise NotImplementedError()

ohe_validation_df.count()
display(ohe_validation_df) # replace with ohe_validate_df.show() if running outside of Databricks

label,features
1.0,"List(0, 215556, List(6, 9, 22, 165, 168, 271, 35880, 35882, 35890, 35907, 35927, 35988, 36079, 36141, 36150, 36286, 37957, 43131, 71635, 71638, 71664, 71700, 72063, 73636, 78836, 81978, 107883, 107907, 107949, 107958, 108060, 108127, 115070, 115071, 179594, 179598, 180270, 181174, 186813), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,"List(0, 215556, List(0, 1, 3, 9, 13, 14, 20, 60, 454, 602, 2058, 2766, 35881, 35882, 35907, 36310, 71630, 71632, 71637, 71642, 72077, 72173, 73731, 74437, 74438, 107881, 107898, 107931, 110643, 110644, 143872, 143918, 144206, 146595, 180003, 180077, 182342, 182404, 186319), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,"List(0, 215556, List(1, 6, 9, 25, 49, 107, 2271, 35880, 35881, 35882, 35891, 35907, 35959, 36017, 36630, 37057, 71638, 71707, 71913, 105838, 105839, 105840, 107884, 107898, 107948, 144268, 144488, 145510, 177533, 179589, 179598, 179661, 179809, 179812, 179895, 181682), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,"List(0, 215556, List(9, 12, 13, 14, 170, 177, 286, 739, 5567, 5568, 35880, 35881, 35882, 35927, 36132, 36278, 36382, 38608, 71632, 71635, 71638, 71670, 72549, 107881, 107972, 107993, 143886, 143898, 143900, 179576, 179581, 179682, 180205, 180822, 185150), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,"List(0, 215556, List(1, 9, 12, 20, 25, 189, 201, 518, 815, 816, 35882, 35884, 35892, 36024, 36420, 36421, 36422, 71630, 71632, 71641, 72711, 107881, 107883, 107886, 107896, 108376, 108377, 143872, 143876, 145004, 179582, 179598, 179625, 179850, 180131, 180133, 180448, 182234, 194008), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
1.0,"List(0, 215556, List(1, 6, 9, 12, 13, 14, 17, 30, 290, 291, 1712, 2888, 16147, 35882, 35889, 35907, 36005, 36136, 36219, 38554, 38828, 71632, 71636, 71680, 71867, 71956, 74539, 107884, 108169, 143911, 144143, 144149, 146376, 179625, 179632, 179671, 179676, 179690, 179891), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,"List(0, 215556, List(1, 6, 12, 13, 25, 26, 38, 39, 218, 1535, 13016, 35880, 35882, 35884, 35890, 35893, 36380, 37452, 71630, 71632, 71641, 71651, 71677, 71963, 71965, 107882, 107884, 107886, 107898, 108172, 143876, 145381, 179591, 179632, 179676, 179690, 179894, 179931, 183691), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,"List(0, 215556, List(1, 12, 13, 14, 49, 53, 64, 632, 21011, 21012, 21013, 35882, 35890, 35907, 36107, 36686, 71630, 71700, 74608, 92729, 107898, 107906, 108002, 108149, 128802, 143881, 143898, 143900, 144342, 144455, 148131, 179596, 179600, 179609, 179614, 179658, 179691, 179705, 179980), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,"List(0, 215556, List(6, 12, 13, 25, 35881, 35882, 35884, 35886, 35907, 35967, 35968, 36050, 44155, 44156, 71630, 71632, 71851, 77806, 107882, 107883, 107884, 107886, 107931, 108493, 143876, 143915, 143917, 143955, 144054, 144056, 145824, 145826, 149640, 179666, 179682, 179816, 179915), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,"List(0, 215556, List(3, 6, 9, 13, 18, 25, 30, 170, 183, 285, 2270, 2271, 12526, 12527, 35882, 35891, 35907, 35959, 35998, 36011, 36442, 37057, 48474, 71632, 71635, 71638, 71700, 71707, 84187, 107883, 107884, 107958, 120367, 143886, 143902, 145955, 179625, 179661, 180134), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"


In [64]:
# TEST Handling unseen features (3f)
from pyspark.sql.functions import size, sum as sqlsum

assert_true(ohe_validation_df.is_cached, 'you need to cache ohe_validation_df')
num_nz_val = (ohe_validation_df
                .select(sqlsum(size(get_indices('features'))))
                .first()[0])

nz_expected = 367573
assert_equal(num_nz_val, nz_expected, 'incorrect number of features: Got {0}, expected {1}'.format(num_nz_val, nz_expected))

## Part 4: CTR prediction and logloss evaluation

### (4a) Logistic regression

We are now ready to train our first CTR classifier.  A natural classifier for this setting is logistic regression, since it models the probability of a click-through event rather than returning a simple binary "yes" or "no". Also, when working with rare events like clicking-through, probabilistic predictions are usually more accurate.

First use [LogisticRegression](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression) from the pyspark.ml package to train a model using `ohe_train_df` with a given hyperparameter configuration.  `LogisticRegression.fit` returns a [LogisticRegressionModel](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionModel).  Next, we'll use the `LogisticRegressionModel.coefficients` and `LogisticRegressionModel.intercept` to print out some details of the model's parameters.  Note that these are the names of the object's attributes and should be called using a syntax like `model.coefficients` for a given `model`.

In [67]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# Given hyperparameters
standardization = False
elastic_net_param = 0.0
reg_param = .01
max_iter = 20

from pyspark.ml.classification import LogisticRegression
# lr = (<FILL IN>)
lr = LogisticRegression(standardization=standardization,
                       elasticNetParam=elastic_net_param,
                       regParam=reg_param,
                       maxIter=max_iter)

# lr_model_basic = <FILL IN>
lr_model_basic = lr.fit(ohe_train_df)

# YOUR CODE HERE
#raise NotImplementedError()

print('intercept: {0}'.format(lr_model_basic.intercept))
print('length of coefficients: {0}'.format(len(lr_model_basic.coefficients)))
sorted_coefficients = sorted(lr_model_basic.coefficients)[:5]

In [68]:
# TEST Logistic regression (4a)
assert_true(np.allclose(lr_model_basic.intercept,  -1.1870497039599432, atol=1e-2), 'incorrect value for model intercept')
assert_true(np.allclose(sorted_coefficients,
                           [-0.10347285277044568, -0.10296978958368273, -0.10296978958368273, -0.10296978958368273, -0.10296978958368273], atol=1e-2),
                           'incorrect value for model coefficients')

### (4b) Log loss
Throughout this lab, we will use log loss to evaluate the quality of models.  Log loss is defined as: \\[ \scriptsize \ell_{log}(p, y) = \begin{cases} -\log (p) & \text{if } y = 1 \\\ -\log(1-p) & \text{if } y = 0 \end{cases} \\] where \\( \scriptsize p\\) is a probability between 0 and 1 and \\( \scriptsize y\\) is a label of either 0 or 1. Log loss is a standard evaluation criterion when predicting rare-events such as click-through rate prediction.

Write a function `add_log_loss` for a DataFrame, and evaluate it on sample inputs.  This operation does not require a UDF.  You can perform a conditional branching with DataFrame columns using [when](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when).

In [70]:
# Some example data
example_log_loss_df = sqlContext.createDataFrame([(.5, 1), (.5, 0), (.99, 1), (.99, 0), (.01, 1),
                                                  (.01, 0), (1., 1), (.0, 1), (1., 0)], ['p', 'label'])
example_log_loss_df.show()

In [71]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.sql.functions import when, log, col
epsilon = 1e-16

def add_log_loss(df):
    """Computes and adds a 'log_loss' column to a DataFrame using 'p' and 'label' columns.

    Note:
        log(0) is undefined, so when p is 0 we add a small value (epsilon) to it and when
        p is 1 we subtract a small value (epsilon) from it.

    Args:
        df (DataFrame with 'p' and 'label' columns): A DataFrame with a probability column
            'p' and a 'label' column that corresponds to y in the log loss formula.

    Returns:
        DataFrame: A new DataFrame with an additional column called 'log_loss'
    """
#     return df.withColum(<FILL IN>)
#becareful of the epsilon!!!
    return df.withColumn("log_loss",when( df.label == 0.0,-log(1 - df.p + epsilon) ).otherwise( -log(df.p + epsilon)) )
  

# YOUR CODE HERE
#raise NotImplementedError()

add_log_loss(example_log_loss_df).show()

In [72]:
# TEST Log loss (4b)
log_loss_values = add_log_loss(example_log_loss_df).select('log_loss').rdd.map(lambda r: r[0]).collect()
assert_true(np.allclose(log_loss_values[:-2],
                            [0.6931471805599451, 0.6931471805599451, 0.010050335853501338, 4.60517018598808,
                             4.605170185988081, 0.010050335853501338, -0.0], atol=1e-2), 'log loss is not correct')
assert_true(not(any(map(lambda x: x is None, log_loss_values[-2:]))),
                'log loss needs to bound p away from 0 and 1 by epsilon')

### (4c)  Baseline log loss

Next we will use the function we have in Part (4b) to compute the baseline log loss of the training data. A very simple yet natural baseline model is that we always make the same prediction regardless of datapoints, therefore the predicted value would be equal to the fraction of training points that correspond to click-through events (i.e., where the label is one). Compute this value (which is simply the mean of the training labels), and then use it to compute the training log loss for the baseline model.

> Note: you'll need to add a `p` column to the `ohe_train_df` DataFrame so that it can be used in your function from Part (4b).  To represent a constant value as a column you can use the [lit](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit) function to wrap the value.

In [74]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# Note that our dataset has a very high click-through rate by design
# In practice click-through rate can be one to two orders of magnitude lower

from pyspark.sql.functions import lit
# class_one_frac_train = (<FILL IN>)
class_one_frac_train = ohe_train_df.selectExpr("avg(label)").first()[0]

# YOUR CODE HERE
#raise NotImplementedError()
print('Training class one fraction = {0:.3f}'.format(class_one_frac_train))

# log_loss_tr_base = (<FILL IN>)
log_loss_tr_base = add_log_loss(ohe_train_df.withColumn( "p",lit(class_one_frac_train)) ).selectExpr("avg(log_loss)").first()[0]

# YOUR CODE HERE
#raise NotImplementedError()
#rint('Baseline Train Logloss = {0:.3f}\n'.format(log_loss_tr_base))

In [75]:
# TEST Baseline log loss (4c)
expected_frac = 0.2339125
expected_log_loss = 0.5439608117656105
assert_true(np.allclose(class_one_frac_train, expected_frac, atol=1e-2), 'incorrect value for class_one_frac_train. Got {0}, expected {1}'.format(class_one_frac_train, expected_frac))
assert_true(np.allclose(log_loss_tr_base, expected_log_loss, atol=1e-2), 'incorrect value for log_loss_tr_base. Got {0}, expected {1}'.format(log_loss_tr_base, expected_log_loss))

### (4d) Predict probability

In order to compute the log loss for the model we trained in Part (4a), we need to generate predictions from this model. Write a function that computes the raw linear prediction from this logistic regression model and then passes it through a [sigmoid function](http://en.wikipedia.org/wiki/Sigmoid_function) \\( \scriptsize \sigma(t) = (1+ e^{-t})^{-1} \\) to return the model's probabilistic prediction. Then compute probabilistic predictions on the training data.

Note that when incorporating an intercept into our predictions, we simply add the intercept to the value of the prediction obtained from the weights and features.  Alternatively, if the intercept was included as the first weight, we would need to add a corresponding feature to our data where the feature has the value one.  This is not the case here.

In [77]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.sql.types import DoubleType
from math import exp #  exp(-t) = e^-t
import math

def add_probability(df, model):
    """Adds a probability column ('p') to a DataFrame given a model"""
    coefficients_broadcast = sc.broadcast(model.coefficients)
    intercept = model.intercept

    def get_p(features):
        """Calculate the probability for an observation given a list of features.

        Note:
            We'll bound our raw prediction between 20 and -20 for numerical purposes.

        Args:
            features: the features

        Returns:
            float: A probability between 0 and 1.
        """
#         # Compute the raw value
#         raw_prediction = <FILL IN>
#         # Bound the raw value between 20 and -20
#         raw_prediction = <FILL IN>
#         # Return the probability
#         <FILL IN>

        # Compute the raw value
        raw_prediction = features.dot( coefficients_broadcast.value )+intercept
        # Bound the raw value between 20 and -20
        raw_prediction =  min(20, max(-20, raw_prediction))
        # Return the probability
        return math.pow((1 + exp(-raw_prediction)) , -1)



        
# YOUR CODE HERE
#raise NotImplementedError()

    get_p_udf = udf(get_p, DoubleType())
    return df.withColumn("p", get_p_udf("features"))

add_probability_model_basic = lambda df: add_probability(df, lr_model_basic)
training_predictions = add_probability_model_basic(ohe_train_df).cache()

training_predictions.show(5)

In [78]:
# TEST Predicted probability (4d)
expected = 18746.356946150863
got = training_predictions.selectExpr('sum(p)').first()[0]
assert_true(np.allclose(got, expected, atol=1e-2),
                'incorrect value for training_predictions. Got {0}, expected {1}'.format(got, expected))

### (4e) Evaluate the model

We are now ready to evaluate the performance of the model we trained in Part (4a). To do this, first write a general function that takes a model and a DataFrame as its input, and outputs the log loss. Note that the log loss for multiple observations should be the mean of all the individual log losses. Then run this function on the OHE training data, and compare the result with the baseline log loss.

In [80]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def evaluate_results(df, model, baseline=None):
    """Calculates the log loss for the data given the model.

    Note:
        If baseline has a value the probability should be set to baseline before
        the log loss is calculated.  Otherwise, use add_probability to add the
        appropriate probabilities to the DataFrame.

    Args:
        df (DataFrame with 'label' and 'features' columns): A DataFrame containing
            labels and features.
        model (LogisticRegressionModel): A trained logistic regression model. This
            can be None if baseline is set.
        baseline (float): A baseline probability to use for the log loss calculation.

    Returns:
        float: Log loss for the data.
    """
    
#     with_probability_df = <FILL IN>
#     with_log_loss_df = <FILL IN>
#     log_loss = <FILL IN>
#     return log_loss
  
  
    if model:
      with_probability_df = add_probability(df, model)
    else:
      with_probability_df = df.withColumn("p", lit(baseline))
    
    with_log_loss_df = add_log_loss(with_probability_df)
    log_loss = with_log_loss_df.selectExpr("avg(log_loss)").first()[0]
    return log_loss



# YOUR CODE HERE
#raise NotImplementedError()


log_loss_train_model_basic = evaluate_results(ohe_train_df, lr_model_basic)
print('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_tr_base, log_loss_train_model_basic))

In [81]:
# TEST Evaluate the model (4e)
expected_log_loss = 0.48971226940239815
assert_true(np.allclose(log_loss_train_model_basic, expected_log_loss, atol=1e-2),
                'incorrect value for log_loss_train_model_basic. Got {0}, expected {1}'.format(log_loss_train_model_basic, expected_log_loss))
expected_res = 0.6931471805600546
res = evaluate_results(ohe_train_df, None,  0.5)
assert_true(np.allclose(res, expected_res, atol=1e-2),
                'evaluate_results needs to handle baseline models. Got {0}, expected {1}'.format(res, expected_res))

### (4f) Log loss of validation dataset

Next, use the `evaluate_results` function and compute the log loss of validation dataset for both the baseline and logistic regression models. Notably, the baseline model for the validation dataset should still be based on the label fraction from the training dataset.

In [83]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# log_loss_val_base = <FILL IN>
log_loss_val_base =  evaluate_results(ohe_validation_df,None, class_one_frac_train)

# log_loss_val_l_r0 = <FILL IN>
log_loss_val_l_r0 = evaluate_results(ohe_validation_df,lr_model_basic)

# YOUR CODE HERE
#raise NotImplementedError()

print(('OHE Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_val_base, log_loss_val_l_r0)))

In [84]:
# TEST Validation log loss (4f)
expected_val_base = 0.6344644324013423
assert_true(np.allclose(log_loss_val_base, expected_val_base, atol=1e-2),
                'incorrect value for log_loss_val_base. Got {0}, expected {1}'.format(log_loss_val_base, expected_val_base))
expected_val_model_basic = 0.5793520014798194
assert_true(np.allclose(log_loss_val_l_r0, expected_val_model_basic, atol=1e-2),
                'incorrect value for log_loss_val_l_r0. Got {0}, expected {1}'.format(log_loss_val_l_r0, expected_val_model_basic))

### Visualization 2: ROC curve

We will now visualize the performance of our model.  We generate a plot called the ROC curve.  The ROC curve shows us the trade-off between the false positive rate and true positive rate, as we liberalizing the threshold required for a positive prediction.  The performance of a random model is represented by the dashed line in the plot.

In [86]:
labels_and_scores = add_probability_model_basic(ohe_validation_df).select('label', 'p')
labels_and_weights = labels_and_scores.collect()
labels_and_weights.sort(key=lambda x: x[1], reverse=True)
labels_by_weight = np.array([k for (k, v) in labels_and_weights])

length = labels_by_weight.size
true_positives = labels_by_weight.cumsum()
num_positive = true_positives[-1]
false_positives = np.arange(1.0, length + 1, 1.) - true_positives

true_positive_rate = true_positives / num_positive
false_positive_rate = false_positives / (length - num_positive)

# Generate layout and plot data
fig, ax = prepare_plot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate (Sensitivity)')
ax.set_xlabel('False Positive Rate (1 - Specificity)')
plt.plot(false_positive_rate, true_positive_rate, color='#8cbfd0', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='#d6ebf2', linewidth=2.)  # Baseline model
display(fig)

## Part 5: Reduce features' dimension via feature hashing

### (5a) Hash function

As we just saw, using an one-hot-encoding featurization can yield a model with good statistical accuracy.  However, the number of distinct categories across all features is quite large -- recall that we observed 233K categories in the training data in Part (3c).  Moreover, the full training dataset includes more than 33M distinct categories, and the training dataset itself is just a small subset of labeled data in real world.  Hence, featurizing via an one-hot-encoding representation could lead to a very large feature vector. To reduce the dimensionality of the feature space, we will use feature hashing.

Below is a hash function that we will use for this part of the lab.  We will first use this hash function with the three sample data points from Part (1a) to gain some intuition.  Implement the following code to hash those three sample points using two different values for `numBuckets` and observe the resulting hashed feature dictionaries.

In [89]:
from collections import defaultdict
import functools 
import hashlib

def hash_function(raw_feats, num_buckets, print_mapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use print_mapping=True for debug purposes and to better understand how the hashing works.

    Args:
        raw_feats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        num_buckets (int): Number of buckets to use as features.
        print_mapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = { category + ':' + str(ind):
                int(int(hashlib.md5((category + ':' + str(ind)).encode('utf-8')).hexdigest(), 16) % num_buckets)
                for ind, category in raw_feats}
    if(print_mapping): print(mapping)

    def map_update(l, r):
        l[r] += 1.0
        return l

    sparse_features = functools.reduce(map_update, mapping.values(), defaultdict(float))
    return dict(sparse_features)

# Reminder of the sample values:
# sample_one = [(0, 'mouse'), (1, 'black')]
# sample_two = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sample_three =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

In [90]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# # Use four buckets
# samp_one_four_buckets = hash_function(sample_one, < FILL IN >, True)
# samp_two_four_buckets = hash_function(sample_two, < FILL IN >, True)
# samp_three_four_buckets = hash_function(sample_three, < FILL IN >, True)

samp_one_four_buckets = hash_function(sample_one,4,True)
samp_two_four_buckets = hash_function(sample_two,4,True)
samp_three_four_buckets = hash_function(sample_three,4,True)


# # Use one hundred buckets
# samp_one_hundred_buckets = hash_function(sample_one, < FILL IN >, True)
# samp_two_hundred_buckets = hash_function(sample_two, < FILL IN >, True)
# samp_three_hundred_buckets = hash_function(sample_three, < FILL IN >, True)

samp_one_hundred_buckets = hash_function(sample_one,100,True)
samp_two_hundred_buckets = hash_function(sample_two,100,True)
samp_three_hundred_buckets = hash_function(sample_three,100,True)

# YOUR CODE HERE
#raise NotImplementedError()

print('\n\t\t 4 Buckets \t\t\t 100 Buckets')
print('Sample One:\t {0}\t\t\t {1}'.format(samp_one_four_buckets, samp_one_hundred_buckets))
print('Sample Two:\t {0}\t\t {1}'.format(samp_two_four_buckets, samp_two_hundred_buckets))
print('Sample Three:\t {0}\t {1}'.format(samp_three_four_buckets, samp_three_hundred_buckets))

In [91]:
# TEST Hash function (5a)
assert_equal(samp_one_four_buckets, {3: 2.0}, 'incorrect value for samp_one_four_buckets')
assert_equal(samp_three_hundred_buckets, {80: 1.0, 82: 1.0, 51: 1.0},
                  'incorrect value for samp_three_hundred_buckets')

### (5b) Create hashed features

Next, we will use this hash function to create hashed features for our CTR datasets. Use the given UDF to create a function that takes in a DataFrame and returns both labels and hashed features.  Then use it to create new training, validation and test datasets with hashed features.

In [93]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.ml.linalg import Vectors
num_hash_buckets = 2 ** 15

# UDF that returns a vector of hashed features given an Array of tuples
tuples_to_hash_features_udf = udf(lambda x: Vectors.sparse(num_hash_buckets, hash_function(x, num_hash_buckets)), VectorUDT())

def add_hashed_features(df):
    """Return a DataFrame with labels and hashed features.

    Note:
        Make sure you cache the DataFrame that you are returning.

    Args:
        df (DataFrame with 'label' and 'features' column): A DataFrame containing labels and the features to be hashed.

    Returns:
        DataFrame: A DataFrame with a 'label' column and a 'features' column that contains a
            SparseVector of hashed features.
    """
#     return     <FILL IN>
    return df.select( "label" , tuples_to_hash_features_udf("features").alias("features") )

# YOUR CODE HERE
#raise NotImplementedError()

# hash_train_df = <FILL IN>
# hash_validation_df = <FILL IN>
# hash_test_df = <FILL IN>

hash_train_df = add_hashed_features(parse_raw_df(raw_train_df))
hash_validation_df = add_hashed_features(parse_raw_df(raw_validation_df))
hash_test_df = add_hashed_features(parse_raw_df(raw_test_df))

# YOUR CODE HERE
#raise NotImplementedError()

hash_train_df.show()

In [94]:
# TEST Creating hashed features (5b)
hash_train_df_feature_sum = sum(hash_train_df
                                  .rdd
                                  .map(lambda r: sum(r[1].indices))
                                  .take(10))
hash_validation_df_feature_sum = sum(hash_validation_df
                                       .rdd
                                       .map(lambda r: sum(r[1].indices))
                                       .take(10))
hash_test_df_feature_sum = sum(hash_test_df
                                 .rdd
                                 .map(lambda r: sum(r[1].indices))
                                 .take(10))

expected_train_sum = 6333443
assert_equal(hash_train_df_feature_sum, expected_train_sum,
                  'incorrect number of features in hash_train_df. Got {0}, expected {1}'.format(hash_train_df_feature_sum, expected_train_sum))

expected_validation_sum = 6340030
assert_equal(hash_validation_df_feature_sum, expected_validation_sum,
                  'incorrect number of features in hash_validation_df. Got {0}, expected {1}'.format(hash_validation_df_feature_sum, expected_validation_sum))

### (5c) Sparsity

Since we only have 33K hashed features versus 233K OHE features, we could expect our OHE features to be sparser. Verify this hypothesis by computing the average sparsity of the OHE and the hashed training datasets.

Note that if you have a `SparseVector` named `sparse`, calling `len(sparse)` returns the total number of features, not the number features with entries.  `SparseVector` objects have the attributes `indices` and `values` that contain information about non-zero features.  Continuing with our example, these can be accessed using `sparse.indices` and `sparse.values`, respectively.

In [96]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def vector_feature_sparsity(sparse_vector):
    """Calculates the sparsity of a SparseVector.

    Args:
        sparse_vector (SparseVector): The vector containing the features.

    Returns:
        float: The ratio of features found in the vector to the total number of features.
    """
#     return     <FILL IN>
    return  len(sparse_vector.values)/len(sparse_vector)

# YOUR CODE HERE
#raise NotImplementedError()

a_sparse_vector = Vectors.sparse(5, {0: 1.0, 3: 1.0})
a_sparse_vector_sparsity = vector_feature_sparsity(a_sparse_vector)
print('This vector should have sparsity 2/5 or .4.')
print('Sparsity = {0:.2f}.'.format(a_sparse_vector_sparsity))

In [97]:
# TEST Sparsity (5c)
assert_equal(a_sparse_vector_sparsity, .4,
                'incorrect value for a_sparse_vector_sparsity')

### (5d) Sparsity continued

Now we have a function to calculate vector sparsity, we'll wrap it up in a UDF and apply it to the entire DataFrame to obtain the average sparsity of features in that DataFrame.  We'll use this function to calculate the average sparsity of both the one-hot-encoded training DataFrame and  the hashed training DataFrame.

In [99]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
feature_sparsity_udf = udf(vector_feature_sparsity, DoubleType())

def get_sparsity(df):
    """Calculates the average sparsity for the features in a DataFrame.

    Args:
        df (DataFrame with 'features' column): A DataFrame with sparse features.

    Returns:
        float: The average feature sparsity.
    """
#     return (<FILL IN>)
    return df.select(feature_sparsity_udf("features").alias("sparsity")).selectExpr("avg(sparsity)").first()[0]

# YOUR CODE HERE
#raise NotImplementedError()

# average_sparsity_ohe = <FILL IN>
# average_sparsity_hash = <FILL IN>

average_sparsity_ohe = get_sparsity(ohe_train_df)
average_sparsity_hash = get_sparsity(hash_train_df)

# YOUR CODE HERE
#raise NotImplementedError()

print('Average OHE Sparsity: {0:.7e}'.format(average_sparsity_ohe))
print('Average Hash Sparsity: {0:.7e}'.format(average_sparsity_hash))

In [100]:
# TEST Sparsity (5d)
expected_ohe = 1.8092746e-04
assert_true(np.allclose(average_sparsity_ohe, expected_ohe, atol=1e-2),
                'incorrect value for average_sparsity_ohe. Got {0}, expected {1}'.format(average_sparsity_ohe, expected_ohe))
expected_hash = 1.1895943e-03
assert_true(np.allclose(average_sparsity_hash, expected_hash, atol=1e-2),
                'incorrect value for average_sparsity_hash. Got {0}, expected {1}'.format(average_sparsity_hash, expected_hash))

### (5e) Logistic model with hashed features

Now let's train a logistic regression model using the hashed training features. Use the given hyperparameters, train and fit the model, then evaluate the log loss on the training set.

In [102]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# Given hyperparameters
standardization = False
elastic_net_param = 0.7
reg_param = .001
max_iter = 20

# lr_hash = (<FILL IN>)

# lr_model_hashed = <FILL IN>

lr_hash = LogisticRegression(standardization=standardization,
                            elasticNetParam=elastic_net_param,
                            regParam=reg_param,
                            maxIter=max_iter)


lr_model_hashed = lr_hash.fit(hash_train_df)

# YOUR CODE HERE
#raise NotImplementedError()

print('intercept: {0}'.format(lr_model_hashed.intercept))
print(len(lr_model_hashed.coefficients))

log_loss_train_model_hashed = evaluate_results(hash_train_df, lr_model_hashed)
print(('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\thashed = {1:.3f}'
       .format(log_loss_tr_base, log_loss_train_model_hashed)))

In [103]:
# TEST Logistic model with hashed features (5e)
expected =  0.481478172974873
assert_true(np.allclose(log_loss_train_model_hashed, expected, atol=1e-2),
                'incorrect value for log_loss_train_model_hashed. Got {0}, expected {1}'.format(log_loss_train_model_hashed, expected))

### (5f) Evaluate the performance on the test set

Finally, we will evaluate the model from Part (5e) on the test set.  Compare the resulting log loss with the baseline log loss on the test set, which can be computed in the same way where the validation log loss was computed in Part (4f).

In [105]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# # Log loss for the best model from (5e)
# log_loss_test = <FILL IN>
log_loss_test = evaluate_results(hash_test_df,lr_model_hashed)

# YOUR CODE HERE
#raise NotImplementedError()

# ## Log loss for the baseline model
# class_one_frac_test = <FILL IN>
# print('Class one fraction for test data: {0}'.format(class_one_frac_test))
# log_loss_test_baseline = <FILL IN>

class_one_frac_test = hash_test_df.selectExpr('avg(label)').first()[0]
print('Class one fraction for test data: {0}'.format(class_one_frac_test))
log_loss_test_baseline = evaluate_results(hash_test_df, None, class_one_frac_test)

# YOUR CODE HERE
#raise NotImplementedError()

print(('Hashed Features Test Log Loss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_test_baseline, log_loss_test)))

In [106]:
# TEST Evaluate on the test set (5f)
expected_test_baseline = 0.6257336255412367
assert_true(np.allclose(log_loss_test_baseline, expected_test_baseline, atol=1e-2),
                'incorrect value for log_loss_test_baseline. Got {0}, expected {1}'.format(log_loss_test_baseline, expected_test_baseline))
expected_test = 0.5748571343610652
assert_true(np.allclose(log_loss_test, expected_test, atol=1e-2),
                'incorrect value for log_loss_test. Got {0}, expected {1}'.format(log_loss_test, expected_test))

# Principle Component Analysis

In this part we will dive into exploratory analysis of neuroscience data, specifically using principal component analysis (PCA) and feature-based aggregation. We will use a dataset of light-sheet imaging recorded by the [Ahrens Lab](http://www.janelia.org/lab/ahrens-lab) at Janelia Research Campus.

Our dataset is generated by studying the movement of a larval [zebrafish](http://en.wikipedia.org/wiki/Zebrafish), an animal that is especially useful in neuroscience because it is transparent, making it possible to record activity over its entire brain using a technique called [light-sheet microscopy](http://en.wikipedia.org/wiki/Light_sheet_fluorescence_microscopy).   Specifically, we'll work with time-varying images containing patterns of the zebrafish's neural activity as it is presented with a moving visual pattern.   Different stimuli induce different patterns across the brain, and we can use exploratory analyses to identify these patterns.  Read ["Mapping brain activity at scale with cluster computing"](http://thefreemanlab.com/work/papers/freeman-2014-nature-methods.pdf) for more information about these kinds of analyses.

During this lab you will learn about PCA, and then compare and contrast different exploratory analyses of the same data set to identify which neural patterns they best highlight.

## This section will cover:

*  *Part 1:* Work through the steps of PCA on a sample dataset
 * *Visualization 1:* Two-dimensional Gaussians

*  *Part 2:* Write a PCA function and evaluate PCA on sample datasets
 * *Visualization 2:* PCA projection
 * *Visualization 3:* Three-dimensional data
 * *Visualization 4:* 2D representation of 3D data

*  *Part 3:* Parse, inspect, and preprocess neuroscience data then perform PCA
 * *Visualization 5:* Pixel intensity
 * *Visualization 6:* Normalized data
 * *Visualization 7:* Top two components as images
 * *Visualization 8:* Top two components as one image

*  *Part 4:* Perform feature-based aggregation followed by PCA
 * *Visualization 9:* Top two components by time
 * *Visualization 10:* Top two components by direction

> Note that, for reference, you can look up the details of:
> * the relevant Spark methods in [PySpark's DataFrame API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.DataFrame)
> * the relevant NumPy methods in the [NumPy Reference](http://docs.scipy.org/doc/numpy/reference/index.html)

## Part 1: Work through the steps of PCA on a sample dataset

### Visualization 1: Two-dimensional Gaussians

Principal Component Analysis, or PCA, is a strategy for dimensionality reduction. To better understand PCA, we'll work with synthetic data generated by sampling from the [two-dimensional Gaussian distribution](http://en.wikipedia.org/wiki/Multivariate_normal_distribution).  This distribution takes as input the mean and variance of each dimension, as well as the covariance between the two dimensions.

In our visualizations below, we will specify the mean of each dimension to be 50 and the variance along each dimension to be 1.  We will explore two different values for the covariance: 0 and 0.9. When the covariance is zero, the two dimensions are uncorrelated, and hence the data looks spherical.  In contrast, when the covariance is 0.9, the two dimensions are strongly (positively) correlated and thus the data is non-spherical.  As we'll see in Parts 1 and 2, the non-spherical data is amenable to dimensionality reduction via PCA, while the spherical data is not.

In [110]:
import matplotlib.pyplot as plt
import numpy as np

def prepare_plot(xticks, yticks, figsize=(10.5, 6), hide_labels=False, grid_color='#999999',
                 grid_width=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hide_labels: axis.set_ticklabels([])
    plt.grid(color=grid_color, linewidth=grid_width, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

def create_2D_gaussian(mn, variance, cov, n):
    """Randomly sample points from a two-dimensional Gaussian distribution"""
    np.random.seed(142)
    return np.random.multivariate_normal(np.array([mn, mn]), np.array([[variance, cov], [cov, variance]]), n)

In [111]:
data_random = create_2D_gaussian(mn=50, variance=1, cov=0, n=100)

# generate layout and plot data
fig, ax = prepare_plot(np.arange(46, 55, 2), np.arange(46, 55, 2))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45, 54.5), ax.set_ylim(45, 54.5)
plt.scatter(data_random[:,0], data_random[:,1], s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
display(fig)

In [112]:
data_correlated = create_2D_gaussian(mn=50, variance=1, cov=.9, n=100)

# generate layout and plot data
fig, ax = prepare_plot(np.arange(46, 55, 2), np.arange(46, 55, 2))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.scatter(data_correlated[:,0], data_correlated[:,1], s=14**2, c='#d6ebf2',
            edgecolors='#8cbfd0', alpha=0.75)
display(fig)

### (1a) Interpreting PCA

PCA can be interpreted as identifying the "directions" along which the data vary the most. In the first step of PCA, we must first center our data.  Working with our correlated dataset, first compute the mean of each feature (column) in the dataset.  Then for each observation, modify the features by subtracting their corresponding mean, to create a zero mean dataset.

> Note:
> * `correlated_data` is an RDD of NumPy arrays.
> * This allows us to perform certain operations more succinctly.
> * For example, we can sum the columns of our dataset using `correlated_data.sum()`.

In [114]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to interprete PCA
correlated_data = sc.parallelize(data_correlated)

# # Interpreting PCA, first compute the mean and subtract it from the original feature
# mean_correlated = <FILL IN>
# correlated_data_zero_mean = correlated_data.<FILL IN>

mean_correlated = correlated_data.sum()/correlated_data.count()
correlated_data_zero_mean = correlated_data.map(lambda x:x-mean_correlated)

# YOUR CODE HERE
#raise NotImplementedError()

print(mean_correlated)
print(correlated_data.take(1))
print(correlated_data_zero_mean.take(1))

In [115]:
# TEST Interpreting PCA (1a)
from nose.tools import assert_true, assert_equal
assert_true(np.allclose(mean_correlated, [49.95739037, 49.97180477], atol=1e-2),
                'incorrect value for mean_correlated')
assert_true(np.allclose(correlated_data_zero_mean.take(1)[0], [-0.28561917, 0.10351492], atol=1e-2),
                'incorrect value for correlated_data_zero_mean')


### (1b) Sample covariance matrix

We are now ready to compute the sample covariance matrix. If we define \\(\scriptsize \mathbf{X} \in \mathbb{R}^{n \times d}\\) as the zero mean data matrix, then the sample covariance matrix is defined as: \\[ \mathbf{C}_{\mathbf X} = \frac{1}{n} \mathbf{X}^\top \mathbf{X} \,.\\]

To compute this matrix, compute the outer product of each data point, add together these outer products, and divide by the number of data points. The data are two dimensional, so the resulting covariance matrix should be a 2x2 matrix.

> Note:
> * [np.outer()](http://docs.scipy.org/doc/numpy/reference/generated/numpy.outer.html) can be used to calculate the outer product of two NumPy arrays.

In [117]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to compute cov matrix
# Compute the covariance matrix using outer products
# # 1. Compute the outer product of each data point
# # 2. Add together these outer products
# # 3. Divide by the number of data points

# correlated_cov = (correlated_data_zero_mean
#                  .map(<FILL_IN>)
#                  .<FILL_IN> ) / correlated_data_zero_mean.<FILL_IN>

correlated_cov = (correlated_data_zero_mean
                 .map( lambda x:np.outer(x, x) )
                 .sum() ) / correlated_data_zero_mean.count()

# YOUR CODE HERE
#raise NotImplementedError()
print(correlated_cov)

In [118]:
# TEST Sample covariance matrix (1b)
cov_result = [[ 0.99558386,  0.90148989], [0.90148989, 1.08607497]]
assert_true(np.allclose(cov_result, correlated_cov, atol=1e-2), 'incorrect value for correlated_cov')


### (1c) Covariance Function

Next, use the expressions above to write a function to compute the sample covariance matrix for an arbitrary `data` RDD.

In [120]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to compute the cov matrix
# # Write a function to compute the sample covariance matrix for an arbitrary RDD
def estimate_covariance(data):
    """Compute the covariance matrix for a given rdd.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        data (RDD of np.ndarray):  An `RDD` consisting of NumPy arrays.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input `RDD`.
    """
    n = data.count()
    mean = data.sum()/n
    data_zero_mean = data.map(lambda x:x-mean)
    sample_cov = (data_zero_mean.map(lambda x:np.outer(x,x)).sum())/n
    return sample_cov
    
    # YOUR CODE HERE
    #raise NotImplementedError()
  


correlated_cov_auto= estimate_covariance(correlated_data)
print(correlated_cov_auto)

In [121]:
# TEST Covariance function (1c)
correct_cov = [[ 0.99558386,  0.90148989], [0.90148989, 1.08607497]]
assert_true(np.allclose(correct_cov, correlated_cov_auto, atol=1e-2),
                'incorrect value for correlated_cov_auto')

test_data = np.array([[0,1,2,3], [4,5,6,7], [8,9,10,11], [12,13,14,15]])
cov_test_data = sc.parallelize(test_data)
correct_test_cov = [[20., 20., 20., 20.],
                    [ 20.,  20.,  20.,  20.],
                    [ 20.,  20.,  20.,  20.],
                    [ 20.,  20.,  20.,  20.]]
assert_true(np.allclose(correct_test_cov, estimate_covariance(cov_test_data), atol=1e-2), 'incorrect value returned by estimate_covariance')


### (1d) Eigendecomposition

Now that we've computed the sample covariance matrix, we can use it to find directions of maximal variance in the data.  Specifically, we can perform an eigendecomposition of this matrix to find its eigenvalues and eigenvectors.  The \\(\scriptsize d \\) eigenvectors of the covariance matrix give us the directions of maximal variance, and are often called the "principal components."  The associated eigenvalues are the variances in these directions.  In particular, the eigenvector corresponding to the largest eigenvalue is the direction of maximal variance (this is sometimes called the "top" eigenvector). Eigendecomposition of a \\(\scriptsize d \times d \\) covariance matrix has a (roughly) cubic runtime complexity with respect to \\(\scriptsize d \\).  Whenever \\(\scriptsize d \\) is relatively small (e.g., less than a few thousand) we can quickly perform this eigendecomposition locally.

Use a function from `numpy.linalg` called [eigh](http://docs.scipy.org/doc/numpy/reference/generated/numpy.linalg.eigh.html) to perform the eigendecomposition.  Next, sort the eigenvectors based on their corresponding eigenvalues (from high to low), yielding a matrix where the columns are the eigenvectors (and the first column is the top eigenvector).  Note that [np.argsort](http://docs.scipy.org/doc/numpy/reference/generated/numpy.argsort.html#numpy-argsort) can be used to obtain the indices of the eigenvalues that correspond to the ascending order of eigenvalues.  Finally, set the `top_component` variable equal to the top eigenvector or prinicipal component, which is a \\(\scriptsize 2 \\)-dimensional vector (array with two values).

> Note:
> * The eigenvectors returned by `eigh` appear in the columns and not the rows.
> * For example, the first eigenvector of `eig_vecs` would be found in the first column and could be accessed using `eig_vecs[:,0]`.

In [123]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to calculate eigenvalues and eigenvectors
from numpy.linalg import eigh

# # Calculate the eigenvalues and eigenvectors from correlated_cov_auto

# eig_vals, eig_vecs = <FILL_IN>
# # Use np.argsort to find the top eigenvector based on the largest eigenvalue

# inds = np.argsort(<FILL_IN>)
# top_component = <FILL_IN>


eig_vals, eig_vecs = eigh(correlated_cov_auto)
inds = np.argsort(eig_vals)
print(inds)
top_component = eig_vecs[:,inds[-1]]


# YOUR CODE HERE
#raise NotImplementedError()

print('eigenvalues: {0}'.format(eig_vals))
print('\neigenvectors: \n{0}'.format(eig_vecs))
print('\ntop principal component: {0}'.format(top_component))

In [124]:
# TEST Eigendecomposition (1d)
def check_basis(vectors, correct):
    return np.allclose(vectors, correct) or np.allclose(np.negative(vectors), correct)

assert_true(check_basis(top_component, [0.68915649, 0.72461254]),
                'incorrect value for top_component')


### (1e) PCA scores

We just computed the top principal component for a 2-dimensional non-spherical dataset.  Now let's use this principal component to derive a one-dimensional representation for the original data. To compute these compact representations, which are sometimes called PCA "scores", calculate the dot product between each data point in the raw data and the top principal component.

In [126]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to generate PCA scores using the top_component
# Use the top_component and the data from correlated_data to generate PCA scores

# correlated_data_scores = <FILL IN>

correlated_data_scores =  correlated_data.map(lambda x:x.dot(top_component))

# YOUR CODE HERE
#raise NotImplementedError()

print('one-dimensional data (first three):\n{0}'.format(np.asarray(correlated_data_scores.take(3))))

In [127]:
# TEST PCA Scores (1e)
first_three = [70.51682806, 69.30622356, 71.13588168]
assert_true(check_basis(correlated_data_scores.take(3), first_three),
                'incorrect value for correlated_data_scores')


## Part 2: Write a PCA function and evaluate PCA on sample datasets

### (2a) PCA function

We now have all the ingredients to write a general PCA function.  Instead of working with just the top principal component, our function will compute the top \\(\scriptsize k\\) principal components and principal scores for a given dataset. The top \\(\scriptsize k\\) principal components should be returned in descending order when ranked by their corresponding principal scores. Write this general function `pca`, and run it with `correlated_data` and \\(\scriptsize k = 2\\). Hint: Use results from Part (1c), Part (1d), and Part (1e).

Note: As discussed in lecture, our implementation is a reasonable strategy when \\(\scriptsize d \\) is small, though more efficient distributed algorithms exist when \\(\scriptsize d \\) is large.

In [130]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to compute top-k principle components, scores and all eigenvalues

def pca(data, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.
 
    Args:
        data (RDD of np.ndarray): An `RDD` consisting of NumPy arrays.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
            scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
            rows equals the length of the arrays in the input `RDD` and the number of columns equals
            `k`.  The `RDD` of scores has the same number of rows as `data` and consists of arrays
            of length `k`.  Eigenvalues is an array of length d (the number of features).
    """
    # sample_cov = <FILL_IN>
    # eig_vals, eig_vecs = <FILL_IN>
    # inds = <FILL_IN>
    # # Return the `k` principal components, `k` scores, and all eigenvalues
    # components = <FILL_IN>
    # eigenvalues = <FILL_IN
    # scores = data.map(<FILL_IN>)
    # return <FILL_IN>

    sample_cov = estimate_covariance(data)
    eig_vals, eig_vecs = eigh(sample_cov)
    inds = np.argsort(eig_vals)
    # Return the `k` principal components, `k` scores, and all eigenvalues
    components = np.take(eig_vecs, [inds[-x-1] for x in range(k)], axis=1)
    eigenvalues = sorted(eig_vals, reverse=True)
    scores = data.map(lambda x:[ x.dot(components[:,i]) for i in range(k) ])
    return   (components, scores, eigenvalues)

    # YOUR CODE HERE
    #raise NotImplementedError()

# Run pca on correlated_data with k = 2
top_components_correlated, correlated_data_scores_auto, eigenvalues_correlated  = pca(correlated_data, 2)

# Note that the 1st principal component is in the first column
print('top_components_correlated: \n{0}'.format(top_components_correlated))
print('\ncorrelated_data_scores_auto (first three): \n{0}'
       .format('\n'.join(map(str, correlated_data_scores_auto.take(3)))))
print('\neigenvalues_correlated: \n{0}'.format(eigenvalues_correlated))

# Create a higher dimensional test set
pca_test_data = sc.parallelize([np.arange(x, x + 4) for x in np.arange(0, 20, 4)])
components_test, test_scores, eigenvalues_test = pca(pca_test_data, 3)

print('\npca_test_data: \n{0}'.format(np.array(pca_test_data.collect())))
print('\ncomponents_test: \n{0}'.format(components_test))
print('\ntest_scores (first three): \n{0}'
       .format('\n'.join(map(str, test_scores.take(3)))))
print('\neigenvalues_test: \n{0}'.format(eigenvalues_test))

In [131]:
# TEST PCA Function (2a)
assert_true(check_basis(top_components_correlated.T,
                            [[0.68915649,  0.72461254], [-0.72461254, 0.68915649]]),
                'incorrect value for top_components_correlated')
first_three_correlated = [[70.51682806, 69.30622356, 71.13588168], [1.48305648, 1.5888655, 1.86710679]]
assert_true(np.allclose(first_three_correlated,
                            np.vstack(np.abs(correlated_data_scores_auto.take(3))).T, atol=1e-2),
                'incorrect value for first three correlated values')
assert_true(np.allclose(eigenvalues_correlated, [1.94345403, 0.13820481], atol=1e-2),
                           'incorrect values for eigenvalues_correlated')
top_components_correlated_k1, correlated_data_scores_k1, eigenvalues_correlated_k1 = pca(correlated_data, 1)
assert_true(check_basis(top_components_correlated_k1.T, [0.68915649, 0.72461254]),
               'incorrect value for components when k=1')
assert_true(np.allclose([70.51682806, 69.30622356, 71.13588168],
                            np.vstack(np.abs(correlated_data_scores_k1.take(3))).T, atol=1e-2),
                'incorrect value for scores when k=1')


### (2b) PCA on `data_random`

Next, use the PCA function we just developed to find the top two principal components of the spherical `data_random` we created in Visualization 1.

First, we need to convert `data_random` to the RDD `random_data_rdd`, and do all subsequent operations on `random_data_rdd`.

In [133]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to convert data_random to RDD random_data_rdd
random_data_rdd = sc.parallelize(data_random)

# # Use pca on data_random
# top_components_random, random_data_scores_auto, eigenvalues_random = <FILL_IN>
top_components_random, random_data_scores_auto, eigenvalues_random = pca(random_data_rdd)

# YOUR CODE HERE
#raise NotImplementedError()


print('top_components_random: \n{0}'.format(top_components_random))
print('\nrandom_data_scores_auto (first three): \n{0}'
       .format('\n'.join(map(str, random_data_scores_auto.take(3)))))
print('\neigenvalues_random: \n{0}'.format(eigenvalues_random))

In [134]:
# TEST PCA on `data_random` (2b)
assert_true(check_basis(top_components_random.T,
                            [[-0.2522559 ,  0.96766056], [-0.96766056,  -0.2522559]]),
                'incorrect value for top_components_random')
first_three_random = [[36.61068572, 35.97314295, 35.59836628],
                      [61.3489929 ,  62.08813671,  60.61390415]]
assert_true(np.allclose(first_three_random, np.vstack(np.abs(random_data_scores_auto.take(3))).T, atol=1e-2),
                'incorrect value for random_data_scores_auto')
assert_true(np.allclose(eigenvalues_random, [1.4204546, 0.99521397], atol=1e-2),
                            'incorrect value for eigenvalues_random')


### Visualization 2: PCA projection

Plot the original data and the 1-dimensional reconstruction using the top principal component to see how the PCA solution looks.  The original data is plotted as before; however, the 1-dimensional reconstruction (projection) is plotted in green on top of the original data and the vectors (lines) representing the two principal components are shown as dotted lines.

In [136]:
def project_points_and_get_lines(data, components, x_range):
    """Project original data onto first component and get line details for top two components."""
    top_component = components[:, 0]
    slope1, slope2 = components[1, :2] / components[0, :2]

    means = data.mean()[:2]
    demeaned = data.map(lambda v: v - means)
    projected = demeaned.map(lambda v: (v.dot(top_component) /
                                        top_component.dot(top_component)) * top_component)
    remeaned = projected.map(lambda v: v + means)
    x1,x2 = zip(*remeaned.collect())

    line_start_P1_X1, line_start_P1_X2 = means - np.asarray([x_range, x_range * slope1])
    line_end_P1_X1, line_end_P1_X2 = means + np.asarray([x_range, x_range * slope1])
    line_start_P2_X1, line_start_P2_X2 = means - np.asarray([x_range, x_range * slope2])
    line_end_P2_X1, line_end_P2_X2 = means + np.asarray([x_range, x_range * slope2])

    return ((x1, x2), ([line_start_P1_X1, line_end_P1_X1], [line_start_P1_X2, line_end_P1_X2]),
            ([line_start_P2_X1, line_end_P2_X1], [line_start_P2_X2, line_end_P2_X2]))

In [137]:
((x1, x2), (line1X1, line1X2), (line2X1, line2X2)) = \
    project_points_and_get_lines(correlated_data, top_components_correlated, 5)

# generate layout and plot data
fig, ax = prepare_plot(np.arange(46, 55, 2), np.arange(46, 55, 2), figsize=(7, 7))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.plot(line1X1, line1X2, linewidth=3.0, c='#8cbfd0', linestyle='--')
plt.plot(line2X1, line2X2, linewidth=3.0, c='#d6ebf2', linestyle='--')
plt.scatter(data_correlated[:,0], data_correlated[:,1], s=14**2, c='#d6ebf2',
            edgecolors='#8cbfd0', alpha=0.75)
plt.scatter(x1, x2, s=14**2, c='#62c162', alpha=.75)
display(fig)

In [138]:
((x1, x2), (line1X1, line1X2), (line2X1, line2X2)) = \
    project_points_and_get_lines(random_data_rdd, top_components_random, 5)

# generate layout and plot data
fig, ax = prepare_plot(np.arange(46, 55, 2), np.arange(46, 55, 2), figsize=(7, 7))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.plot(line1X1, line1X2, linewidth=3.0, c='#8cbfd0', linestyle='--')
plt.plot(line2X1, line2X2, linewidth=3.0, c='#d6ebf2', linestyle='--')
plt.scatter(data_random[:,0], data_random[:,1], s=14**2, c='#d6ebf2',
            edgecolors='#8cbfd0', alpha=0.75)
plt.scatter(x1, x2, s=14**2, c='#62c162', alpha=.75)
display(fig)

### Visualization 3: Three-dimensional data

So far we have worked with two-dimensional data. Now let's generate three-dimensional data with highly correlated features. As in Visualization 1, we'll create samples from a multivariate Gaussian distribution, which in three dimensions requires us to specify three means, three variances, and three covariances.

In the 3D graphs below, we have included the 2D plane that corresponds to the top two principal components, i.e. the plane with the smallest euclidean distance between the points and itself. Notice that the data points, despite living in three-dimensions, are found near a two-dimensional plane: the left graph shows how most points are close to the plane when it is viewed from its side, while the right graph shows that the plane covers most of the variance in the data.  Note that darker blues correspond to points with higher values for the third dimension.

In [140]:
from mpl_toolkits.mplot3d import Axes3D

m = 100
mu = np.array([50, 50, 50])
r1_2 = 0.9
r1_3 = 0.7
r2_3 = 0.1
sigma1 = 5
sigma2 = 20
sigma3 = 20
c = np.array([[sigma1 ** 2, r1_2 * sigma1 * sigma2, r1_3 * sigma1 * sigma3],
             [r1_2 * sigma1 * sigma2, sigma2 ** 2, r2_3 * sigma2 * sigma3],
             [r1_3 * sigma1 * sigma3, r2_3 * sigma2 * sigma3, sigma3 ** 2]])
np.random.seed(142)
data_threeD = np.random.multivariate_normal(mu, c, m)

from matplotlib.colors import ListedColormap, Normalize
from matplotlib.cm import get_cmap
norm = Normalize()
cmap = get_cmap("Blues")
clrs = cmap(np.array(norm(data_threeD[:,2])))[:,0:3]

fig = plt.figure(figsize=(11, 6))
ax = fig.add_subplot(121, projection='3d')
ax.azim=-100
ax.scatter(data_threeD[:,0], data_threeD[:,1], data_threeD[:,2], c=clrs, s=14**2)

xx, yy = np.meshgrid(np.arange(-15, 10, 1), np.arange(-50, 30, 1))
normal = np.array([0.96981815, -0.188338, -0.15485978])
z = (-normal[0] * xx - normal[1] * yy) * 1. / normal[2]
xx = xx + 50
yy = yy + 50
z = z + 50

ax.set_zlim((-20, 120)), ax.set_ylim((-20, 100)), ax.set_xlim((30, 75))
ax.plot_surface(xx, yy, z, alpha=.10)

ax = fig.add_subplot(122, projection='3d')
ax.azim=10
ax.elev=20
#ax.dist=8
ax.scatter(data_threeD[:,0], data_threeD[:,1], data_threeD[:,2], c=clrs, s=14**2)

ax.set_zlim((-20, 120)), ax.set_ylim((-20, 100)), ax.set_xlim((30, 75))
ax.plot_surface(xx, yy, z, alpha=.1)
plt.tight_layout()
display(fig)

### (2c) 3D to 2D

We will now use PCA to see if we can recover the 2-dimensional plane on which the data live. Parallelize the data, and use our PCA function from above, with \\( \scriptsize k=2 \\) components.

In [142]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to apply PCA function to 3D data. You need to first parallelize the data and use the PCA function defined above with k=2 components
threeD_data = sc.parallelize(data_threeD)

# components_threeD, threeD_scores, eigenvalues_threeD = <FILL_IN>

components_threeD, threeD_scores, eigenvalues_threeD = pca(threeD_data)

# YOUR CODE HERE
#raise NotImplementedError()

print('components_threeD: \n{0}'.format(components_threeD))
print('\nthreeD_scores (first three): \n{0}'
       .format('\n'.join(map(str, threeD_scores.take(3)))))
print('\neigenvalues_threeD: \n{0}'.format(eigenvalues_threeD))

In [143]:
# TEST 3D to 2D (2c)
assert_equal(components_threeD.shape, (3, 2), 'incorrect shape for components_threeD')
assert_true(np.allclose(np.sum(eigenvalues_threeD), 969.796443367, atol=1e-2),
                'incorrect value for eigenvalues_threeD')
assert_true(np.allclose(np.abs(np.sum(components_threeD)), 1.77238943258, atol=1e-2),
                'incorrect value for components_threeD')
assert_true(np.allclose(np.abs(np.sum(threeD_scores.take(3))), 237.782834092, atol=1e-2),
                'incorrect value for threeD_scores')


### Visualization 4: 2D representation of 3D data

See the 2D version of the data that captures most of its original structure.  Note that darker blues correspond to points with higher values for the original data's third dimension.

In [145]:
scores_threeD = np.asarray(threeD_scores.collect())

# generate layout and plot data
fig, ax = prepare_plot(np.arange(20, 150, 20), np.arange(-40, 110, 20))
ax.set_xlabel(r'New $x_1$ values'), ax.set_ylabel(r'New $x_2$ values')
ax.set_xlim(5, 150), ax.set_ylim(-45, 50)
plt.scatter(scores_threeD[:, 0], scores_threeD[:, 1], s=14 ** 2, c=clrs, edgecolors='#8cbfd0', alpha=0.75)
display(fig)

### (2d) Variance explained

Finally, let's quantify how much of the variance is being captured by PCA in each of the three synthetic datasets we've analyzed.  To do this, we'll compute the fraction of retained variance by the top principal components.  Recall that the eigenvalue corresponding to each principal component captures the variance along this direction.  If our initial data is \\(\scriptsize d\\)-dimensional, then the total variance in our data equals: \\( \scriptsize \sum_{i=1}^d \lambda_i \\), where \\(\scriptsize \lambda_i\\) is the eigenvalue corresponding to the \\(\scriptsize i\\)th principal component. Moreover, if we use PCA with some \\(\scriptsize k < d\\), then we can compute the variance retained by these principal components by adding the top \\(\scriptsize k\\) eigenvalues.  The fraction of retained variance equals the sum of the top \\(\scriptsize k\\) eigenvalues divided by the sum of all of the eigenvalues.

In [147]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to compute the fraction of retained variance by the top principle components

def variance_explained(data, k=1):
    """Calculate the fraction of variance explained by the top `k` eigenvectors.

    Args:
        data (RDD of np.ndarray): An RDD that contains NumPy arrays which store the
            features for an observation.
        k: The number of principal components to consider.

    Returns:
        float: A number between 0 and 1 representing the percentage of variance explained
            by the top `k` eigenvectors.
    """
    # components, scores, eigenvalues = <FILL IN>
    # variances = <FILL IN>
    # return <FILL IN>
    
    
    components, scores, eigenvalues = pca(data)
    variances = eigenvalues[:k] 
    return np.sum(variances)/np.sum(eigenvalues)
    
    
    # YOUR CODE HERE
    #raise NotImplementedError()

variance_random_1 = variance_explained(random_data_rdd, 1)
variance_correlated_1 = variance_explained(correlated_data, 1)
variance_random_2 = variance_explained(random_data_rdd, 2)
variance_correlated_2 = variance_explained(correlated_data, 2)
variance_threeD_2 = variance_explained(threeD_data, 2)
print ('Percentage of variance explained by the first component of random_data_rdd: {0:.1f}%'
       .format(variance_random_1 * 100))
print ('Percentage of variance explained by both components of random_data_rdd: {0:.1f}%'
       .format(variance_random_2 * 100))
print ('\nPercentage of variance explained by the first component of correlated_data: {0:.1f}%'.
       format(variance_correlated_1 * 100))
print ('Percentage of variance explained by both components of correlated_data: {0:.1f}%'
       .format(variance_correlated_2 * 100))
print ('\nPercentage of variance explained by the first two components of threeD_data: {0:.1f}%'
       .format(variance_threeD_2 * 100))

In [148]:
# TEST Variance explained (2d)
assert_true(np.allclose(variance_random_1, 0.588017172066, atol=1e-2), 'incorrect value for variance_random_1')
assert_true(np.allclose(variance_correlated_1, 0.933608329586, atol=1e-2),
                'incorrect value for varianceCorrelated1')
assert_true(np.allclose(variance_random_2, 1.0, atol=1e-2), 'incorrect value for variance_random_2')
assert_true(np.allclose(variance_correlated_2, 1.0, atol=1e-2), 'incorrect value for variance_correlated_2')
assert_true(np.allclose(variance_threeD_2, 0.993967356912, atol=1e-2), 'incorrect value for variance_threeD_2')


## Part 3:  Parse, inspect, and preprocess neuroscience data then perform PCA

### Data introduction

A central challenge in neuroscience is understanding the organization and function of neurons, the cells responsible for processing and representing information in the brain. New technologies make it possible to monitor the responses of large populations of neurons in awake animals. In general, neurons communicate through electrical impulses that must be recorded with electrodes, which is a challenging process. As an alternative, we can genetically engineer animals so that their neurons express special proteins that fluoresce or light up when active, and then use microscopy to record neural activity as images.

Light-sheet microscopy lets us do this in a special, transparent animal, the larval zebrafish, over nearly its entire brain. The resulting data are time-varying images containing the activity of hundreds of thousands of neurons. Given the raw data, which is enormous, we want to find compact spatial and temporal patterns: Which groups of neurons are active together? What is the time course of their activity? Are those patterns specific to particular events happening during the experiment (e.g. a stimulus that we might present). PCA is a powerful technique for finding spatial and temporal patterns in these kinds of data, and that's what we'll explore here!

### (3a) Load neuroscience data

In the next sections we will use PCA to capture structure in neural datasets. Before doing the analysis, we will load and do some basic inspection of the data. The raw data are currently stored as a text file. Every line in the file contains the time series of image intensity for a single pixel in a time-varying image (i.e. a movie). The first two numbers in each line are the spatial coordinates of the pixel, and the remaining numbers are the time series. We'll use `first()` to inspect a single row, and print just the first 100 characters.

In [152]:
url = 'https://raw.githubusercontent.com/10605/data/master/hw3/neuro.txt'

from pyspark import SparkFiles

sc.addFile(url)

lines = sc.textFile("file://" + SparkFiles.get("neuro.txt"))
print(lines.first()[0:100])

# Check that everything loaded properly
assert len(lines.first()) == 1397
assert lines.count() == 46460

### (3b) Parse the data

Parse the data into a key-value representation. We want each key to be a tuple of two-dimensional spatial coordinates and each value to be a NumPy array storing the associated time series. Write a function that converts a line of text into a (`tuple`, `np.ndarray`) pair. Then apply this function to each record in the RDD, and inspect the first entry of the new parsed data set. Now would be a good time to cache the data, and force a computation by calling count, to ensure the data are cached.

Note: `tuple` contains 2 integer values and the numpy array consists of float numbers.

In [154]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to parse the data into (`tuple`, `np.ndarray`) format 

def parse(line):
    """Parse the raw data into a (`tuple`, `np.ndarray`) pair.

    Note:
        You should store the pixel coordinates as a tuple of two ints and the elements of the pixel intensity
        time series as an np.ndarray of floats.

    Args:
        line (str): A string representing an observation.  Elements are separated by spaces.  The
            first two elements represent the coordinates of the pixel, and the rest of the elements
            represent the pixel intensity over time.

    Returns:
        tuple of tuple, np.ndarray: A (coordinate, pixel intensity array) `tuple` where coordinate is
            a `tuple` containing two values and the pixel intensity is stored in an NumPy array
            which contains 240 values.
    """
    # vec = <FILL_IN>
    # key = <FILL_IN>
    # ts = <FILL_IN>
    # return <FILL_IN>
    
    
    vec = line.split(" ")
    key = ((int)(vec[0]),(int)(vec[1]))
    temp = [ (float)(vec[i]) for i in range(2,len(vec)) ]
    ts = np.array(temp)
    return (key,ts)

    
    # YOUR CODE HERE
    #raise NotImplementedError()

raw_data = lines.map(parse)
raw_data.cache()
entry = raw_data.first()
print ('Length of movie is {0} seconds'.format(len(entry[1])))
print ('Number of pixels in movie is {0:,}'.format(raw_data.count()))
print ('\nFirst entry of raw_data (with only the first five values of the NumPy array):\n({0}, {1})'
       .format(entry[0], entry[1][:5]))

In [155]:
# TEST Parse the data (3b)
assert_true(isinstance(entry[0], tuple), "entry's key should be a tuple")
assert_equal(len(entry), 2, 'entry should have a key and a value')
assert_true(isinstance(entry[0][1], int), 'coordinate tuple should contain ints')
assert_equal(len(entry[0]), 2, "entry's key should have two values")
assert_true(isinstance(entry[1], np.ndarray), "entry's value should be an np.ndarray")
assert_true(isinstance(entry[1][0], np.float), 'the np.ndarray should consist of np.float values')
assert_equal(entry[0], (0, 0), 'incorrect key for entry')
assert_equal(entry[1].size, 240, 'incorrect length of entry array')
assert_true(np.allclose(np.sum(entry[1]), 24683.5), 'incorrect values in entry array')
assert_true(raw_data.is_cached, 'raw_data is not cached')


### (3c) Min and max fluorescence

Next we'll do some basic preprocessing on the data. The raw time-series data are in units of image fluorescence, and baseline fluorescence varies somewhat arbitrarily from pixel to pixel. First, compute the minimum and maximum values across all pixels.

In [157]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to calculate min and max
# mn = raw_data.<FILL_IN>
# mx = raw_data.<FILL_IN>

mn = raw_data.map(lambda x:np.min(x[1])).min()
mx = raw_data.map(lambda x:np.max(x[1])).max()

# YOUR CODE HERE
#raise NotImplementedError()

print (mn, mx)

In [158]:
# TEST Min and max fluorescence (3c)
assert_true(np.allclose(mn, 100.6, atol=1e-2), 'incorrect value for mn')
assert_true(np.allclose(mx, 940.8, atol=1e-2), 'incorrect value for mx')


### Visualization 5: Pixel intensity

Let's now see how a random pixel varies in value over the course of the time series.  We'll visualize a pixel that exhibits a standard deviation of over 100.

In [160]:
example = raw_data.filter(lambda x: np.std(x[1]) > 100).values().first()

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 300, 50), np.arange(300, 800, 100))
ax.set_xlabel(r'time'), ax.set_ylabel(r'fluorescence')
ax.set_xlim(-20, 270), ax.set_ylim(270, 730)
plt.plot(range(len(example)), example, c='#8cbfd0', linewidth='3.0')
display(fig)

### (3d) Fractional signal change

To convert from these raw fluorescence units to more intuitive units of fractional signal change, write a function that takes a time series for a particular pixel and subtracts and divides by the mean.  Then apply this function to all the pixels. Confirm that this changes the maximum and minimum values.

In [162]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to standardize the array

def rescale(ts):
    """Take a np.ndarray and return the standardized array by subtracting and dividing by the mean.

    Note:
        You should first subtract the mean and then divide by the mean.

    Args:
        ts (np.ndarray): Time series data (`np.float`) representing pixel intensity.

    Returns:
        np.ndarray: The times series adjusted by subtracting the mean and dividing by the mean.
    """
    mean = np.average(ts)
    return np.array([(x-mean)/mean for x in ts])
    # YOUR CODE HERE
    #raise NotImplementedError()

scaled_data = raw_data.mapValues(lambda v: rescale(v))
mn_scaled = scaled_data.map(lambda x: x[1]).map(lambda v: min(v)).min()
mx_scaled = scaled_data.map(lambda x: x[1]).map(lambda v: max(v)).max()
print(mn_scaled, mx_scaled)

In [163]:
# TEST Fractional signal change (3d)
assert_true(isinstance(scaled_data.first()[1], np.ndarray), 'incorrect type returned by rescale')
assert_true(np.allclose(mn_scaled, -0.27151288, atol=1e-2), 'incorrect value for mn_scaled')
assert_true(np.allclose(mx_scaled, 0.90544876, atol=1e-2), 'incorrect value for mx_scaled')


### Visualization 6: Normalized data

Now that we've normalized our data, let's once again see how a random pixel varies in value over the course of the time series.  We'll visualize a pixel that exhibits a standard deviation of over 0.1.  Note the change in scale on the y-axis compared to the previous visualization.

In [165]:
example = scaled_data.filter(lambda x: np.std(x[1]) > 0.1).values().first()

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 300, 50), np.arange(-.1, .6, .1))
ax.set_xlabel(r'time'), ax.set_ylabel(r'fluorescence')
ax.set_xlim(-20, 260), ax.set_ylim(-.12, .52)
plt.plot(range(len(example)), example, c='#8cbfd0', linewidth='3.0')
display(fig)

### (3e) PCA on the scaled data

We now have a preprocessed dataset with \\(\scriptsize n = 46460\\) pixels and \\(\scriptsize d = 240\\) seconds of time series data for each pixel.  We can interpret the pixels as our observations and each pixel value in the time series as a feature.  We would like to find patterns in brain activity during this time series, and we expect to find correlations over time.  We can thus use PCA to find a more compact representation of our data and allow us to visualize it.

Use the `pca` function from Part (2a) to perform PCA on the preprocessed neuroscience data with \\(\scriptsize k = 3\\), resulting in a new low-dimensional 46460 by 3 dataset.  The `pca` function takes an RDD of arrays, but `scaled_data` is an RDD of key-value pairs, so you'll need to extract the values.

In [167]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to run pca using scaled_data

# components_scaled, scaled_scores, eigenvalues_scaled = <FILL IN>
components_scaled, scaled_scores, eigenvalues_scaled = pca(scaled_data.map(lambda x:x[1]),3)

# YOUR CODE HERE
#raise NotImplementedError()

print ('components_scaled: (first five) \n{0}'.format(components_scaled[:5, :]))
print ('\nscaled_scores (first three): \n{0}'
       .format('\n'.join(map(str, scaled_scores.take(3)))))
print ('\neigenvalues_scaled: (first five) \n{0}'.format(eigenvalues_scaled[:5]))

In [168]:
# TEST PCA on the scaled data (3e)
assert_equal(components_scaled.shape, (240, 3), 'incorrect shape for components_scaled')
assert_true(np.allclose(np.abs(np.sum(components_scaled[:5, :])), 0.283150995232, atol=1e-2),
                'incorrect value for components_scaled')
assert_true(np.allclose(np.abs(np.sum(scaled_scores.take(3))), 0.0285507449251, atol=1e-2),
                'incorrect value for scaled_scores')
assert_true(np.allclose(np.sum(eigenvalues_scaled[:5]), 0.206987501564, atol=1e-2),
                'incorrect value for eigenvalues_scaled')


### Visualization 7: Top two components as images

Now, we'll view the scores for the top two components as images.  Note that we reshape the vectors by the dimensions of the original image, 230 x 202.
These graphs map the values for the single component to a grayscale image.  This provides us with a visual representation which we can use to see the overall structure of the zebrafish brain and to identify where high and low values occur.  However, using this representation, there is a substantial amount of useful information that is difficult to interpret.  In the next visualization, we'll see how we can improve interpretability by combining the two principal components into a single image using a color mapping.

In [170]:
import matplotlib.cm as cm

scores_scaled = np.vstack(scaled_scores.collect())
image_one_scaled = scores_scaled[:, 0].reshape(230, 202).T

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 10, 1), np.arange(0, 10, 1), figsize=(9.0, 7.2), hide_labels=True)
ax.grid(False)
ax.set_title('Top Principal Component', color='#888888')
image = plt.imshow(image_one_scaled, interpolation='nearest', aspect='auto', cmap=cm.gray)
display(fig)

In [171]:
image_two_scaled = scores_scaled[:, 1].reshape(230, 202).T

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 10, 1), np.arange(0, 10, 1), figsize=(9.0, 7.2), hide_labels=True)
ax.grid(False)
ax.set_title('Second Principal Component', color='#888888')
image = plt.imshow(image_two_scaled, interpolation='nearest', aspect='auto', cmap=cm.gray)
display(fig)

### Visualization 8: Top two components as one image

When we perform PCA and color neurons based on their location in the low-dimensional space, we can interpret areas with similar colors as exhibiting similar responses (at least in terms of the simple representation we recover with PCA). Below, the first graph shows how low-dimensional representations, which correspond to the first two principal components, are mapped to colors. The second graph shows the result of this color mapping using the zebrafish neural data.

The second graph clearly exhibits patterns of neural similarity throughout different regions of the brain.  However, when performing PCA on the full dataset, there are multiple reasons why neurons might have similar responses. The neurons might respond similarly to different stimulus directions, their responses might have  similar temporal dynamics, or their response similarity could be influenced by both temporal and stimulus-specific factors. However, with our initial PCA analysis, we cannot pin down the underlying factors, and hence it is hard to interpret what "similarity" really means.

Optional Details: Note that we use [polar coordinates](https://en.wikipedia.org/wiki/Polar_coordinate_system) to map our low-dimensional points to colors.  Using polar coordinates provides us with an angle \\( (\phi) \\) and magnitude \\( (\rho) \\).  We then use the well-known polar color space, [hue-saturation-value](https://en.wikipedia.org/wiki/HSL_and_HSV) (HSV), and map the angle to hue and the magnitude to value (brightness).  This maps low magnitude points to black while allowing larger magnitude points to be differentiated by their angle. Additionally, the function `polarTransform` that maps low-dimensional representations to colors has an input parameter called `scale`, which we set  to 2.0, and you can try lower values for the two graphs to see more nuanced mappings -- values near 1.0 are particularly interesting.

In [174]:
# Adapted from python-thunder's Colorize.transform where cmap='polar'.
# Checkout the library at: https://github.com/thunder-project/thunder and
# http://thunder-project.org/

def polar_transform(scale, img):
    """Convert points from cartesian to polar coordinates and map to colors."""
    from matplotlib.colors import hsv_to_rgb

    img = np.asarray(img)
    dims = img.shape

    phi = ((np.arctan2(-img[0], -img[1]) + np.pi/2) % (np.pi*2)) / (2 * np.pi)
    rho = np.sqrt(img[0]**2 + img[1]**2)
    saturation = np.ones((dims[1], dims[2]))

    out = hsv_to_rgb(np.dstack((phi, saturation, scale * rho)))

    return np.clip(out * scale, 0, 1)

In [175]:
# Show the polar mapping from principal component coordinates to colors.
x1_abs_max = np.max(np.abs(image_one_scaled))
x2_abs_max = np.max(np.abs(image_two_scaled))

num_of_pixels = 300
x1_vals = np.arange(-x1_abs_max, x1_abs_max, (2 * x1_abs_max) / num_of_pixels)
x2_vals = np.arange(x2_abs_max, -x2_abs_max, -(2 * x2_abs_max) / num_of_pixels)
x2_vals.shape = (num_of_pixels, 1)

x1_data = np.tile(x1_vals, (num_of_pixels, 1))
x2_data = np.tile(x2_vals, (1, num_of_pixels))

# Try changing the first parameter to lower values
polar_map = polar_transform(2.0, [x1_data, x2_data])

grid_range = np.arange(0, num_of_pixels + 25, 25)
fig, ax = prepare_plot(grid_range, grid_range, figsize=(9.0, 7.2), hide_labels=True)
image = plt.imshow(polar_map, interpolation='nearest', aspect='auto')
ax.set_xlabel('Principal component one'), ax.set_ylabel('Principal component two')
grid_marks = (2 * grid_range / float(num_of_pixels) - 1.0)
x1_marks = x1_abs_max * grid_marks
x2_marks = -x2_abs_max * grid_marks
ax.get_xaxis().set_ticklabels(map(lambda x: '{0:.1f}'.format(x), x1_marks))
ax.get_yaxis().set_ticklabels(map(lambda x: '{0:.1f}'.format(x), x2_marks))
display(fig)

In [176]:
# Use the same transformation on the image data
# Try changing the first parameter to lower values
brainmap = polar_transform(2.0, [image_one_scaled, image_two_scaled])

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 10, 1), np.arange(0, 10, 1), figsize=(9.0, 7.2), hide_labels=True)
ax.grid(False)
image = plt.imshow(brainmap,interpolation='nearest', aspect='auto')
display(fig)

## Part 4: Feature-based aggregation and PCA

### (4a) Aggregation using arrays

In the analysis in Part 3, we performed PCA on the full time series data, trying to find global patterns across all 240 seconds of the time series. However, our analysis doesn't use the fact that different events happened during those 240 seconds. Specifically, during those 240 seconds, the zebrafish was presented with 12 different direction-specific visual patterns, with each one lasting for 20 seconds, for a total of 12 x 20 = 240 features. Stronger patterns are likely to emerge if we incorporate knowledge of our experimental setup into our analysis.  As we'll see, we can isolate the impact of temporal response or direction-specific impact by appropriately aggregating our features.

In order to aggregate the features we will use basic ideas from matrix multiplication.  First, note that if we use `np.dot` with a two-dimensional array, then NumPy performs the equivalent matrix-multiply calculation.  For example, `np.array([[1, 2, 3], [4, 5, 6]]).dot(np.array([2, 0, 1]))` produces `np.array([5, 14])`.

\\[\begin{bmatrix} 1 & 2 & 3 \\\ 4 & 5 & 6 \end{bmatrix} \begin{bmatrix} 2 \\\ 0 \\\ 1 \end{bmatrix} = \begin{bmatrix} 5 \\\ 14 \end{bmatrix} \\]

By setting up our multi-dimensional array properly we can multiply it by a vector to perform certain aggregation operations.  For example, imagine we had a 3 dimensional vector, \\( \scriptsize \begin{bmatrix} 1 & 2 & 3 \end{bmatrix}^\top \\)  and we wanted to create a 2 dimensional vector containing the sum of its first and last elements as one value and three times its second value as another value, i.e., \\( \scriptsize \begin{bmatrix} 4 & 6 \end{bmatrix}^\top \\). We can generate this result via matrix multiplication as follows: `np.array([[1, 0, 1], [0, 3, 0]]).dot(np.array([1, 2, 3])` which produces `np.array([4, 6]`.

\\[\begin{bmatrix} 1 & 0 & 1 \\\ 0 & 3 & 0 \end{bmatrix} \begin{bmatrix} 1 \\\ 2 \\\ 3 \end{bmatrix} = \begin{bmatrix} 4 \\\ 6 \end{bmatrix} \\]

For this exercise, you'll create several arrays that perform different types of aggregation.  The aggregation is specified in the comments before each array.  You should fill in the array values by hand.  We'll automate array creation in the next two exercises.

In [179]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to create arrays for different types of arregation
vector = np.array([0., 1., 2., 3., 4., 5.])

# # Create a multi-dimensional array that when multiplied (using .dot) against vector, results in
# # a two element array where the first element is the sum of the 0, 2, and 4 indexed elements of
# # vector and the second element is the sum of the 1, 3, and 5 indexed elements of vector.
# # This should be a 2 row by 6 column array

# sum_every_other = np.array(<FILL_IN)
sum_every_other = np.array( [ [1, 0, 1, 0, 1, 0],[0, 1, 0, 1, 0, 1] ] )

# YOUR CODE HERE
#raise NotImplementedError()

# # Create a multi-dimensional array that when multiplied (using .dot) against vector, results in a
# # three element array where the first element is the sum of the 0 and 3 indexed elements of vector,
# # the second element is the sum of the 1 and 4 indexed elements of vector, and the third element is
# # the sum of the 2 and 5 indexed elements of vector.
# # This should be a 3 row by 6 column array

# sum_every_third = np.array(<FILL_IN)
sum_every_third = np.array( [ [1, 0, 0, 1, 0, 0],[0, 1, 0, 0, 1, 0],[0, 0, 1, 0, 0, 1] ] )

# YOUR CODE HERE
#raise NotImplementedError()

# # Create a multi-dimensional array that can be used to sum the first three elements of vector and
# # the last three elements of vector, which returns a two element array with those values when dotted
# # with vector.
# # This should be a 2 row by 6 column array

# sum_by_three = np.array(<FILL_IN>)

sum_by_three = np.array( [ [1, 1, 1, 0, 0, 0],[0, 0, 0, 1, 1, 1] ] )

# YOUR CODE HERE
#raise NotImplementedError()

# # Create a multi-dimensional array that that sums the first two elements, second two elements, and
# # last two elements of vector, which returns a three element array with those values when dotted
# # with vector.
# # This should be a 3 row by 6 column array
# sum_by_two = np.array(<FILL_IN>)


sum_by_two = np.array( [ [1, 1, 0, 0, 0, 0],[0, 0, 1, 1, 0, 0],[0, 0, 0, 0, 1, 1] ] )

# YOUR CODE HERE
#raise NotImplementedError()

print ('sum_every_other.dot(vector):\t{0}'.format(sum_every_other.dot(vector)))
print ('sum_every_third.dot(vector):\t{0}'.format(sum_every_third.dot(vector)))

print ('\nsum_by_three.dot(vector):\t{0}'.format(sum_by_three.dot(vector)))
print ('sum_by_two.dot(vector): \t{0}'.format(sum_by_two.dot(vector)))

In [180]:
# TEST Aggregation using arrays (4a)
assert_equal(sum_every_other.shape, (2, 6), 'incorrect shape for sum_every_other')
assert_equal(sum_every_third.shape, (3, 6), 'incorrect shape for sum_every_third')
assert_true(np.allclose(sum_every_other.dot(vector), [6, 9]), 'incorrect value for sum_every_other')
assert_true(np.allclose(sum_every_third.dot(vector), [3, 5, 7]),
                'incorrect value for sum_every_third')
assert_equal(sum_by_three.shape, (2, 6), 'incorrect shape for sum_by_three')
assert_equal(sum_by_two.shape, (3, 6), 'incorrect shape for sum_by_two')
assert_true(np.allclose(sum_by_three.dot(vector), [3, 12]), 'incorrect value for sum_by_three')
assert_true(np.allclose(sum_by_two.dot(vector), [1, 5, 9]), 'incorrect value for sum_by_two')


### (4b) Recreate with `np.tile` and `np.eye`
[np.tile](http://docs.scipy.org/doc/numpy/reference/generated/numpy.tile.html) is useful for repeating arrays in one or more dimensions.  For example, `np.tile(np.array([[1, 2], [3, 4]]), 2)` produces `np.array([[1, 2, 1, 2], [3, 4, 3, 4]]))`.

 \\[ np.tile( \begin{bmatrix} 1 & 2 \\\ 3 & 4 \end{bmatrix} , 2) \to \begin{bmatrix} 1 & 2 & 1& 2 \\\ 3 & 4 & 3 & 4 \end{bmatrix} \\]

Recall that [np.eye](http://docs.scipy.org/doc/numpy/reference/generated/numpy.eye.html) can be used to create an identity array \\( (\mathbf{I_n}) \\).  For example, `np.eye(3)` produces `np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]])`.

\\[ np.eye( 3 ) \to \begin{bmatrix} 1 & 0 & 0 \\\ 0 & 1 & 0 \\\ 0 & 0 & 1 \end{bmatrix} \\]

In this exercise, recreate `sum_every_other` and `sum_every_third` using `np.tile` and `np.eye`.

In [182]:
# Reference for what to recreate
print ('sum_every_other: \n{0}'.format(sum_every_other))
print ('\nsum_every_third: \n{0}'.format(sum_every_third))

In [183]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to use np.tile and np.eye to recreate the arrays

# sum_every_other_tile = <FILL_IN>
# sum_every_third_tile = <FILL_IN>

sum_every_other_tile = np.tile(np.array([[1, 0],[0, 1]]),3)
sum_every_third_tile = np.tile(np.eye(3),2)

# YOUR CODE HERE
#raise NotImplementedError()

print (sum_every_other_tile)
print ('sum_every_other_tile.dot(vector): {0}'.format(sum_every_other_tile.dot(vector)))
print ('\n', sum_every_third_tile)
print ('sum_every_third_tile.dot(vector): {0}'.format(sum_every_third_tile.dot(vector)))

In [184]:
# TEST Recreate with `np.tile` and `np.eye` (4b)
assert_equal(sum_every_other_tile.shape, (2, 6), 'incorrect shape for sum_every_other_tile')
assert_equal(sum_every_third_tile.shape, (3, 6), 'incorrect shape for sum_every_third_tile')
assert_true(np.allclose(sum_every_other_tile.dot(vector), [6, 9]),
                'incorrect value for sum_every_other_tile')
assert_true(np.allclose(sum_every_third_tile.dot(vector), [3, 5, 7]),
                'incorrect value for sum_every_third_tile')


### (4c) Recreate with `np.kron`
The Kronecker product is the generalization of outer products involving matrices, and we've included some examples below to illustrate the idea.  Please refer to the [Wikipedia page](https://en.wikipedia.org/wiki/Kronecker_product) for a detailed definition.  We can use [np.kron](http://docs.scipy.org/doc/numpy/reference/generated/numpy.kron.html) to compute Kronecker products and recreate the `sum_by` arrays.  Note that \\( \otimes \\) indicates a Kronecker product.

\\[ \begin{bmatrix} 1 & 2 \\\ 3 & 4 \end{bmatrix} \otimes \begin{bmatrix} 1 & 2 \end{bmatrix}  = \begin{bmatrix} 1 \cdot 1 & 1 \cdot 2 & 2 \cdot 1 & 2 \cdot 2 \\\ 3 \cdot 1 & 3 \cdot 2 & 4 \cdot 1 & 4 \cdot 2 \end{bmatrix} = \begin{bmatrix} 1 & 2 & 2 & 4 \\\ 3 & 6 & 4 & 8 \end{bmatrix}  \\]

We can see how the Kronecker product continues to expand if there are multiple rows in the second array.

\\[ \begin{bmatrix} 1 & 2 \\\ 3 & 4 \end{bmatrix} \otimes \begin{bmatrix} 5 & 6 \\\ 7 & 8 \end{bmatrix} = \begin{bmatrix} 1 \cdot \begin{bmatrix} 5 & 6 \\\ 7 & 8 \end{bmatrix} & 2 \cdot \begin{bmatrix} 5 & 6 \\\ 7 & 8 \end{bmatrix} \\\ \\\ 3 \cdot \begin{bmatrix} 5 & 6 \\\ 7 & 8 \end{bmatrix} & 4 \cdot \begin{bmatrix} 5 & 6 \\\ 7 & 8 \end{bmatrix} \end{bmatrix} = \begin{bmatrix} 5 & 6 & 10 & 12 \\\ 7 & 8 & 14 & 16 \\\ 15 & 18 & 20 & 24 \\\ 21 & 24 & 28 & 32 \end{bmatrix} \\]

For this exercise, you'll recreate the `sum_by_three` and `sum_by_two` arrays using `np.kron`, `np.eye`, and `np.ones`.  Note that `np.ones` creates an array of all ones.

In [186]:
# Reference for what to recreate
print ('sum_by_three: \n{0}'.format(sum_by_three))
print ('\nsum_by_two: \n{0}'.format(sum_by_two))

In [187]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to use np.kron, np.eye, and np.ones to recreate the arrays

# sum_by_three_kron = <FILL IN>
# sum_by_two_kron = <FILL IN>


sum_by_three_kron = np.kron(np.eye(2),np.ones(3))
sum_by_two_kron = np.kron(np.eye(3),np.ones(2))

# YOUR CODE HERE
#raise NotImplementedError()

print (sum_by_three_kron)
print ('sum_by_three_kron.dot(vector): {0}'.format(sum_by_three_kron.dot(vector)))
print ('\n', sum_by_two_kron)
print ('sum_by_two_kron.dot(vector): {0}'.format(sum_by_two_kron.dot(vector)))

In [188]:
# TEST Recreate with `np.kron` (4c)
assert_equal(sum_by_three_kron.shape, (2, 6), 'incorrect shape for sum_by_three_kron')
assert_equal(sum_by_two_kron.shape, (3, 6), 'incorrect shape for sum_by_two_kron')
assert_true(np.allclose(sum_by_three_kron.dot(vector), [3, 12]),
                'incorrect value for sum_by_three_kron')
assert_true(np.allclose(sum_by_two_kron.dot(vector), [1, 5, 9]),
                'incorrect value for sum_by_two_kron')


### (4d) Aggregate by time

As we discussed in Part (4a), we would like to incorporate knowledge of our experimental setup into our analysis. To do this, we'll first study the temporal aspects of neural response, by aggregating our features by time. In other words, we want to see how different pixels (and the underlying neurons captured in these pixels) react in each of the 20 seconds after a new visual pattern is displayed, regardless of what the pattern is.  Hence, instead of working with the 240 features individually, we'll aggregate the original features into 20 new features, where the first new feature captures the pixel response one second after a visual pattern appears, the second new feature is the response after two seconds, and so on.

We can perform this aggregation using a map operation. First, build a multi-dimensional array \\( \scriptsize \mathbf{T} \\) that, when dotted with a 240-component vector, sums every 20-th component of this vector and returns a 20-component vector. Note that this exercise is similar to (4b).  Once you have created your multi-dimensional array \\( \scriptsize \mathbf{T} \\), use a `map` operation with that array and each time series to generate a transformed dataset. We'll cache and count the output, as we'll be using it again.

In [190]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to create a multi-dimensional array to perform the aggregation
# # Create a multi-dimensional array to perform the aggregation
# T = <FILL IN>
T = np.tile(np.eye(20),12)

# # Transform scaled_data using T.  Make sure to retain the keys.
# time_data = scaled_data.<FILL IN>

time_data = scaled_data.mapValues(lambda x:T.dot(x))


# YOUR CODE HERE
#raise NotImplementedError()

time_data.cache()
print (time_data.count())
print (time_data.first())

In [191]:
# TEST Aggregate by time (4d)
assert_equal(T.shape, (20, 240), 'incorrect shape for T')
time_data_first = time_data.values().first()
time_data_fifth = time_data.values().take(5)[4]
assert_equal(time_data.count(), 46460, 'incorrect length of time_data')
assert_equal(time_data_first.size, 20, 'incorrect value length of time_data')
assert_equal(time_data.keys().first(), (0, 0), 'incorrect keys in time_data')
assert_true(np.allclose(time_data_first[:2], [0.00802155, 0.00607693], atol=1e-2),
                'incorrect values in time_data')
assert_true(np.allclose(time_data_fifth[-2:], [-0.00636676, -0.0179427], atol=1e-2),
                'incorrect values in time_data')


### (4e) Obtain a compact representation

We now have a time-aggregated dataset with \\(\scriptsize n = 46460\\) pixels and \\(\scriptsize d = 20\\) aggregated time features, and we want to use PCA to find a more compact representation.  Use the `pca` function from Part (2a) to perform PCA on the this data with \\(\scriptsize k = 3\\), resulting in a new low-dimensional 46,460 by 3 dataset. As before, you'll need to extract the values from `time_data` since it is an RDD of key-value pairs.

In [193]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to extract a compact representation from time_data using pca function from Part (2a)

# components_time, time_scores, eigenvalues_time = <FILL IN>
components_time, time_scores, eigenvalues_time = pca(time_data.map(lambda x:x[1]),3)
# YOUR CODE HERE
#raise NotImplementedError()

print ('components_time: (first five) \n{0}'.format(components_time[:5, :]))
print ('\ntime_scores (first three): \n{0}'
       .format('\n'.join(map(str, time_scores.take(3)))))
print ('\neigenvalues_time: (first five) \n{0}'.format(eigenvalues_time[:5]))

In [194]:
# TEST Obtain a compact representation (4e)
assert_equal(components_time.shape, (20, 3), 'incorrect shape for components_time')
assert_true(np.allclose(np.abs(np.sum(components_time[:5, :])), 2.37299020, atol=1e-2),
                'incorrect value for components_time')
assert_true(np.allclose(np.abs(np.sum(time_scores.take(3))), 0.0213119114, atol=1e-2),
                'incorrect value for time_scores')
assert_true(np.allclose(np.sum(eigenvalues_time[:5]), 0.844764792, atol=1e-2),
                'incorrect value for eigenvalues_time')


### Visualization 9: Top two components by time

Let's view the scores from the first two principal components as a composite image. When we preprocess by aggregating by time and then perform PCA, we are only looking at variability related to temporal dynamics. As a result, if neurons appear similar -- have similar colors -- in the resulting image, it means that their responses vary similarly over time, regardless of how they might be encoding direction. In the image below, we can define the midline as the horizontal line across the middle of the brain.  We see clear patterns of neural activity in different parts of the brain, and crucially note that the regions on either side of the midline are similar, which suggests that temporal dynamics do not differ across the two sides of the brain.

In [196]:
scores_time = np.vstack(time_scores.collect())
image_one_time = scores_time[:, 0].reshape(230, 202).T
image_two_time = scores_time[:, 1].reshape(230, 202).T
brainmap = polar_transform(3, [image_one_time, image_two_time])

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 10, 1), np.arange(0, 10, 1), figsize=(9.0, 7.2), hide_labels=True)
ax.grid(False)
image = plt.imshow(brainmap,interpolation='nearest', aspect='auto')
display(fig)

### (4f) Aggregate by direction

Next, let's perform a second type of feature aggregation so that we can study the direction-specific aspects of neural response, by aggregating our features by direction. In other words, we want to see how different pixels (and the underlying neurons captured in these pixels) react when the zebrafish is presented with 12 direction-specific patterns, ignoring the temporal aspect of the reaction.  Hence, instead of working with the 240 features individually, we'll aggregate the original features into 12 new features, where the first new feature captures the average pixel response to the first direction-specific visual pattern, the second new feature is the response to the second direction-specific visual pattern, and so on.

As in Part (4c), we'll design a multi-dimensional array \\( \scriptsize \mathbf{D} \\) that, when multiplied by a 240-dimensional vector, sums the first 20 components, then the second 20 components, and so on. Note that this is similar to exercise (4c).  First create \\( \scriptsize \mathbf{D} \\), then use a `map` operation with that array and each time series to generate a transformed dataset. We'll cache and count the output, as we'll be using it again.

In [198]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to create a multi-dimensional array to perform the aggregation
# D = <FILL IN>
D = np.kron(np.eye(12),np.ones(20))

# # Transform scaled_data using D.  Make sure to retain the keys.
# direction_data = scaled_data. <FILL IN>
direction_data = scaled_data.mapValues(lambda x:D.dot(x))

# YOUR CODE HERE
#raise NotImplementedError()

direction_data.cache()
print (direction_data.count())
print (direction_data.first())

In [199]:
# TEST Aggregate by direction (4f)
assert_equal(D.shape, (12, 240), 'incorrect shape for D')
direction_data_first = direction_data.values().first()
direction_data_fifth = direction_data.values().take(5)[4]
assert_equal(direction_data.count(), 46460, 'incorrect length of direction_data')
assert_equal(direction_data_first.size, 12, 'incorrect value length of direction_data')
assert_equal(direction_data.keys().first(), (0, 0), 'incorrect keys in direction_data')
assert_true(np.allclose(direction_data_first[:2], [0.03346365, 0.03638058], atol=1e-2),
                'incorrect values in direction_data')
assert_true(np.allclose(direction_data_fifth[:2], [0.01479147, -0.02090099], atol=1e-2),
                'incorrect values in direction_data')


### (4g) Compact representation of direction data

We now have a direction-aggregated dataset with \\(\scriptsize n = 46460\\) pixels and \\(\scriptsize d = 12\\) aggregated direction features, and we want to use PCA to find a more compact representation.  Use the `pca` function from Part (2a) to perform PCA on the this data with \\(\scriptsize k = 3\\), resulting in a new low-dimensional 46460 by 3 dataset. As before, you'll need to extract the values from `direction_data` since it is an RDD of key-value pairs.

In [201]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code to get compact representation of direction data using pca function
# components_direction, direction_scores, eigenvalues_direction = <FILL IN>
components_direction, direction_scores, eigenvalues_direction = pca(direction_data.map(lambda x:x[1]),3)

# YOUR CODE HERE
#raise NotImplementedError()

print ('components_direction: (first five) \n{0}'.format(components_direction[:5, :]))
print ('\ndirection_scores (first three): \n{0}'
       .format('\n'.join(map(str, direction_scores.take(3)))))
print ('\neigenvalues_direction: (first five) \n{0}'.format(eigenvalues_direction[:5]))

In [202]:
# TEST Compact representation of direction data (4g)
assert_equal(components_direction.shape, (12, 3), 'incorrect shape for components_direction')
assert_true(np.allclose(np.abs(np.sum(components_direction[:5, :])), 1.080232069, atol=1e-2),
                'incorrect value for components_direction')
assert_true(np.allclose(np.abs(np.sum(direction_scores.take(3))), 0.10993162084, atol=1e-2),
                'incorrect value for direction_scores')
assert_true(np.allclose(np.sum(eigenvalues_direction[:5]), 2.0089720377, atol=1e-2),
                'incorrect value for eigenvalues_direction')


### Visualization 10: Top two components by direction

Again, let's view the scores from the first two principal components as a composite image.  When we preprocess by averaging across time (group by direction), and then perform PCA, we are only looking at variability related to stimulus direction. As a result, if neurons appear similar -- have similar colors -- in the image, it means that their responses vary similarly across directions, regardless of how they evolve over time. In the image below, we see a different pattern of similarity across regions of the brain.  Moreover, regions on either side of the midline are colored differently, which suggests that we are looking at a property, direction selectivity, that has a different representation across the two sides of the brain.

In [204]:
scores_direction = np.vstack(direction_scores.collect())
image_one_direction = scores_direction[:, 0].reshape(230, 202).T
image_two_direction = scores_direction[:, 1].reshape(230, 202).T
brainmap = polar_transform(2, [image_one_direction, image_two_direction])
# with thunder: Colorize(cmap='polar', scale=2).transform([image_one_direction, image_two_direction])

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 10, 1), np.arange(0, 10, 1), figsize=(9.0, 7.2), hide_labels=True)
ax.grid(False)
image = plt.imshow(brainmap, interpolation='nearest', aspect='auto')
display(fig)