## Problem 14.1. Spark

In [1]:
import pyspark
from pyspark import SparkConf, SparkContext

from nose.tools import assert_equal, assert_is_instance

In [2]:
sc = SparkContext('local[*]')

In [3]:
text_file = sc.textFile('ml-latest-small/ratings.csv')

assert_is_instance(text_file, pyspark.rdd.RDD)

### Part 1

In [45]:
def read_ratings_csv(rdd):
    '''
    Creates an RDD by transforming `ratings.csv`
    into columns with appropriate data types.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A pyspark.rdd.PipelinedRDD instance.
    '''
    
    data = rdd.map(lambda l: l.split(",")) \
             .map(lambda p: (p[0], p[1], p[2], p[3])) \
             .filter(lambda line: 'userId' not in line)
    cols = data.filter(lambda line: 'NA' not in line)
    result=cols.map(lambda p: (int(p[0]), int(p[1]), float(p[2]), int(p[3])))
    return result

In [46]:
ratings = read_ratings_csv(text_file)
print(ratings.take(3))

[(1, 31, 2.5, 1260759144), (1, 1029, 3.0, 1260759179), (1, 1061, 3.0, 1260759182)]


In [48]:
assert_is_instance(ratings, pyspark.rdd.PipelinedRDD)
assert_equal(ratings.count(), 100004)
assert_equal(len(ratings.first()), 4)

### Part 2

In [49]:
def filter_favorable_ratings(rdd):
    '''
    Selects rows whose rating is greater than 3.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A pyspark.rdd.PipelinedRDD instance.
    '''
    
    result=rdd.filter(lambda p: p[2]>3)
    
    return result

In [50]:
favorable = filter_favorable_ratings(ratings)

In [51]:
assert_is_instance(favorable, pyspark.rdd.PipelinedRDD)
assert_equal(favorable.count(), 62106)

### Part 3

In [65]:
def find_n_reviews(rdd, movie_id):
    '''
    Finds the number of reviews for a movie.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    movie_id: An int.
    
    Returns
    -------
    An int.
    '''
    
    n_reviews=rdd.filter(lambda p: p[1]==movie_id).count()
    
    return n_reviews

In [66]:
n_toy_story = find_n_reviews(favorable, 1)
print(n_toy_story)

182


In [69]:
assert_is_instance(n_toy_story, int)

test = [find_n_reviews(favorable, n) for n in range(5)]
assert_equal(test, [0, 182, 51, 24, 1])

### Cleanup

In [70]:
sc.stop()

## Problem 14.2. Spark DataFrames

In [72]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
import pandas as pd

from nose.tools import assert_equal, assert_is_instance
from pandas.util.testing import assert_frame_equal

import warnings
warnings.filterwarnings("ignore")

In [73]:
sc = SparkContext('local[*]')

In [74]:
csv_path = 'ml-latest-small/ratings.csv'
text_file = sc.textFile(csv_path)

In [77]:
def create_df(rdd):
    '''
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A pyspark.sql.dataframe.DataFrame instance.
    '''
    new=read_ratings_csv(rdd)
    sqlContext = SQLContext(sc)
    schemaString = "userId movieId rating timestamp"
    fieldTypes = [IntegerType(), IntegerType(), FloatType(), IntegerType()]
    f_data = [StructField(field_name, field_type, True) \
              for field_name, field_type in zip(schemaString.split(), fieldTypes)]
    schema = StructType(f_data)
    df = sqlContext.createDataFrame(new, schema)
    return df

In [78]:
df = create_df(text_file)
df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|   1.0|1260759203|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     1|   2150|   3.0|1260759194|
|     1|   2193|   2.0|1260759198|
|     1|   2294|   2.0|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|   1.0|1260759200|
|     1|   3671|   3.0|1260759117|
+------+-------+------+----------+
only showing top 20 rows



In [79]:
assert_is_instance(df, pyspark.sql.dataframe.DataFrame)

# convert the Spark dataframe to Pandas dataframe
df_pd = pd.read_csv(csv_path)
assert_frame_equal(df.toPandas(), df_pd)

In [80]:
def filter_favorable_ratings(df):
    '''
    Selects rows whose rating is greater than 3.
    
    Parameters
    ----------
    A pyspark.sql.dataframe.DataFrame instance.

    Returns
    -------
    A pyspark.sql.dataframe.DataFrame instance.

    '''
    
    result=df.filter(df['rating'] >3).select(df['movieId'], df['rating'])
    
    return result

In [81]:
favorable = filter_favorable_ratings(df)
favorable.show()

+-------+------+
|movieId|rating|
+-------+------+
|   1172|   4.0|
|   1339|   3.5|
|   1953|   4.0|
|   2105|   4.0|
|     10|   4.0|
|     17|   5.0|
|     39|   5.0|
|     47|   4.0|
|     50|   4.0|
|    110|   4.0|
|    150|   5.0|
|    153|   4.0|
|    222|   5.0|
|    253|   4.0|
|    261|   4.0|
|    265|   5.0|
|    266|   5.0|
|    273|   4.0|
|    296|   4.0|
|    314|   4.0|
+-------+------+
only showing top 20 rows



In [82]:
assert_is_instance(favorable, pyspark.sql.dataframe.DataFrame)

favorable_pd = df_pd.loc[df_pd['rating'] > 3.0, ['movieId', 'rating']].reset_index(drop=True)
assert_frame_equal(favorable.toPandas(), favorable_pd)

In [83]:
def find_n_reviews(df, movie_id):
    '''
    Finds the number of reviews for a movie.
    
    Parameters
    ----------
    movie_id: An int.
    
    Returns
    -------
    n_reviews: An int.
    '''
    
    n_reviews=df.filter(df['movieId'] == movie_id).count()
    
    return n_reviews

In [84]:
n_toy_story = find_n_reviews(favorable, 1)
print(n_toy_story)

182


In [85]:
assert_is_instance(n_toy_story, int)

test = [find_n_reviews(favorable, n) for n in range(1, 6)]
test_pd = favorable_pd.groupby('movieId').size()[:5].tolist()
assert_equal(test, test_pd)

### Cleanup

In [86]:
sc.stop()

## Problem 14.3. Spark MLlib

In [88]:
import pyspark
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

from nose.tools import (
    assert_equal, assert_is_instance,
    assert_true, assert_almost_equal
    )

In [89]:
sc = SparkContext('local[*]')

In [150]:
text_file = sc.textFile('2001-1.csv')

data = (
    text_file
    .map(lambda line: line.split(","))
    # 14: ArrDelay, 15: DepDelay
    .map(lambda p: (p[14], p[15]))
    .filter(lambda line: 'ArrDelay' not in line)
    .filter(lambda line: 'NA' not in line)
    .map(lambda p: (int(p[0]), int(p[1])))
    )

len_data = data.count()


### Function: to_binary

In [156]:
def to_binary(rdd):
    '''
    Transforms the "ArrDelay" column into binary labels
    that indicate whether a flight arrived late (1) or not (0).
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A pyspark.rdd.PipelinedRDD instance.
    '''
    
    def trans(x):
        if x>=15:
            return 1
        else:
            return 0
    rdd=rdd.map(lambda p: (trans(p[0]), p[1]))
    
    return rdd

In [157]:
binary_labels = to_binary(data)
print(binary_labels.take(5))

[(0, -4), (0, -5), (1, 11), (0, -3), (1, 0)]


In [158]:
assert_is_instance(binary_labels, pyspark.rdd.PipelinedRDD)
assert_equal(binary_labels.count(), len_data)

In [159]:
assert_equal(to_binary(sc.parallelize([(15.0, 120.0)])).first(), (1, 120.0))
assert_equal(to_binary(sc.parallelize([(14.9, 450.0)])).first(), (0, 450.0))

### Function: to_labeled_point

In [160]:
def to_labeled_point(rdd):
    '''
    Transforms a Spark sequence of tuples into
    a sequence containing LabeledPoint values for each row.
    
    The arrival delay is the label.
    The departure delay is the feature.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A pyspark.rdd.PipelinedRDD instance.
    '''
    rdd = rdd.map(lambda p: LabeledPoint(p[0],[p[1]]))
    return rdd

In [161]:
labeled_point = to_labeled_point(binary_labels)
print(labeled_point.take(5))

[LabeledPoint(0.0, [-4.0]), LabeledPoint(0.0, [-5.0]), LabeledPoint(1.0, [11.0]), LabeledPoint(0.0, [-3.0]), LabeledPoint(1.0, [0.0])]


In [163]:
assert_is_instance(labeled_point, pyspark.rdd.PipelinedRDD)
assert_equal(labeled_point.count(), len_data)
assert_true(all(isinstance(p, LabeledPoint) for p in labeled_point.take(5)))
assert_equal([p.label for p in labeled_point.take(5)], [0.0, 0.0, 1.0, 0.0, 1.0])
assert_true(all(
    isinstance(p.features, pyspark.mllib.linalg.DenseVector)
    for p
    in labeled_point.take(5)
    ))

### Function: fit_and_predict

In [164]:
def fit_and_predict(rdd):
    '''
    Fits a logistic regression model.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    An RDD of (label, prediction) pairs.
    '''
    
    lr_model = LogisticRegressionWithLBFGS.train(rdd, iterations=10)
    rdd = rdd.map(lambda lp: (lp.label, float(lr_model.predict(lp.features))))
    
    return rdd

In [165]:
labels_and_preds = fit_and_predict(labeled_point)
print(labels_and_preds.take(5))

[(0.0, 0.0), (0.0, 0.0), (1.0, 1.0), (0.0, 0.0), (1.0, 0.0)]


In [None]:
assert_is_instance(labels_and_preds, pyspark.rdd.PipelinedRDD)
assert_equal(labels_and_preds.count(), len_data)

### Function: get_accuracy

In [166]:
def get_accuracy(rdd):
    '''
    Computes accuracy.
    
    Parameters
    ----------
    rdd: A pyspark.rdd.RDD instance.
    
    Returns
    -------
    A float.
    '''
    ac = rdd.map(lambda p: p[0]==p[1])
    
    accuracy = ac.mean()
    
    return accuracy

In [167]:
accuracy = get_accuracy(labels_and_preds)
print(accuracy)

0.7547458269632241


## Cleanup

In [None]:
sc.stop()