This notebook will be collected automatically at **6pm on Monday** from `/home/data_scientist/assignments/Week14` directory on the course JupyterHub server. If you work on this assignment on the course Jupyterhub server, just make sure that you save your work and instructors will pull your notebooks automatically after the deadline. If you work on this assignment locally, the only way to submit assignments is via Jupyterhub, and you have to place the notebook file in the correct directory with the correct file name before the deadline.

1. Make sure everything runs as expected. First, restart the kernel (in the menubar, select `Kernel` → `Restart`) and then run all cells (in the menubar, select `Cell` → `Run All`).
2. Make sure you fill in any place that says `YOUR CODE HERE`. Do not write your answer in anywhere else other than where it says `YOUR CODE HERE`. Anything you write anywhere else will be removed by the autograder.
3. Do not change the file path or the file name of this notebook.
4. Make sure that you save your work (in the menubar, select `File` → `Save and CheckPoint`)

## Problem 14.3. Spark MLlib

In this problem, we will use Spark MLlib to perform a logistic regression on the flight data to determine whether a flight would be delayed or not.

In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD

from nose.tools import (
    assert_equal, assert_is_instance,
    assert_true, assert_almost_equal
    )

We run Spark in [local mode](http://spark.apache.org/docs/latest/programming-guide.html#local-vs-cluster-modes) from within our Docker container.

In [None]:
sc = SparkContext('local[*]')

We use code similar to the RDD code from the [Introduction to Spark](https://github.com/UI-DataScience/info490-sp16/blob/master/Week14/notebooks/intro2spark.ipynb) notebook to import two columns: `ArrDealy` and `DepDelay`.

In [None]:
text_file = sc.textFile('/home/data_scientist/data/2001.csv')

data = (
    text_file
    .map(lambda line: line.split(","))
    # 14: ArrDelay, 15: DepDelay
    .map(lambda p: (p[14], p[15]))
    .filter(lambda line: 'ArrDelay' not in line)
    .filter(lambda line: 'NA' not in line)
    .map(lambda p: (int(p[0]), int(p[1])))
    )

len_data = data.count()
assert_equal(len_data, 5723673)
assert_equal(
    data.take(5),
    [(-3, -4),
     (4, -5),
     (23, 11),
     (10, -3),
     (20, 0)])

- Write a function that transforms the `ArrDelay` column into binary labels that indicate whether a flight arrived late or not. We define a flight to be delayed if its arrival delay is 15 minutes or more, the same definition used by the FAA (source: [Wikipedia](https://en.wikipedia.org/wiki/Flight_cancellation_and_delay)).

- The `DepDelay` column should remain unchanged.

In [None]:
def to_binary(rdd):
    '''
    Transforms the "ArrDelay" column into binary labels
    that indicate whether a flight arrived late or not.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A pyspark.rdd.PipelinedRDD instance.
    '''
    
    # YOUR CODE HERE
    
    return rdd

In [None]:
binary_labels = to_binary(data)
print(binary_labels.take(5))

In [None]:
assert_is_instance(binary_labels, pyspark.rdd.PipelinedRDD)
assert_equal(binary_labels.count(), len_data)
assert_equal(
    binary_labels.take(5),
    [(0, -4),
     (0, -5),
     (1, 11),
     (0, -3),
     (1, 0)])
assert_equal(to_binary(sc.parallelize([(15.0, 490.0)])).first(), (1, 490.0))
assert_equal(to_binary(sc.parallelize([(14.9, 490.0)])).first(), (0, 490.0))

Our data must be in a Spark specific data structure called [LabeledPoint](https://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point). So

- Write a function that turns a Spark sequence of tuples into a sequence containing LabeledPoint values for each row. The arrival delay should be the label, and the departure delay should be the feature.

In [None]:
def to_labeled_point(rdd):
    '''
    Transforms a Spark sequence of tuples into
    a sequence containing LabeledPoint values for each row.
    
    The arrival delay is the label.
    The departure delay is the feature.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A pyspark.rdd.PipelinedRDD instance.
    '''
    
    # YOUR CODE HERE
    
    return rdd

In [None]:
labeled_point = to_labeled_point(binary_labels)
print(labeled_point.take(5))

In [None]:
assert_is_instance(labeled_point, pyspark.rdd.PipelinedRDD)
assert_equal(labeled_point.count(), len_data)
assert_true(all(isinstance(p, LabeledPoint) for p in labeled_point.take(5)))
assert_equal([p.label for p in labeled_point.take(5)], [0.0, 0.0, 1.0, 0.0, 1.0])
assert_true(all(
    isinstance(p.features, pyspark.mllib.linalg.DenseVector)
    for p
    in labeled_point.take(5)
    ))
assert_equal(
    [p.label for p in labeled_point.take(5)],
    [0.0,
     0.0,
     1.0,
     0.0,
     1.0]
    )
assert_equal(
    [p.features.values.tolist() for p in labeled_point.take(5)],
    [[-4.0],
     [-5.0],
     [11.0],
     [-3.0],
     [0.0]]
    )

- Use [LogisticRegressionWithSGD](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithSGD) to train a [logistic regression](http://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression) model. 
- Use 10 iterations. Use default parameters for all other parameters other than `iterations`.
- Use the resulting logistic regression model to make predictions on the entire data, and return an RDD of (label, prediction) pairs.

In [None]:
def fit_and_predict(rdd):
    '''
    Fits a logistic regression model.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    An RDD of (label, prediction) pairs.
    '''
    
    # YOUR CODE HERE
    
    return rdd

In [None]:
labels_and_preds = fit_and_predict(labeled_point)
print(labels_and_preds.take(5))

In [None]:
assert_is_instance(labels_and_preds, pyspark.rdd.PipelinedRDD)
assert_equal(labels_and_preds.count(), len_data)
assert_equal(
    labels_and_preds.take(5),
    [(0.0, 0.0),
     (0.0, 0.0),
     (1.0, 1.0),
     (0.0, 0.0),
     (1.0, 0.0)]
    )

- Write a function that computes the accuracy from a Spark sequence of (label, prediction) pairs.

In [None]:
def get_accuracy(rdd):
    '''
    Computes accuracy.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A float.
    '''
    
    # YOUR CODE HERE
    
    return accuracy

In [None]:
accuracy = get_accuracy(labels_and_preds)
print(accuracy)

In [None]:
assert_is_instance(accuracy, float)
assert_almost_equal(get_accuracy(sc.parallelize([(0.0, 1.0), (1.0, 0.0)])), 0.0)
assert_almost_equal(get_accuracy(sc.parallelize([(0.0, 1.0), (0.0, 0.0)])), 0.5)
assert_almost_equal(get_accuracy(sc.parallelize([(0.0, 0.0), (1.0, 0.0)])), 0.5)
assert_almost_equal(get_accuracy(sc.parallelize([(0.0, 0.0), (1.0, 1.0)])), 1.0)
assert_almost_equal(get_accuracy(sc.parallelize([(1.0, 0.0), (0.0, 1.0), (0.0, 1.0)])), 0.0)
assert_almost_equal(get_accuracy(sc.parallelize([(1.0, 1.0), (0.0, 1.0), (0.0, 1.0)])), 1/3)
assert_almost_equal(get_accuracy(sc.parallelize([(1.0, 1.0), (0.0, 0.0), (0.0, 1.0)])), 2/3)
assert_almost_equal(get_accuracy(sc.parallelize([(1.0, 1.0), (0.0, 0.0), (1.0, 1.0)])), 1.0)
assert_almost_equal(accuracy, 0.7503940214613938)

## Cleanup

We must stop the SparkContext in order to release the spark resources before existing this Notebook.

In [None]:
sc.stop()