- RDD storage levels
- Caching y persistencia distribuida de RDDs
- RDDs Checkpointing
- Escritura de RDD a archivos

## RDD lineage

![RDD lineage](https://github.com/israelzuniga/dlatam-bigdata-workshop/blob/master/notebooks/img/rdd_lineage.png?raw=true)

In [1]:
!wget https://raw.githubusercontent.com/israelzuniga/dlatam-bigdata-workshop/master/notebooks/data/lorem.txt

--2018-11-03 15:33:13--  https://raw.githubusercontent.com/israelzuniga/dlatam-bigdata-workshop/master/notebooks/data/lorem.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 29769 (29K) [text/plain]
Saving to: ‘lorem.txt.1’


2018-11-03 15:33:13 (1.05 MB/s) - ‘lorem.txt.1’ saved [29769/29769]



In [2]:
APP_NAME = "RDDS"
SPARK_URL = "local[*]"


from pyspark import SparkConf, SparkContext

In [3]:
conf = (SparkConf()\
        .setMaster(SPARK_URL)\
        .setAppName(APP_NAME))



sc = SparkContext(conf= conf)

In [4]:
# File: lorem.txt
lorem = sc.textFile('lorem.txt')

In [5]:
#RDD: lorem

words = lorem.flatMap(lambda x: x.split())


words.take(12)


['Lorem',
 'ipsum',
 'dolor',
 'sit',
 'amet,',
 'consectetur',
 'adipiscing',
 'elit.',
 'Phasellus',
 'ante',
 'enim,',
 'sagittis']

In [6]:
# RDD: words

longwords = words.filter(lambda x: len(x) > 5)



longwords.take(6)

['consectetur',
 'adipiscing',
 'Phasellus',
 'sagittis',
 'pellentesque',
 'vulputate']

In [7]:
# RDD: longwords

numwords = longwords.count()


print(numwords)

2227


In [8]:
print(longwords.toDebugString())

b'(2) PythonRDD[5] at RDD at PythonRDD.scala:49 []\n |  lorem.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n |  lorem.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []'


La acción `longwords.count()` obliga la evaluación de los RDDs padres hasta `longwords`. Si esta acción (o cualquier otra como `longwords.take(6)` o `longwords.collect()`) es llamada en una ocasión posterior, el linaje entero se reevualua . En casos simples, con datos pequeños en una o dos fases, las reevaluaciones no son problema. Pero en muchas circunstancias pueden ser ineficientes y pueden impactar el tiempo de recuperación en caso de catástrofe.

## RDD Storage Levels


Storage Level|Desc
------------|------
MEMORY_ONLY|(Default) RDD partitions are stored in memory only.
MEMORY_AND_DISK| RDD partitions that do no fit in memory are stored in disk.
MEMORY_ONLY_SER| RDD partitions are stored as serialized objects in memory. This option can be used to save memory (as serialized objects may consume less space than their deserialized equvalent).
MEMORY_AND_DISK_SER| RDD partitions are stored as serialized objects in memory. Objects that do not fit into memory are spilled to disk.
DISK_ONLY| RDD partitions are stored on disk only.




## Storage Level Flags

### StorageClass Constructor
```
StorageLevel(useDisk,
              useMemory,
              useOffHeap,
              deserialized,
              replication=1)
```


`useDisk`, `useMemory`, `useOffHeap`, y `deserialized` son argumentos Booleanos, mientras que `replication` es de valor entero (default a 1)


### Spark API: `getStorageLevel()`

In [9]:
lorem.getStorageLevel()

StorageLevel(False, False, False, False, 1)

In [10]:
lorem_sl = lorem.getStorageLevel()

In [13]:
lorem.getStorageLevel().useDisk

False

In [11]:
lorem_sl.useDisk

False

In [12]:
lorem_sl.useMemory

False

## Eligiendo un nivel de almacenamiento:

El nivel de almacenamiento de los RDD permiten ajustar el funcionamiento de los trabajos en Spark y acomodar operaciones que de otra forma no tendrían espacio en la memoria del cluster. Adicionalmente, las opciones disponibles de replicación pueden reducir el tiempo de restauración en caso de fallas.

Generalmente hablando, si un RDD cabe en la memoria disponible del cluster, el nivel de almacenamiento por default es suficiente y proveerá el mejor rendimiento.


-----




# Caching / Persistence / Checkpointing


## Caching

In [14]:
!wget https://raw.githubusercontent.com/israelzuniga/dlatam-bigdata-workshop/master/notebooks/data/all-shakespeare.txt

--2018-11-03 15:44:44--  https://raw.githubusercontent.com/israelzuniga/dlatam-bigdata-workshop/master/notebooks/data/all-shakespeare.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5342761 (5.1M) [text/plain]
Saving to: ‘all-shakespeare.txt.1’


2018-11-03 15:44:46 (2.39 MB/s) - ‘all-shakespeare.txt.1’ saved [5342761/5342761]



In [15]:
doc = sc.textFile("all-shakespeare.txt") 

In [16]:
words = doc.flatMap( lambda x: x.split()) \
    .map( lambda x: (x, 1))\
    .reduceByKey( lambda x, y: x + y) 


In [17]:
words.cache() 
words.count() # triggers computation 

73032

In [18]:
words.take( 30) # no computation required 

[('1', 48),
 ('IV', 501),
 ('PERSONAE', 37),
 ('Fourth.', 3),
 ('(KING', 15),
 ('HENRY,', 16),
 ('Prince', 78),
 ('of', 16526),
 ('HENRY:)', 2),
 ('|', 626),
 ('Lancaster', 21),
 ('(LANCASTER:)', 1),
 ('SIR', 486),
 ('WALTER', 21),
 ('BLUNT:', 2),
 ('PERCY', 35),
 ('OF', 1071),
 ('(NORTHUMBERLAND:)', 4),
 ('HOTSPUR,', 6),
 ('his', 6712),
 ('(HOTSPUR:)', 1),
 ('MORTIMER', 29),
 ('(MORTIMER:)', 2),
 ('RICHARD', 409),
 ('ARCHIBALD', 1),
 ('VERNON', 32),
 ('FALSTAFF', 492),
 ('MICHAEL', 10),
 ('POINS:', 2),
 ('GADSHILL:', 1)]

In [19]:
words.count() # no computation required 

73032

## Persistence `persist()`

```python
# Default
RDD.persist(storageLevel =  StorageLevel.MEMORY_ONLY_SER)

# myrdd
myrdd.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

# =
myrdd.persist(StorageLevel(True, True, False, False, 2))
```


##  `unpersist()`
Si el RDD deja de requerir perssitencia en disco, usamos este método. También si queremos cambiar las propiedades de persistencia de un RDD, debemos abandonar su configuración y especificarla posteriormente.

In [20]:
doc = sc.textFile("all-shakespeare.txt") 

In [21]:
words = doc.flatMap( lambda x: x.split()) \
    .map( lambda x: (x, 1))\
    .reduceByKey( lambda x, y: x + y) 


In [22]:
words.persist()

PythonRDD[22] at RDD at PythonRDD.scala:49

In [23]:
words.count()

73032

In [24]:
words.take(3)

[('1', 48), ('IV', 501), ('PERSONAE', 37)]

In [25]:
print(words.toDebugString())

b'(2) PythonRDD[22] at RDD at PythonRDD.scala:49 [Memory Serialized 1x Replicated]\n |       CachedPartitions: 2; MemorySize: 1380.9 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B\n |  MapPartitionsRDD[21] at mapPartitions at PythonRDD.scala:129 [Memory Serialized 1x Replicated]\n |  ShuffledRDD[20] at partitionBy at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]\n +-(2) PairwiseRDD[19] at reduceByKey at <ipython-input-21-bb809d795f06>:1 [Memory Serialized 1x Replicated]\n    |  PythonRDD[18] at reduceByKey at <ipython-input-21-bb809d795f06>:1 [Memory Serialized 1x Replicated]\n    |  all-shakespeare.txt MapPartitionsRDD[17] at textFile at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]\n    |  all-shakespeare.txt HadoopRDD[16] at textFile at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]'


In [26]:
words.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [27]:
from pyspark import StorageLevel

In [43]:
words.persist(storageLevel=StorageLevel(False, True, False, False, 10))

Py4JJavaError: An error occurred while calling o140.persist.
: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
	at org.apache.spark.rdd.RDD.persist(RDD.scala:170)
	at org.apache.spark.rdd.RDD.persist(RDD.scala:195)
	at org.apache.spark.api.java.JavaRDD.persist(JavaRDD.scala:47)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [46]:
words.unpersist()

PythonRDD[22] at RDD at PythonRDD.scala:49

In [45]:
words.persist(storageLevel=StorageLevel(True, True, True, False, 10))

PythonRDD[22] at RDD at PythonRDD.scala:49

In [39]:
words.unpersist()

PythonRDD[22] at RDD at PythonRDD.scala:49

## Checkpointing `checkpoint()`

```python


SparkContext.setCheckpointDir()


RDD.checkpoint()


RDD.isCheckpointed()


RDD.getCheckpointFile()
```

In [47]:
sc.setCheckpointDir('claseNoviembre/')

In [48]:
doc = sc.textFile('all-shakespeare.txt')

In [49]:
words = doc.flatMap( lambda x: x.split()) \
    .map( lambda x: (x, 1))\
    .reduceByKey( lambda x, y: x + y) 


In [50]:
words.checkpoint()

In [51]:
words.count()

73032

In [52]:
words.isCheckpointed()

True

In [54]:
words.getCheckpointFile()

'file:/home/jovyan/work/notebooks/claseNoviembre/1ab0f255-b717-4627-ad74-71f5d85cbb77/rdd-31'

In [55]:
sc.stop()

-----

Ejercicio: Checkpointing