Spark / PySpark tutorial
========================

Spark is an increasingly popular cluster computing system based on Apache Hadoop that offers great potential value because of its speed and ease of use. We are going to have a look at it here, with special focus on the Python interface to Spark: PySpark.

Prerequisites
-------------

Download one of the pre-built versions of Spark from the website (http://spark.apache.org/downloads.html), and untar it (`tar -xvf <filename>`). Alternatively, build Spark yourself by running `./sbt/sbt assembly` from the Spark directory.

We will run Spark locally on our machine in this tutorial. After you set the `SPARK_HOME` environment variable and add Spark's `python` directory to the `PYTHONPATH`, you're good to go.

*Note:
When running locally and running inside a virtualenv we need to tell Spark to use the current version of Python, otherwise it would use the default system Python version. Put this in your code: `os.environ['PYSPARK_PYTHON'] = sys.executable`.*

*Note:
Spark has a web UI that shows running tasks, workers and various statistics. Running locally, it can be reached at http://localhost:4040/.*

Calling PySpark
---------------

To call Spark from Python, we need to use the PySpark interface. It can, for example, be called as an interactive shell from your Spark home dir:

    ./bin/pyspark

As an iPython Spark shell:

    IPYTHON=1 ./bin/pyspark

Or as a launcher for scripts:

    ./bin/pyspark --master local

Below we show how you would use the PySpark API inside a Python script.

In [1]:
import os
import sys

# Spark's home directory (here it's: ~/spark-1.6.0) should be set as an environment variable.
# (Of course setting an env. variable doesn't need to be done from Python; any method will do.)
os.environ['SPARK_HOME'] = os.path.join(os.path.expanduser('~'), 'spark-1.6.0')

# Add Spark's Python interface (PySpark) to PYTHONPATH.
# (Again: this doesn't need to be done from Python.)
sys.path.append(os.path.join(os.environ.get('SPARK_HOME'), 'python'))

# This can be useful for running in virtualenvs:
# os.environ['PYSPARK_PYTHON'] = '/home/user/virtualenv/bin/python'

# OK, now we can import PySpark
from pyspark import SparkContext, SparkConf

ImportError: No module named 'pyspark'

Inside our *driver program*, the connection to Spark is represented by a `SparkContext` instance. For running Spark locally, you can simply instantiate one with:

    sc = SparkContext('local', 'mySparkApp')

Alternatively, you can use a `SparkConf` instance to control various Spark configuration properties, which is what we are going to demonstrate here.

In [2]:
conf = SparkConf()
# To run local, use:
conf.setMaster('local')
conf.setAppName('spark_tutorial')
# SparkConf's 'set', 'setAll' and 'setIfMissing' can be used to set a variety of
# configuration properties, e.g.:
conf.setIfMissing("spark.cores.max", "4")
conf.set("spark.executor.memory", "1g")
# Alternatively:
conf.setAll([('spark.cores.max', '4'), (("spark.executor.memory", "1g"))])

# Now instantiate SparkContext
sc = SparkContext(conf=conf)

# Note: SparkContexts can be stopped manually with:
# sc.stop()

NameError: name 'SparkConf' is not defined

How Spark works, in brief
-------------------------

Spark uses a *cluster manager* (e.g., Spark's own standalone manager, YARN or Mesos), and a number of *worker nodes*. The manager attempts to acquire *executors* on the worker nodes, which do computations and store data based on the code and tasks that are sent to them.

Spark's primary abstraction is a so-called *Resilient Distributed Dataset (RDD)*. Spark can create RDDs from any storage source supported by Hadoop. An RDD holds intermediate computational results and is stored in RAM or on disk across the worker nodes. In case a node fails, an RDD can be restored. Many processes can executed in parallel thanks to the distributed nature of RDDs, and pipelining and lazy execution prevent the need for saving intermediate results for the next step. Importantly, Spark supports pulling data sets into a cluster-wide *in-memory cache* for fast access.

RDD operations can be divided into 2 groups: *transformations* and *actions*. Transformations (e.g., `map`) of RDDs always result in new RDDs, and actions (e.g., `reduce`) return values that are the result of operations on the RDD back to the driver program.

The above and more will be demonstrated in the code examples below.

### RDDs are distributed data sets

In [8]:
# 'parallelize' creates an RDD by distributing data over the cluster
rdd = sc.parallelize(range(14))
print("Number of partitions: {}".format(rdd.getNumPartitions()))
# 'glom' lists all elements within each partition
print(rdd.glom().collect())

Number of partitions: 4
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11, 12, 13]]


### Spark is lazy
Despite any intermediate transformations, Spark only runs after an *action* is performed on an RDD. This is because it tries to do smart pipelining of operations so that it does not have to save intermediate results.

In [9]:
rddSquared = rdd.map(lambda x: x ** 2)
# Alternatively, you can use a normal function:
# def squared(x):
#     return x ** 2
# rddSquared = rdd.map(squared)

# The 'collect' action triggers Spark: the above transformation is performed,
# and results are collected.
print(rddSquared.collect())

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169]


In [10]:
# Popular transformations
# -----------------------

func = lambda x: -x
rdd.map(func)
rdd.flatMap(func) # like map, but flattens results
rdd.filter(func)
rdd.sortBy(func)

# Popular actions
# ---------------

rdd.reduce(lambda x, y: x + y)
rdd.count()

# Actions with which to take data from an RDD:
print(rdd.collect())                    # get all elements
print(rdd.first())                      # get first element
print(rdd.take(5))                      # get N first elements
print(rdd.top(3))                       # get N highest elements in descending order
print(rdd.takeOrdered(7, lambda x: -x)) # get N first elements in ascending (or a function's) order

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
0
[0, 1, 2, 3, 4]
[13, 12, 11]
[13, 12, 11, 10, 9, 8, 7]


### RDDs can be cached to memory (or drive)
Spark allows the user to control what data is cached and how. Proper caching of RDDs can be hugely beneficial! Whenever you have an RDD that will be re-used multiple times later on, you should consider caching it.

In [11]:
import numpy as np

NUM_SAMPLES = int(1e6)
rddBig = sc.parallelize(np.random.random(NUM_SAMPLES))

# no caching: will be recalculated everytime we go through the loop
rddBigTrans = rddBig.map(lambda x: (x ** 2 - 0.1) ** 0.5)
print(rddBigTrans.getStorageLevel())
for threshold in (0.2, 0.4, 0.6, 0.8):
    %timeit -n 1 -r 1 rddBigTrans.filter(lambda x: x >= threshold).count()

# we cache this intermediate result because it will be repeatedly called
rddBigTrans_c = rddBig.map(lambda x: (x ** 2 - 0.1) ** 0.5).cache()
print(rddBigTrans_c.getStorageLevel())
for threshold in (0.2, 0.4, 0.6, 0.8):
    %timeit -n 1 -r 1 rddBigTrans_c.filter(lambda x: x >= threshold).count()

# use unpersist to remove from cache
print(rddBigTrans_c.unpersist().getStorageLevel())
# for even finer-grained control of caching, use the 'persist' function
from pyspark import storagelevel
print(rddBigTrans.persist(storagelevel.StorageLevel.MEMORY_AND_DISK_SER).getStorageLevel())

Serialized 1x Replicated
1 loops, best of 1: 2.4 s per loop
1 loops, best of 1: 1.41 s per loop
1 loops, best of 1: 1.62 s per loop
1 loops, best of 1: 1.33 s per loop
Memory Serialized 1x Replicated
1 loops, best of 1: 2.66 s per loop
1 loops, best of 1: 916 ms per loop
1 loops, best of 1: 786 ms per loop
1 loops, best of 1: 1.16 s per loop
Serialized 1x Replicated
Disk Memory Serialized 1x Replicated


### Spark can work with key-value pairs
So-called PairRDDs are RDDs that store key-value pairs. Spark has a variety of special operations that make use of this, such as joining by key, grouping by key, etc.

In [12]:
# PairRDDs are automatically created whenever we present a list of key-value tuples
# Here we transform rddA and create a key based on even/odd flags
rddP1 = rdd.map(lambda x: (x % 2 == 0, x))
# A clearer shortcut for this is:
rddP1 = rdd.keyBy(lambda x: x % 2 == 0)

# Another way to create a PairRDD is to zip two RDDs (assumes equal length RDDs)
print("Zipped: {}".format(rdd.zip(rdd).collect()))

# Access to the keys and values
print("Keys: {}".format(rddP1.keys().collect()))
print("Values: {}".format(rddP1.values().collect()))

# This is how you can map a function to a pairRDD; x[0] is the key, x[1] the value
print(rddP1.map(lambda x: (x[0], x[1] ** 2)).collect())
# Better: mapValues/flatMapValues, which operates on values only and keeps the keys in place
print(rddP1.mapValues(lambda x: x ** 2).collect())
# We can also go back from a PairRDD to a normal RDD by simply dropping the key
print(rddP1.map(lambda x: x[1] ** 2).collect())

Zipped: [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13)]
Keys: [True, False, True, False, True, False, True, False, True, False, True, False, True, False]
Values: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
[(True, 0), (False, 1), (True, 4), (False, 9), (True, 16), (False, 25), (True, 36), (False, 49), (True, 64), (False, 81), (True, 100), (False, 121), (True, 144), (False, 169)]
[(True, 0), (False, 1), (True, 4), (False, 9), (True, 16), (False, 25), (True, 36), (False, 49), (True, 64), (False, 81), (True, 100), (False, 121), (True, 144), (False, 169)]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169]


In [13]:
# Various aggregations by key are possible, such as reduceByKey, combineByKey, and foldByKey
# reduceByKey example:
print("Sum per key: {}".format(rddP1.reduceByKey(lambda x, y: x + y).collect()))

# Also, some common operation are available in 'ByKey' form, e.g.:
rddP1.sortByKey()
rddP1.countByKey()

Sum per key: [(False, 49), (True, 42)]


defaultdict(<type 'int'>, {False: 7, True: 7})

In [14]:
# Grouping and joining by key

# There are various possible ways of joining 2 RDDs together by key:
rddP2 = sc.parallelize(range(0, 28, 2)).map(lambda x: (x % 2 == 0, x))
# inner join, or cross join in case of overlapping keys
print("Join: {}".format(rddP1.join(rddP2).collect()))
# left/right outer join
rddP1.leftOuterJoin(rddP2)
rddP1.rightOuterJoin(rddP2)

# for all keys in either rddP1 or rddP2, cogroup returns iterables of the values in either
print("Cogroup: {}".format(rddP1.cogroup(rddP2).collect()))
# cogrouping together more than two RDDs by key can be done with groupWith
rddP1.groupWith(rddP2, rddP2)

# with groupByKey we create a new RDD that keeps the same keys on the same node, where possible
print("After groupByKey: {}".format(rddP1.groupByKey().glom().collect()))

Join: [(True, (6, 0)), (True, (6, 2)), (True, (6, 4)), (True, (6, 6)), (True, (6, 8)), (True, (6, 10)), (True, (6, 18)), (True, (6, 20)), (True, (6, 22)), (True, (6, 24)), (True, (6, 26)), (True, (6, 12)), (True, (6, 14)), (True, (6, 16)), (True, (8, 0)), (True, (8, 2)), (True, (8, 4)), (True, (8, 6)), (True, (8, 8)), (True, (8, 10)), (True, (8, 18)), (True, (8, 20)), (True, (8, 22)), (True, (8, 24)), (True, (8, 26)), (True, (8, 12)), (True, (8, 14)), (True, (8, 16)), (True, (4, 0)), (True, (4, 2)), (True, (4, 4)), (True, (4, 6)), (True, (4, 8)), (True, (4, 10)), (True, (4, 18)), (True, (4, 20)), (True, (4, 22)), (True, (4, 24)), (True, (4, 26)), (True, (4, 12)), (True, (4, 14)), (True, (4, 16)), (True, (10, 0)), (True, (10, 2)), (True, (10, 4)), (True, (10, 6)), (True, (10, 8)), (True, (10, 10)), (True, (10, 18)), (True, (10, 20)), (True, (10, 22)), (True, (10, 24)), (True, (10, 26)), (True, (10, 12)), (True, (10, 14)), (True, (10, 16)), (True, (12, 0)), (True, (12, 2)), (True, (12, 4

### Spark can directly create RDDs from text files

In [15]:
# TODO: addFile does not seem to work. Is it because we are running in standalone mode?
# from pyspark import SparkFiles
# sc.addFile(os.path.join(os.environ.get('SPARK_HOME'), 'LICENSE'))
# rddT = sc.textFile(SparkFiles.get('LICENSE'))
# print(rddT.take(5))

### RDDs support simple statistical actions

In [16]:
print(rdd.stats())
print(rdd.count())
print(rdd.sum())
print(rdd.mean())
print(rdd.stdev(), rdd.sampleStdev())
print(rdd.variance(), rdd.sampleVariance())
print(rdd.min(), rdd.max())
print(rdd.histogram(5))

(count: 14, mean: 6.5, stdev: 4.03112887415, max: 13.0, min: 0.0)
14
91
6.5
(4.0311288741492746, 4.1833001326703778)
(16.25, 17.5)
(0, 13)
([0.0, 2.6, 5.2, 7.800000000000001, 10.4, 13], [3, 3, 2, 3, 3])


### RDDs support set transformations

In [17]:
rddB = sc.parallelize(range(0, 26, 2))
print(rdd.union(rddB).collect()) # or: rdd + rddB
print(rdd.union(rddB).distinct().collect())
print(rdd.intersection(rddB).collect())
print(rdd.subtract(rddB).collect())
print(rdd.cartesian(rddB).collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24]
[24, 16, 0, 8, 1, 9, 2, 10, 18, 3, 11, 4, 12, 20, 13, 5, 14, 6, 22, 7]
[0, 8, 10, 2, 12, 4, 6]
[9, 1, 11, 3, 5, 13, 7]
[(0, 0), (0, 2), (0, 4), (1, 0), (1, 2), (1, 4), (2, 0), (2, 2), (2, 4), (0, 6), (0, 8), (0, 10), (1, 6), (1, 8), (1, 10), (2, 6), (2, 8), (2, 10), (0, 12), (0, 14), (0, 16), (1, 12), (1, 14), (1, 16), (2, 12), (2, 14), (2, 16), (0, 18), (0, 20), (0, 22), (1, 18), (1, 20), (1, 22), (2, 18), (2, 20), (2, 22), (0, 24), (1, 24), (2, 24), (3, 0), (3, 2), (3, 4), (4, 0), (4, 2), (4, 4), (5, 0), (5, 2), (5, 4), (3, 6), (3, 8), (3, 10), (4, 6), (4, 8), (4, 10), (5, 6), (5, 8), (5, 10), (3, 12), (3, 14), (3, 16), (4, 12), (4, 14), (4, 16), (5, 12), (5, 14), (5, 16), (3, 18), (3, 20), (3, 22), (4, 18), (4, 20), (4, 22), (5, 18), (5, 20), (5, 22), (3, 24), (4, 24), (5, 24), (6, 0), (6, 2), (6, 4), (7, 0), (7, 2), (7, 4), (8, 0), (8, 2), (8, 4), (6, 6), (6, 8), (6, 10), (7, 6), (7, 8), (

### Spark supports cluster-wide shared variables

In [18]:
# A broadcast variable is copied to each machine only once, in an efficient manner.
# It is very suitable when each node uses the data in it, and especially so if the data is
# large and would otherwise be sent across the network multiple times.
broadcastVar = sc.broadcast({'CA': 'California', 'NL': 'Netherlands'})
print(broadcastVar.value)

# An accumulator is a shared variable that lives on the master, and to
# which each task can add values. (Basically, it's a simple reducer.)
accu = sc.accumulator(0)
# 'foreach' just applies a function to each RDD element without returning anything
rdd.foreach(lambda x: accu.add(x))
print(accu.value)

{'CA': 'California', 'NL': 'Netherlands'}
91


### Spark allows for customizable partitioning and parallelization

In [19]:
# First of all, a SparkContext instantiation can be used to set certain defaults
sc.setLocalProperty('defaultMinPartitions', '4')
sc.setLocalProperty('defaultParallelism', '4')

# Also, aggregations such as 'reduceByKey' allow specifying the level of parallelism manually
rddP1.reduceByKey(lambda x, y: x + y, 10)

# Also, we can set any RDD to use a given number of partitions
rddRepart = rdd.partitionBy(100)

# Finally, there are some methods for manual re-partitioning of RDDs.
# (Warning: these can be expensive, but in certain cases very useful.)

# Efficient downscaling of partitions is done with 'coalesce'
rddRepart = rdd.coalesce(2)
# And full manual repartitioning
rddRepart = rdd.repartition(10)

A more complicated example of `partitionBy` is to use it for predefining how an RDD is partitioned, and to use this partitioning on other RDDs in exactly the same way. This way, we avoid shuffling the entire data set during, e.g., a `join` operation between these data sets. Depending on the application, this can translate into significant speedups.

In [20]:
rddP1 = sc.parallelize(range(14)).map(lambda x: (x % 2 == 0, x)).partitionBy(2)
rddP2 = sc.parallelize(range(0, 28, 2)).map(lambda x: (x % 2 == 0, x)).partitionBy(2)

print(rddP1.glom().collect())
print(rddP2.glom().collect())

# Now this 'join' does not require a full shuffle of the data,
# since the 'False' and 'True' keys are on the same node
rddJ = rddP1.join(rddP2)
# If we also want to keep 'rddJ' in the same partitioning, we have to specify it again
rddJ = rddJ.partitionBy(2)
print(rddJ.glom().collect())

[[(False, 7), (False, 9), (False, 11), (False, 13), (False, 1), (False, 3), (False, 5)], [(True, 0), (True, 2), (True, 10), (True, 12), (True, 4), (True, 6), (True, 8)]]
[[], [(True, 12), (True, 14), (True, 16), (True, 18), (True, 20), (True, 22), (True, 24), (True, 26), (True, 6), (True, 8), (True, 10), (True, 0), (True, 2), (True, 4)]]
[[], [(True, (0, 18)), (True, (0, 20)), (True, (0, 22)), (True, (0, 24)), (True, (0, 26)), (True, (0, 12)), (True, (0, 14)), (True, (0, 16)), (True, (0, 6)), (True, (0, 8)), (True, (0, 10)), (True, (0, 0)), (True, (0, 2)), (True, (0, 4)), (True, (2, 18)), (True, (2, 20)), (True, (2, 22)), (True, (2, 24)), (True, (2, 26)), (True, (2, 12)), (True, (2, 14)), (True, (2, 16)), (True, (2, 6)), (True, (2, 8)), (True, (2, 10)), (True, (2, 0)), (True, (2, 2)), (True, (2, 4)), (True, (4, 18)), (True, (4, 20)), (True, (4, 22)), (True, (4, 24)), (True, (4, 26)), (True, (4, 12)), (True, (4, 14)), (True, (4, 16)), (True, (4, 6)), (True, (4, 8)), (True, (4, 10)), (Tr

Pitfalls
--------

### Not caching intermediate results that are re-used later on

In [21]:
print("Not so great:")
rddBigTrans = rddBig.map(lambda x: (x ** 2 - 0.1) ** 0.5)
for threshold in (0.2, 0.4, 0.6, 0.8):
    %timeit -n 1 -r 1 rddBigTrans.filter(lambda x: x >= threshold).count()

print("Better:")
rddBigTrans_c = rddBig.map(lambda x: (x ** 2 - 0.1) ** 0.5).cache()
for threshold in (0.2, 0.4, 0.6, 0.8):
    %timeit -n 1 -r 1 rddBigTrans_c.filter(lambda x: x >= threshold).count()

Not so great:
1 loops, best of 1: 1.44 s per loop
1 loops, best of 1: 1.49 s per loop
1 loops, best of 1: 1.57 s per loop
1 loops, best of 1: 1.18 s per loop
Better:
1 loops, best of 1: 2.35 s per loop
1 loops, best of 1: 857 ms per loop
1 loops, best of 1: 864 ms per loop
1 loops, best of 1: 901 ms per loop


### Not considering when and how data is transfered through the cluster
Keep in mind that Spark is a distributed computing framework, and that transferring data over the network within a cluster should be avoided (network bandwidth is ~100 times more expensive than memory bandwidth).

In [22]:
# groupByKey triggers a shuffle so a lot of data is copied over the network
sumPerKey = rddP1.groupByKey().mapValues(lambda x: sum(x)).collect()

# Better: reduceByKey reduces locally before shuffling
sumPerKey = rddP1.reduceByKey(lambda x, y: x + y).collect()

### Not working with an appropriate number of partitions

In [23]:
# Not enough partitions results in bad concurrency in the cluster.
# It also puts pressure on the memory for certain operations.

# On the other hand, suppose an RDD is distributed over 1000 partitions, but we work only on a small
# subset of the data in the RDD, e.g.:
rddF = rdd.filter(lambda x: x < 0.1).map(lambda x: x ** 2)

# We are then effectively creating many empty tasks, and using coalesce or repartition
# to create an RDD with less partitions would be beneficial:
rddF = rdd.filter(lambda x: x < 3).coalesce(10).map(lambda x: x ** 2)

### Using map with high overhead per element, better to use mapPartitions

In [24]:
# For example, opening and closing a database connection takes time.
def db_operation(x):
    # Open connection to DB
    # Do something with an element
    # Close DB connection
    pass

# Especially so if you repeat it for every element:
rdd.map(db_operation)

# Better: do this at the level of partition rather than at the level of element.
def vectorized_db_operation(x):
    # Open connection to DB
    # Do something with an array of elements
    # Close DB connection
    pass

result = rdd.mapPartitions(vectorized_db_operation)

### Sending a lot of data along with a function call to each element

In [26]:
bigData = np.random.random(int(1e6))

def myFunc(x):
    return x * np.random.choice(bigData)

# this would send bigData along for each element of rdd
rdd.map(myFunc)

# Better: make the big data a read-only broadcast variable so that it
# is efficiently copied across the network
bigDataBC = sc.broadcast(bigData)

Code examples
-------------

### Simple scikit-learn

In [None]:
from sklearn.cross_validation import train_test_split, ShuffleSplit
from sklearn.datasets import make_regression
from sklearn import pipeline
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler

N = 10000   # number of data points
D = 100     # number of dimensions

X, y = make_regression(n_samples=N, n_features=D, n_informative=int(D*0.1),
                       n_targets=1, bias=-6., noise=50., random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y)

# partition the data into random subsamples
samples = sc.parallelize(ShuffleSplit(y_train.size, n_iter=8))
reg_model = pipeline.Pipeline([("scaler", StandardScaler()), ("ridge", Ridge())])
# train a model for each subsample and apply it to the test set
mean_rsq = samples.map(
    lambda (index, _): reg_model.fit(X[index], y[index]).score(X_test, y_test)).mean()
print(mean_rsq)

### Stochastic gradient descent using scikit-learn (from: https://gist.github.com/MLnick/4707012)
Each partition is a mini-batch for the SGD, uses average weights.

In [None]:
from sklearn import linear_model as lm
from sklearn.base import copy

N = 10000   # Number of data points
D = 10      # Numer of dimensions
ITERATIONS = 5
np.random.seed(seed=42)

def generate_data(N):
    return [[[1] if np.random.rand() < 0.5 else [0], np.random.randn(D)]
            for _ in range(N)]

def train(iterator, sgd):
    for x in iterator:
        sgd.partial_fit(x[1], x[0], classes=np.array([0, 1]))
    yield sgd

def merge(left, right):
    new = copy.deepcopy(left)
    new.coef_ += right.coef_
    new.intercept_ += right.intercept_
    return new

def avg_model(sgd, slices):
    sgd.coef_ /= slices
    sgd.intercept_ /= slices
    return sgd

slices = 4
data = generate_data(N)
print(len(data))

# init stochastic gradient descent
sgd = lm.SGDClassifier(loss='log')
# training
for ii in range(ITERATIONS):
    sgd = sc.parallelize(data, numSlices=slices) \
            .mapPartitions(lambda x: train(x, sgd)) \
            .reduce(lambda x, y: merge(x, y))
    # averaging weight vector => iterative parameter mixtures
    sgd = avg_model(sgd, slices)
    print("Iteration %d:" % (ii + 1))
    print("Model: ")
    print(sgd.coef_)
    print(sgd.intercept_)

The Spark universe
------------------

Other interesting tools for Spark:

- Spark SQL: http://spark.apache.org/docs/latest/sql-programming-guide.html
- MLlib, Spark's machine learning library: http://spark.apache.org/docs/latest/mllib-guide.html
- Spark Streaming, for streaming data applications: http://spark.apache.org/docs/latest/streaming-programming-guide.html

More information
----------------

### Documentation

Spark documentation: https://spark.apache.org/docs/latest/index.html

Spark programming guide: http://spark.apache.org/docs/latest/programming-guide.html

PySpark documentation: https://spark.apache.org/docs/latest/api/python/index.html

### Books

Learning Spark: http://shop.oreilly.com/product/0636920028512.do

(preview: https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/)

### Talks (recommended to watch them in this order)

Parallel programming with Spark: https://www.youtube.com/watch?v=7k4yDKBYOcw

Advanced Spark features: https://www.youtube.com/watch?v=w0Tisli7zn4

PySpark: Python API for Spark: https://www.youtube.com/watch?v=xc7Lc8RA8wE

Understanding Spark performance: https://www.youtube.com/watch?v=NXp3oJHNM7E

A deeper understanding of Spark's internals: https://www.youtube.com/watch?v=dmL0N3qfSc8