# Lesson 11 - Starter Code

### Part 1: Intro to Databricks Notebook

This notebook is very similar to an IPython Notebook. Cells can have markdown or code and are attached to a cluster where the code is executed in a distributed envrionment.

In [3]:
print "hello, world!"

A SparkContext (`sc`) is the entry point to Spark for a Spark application.  
Similarly, a SQLContext (`sqlContext`) is the entry point to a Spark SQL DB.  
Read more at the [Spark Docs](http://spark.apache.org/docs/latest/)

In [5]:
# A Spark Context is already created for you.
# Do not create another or unspecified behavior may occur.
sc

In [6]:
# A SQLContext is also already created for you.
# Do not create another or unspecified behavior may occur.
sqlContext

There are a large number of Databricks hosted datasets available for you.  
See the full [List of Databricks Hosted Datasets available here](https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#03%20Data%20Sources/6%20Databricks%20Public%20Datasets/1%20DBFS%20Hosted%20Datasets.html)

In [8]:
%fs ls /databricks-datasets

### Part 2: (DEMO) Word Count Example

Load data in from a text file into a Resilient Distributed Dataset (RDD).  
An RDD is the main data structure abstraction in Spark and provides a collection of elements partitioned across the nodes of the cluster and can be operated on in parallel.

In [11]:
# Read a text file into an RDD
dataRDD = sc.textFile("dbfs:/databricks-datasets/cs100/lab1/data-001/shakespeare.txt")

print "Data type of dataRDD:"
print type(dataRDD)

print "Number of elements (lines) in dataRDD:"
print dataRDD.count()

print "First ten elements (lines) in dataRDD:"
print dataRDD.take(10)

In [12]:
# RDD transformations
word_counts = dataRDD \
    .flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)

In [13]:
# RDD actions
word_counts.collect()

In [14]:
# Get top ten words
word_counts.takeOrdered(10, key=lambda x: -x[1])

### Part 3: (GUIDED PRACTICE) Word Length Count

Perform the word count as above, but add an additional transformation to convert each word to its length.

In [16]:
# RDD transformations
word_len_counts = dataRDD \
    .flatMap(lambda line: line.split(" ")) \
    .map(### FILL IN ###) \
    .map(lambda word_len: (word_len, 1)) \
    .reduceByKey(lambda a, b: a + b)

In [17]:
# RDD actions
word_len_counts.### FILL IN ###

In [18]:
# Get top ten word lengths
word_len_counts.### FILL IN ###

In [19]:
# RDD transformations to filter long words
word_lengths = dataRDD \
    .flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, len(word))) \
    .filter(lambda x: x[1]>20)

In [20]:
# RDD actions
word_lengths.collect()

In [21]:
# Get top ten longest words
word_lengths.### FILL IN ###

### Part 4: (DEMO) Spark DataFrame

#### 4.1 Load data

In [24]:
# Read titanic data as DataFrame using spark-csv package, and cache it
titanic = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true').load('/databricks-datasets/Rdatasets/data-001/csv/COUNT/titanic.csv').cache()

print "Data type of titanic:"
print type(titanic)

print "Number of elements (records) in titanic:"
print titanic.count()

print "First ten elements (records) in titanic:"
print titanic.head(10)

#### 4.2 Explore data

In [26]:
display(titanic.where("class like '1st class' and age like 'child'"))

#### 4.3 Pre-process data

In this section, we convert string categorical columns into ordered indices usable by a linear model. Note that these categorical features are special: There is a natural ordering, so it makes sense to treat them as continuous. For general categorical features, you should probably use one-hot encoding instead.

In [28]:
# Compute lists of string categories
def getCategories(col):
  vals = sorted(titanic.select(col).distinct().rdd.map(lambda x: x[0]).collect())
  valDict = dict([(vals[i], i) for i in range(len(vals))])
  print col + ': ' + ', '.join(vals)
  return (vals, valDict)

(classes, classDict) = getCategories("class")
(ages, ageDict) = getCategories("age")
(sexes, sexDict) = getCategories("sex")
(survived, survivedDict) = getCategories("survived")

In [29]:
# Convert the string categories into indices
from pyspark.sql.types import *

classUDF = udf(lambda x: classDict[x], IntegerType())
ageUDF = udf(lambda x: ageDict[x], IntegerType())
sexUDF = udf(lambda x: sexDict[x], IntegerType())
survivedUDF = udf(lambda x: survivedDict[x], IntegerType())

titanicIndexed = titanic.select(classUDF(titanic["class"]).alias("class"), ageUDF(titanic["age"]).alias("age"), sexUDF(titanic["sex"]).alias("sex"), survivedUDF(titanic["survived"]).alias("survived")).cache()

display(titanicIndexed.describe())

#### 4.4 Train a model

We now train a Logistic Regression model using `LogisticRegressionWithSGD`. Since we will use the traditional MLlib API (not the Pipelines API), we first have to extract the label and features columns and create an RDD of LabeledPoints. (The Pipelines API takes DataFrames instead of RDDs.)

In [31]:
# Convert data to RDD of LabeledPoint
from pyspark.mllib.regression import LabeledPoint

featureCols = ["age", "sex", "class"]
titanicLabels = titanicIndexed.select("survived").map(lambda row: row[0])
titanicFeatures = titanicIndexed.select(*featureCols).map(lambda x: list(x)) #[x[0], x[1], x[2]])
titanicData = titanicLabels.zip(titanicFeatures).map(lambda l_p: LabeledPoint(l_p[0], l_p[1])).cache()

In [32]:
# Train the model, and print the intercept and weight vector
# We use L1 (sparsifying) regularization, but you can also use None or "l2".
from pyspark.mllib.classification import LogisticRegressionWithSGD

lr = LogisticRegressionWithSGD.train(titanicData, regParam=0.1, regType="l1", intercept=True, iterations=100)

#### 4.5 Evaluate the model

In [34]:
# We can make a single prediction:
oneInstance = [0, 1, 0]
prediction = lr.predict(oneInstance)
print 'Example prediction:'
print '  features: ' + str(oneInstance)
print '  prediction: %d' % prediction

In [35]:
# We can also make predictions on the whole dataset and compute accuracy
import numpy

def accuracy(model, labelsRDD, featuresRDD):
  predictionsRDD = featuresRDD.map(lambda x: model.predict(x))
  return labelsRDD.zip(predictionsRDD).map(lambda labelAndPred: labelAndPred[0] == labelAndPred[1]).mean()

print 'Training accuracy: %g' % accuracy(lr, titanicLabels, titanicFeatures)

In [36]:
# Previously, we were making 0/1 predictions.  We can clear the model threshold to make soft predictions.
# Note: Soft prediction are currently only supported for binary classification.
lr.clearThreshold()
print 'Predicted probability of label 1: %g' % lr.predict(oneInstance)

Read the Spark MLlib guide to get complete list of ML algorithms in Spark:  
[Spark Machine Learning Library (MLlib) Guide](http://spark.apache.org/docs/latest/mllib-guide.html)