# Introduction to Spark

Spark is a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing.

Spark applications can be written in Python, Java, Scala in R. It integrates well with IPython and the entire Python Stack (e.g. Numpy).

The company Databricks is the main contributor to the open-source project. 





This graph, when published, created much excitement about Spark (and some controversy):

![Spark](http://spark.apache.org/images/logistic-regression.png)




Spark includes multiple modules:

![spark-modules](http://spark.apache.org/images/spark-stack.png)




A spark cluster contains a master and workers:

![spark-master-worker](https://camo.githubusercontent.com/8db49b5f39c2ba95614d1fbe98e905b4694f9999/687474703a2f2f737061726b2e6170616368652e6f72672f646f63732f6c61746573742f696d672f636c75737465722d6f766572766965772e706e67)
![executors](http://spark-mooc.github.io/web-assets/images/executors.png)


This is a quick introduction. Please refer to the programming guide if you want to go deeper:
http://spark.apache.org/docs/latest/programming-guide.html

# Tutorial

In [None]:
import pyspark

We need to create a sparkcontext (unless created by default):

In [None]:
sc = pyspark.SparkContext()

In [None]:
print ("Running Spark Version %s" % (sc.version))

sparkcontext comes with a default configuration for local computations

by modifying SparkConf we can set our cluster configuration:

In [None]:
conf = pyspark.SparkConf()

conf.set("spark.python.profile", "true", 
         # ...
        )

# Read more about configurations here: http://spark.apache.org/docs/latest/configuration.html

## RDD


Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is a distributed collection of objects. Each RDD is divided into partitions, which may be computed on different nodes of the cluster.

RDDs support two types of operations: 

1. **Transformations** which create a new dataset from an existing one

2. **Actions** which return a value to the driver program after running a computation on the dataset.



In [None]:
# Load a range of 0 .. 10000 (Python)

data = range(10000)

In [None]:
# Parallelize the list, create an RDD

dataRDD = sc.parallelize(
    data, # python collection that will be distributed
    4 # number of slices
)

In [None]:
dataRDD.getNumPartitions()

In [None]:
# We can collect an RDD
dataRDD.collect()

# or take first 5
dataRDD.take(5)

# (this is an action - it requires data to all be in the same place!)

Let's substract 1 from every element in the collection:

In [None]:
subRDD = dataRDD.map(lambda x: x - 1)

In [None]:
subRDD.take(5)

In [None]:
subRDD.count()

Here is the a list of selected operations:

**Transformations** :

* map
* filter
* sample
* groupByKey
* reduceByKey

[cf. doc](http://spark.apache.org/docs/latest/programming-guide.html#transformations)

**Actions** :

* first()
* count()
* take()
* collect()
* reduce()
* takeOrdered() : sort according to a lambda (passed as the second variable)
* top()

[cf. doc](http://spark.apache.org/docs/latest/programming-guide.html#actions)

Let's delete from the collections elements that are multiples of 3 and 5

### Filter

In [None]:
dataRDD.take(5)

In [None]:
dataRDD.filter(lambda x: x % 2 != 0 and x % 5 != 0).take(5)

### Reduce

In [None]:
dataRDD.filter(lambda x: x % 2 != 0 and x % 5 != 0) \
    .reduce(lambda a, b: a + b)

### countByValue (similar to value_counts)

In [None]:
repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6])
print(repetitiveRDD.countByValue())

### Map et flatMap

In [None]:
simpleRDD = sc.parallelize([2, 3, 4])
print(simpleRDD.map(lambda x: range(1, x)).collect())

#  one-to-many mapping
print(simpleRDD.flatMap(lambda x: range(1, x)).collect())

In [None]:
# Let's create a new base RDD to work from
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)

# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))

# View the results
print(singularAndPluralWordsRDDMap.collect())
print(singularAndPluralWordsRDD.collect())

### reduceByKey

![reduceByKey() figure](http://spark-mooc.github.io/web-assets/images/reduce_by.png)

In [None]:
data = ["a", "b", "a", "a", "b", "b", "a", "a", "a", "b", "b", "b"]

redRDD = sc.parallelize(data, 4)

redRDD = redRDD.map(lambda x: (x, 1))
    
redRDD = redRDD.reduceByKey(lambda a, b: a + b)

In [None]:
redRDD.collect() # we just did sum aggregation by key 

## Reading files

In [None]:
titanic = sc.textFile("data/titanic_train.csv")

In [None]:
# count elements in collections

titanic.count()

In [None]:
titanic.take(5)

In [None]:
titanic.takeSample(False, 10, 1) # Return a fixed-size sampled subset of this RDD

# takeSample(withReplacement, num, seed=None)

In [None]:
header = titanic.first()
header

In [None]:
titanic = titanic.filter(lambda x: x != '').filter(lambda line: line != header)

In [None]:
def parse_row(row):
    row = [segs.replace('"','') for segs in row.split(',')]
    return {
        'survived': row[1],
        'pclass': row[2],
        'sex': row[5],
        'age': row[6],
        'ticket': row[9],
        'fare': float(row[10]) if '.' in row[10] else -1,
        'embarked': row[12]
    }

In [None]:
parsed_titanic = titanic.map(lambda row: parse_row(row))

In [None]:
parsed_titanic.take(3)

A very bulky process ...

### Aggregation

In [None]:
parsed_titanic.map(lambda person: person['sex']).countByValue()

#### What is the average fare?

In [None]:
parsed_titanic.map(lambda x: x['fare']).take(10)

In [None]:
fares = parsed_titanic.map(lambda x: x['fare']).filter(lambda x: x != -1)
fares = fares.reduce(lambda a, b: a + b) / fares.count()
fares

In [None]:
parsed_titanic.filter(lambda row: row['fare'] != -1) \
    .map(lambda row: (row['pclass'], (row['fare'], 1))) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
    .mapValues(lambda u: u[0] / u[1]).collect()

Not so much fun to write this long low-level operations ...

## Dataframes

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
df = sqlContext.createDataFrame(parsed_titanic)

In [None]:
df.sample(False, 0.1).show()

In [None]:
df.printSchema()

In [None]:
df.select("pclass", "survived").show(5)

In [None]:
df.select("pclass", "survived").filter(df["survived"] == 1).show(5)

In [None]:
df.groupBy("sex").count().show()

In [None]:
#default format is parquet

df.write.save('output/titanic.parquet')

In [None]:
df2 = sqlContext.read.parquet('output/titanic.parquet')

In [None]:
df2.show(5)

In [None]:
import pyspark.sql.functions as func

In [None]:
df.groupby("pclass") \
    .agg(func.mean("fare"), func.mean("survived")) \
    .show(10)

In [None]:
df.groupby("sex") \
    .agg(func.sum("fare").alias("total")) \
    .show(10)

### Use SQL

1 - We can run an sql query on a parquet file

In [None]:
sqlContext.sql("SELECT count(*) FROM parquet.`output/titanic.parquet`").show()

2 - We can registers an existing RDD as a SQL table

In [None]:
df.registerTempTable("titanic")

In [None]:
sqlContext.sql("""
    SELECT pclass, avg(fare)
    FROM titanic
    GROUP BY pclass
""").show()

### Make a join

In [None]:
referential = sc.parallelize([('S', 'Southampton'), ('C', 'Cherbourg'), ('Q', 'Queenstown')])

In [None]:
ref_df = sqlContext.createDataFrame(referential)

In [None]:
ref_df.show()

In [None]:
ref_df = ref_df.withColumnRenamed('_1', 'embarked') \
    .withColumnRenamed('_2', 'port_name')

In [None]:
ref_df.show()

In [None]:
df.join(ref_df, on='embarked', how='left').show(5)

In [None]:
df.join(ref_df, on='embarked', how='left').filter(df['embarked'] == 'S').show(5)

## Machine Learning with MLlib

First, we need to prepare the data. MLlib models use **LabeledPoint** as input (features and labels)

We will start by predicting survival (survived=1) using a Decision Tree:

In [None]:
from pyspark.mllib.regression import LabeledPoint

def create_point(row):    
    features = [
        int(row['pclass']) - 1,
        (1 if row['age'] > '18' else 0),
        (1 if row['sex'] == 'female' else 0)
    ]
    return LabeledPoint(1 if row['survived'] == '1' else 0, features)

In [None]:
titanic_points = parsed_titanic.map(create_point)

In [None]:
titanic_points.takeSample(False, 5, 0)

In [None]:
# let's validate out model using the split funcion randomSplit:

training_rdd, test_rdd = titanic_points.randomSplit([0.7, 0.3], seed = 0)

training_count = training_rdd.count()
test_count = test_rdd.count()

# size of two new data sets:
training_count, test_count

### Logistic Regression

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

lr = LogisticRegressionWithLBFGS.train(training_rdd)

In [None]:
predictions_rdd = lr.predict(test_rdd.map(lambda x: x.features))
predictions_rdd.take(10)

#### Validation

In [None]:
# first let's inspect test dataset

test_rdd.take(5)

In [None]:
# zip allows us to transform two lists into a list of tuples

truth_and_predictions_rdd = test_rdd.map(lambda x: x.label).zip(predictions_rdd)
truth_and_predictions_rdd.take(5)

In [None]:
accuracy = truth_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Accuracy =', accuracy)

### Decision Tree

In [None]:
from pyspark.mllib.tree import DecisionTree

dt=DecisionTree.trainClassifier(
   training_rdd, 
   numClasses=2, 
    
   categoricalFeaturesInfo={
        # Map from categorical feature index to number of categories. 
        # Any feature not in this map is treated as continuous.
        0: 3,
        1: 2,
        2: 2
    })

In [None]:
predictions_rdd = dt.predict(test_rdd.map(lambda x: x.features))
predictions_rdd.take(10)

#### Validation

In [None]:
truth_and_predictions_rdd = test_rdd.map(lambda x: x.label).zip(predictions_rdd)
truth_and_predictions_rdd.take(5)

accuracy = truth_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Accuracy =', accuracy)