# MLLib Introduction

## Check out Jupyter

1. Click the "Help" menu above, then "User Interface Tour"
2. Click the grey cell below and hit `Ctrl + Enter` to run it
3. Click on the `+` button above to make a new "cell". What's 1337 * 1337?

In [None]:
import random
%matplotlib inline
print("Hello Ignite %s Commmunity!" % random.choice(["Big Data", "Machine Learning"]))

## Basic Spark usage

Spark is basically the latest and greatest implementation of Hadoop MapReduce. It allows you to write infinitely scalable programs by focussing on **transforming** data stored in Resilient Distributed Datasets (RDDs) using functions with no side-effects.

You can read more about the thinking behind MapReduce here: https://data-flair.training/blogs/hadoop-mapreduce-tutorial/


In [None]:
# First we're going to snag a file from the web and save it to a local file
# This kind of brevity is why they say Python comes with "batteries included"

import urllib
urllib.request.urlretrieve ("https://github.com/joehalliwell/ml-workshop/raw/master/data/war-and-peace.txt", "war-and-peace.txt")

In [None]:
# Now we're going to load the file into spark and count the number of lines
# We access Spark throught a global variable "sc" that's been magically created already...

text = sc.textFile("war-and-peace.txt")
text.count()

In [None]:
# Have a look at the first 10 lines

text.take(10)

In [None]:
# Split each line into words -- try map() instead how does it differ?

words = text.flatMap(lambda line: line.split())
words.take(10)

In [None]:
# Now we use a famous trick to count up each word. Can you see how it works?

counts = words.map(lambda w: (w,1)).reduceByKey(lambda a,b: a + b)
counts.take(10)

In [None]:
# One of the ways Spark differs from classic MapReduce is by adding more powerful operations like sorting...

counts.sortBy(lambda wordcount: -wordcount[1]).take(10)

### Spark challenges!

2. How can you modify the code to lower case words before counting?
3. What's the median word?
1. (Tricky) How many times does "Russia" appear?
4. (Tricky) What are the most frequent word pairs?

In [None]:
# Try your answers here!

## SparkSQL DataFrames

Spark DataFrames are like a scalable version of the in-memory Pandas dataframes we used in the last ML workshop. 


In [None]:
# Snag a data file

urllib.request.urlretrieve("https://raw.githubusercontent.com/joehalliwell/ml-workshop/master/data/houses.csv", "houses.csv")


In [None]:
# This is much simpler/nicer in later versions of Spark (the ones we use...)
# We're accessing Spark's SQL functionality through another magic global variable

houses = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("houses.csv")
    
# Let's have a look at the first few items...

houses.take(10)

In [None]:
# Hmm that wasn't very readable. But we can convert it to a Pandas dataframe to display it nicely.

houses.limit(10).toPandas()

In [None]:
# You can perform LINQ-style manipulations...

houses.select("id", "price") \
    .withColumn("price_k", houses["price"] / 1000) \
    .limit(10).toPandas()

In [None]:
houses.where(houses["price"] < 300000).limit(10).toPandas()

In [None]:
# ...alternatively (and for interop with reporting tools) you can use SQL!

sqlContext.registerDataFrameAsTable(df=houses, tableName="houses")
sqlContext.sql("SELECT bedrooms, COUNT(1) AS count FROM houses GROUP BY bedrooms").toPandas()

Hmm.... that's quite a lot of houses with zero bedrooms...

### Dataframes challenges!

1. How many waterfront houses are there? (Hint: you can use `groupBy().sum("columnname")` to sum a column)
2. What's the average number of bedrooms?
3. Do waterfront houses have more or less bedrooms than average?

In [None]:
# Try your answers here!

## Machine Learning in Spark

We're going to train a simple linear regression model to predict house prices.

Spark ML's algorithms and implementations are all carefully designed to scale well with data volumes. The details are a bit complicated, but you can read more at https://spark.apache.org/docs/1.6.0/ml-classification-regression.html#regression

In [None]:
# Let's look at a toy example first
from pyspark.ml.regression import LinearRegression
from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
import random

# This is our toy data: y = 2 * x + 1 + noise
noise = 10.0
rows = [Row(label=2.0 * x + 1.0 + random.gauss(0, noise), features=DenseVector([x])) for x in range(100)]
data = sqlContext.createDataFrame(rows)

# Split into training and test sets
train, test = data.randomSplit([0.8, 0.2])

# Build and estimator and use it to fit a model
estimator = LinearRegression()
model = estimator.fit(train)

# Run the model on unseen values and evaluate (by eye!)
model.transform(test).toPandas().plot()

In [None]:
good = sqlContext.sql("SELECT * FROM houses WHERE bedrooms > 0 AND bathrooms > 0")
good = good.withColumn("label", good["price"].cast("double"))
print("Dropped {0} items".format(houses.count() - good.count()))

In [None]:
# ML expects a dataframe with "features" and "label" columns

from pyspark.ml.feature import VectorAssembler

selector = VectorAssembler(
    inputCols=["bedrooms", "bathrooms", "sqft_living", "sqft_lot", "floors", "lat", "long"],
   outputCol="features")
data = selector.transform(good).select("features", "label")
data.limit(10).toPandas()

In [None]:
# Finally we'll split the data into a training and test set

train, test = data.randomSplit([0.8, 0.2])
test.count(), train.count()

In [None]:
# Now we use the estimator to create a fitted model...
estimator = LinearRegression()
model = estimator.fit(train)
print("Fitted {0}".format(model))

In [None]:
# ...which we can then use to generate a set of predictions

predictions = model.transform(test)
predictions.limit(10).toPandas()

In [None]:
# Now we evaluate the predictions by looking at R-squared and RMSE

from pyspark.ml.evaluation import RegressionEvaluator

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R-squared (R2) on test data = %g" % r2)

evaluator = RegressionEvaluator(metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root mean-squared error (RMSE) on test data = %g" % rmse)


### Spark ML challenges!

The results are frankly disappointing... it's over to you to make them better

0. Try setting/tweaking some hyperparameters
1. Try using a different regression model e.g. GBTRegressor https://spark.apache.org/docs/1.6.0/ml-classification-regression.html#regression
2. Try a different feature set -- include more features (date?) or scale/transform existing ones https://spark.apache.org/docs/2.2.0/ml-features.html
3. Experiment and share your R-squared with the group!
4. Refactor the code/notebook to facilitate experimentation

## References

- MLlib example adapted from http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/#.WgTPv7y69hE
- Interpreting R-squared http://blog.minitab.com/blog/adventures-in-statistics-2/regression-analysis-how-do-i-interpret-r-squared-and-assess-the-goodness-of-fit