# Basic setup

Here we will import the `pyspark` module and set up a `SparkContext` running on the local machine with one executor for every core on the local machine.

In [1]:
import pyspark
spark = pyspark.SparkContext("local[*]")

# Creating a resilient distributed dataset (RDD)

One of the easiest ways to create a resilient distributed dataset is from a local collection, with the `parallelize` method on the `SparkContext` object.

In [3]:
numberRDD = spark.parallelize(range(1, 10000))

You can also create RDDs from files, S3 objects, and other external data sources.  See [the documentation](https://spark.apache.org/docs/latest/programming-guide.html#external-datasets) for more information. 

# Basic RDD transformations 

RDDs are _immutable_, so to transform a RDD, we create a new one.  RDDs are also _lazy_, so instead of transforming the elements when we create the new RDD, we store a reference to the original and the operation we'd need to apply to it to construct the transformed RDD.

In [10]:
# filter numberRDD, keeping only even numbers
evens = numberRDD.filter(lambda x: x % 2 == 0)

# produce an RDD by doubling every element in numberRDD
doubled = numberRDD.map(lambda x: x * 2)

# filter numberRDD, keeping only multiples of five
fives = numberRDD.filter(lambda x: x % 5 == 0)

# return an RDD of the elements in both evens and fives
tens = evens.intersection(fives)
sortedTens = tens.sortBy(lambda x: x)

You can see other RDD transformations in the [Spark documentation](https://spark.apache.org/docs/latest/programming-guide.html#transformations).

# RDD actions

Since RDDs are lazy and RDD transformations don't actually compute anything, we need some way to force Spark to actually schedule a computation.  RDD _actions_ are operations that schedule the graph of computations implied by an RDD and return a result to the main program.  Here are a few examples:

In [11]:
(evens.count(), doubled.count())

(4999, 9999)

In [8]:
# note that we may not get results in order!
tens.take(5)

[8320, 2080, 4160, 480, 8800]

In [9]:
# ...unless we sort
sortedTens.take(5)

[10, 20, 30, 40, 50]

In [12]:
# we can take a sample from an RDD (with or without replacement)
sortedTens.takeSample(False, 10)

[2570, 2910, 8620, 900, 870, 6390, 6270, 6700, 2610, 3330]

In [13]:
sortedTens.reduce(lambda x, y: max(x, y))

9990

You can see some other RDD actions in the [Spark documentation](https://spark.apache.org/docs/latest/programming-guide.html#actions).

# Structured query and data frames

Spark also includes support for structured queries, including SQL and pandas- or R-like "data frame" operations through a query DSL.  In order to use this support (in Spark 1.x), you'll need to create a `SQLContext` object:

In [4]:
from pyspark.sql import SQLContext
sqlc = SQLContext(spark)

Let's see structured query in action by loading a [Parquet](http://parquet.apache.org/) file with some simplified [fedmsg](https://fedora-fedmsg.readthedocs.io/en/latest/) log messages.

In [9]:
df = sqlc.read.parquet("/msgs.parquet")
df.printSchema()

root
 |-- category: string (nullable = true)
 |-- i: long (nullable = true)
 |-- id: long (nullable = true)
 |-- msg: string (nullable = true)
 |-- msg_id: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- source_version: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- topic: string (nullable = true)



We can use the data frame DSL to do RDBMS-style queries on these data, which is great for characterizing or exploring it.  Because these queries can execute in parallel or across multiple machines, aggregations can be _much_ faster in Spark than they are on a traditional RDBMS.  

It is simple to do basic aggregations, like this:

In [48]:
df.groupBy('category').count().orderBy('count', ascending=False).show()

+------------+-------+
|    category|  count|
+------------+-------+
|    buildsys|3494565|
|         git|  92611|
|        copr|  76774|
|       pkgdb|  43328|
|       bodhi|  33624|
|fedoratagger|  30835|
|   fedbadges|  29494|
|        wiki|  17609|
|      askbot|  15278|
|         fas|  13022|
|  summershum|   6996|
|        trac|   6496|
|     compose|   5340|
|        null|   5095|
|     ansible|   4333|
|      github|   4291|
|      planet|   3926|
|     meetbot|   3016|
|      anitya|   1300|
|         fmn|    817|
+------------+-------+
only showing top 20 rows



(This file isn't huge, but it's in the Docker image.)

In [101]:
df.count()

3889881

We can use the `show` method to quickly inspect a few rows of a data frame (not just the results of a query).  This is often helpful to sanity-check a new data source.

In [10]:
df.show(10)

+------------+---+--------+--------------------+--------------------+-----------+--------------+--------------------+--------------------+
|    category|  i|      id|                 msg|              msg_id|source_name|source_version|           timestamp|               topic|
+------------+---+--------+--------------------+--------------------+-----------+--------------+--------------------+--------------------+
|       bodhi|  1|       1|{"comment":{"anon...|                null| datanommer|             0|2012-10-08T17:38:...|org.fedoraproject...|
|    buildsys|282|10000000|{"build_id":23690...|2014-bf43ca16-aa9...| datanommer|         0.6.4|2014-08-22T04:20:...|org.fedoraproject...|
|    buildsys|283|10000001|{"build_id":23690...|2014-bd536558-253...| datanommer|         0.6.4|2014-08-22T04:20:...|org.fedoraproject...|
|    buildsys|286|10000004|{"build_id":17994...|2014-46706d25-846...| datanommer|         0.6.4|2014-08-22T04:20:...|org.fedoraproject...|
|        copr|133|10000017|

# Data cleaning

Uh oh!  It looks like the `msg` field of this data frame is JSON-encoded message structures instead of actual message structures.  While we'd _never_ see messy data in the real world, this really throws a wrench into our tutorial.  Let's fix that by asking Spark to infer a schema for the JSON fields.

In [125]:
msgRDD = df.select("msg").rdd.map(lambda x: x[0])
# structs = sqlc.jsonRDD(msgRDD)
# structs.printSchema()

You'll notice that the last two lines are commented out there, and with good reason.  You can uncomment them and run them, but only if you're patient and willing to scroll. You'll get a huge schema with objects that have one field for (as one example) every Fedora user who has ever participated in an IRC meeting! (Alternatively, [click here](https://gist.github.com/willb/ede22cdcd25b64e8cda952f927701d96) to see a rendered version of the inferred schema.) 

Spark can't infer a useful schema for these JSON records, because their schemas diverge and because of some of the unusual ways that `fedmsg` data uses JSON to encode maps.  While there are a few reasons for the schema divergence (see [a practical treatment](http://chapeau.freevariable.com/2014/10/fedmsg-and-spark.html) or a more [type-theoretic one](http://chapeau.freevariable.com/2014/11/algebraic-types.html)), in this case one problem is that different `fedmsg` messages use the `branches` field to refer to values with incompatible types.

Fortunately, we can fix that with a pretty quick hack.  We'll just go through every record and retain only a few fields that we know are not going to give us grief.  (You'd probably want to do something more sophisticated in a real application.)  We'll use Spark's _user-defined function_ mechanism to achieve this.

In [102]:
import json

# define the fields we want to keep
interesting_fields = ['agent', 'author', 'copr', 'user', 'msg', 'meeting_topic', 'name', 'owner', 'package']

# describe the return type of our user-defined function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
resultType = StructType([StructField(f, StringType(), nullable=True) for f in interesting_fields])

# this is the body of our first user-defined function, to restrict us to a subset of fields
def trimFieldImpl(js):
    try:
        d = json.loads(js)
        return [d.get(f) for f in interesting_fields]
    except:
        # return an empty struct if we fail to parse this message
        return [None] * len(interesting_fields)
    
from pyspark.sql.functions import udf

# register trimFieldImpl as a user-defined function
trimFields = udf(trimFieldImpl, resultType)

trimmedDF = df.withColumn("msg", trimFields("msg"))

In [103]:
trimmedDF.printSchema()

root
 |-- category: string (nullable = true)
 |-- i: long (nullable = true)
 |-- id: long (nullable = true)
 |-- msg: struct (nullable = true)
 |    |-- agent: string (nullable = true)
 |    |-- author: string (nullable = true)
 |    |-- copr: string (nullable = true)
 |    |-- user: string (nullable = true)
 |    |-- msg: string (nullable = true)
 |    |-- meeting_topic: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- owner: string (nullable = true)
 |    |-- package: string (nullable = true)
 |-- msg_id: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- source_version: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- topic: string (nullable = true)



Data frames are a great way to explore structured data, but you can also train models against them (either by converting to RDDs and using [MLlib](https://spark.apache.org/docs/latest/mllib-guide.html) or by using [ML Pipelines](https://spark.apache.org/docs/latest/ml-guide.html) to define learning pipelines directly on data frames).

Let's extract bug and update comments from our `fedmsg` data and use those to train a word2vec model.

In [135]:
def getComments(js):
    try:
        d = json.loads(js)
        cs = d.get('comment', []) + d.get('update', {}).get('comments', [])
        notes = 'notes' in d and [d['notes']] or []
        return [c['text'] for c in cs if c.has_key('text')] + []
    except:
        return []

commentsRDD = msgRDD.flatMap(lambda js: getComments(js))

# turn comments into sequences of words.  don't bother stripping punctuation or stemming #yolo
wordSeqs = commentsRDD.map(lambda s: s.split())

In [140]:
# actually train a model

from pyspark.mllib.feature import Word2Vec

w2v = Word2Vec()
model = w2v.fit(wordSeqs)

# find synonyms for a given word
synonyms = model.findSynonyms('works', 5)

for word, distance in synonyms:
    print("{}: {}".format(word, distance))

Works: 2.20867294376
fine: 2.08998290367
good: 1.8792983991
me: 1.86386831913
work: 1.85234287117


In [139]:
# see what words are in the model

model.getVectors().keys()

[u'breaks',
 u'(fedora)',
 u'yaneti.',
 u'immanetize.',
 u'hpejakle.',
 u'submitted',
 u'plugin',
 u'looks',
 u'kkeithle.',
 u'tgl.',
 u'Error:',
 u'jdunn.',
 u'nalin',
 u'used',
 u'automatic',
 u'mystro256.',
 u'oget.',
 u'frafra.',
 u'Still',
 u'Please',
 u'oliver.',
 u'stahnma.',
 u'sddm-0.2.0-0.19.20130914git50ca5b20.fc20.',
 u'unpushed',
 u'pavlix.',
 u'mitr.',
 u'emaldonado',
 u'goeran.',
 u'jsteffan.',
 u'mmilata.',
 u'kumarpraveen.',
 u'davidz.',
 u'Does',
 u"it's",
 u'goldmann.',
 u'rjones',
 u'canl-c',
 u'application',
 u'please',
 u'fpokorny.',
 u'nodejs-form-data-0.1.0-1.el6,',
 u'desktop',
 u'noarch.',
 u'number',
 u'mluscon.',
 u'able',
 u"that's",
 u'devrim.',
 u'failure',
 u'roma.',
 u'jvcelak.',
 u'for',
 u'iankent.',
 u'tingping.',
 u'jfsaucier.',
 u'mschwendt',
 u'rfenkhuber.',
 u'adrian.',
 u'network',
 u'averi.',
 u'scottt.',
 u'anishpatil.',
 u'functionality',
 u'logic.',
 u'support',
 u'<module>',
 u'paller.',
 u'buildroot',
 u'post',
 u'works.',
 u'test',
 u'fur