## Setting things up

In [1]:
from operator import add

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("Spark Examples")\
    .getOrCreate()


## Creating an RDD

In [2]:
# The elements of this data set are copied to form a distributed data set that can be operated
# on in parallel.
# Call sc.parallelize(iterable, num_partitions) to create a distributed data set with one
# task run per partition (typically 2-4 partitions for each cluster CPU). 
data = [1, 2, 3, 4, 5]
num_partitions = 3
rdd_data = spark.sparkContext.parallelize(data, num_partitions)
rdd_data

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

In [3]:
# Since an RDD is distributed amongst multiple computers, printing out its values
# on *this* computer is nontrivial
rdd_data.take(3)

[1, 2, 3]

In [4]:
# Warning: only do this for small RDDs as it'll bring back all the data !
rdd_data.collect()

[1, 2, 3, 4, 5]

In [5]:
rdd_data.reduce(lambda a, b: a+b)

15

In [6]:
def my_fun(a, b):
    return a + b

rdd_data.reduce(my_fun)

15

In [7]:
# PySpark can create distributed datasets from any storage source supported by Hadoop,
# including your local file system, HDFS, Cassandra, HBase, Amazon S3 etc. Spark supports
# text files, SequenceFiles and any other Hadoop InputFormat.

In [8]:
lines = spark.sparkContext.textFile("SQL Notes.txt")
lines

SQL Notes.txt MapPartitionsRDD[6] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
# Calculating the total length of all lines in the text file

# map() is a *transformation*: it turns an RDD into another RDD.
# In this case, it turns our RDD representing the text file lines into
# an RDD containing the length of each line in the file.
line_lengths = lines.map(lambda s: len(s))

# Uncomment the following line to see the first 10 elements 
#line_lengths.take(10)

In [10]:
# reduce() is an *action*: it actually returns a value.
# This reduction adds all the line lengths together to produce the total
# length of all lines.
# Due to lazy evaluation, the map() above does not actually run until we
# perform this reduce() action.
total_length = line_lengths.reduce(lambda a, b: a + b)
total_length

24646

## Creating a DataFrame

These work similarly to data frames in pandas and R. DataFrames are part of Spark SQL, which gives Spark more information than raw RDDs about the structure of the data and the type of computation you're performing. This allows Spark to perform extra optimisations than would be the case for raw RDDs.

For ML purposes you should generally use DataFrames over raw RDDs.

In [11]:
# As with RDDs, you can create DataFrames in many ways: from files, external DBs, existing RDDs...
people_df = spark.read.json("people.json")
people_df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [12]:
people_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [13]:
# Select people older than 21
people_df.filter(people_df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [14]:
# Count people by age
people_df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



## Word counts

In [15]:
# read.text() is unlike textFile() in that it creates a Spark DataFrame, not an RDD
lines_df = spark.read.text("SQL Notes.txt")
lines_df.printSchema()

root
 |-- value: string (nullable = true)



In [16]:
lines_df.show()

+--------------------+
|               value|
+--------------------+
|* Resources: Cour...|
|                    |
|* Relational alge...|
|    Mathematical ...|
|    Theory refers...|
|    Algebra is cl...|
|    SQL has relat...|
|                    |
| * Relational model:|
|  - A *database* ...|
|                    |
|  - Each relation...|
|                    |
|  - Each tuple in...|
|                    |
|  - Each attribut...|
|                    |
|  - *Schema*: Str...|
|    - DB schema i...|
|                    |
+--------------------+
only showing top 20 rows



In [17]:
# Let's have a look at the first element of this DataFrame.
# Note that it's a PySpark SQL row: https://spark.apache.org/docs/1.1.1/api/python/pyspark.sql.Row-class.html
lines_df.rdd.first()

Row(value='* Resources: Coursera (Introduction to DBs)')

In [18]:
# 1st 5 elements, just so you can see what they look like
lines_df.rdd.take(5)

[Row(value='* Resources: Coursera (Introduction to DBs)'),
 Row(value=''),
 Row(value='* Relational algebra:'),
 Row(value='    Mathematical basis of relational databases: specifies set operators like intersection, union, difference, selection, projection, join'),
 Row(value='    Theory refers to sets (each element is unique), but in practice we usually deal with bags (which do allow duplicates)')]

In [19]:
lines_rdd = lines_df.rdd.map(lambda r: r.value)

# Equivalently...
#lines_rdd = lines_df.rdd.map(lambda r: r[0])

In [20]:
# Split each line into individual words
# Unlike map(), flatMap() "flattens" a list of lists into a single list
split_rdd = lines_rdd.flatMap(lambda x: x.split(' '))
split_rdd.take(10)

['*',
 'Resources:',
 'Coursera',
 '(Introduction',
 'to',
 'DBs)',
 '',
 '*',
 'Relational',
 'algebra:']

In [21]:
# This attaches the number "1" to each word. See the next cell to find out why we do this!
word_one_rdd = split_rdd.map(lambda x: (x, 1))
word_one_rdd.take(5)

[('*', 1), ('Resources:', 1), ('Coursera', 1), ('(Introduction', 1), ('to', 1)]

In [22]:
# This will group every value with the same key and add the values together
# Since each value is just 1, this operation will give us the word count for each
# key
word_counts = word_one_rdd.reduceByKey(add)

In [23]:
# We've finished, so use collect() to pull back the final word counts
word_counts.collect()

[('*', 49),
 ('Resources:', 1),
 ('Coursera', 1),
 ('(Introduction', 1),
 ('to', 49),
 ('DBs)', 1),
 ('', 4317),
 ('Relational', 2),
 ('algebra:', 1),
 ('Mathematical', 1),
 ('basis', 1),
 ('of', 63),
 ('relational', 2),
 ('databases:', 1),
 ('specifies', 1),
 ('set', 7),
 ('operators', 1),
 ('like', 8),
 ('intersection,', 1),
 ('union,', 1),
 ('difference,', 1),
 ('selection,', 1),
 ('projection,', 1),
 ('join', 4),
 ('Theory', 1),
 ('refers', 1),
 ('sets', 2),
 ('(each', 1),
 ('element', 1),
 ('is', 24),
 ('unique),', 1),
 ('but', 8),
 ('in', 37),
 ('practice', 1),
 ('we', 9),
 ('usually', 1),
 ('deal', 1),
 ('with', 19),
 ('bags', 1),
 ('(which', 1),
 ('do', 5),
 ('allow', 2),
 ('duplicates)', 1),
 ('Algebra', 1),
 ('closed:', 2),
 ('results', 4),
 ('operations', 1),
 ('are', 11),
 ('also', 6),
 ('relations', 3),
 ('SQL', 9),
 ('has', 8),
 ('algebra', 1),
 ('as', 18),
 ('its', 3),
 ('foundation', 1),
 ('model:', 1),
 ('-', 145),
 ('A', 4),
 ('*database*', 1),
 ('a', 72),
 ('named', 

In [24]:
# We can chain together all the previous operations in the following one-liner
word_counts = lines_rdd.flatMap(lambda x: x.split(' ')) \
                       .map(lambda x: (x, 1)) \
                       .reduceByKey(add)

In [25]:
word_counts.collect()

[('*', 49),
 ('Resources:', 1),
 ('Coursera', 1),
 ('(Introduction', 1),
 ('to', 49),
 ('DBs)', 1),
 ('', 4317),
 ('Relational', 2),
 ('algebra:', 1),
 ('Mathematical', 1),
 ('basis', 1),
 ('of', 63),
 ('relational', 2),
 ('databases:', 1),
 ('specifies', 1),
 ('set', 7),
 ('operators', 1),
 ('like', 8),
 ('intersection,', 1),
 ('union,', 1),
 ('difference,', 1),
 ('selection,', 1),
 ('projection,', 1),
 ('join', 4),
 ('Theory', 1),
 ('refers', 1),
 ('sets', 2),
 ('(each', 1),
 ('element', 1),
 ('is', 24),
 ('unique),', 1),
 ('but', 8),
 ('in', 37),
 ('practice', 1),
 ('we', 9),
 ('usually', 1),
 ('deal', 1),
 ('with', 19),
 ('bags', 1),
 ('(which', 1),
 ('do', 5),
 ('allow', 2),
 ('duplicates)', 1),
 ('Algebra', 1),
 ('closed:', 2),
 ('results', 4),
 ('operations', 1),
 ('are', 11),
 ('also', 6),
 ('relations', 3),
 ('SQL', 9),
 ('has', 8),
 ('algebra', 1),
 ('as', 18),
 ('its', 3),
 ('foundation', 1),
 ('model:', 1),
 ('-', 145),
 ('A', 4),
 ('*database*', 1),
 ('a', 72),
 ('named', 

## Machine learning with Spark
Spark's MLlib contains a large number of distributed implementations of machine learning algorithms. Generally, it tries to follow a similar interface to scikit-learn so you should be able to pick it up very easily.

Keep in mind that there are two versions of MLlib: the older one is based on RDDs and the newer uses DataFrames as the primary structure for storing data sets. Make sure you use the DataFrame-based API as the RDD-based version is in maintenance mode and will soon be deprecated!

The following toy example shows a few fundamental concepts in MLlib: `Transformer`s, `Estimator`s and `Pipeline`s. These work as follows:

* `Estimator`: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. Example: `LogisticRegression` is an Estimator which is trained on a training DataFrame and produces a `LogisticRegressionModel` Transformer (which can be run on a test set to make predictions).

* `Transformer`: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. Examples: Feature transformers like `Tokenizer`s or `CountVectorizer`s. A `LogisticRegressionModel` is also a Transformer which transforms a DataFrame with features (e.g. a test set) into a DataFrame with predictions.

* `Pipeline`: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow. As in scikit-learn, at training time we call `fit()` to perform transformations on the training data and fit the model specified by the `Estimator`. To evaluate the model, we call `transform()` on the test data to make predictions. Note that this last step is a bit different from scikit-learn, where we call `predict()` to make predictions.

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import CountVectorizer, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples. In this example, we're
# simply looking to output "1.0" when the text contains "spark" and "0.0" otherwise. Usually
# text classification problems contain a much larger vocabulary than this!
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Set up an ML pipeline, which consists of three stages: a tokenizer, a count vectorizer
# and the logistic regression estimator itself.
# The tokenizer and count vectorizer are Transformers; logistic regression is an Estimator.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
count_vectorizer = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="features")
logistic_regression = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, count_vectorizer, logistic_regression])

# Fit pipeline to training documents
model = pipeline.fit(training)

In [27]:
# Prepare test documents, which are *unlabeled* (id, text) tuples
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("({0}, {1}) --> prob={2}, prediction={3}".format(rid, text, prob, prediction))

(4, spark i j k) --> prob=[0.155543713844,0.844456286156], prediction=1.0
(5, l m n) --> prob=[0.830707735211,0.169292264789], prediction=0.0
(6, spark hadoop spark) --> prob=[0.0696218406195,0.93037815938], prediction=1.0
(7, apache hadoop) --> prob=[0.981518350351,0.018481649649], prediction=0.0


In [28]:
spark.stop()