In [15]:
%python
from spylon_kernel import register_ipython_magics
register_ipython_magics()

# Spark

* Cluster computing software
* Extends map-reduce model to queries, dataframes and stream processing abstracting over cluster infrastructure and related complexity
* In-memory intermediate storage
* Structured data (tables) and semi-structured data (Json, XML) 

# Spark components
* Language support: Scala, Java, Python, R...
* Additional libraries: Spark SQL, ML-Lib, GraphX, Streaming
* Base libraries: Spark Core, RDD API, DataFrame API
* Cluster Management: Yarn, Mesos, Standalone, K8
* Storage / data sources: Local, HDFS, S3, RDBMS, NoSQL

# Spark cluster architecture

Each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application. 
Key configuration parameters:
 * spark.executor.cores: max tasks per executor. 
 * spark.executor.memory: max mem per executor.

![spark architecture](https://spark.apache.org/docs/latest/img/cluster-overview.png)

# Key Spark abstractions & concepts
* Job: paralell computation consisting of multiple tasks
* RDD (Resilient Distributed Dataset). A collection of elements.
    - Immutable
    - Partitioned
    - Enable efficient data reuse
    - fault-tolerant parallel data structures
    - Intermediate persistence
    - Partition & placement control
    - Manipulation through coarse-grained transforms: (map, filter, persist, groupByKey, join...)
* Task: Single operation happening on a specific RDD partition
* DAG (Directed Acyclic Graph) Scheduler.
    - Transforms a logical execution plan of RDD lineage dependencies to a physical execution plan.

* DataFrame: 2-dimensional data structure of heterogeneous types

# RDD
Resilient Distributed Dataset

In [2]:
val data = Range(0, 100)
val distData = sc.parallelize(data)

data: scala.collection.immutable.Range = Range 0 until 100
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26


In [3]:
val doubled = distData.map(x => x*2)

doubled: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26


In [26]:
doubled.getClass.getName

res6: String = org.apache.spark.rdd.MapPartitionsRDD


In [29]:
val even = doubled.filter(_ % 2 == 0).collect()

even: Array[Int] = Array(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198)


In [5]:
doubled.partitions.size

res1: Int = 32


[RDD documentation https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)

# DataFrame

Strongly typed collection of objects that can be transformed in parallel using functional or relational operations.

* [DataFrame](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/index.html)


* [DataSet](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html)

```
typeDataFrame = Dataset[Row]
val row = Row(1, true, "a string", null)
```


In [34]:
val df = Seq(
    (1, "John", 30, 75),
    (0, "Mary", 20, 60),
    (1, "Pete", 51, 80)
).toDF("label", "name", "age", "weight")

df: org.apache.spark.sql.DataFrame = [label: int, name: string ... 2 more fields]


In [35]:
df.show()

+-----+----+---+------+
|label|name|age|weight|
+-----+----+---+------+
|    1|John| 30|    75|
|    0|Mary| 20|    60|
|    1|Pete| 51|    80|
+-----+----+---+------+



In [50]:
val weightGain = df("weight") + 15

weightGain: org.apache.spark.sql.Column = (weight + 15)


In [51]:
df.select(weightGain).collect()

res21: Array[org.apache.spark.sql.Row] = Array([90], [75], [95])


In [54]:
df.filter(df("age") > 20)

res24: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, name: string ... 2 more fields]


In [53]:
df.filter("age > 20").show()

+-----+----+---+------+
|label|name|age|weight|
+-----+----+---+------+
|    1|John| 30|    75|
|    1|Pete| 51|    80|
+-----+----+---+------+



In [55]:
df.filter("age > 20").count()

res25: Long = 2


In [69]:
val column_exp = df("age") === 30
df.filter(column_exp).show()

+-----+----+---+------+
|label|name|age|weight|
+-----+----+---+------+
|    1|John| 30|    75|
+-----+----+---+------+



column_exp: org.apache.spark.sql.Column = (age = 30)


# Differences with Pandas 

Scala:
```
def filter(condition: Column): Dataset[T]
           ^^^^^^^^^^^^^^^^^
```

Most functions in Spark accept column expressions

Pandas:

Pandas is eager execution, and indexing is done with boolean series

```
In [27]: import pandas as pd 
    ...: df = pd.DataFrame([ 
    ...:     [1, 'John', 30, 75], 
    ...:     [1, 'Mary', 20, 60], 
    ...:     [1, 'Pete', 51, 80], 
    ...: ]) 
    ...: df.columns = ['label','name','age','weight'] 
    ...:  
```


```
In [28]: df                                                                                                             
Out[28]: 
   label  name  age  weight
0      1  John   30      75
1      1  Mary   20      60
2      1  Pete   51      80
In [29]: df['age']>20                                                                                                   
Out[29]: 
0     True
1    False
2     True
Name: age, dtype: bool
In [30]: df[df['age']>20]                                                                                               
Out[30]: 
   label  name  age  weight
0      1  John   30      75
2      1  Pete   51      80
```