# Actions


Unlike **transformations** which produce RDDs, **action** functions produce a value back to the caller. In Spark, transformations are *lazy* evaluated, also known as a call-by-need evaluation strategy that delays the evaluation of an expression until its value is needed. **Lazy evaluation** allows for more efficient processing, avoiding processing of experessions that are not needed for the requested value, optimally combining multiple transformations, and sharing intermediate results when possible. As a rule, transformations describe a lazy process flow, and an **action** may trigger (part) of the flow to be executed.

In [2]:
elements = sc.parallelize(["Peter", "Mike", "James", "John", "Luke", "Phil", "Mike"])

The **collect()** action return all the elements of the RDD as a Python list.

In [3]:
elements.collect()

['Peter', 'Mike', 'James', 'John', 'Luke', 'Phil', 'Mike']

For inspection, the **first()** action return the first element as a value, and the **take(n)** action returns the first n elements as a Dataframe.

In [4]:
elements.first()

'Peter'

In [5]:
elements.take(5)

['Peter', 'Mike', 'James', 'John', 'Luke']

Using the **count()** action we can see the size of the RDD.

In [6]:
elements.count()

7

The **reduce(f)** action can be used with a custom function f to aggregate the values. The function f must accept two values and produce one. For example, we can use a max function.

In [10]:
elements.reduce(lambda x, y: max(x, y))

'Phil'

The **foreach(f)** function causes the function f to be used on every element in the RDD. The function f can for instance be an accumulator or store the results externally. The use of foreach falls outside the scope of this tutorial.

The **saveAsTextFile(directory)** action, stores the RRD in the given directory. If the filesystem is distributed (e.g. the Hadoop Distributed FS), the data is likely to be stored local to the nodes that hold the data, and replicated to avoid loss and to improve accessibility. But even using a local filesystem, the data is often stored in numbered parts.

In [12]:
import os, shutil
if os.path.isdir('elements'): # must remove if exists
    shutil.rmtree('elements')
elements.saveAsTextFile('elements')

Reading back a texfile

In [13]:
elements1 = sc.textFile('elements')
print(elements1.collect())

['Peter', 'Mike', 'James', 'John', 'Luke', 'Phil', 'Mike']


## Caching

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the **persist()** method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. A special case of **persist** is to use **cache** which forces the RDD to remain in memory. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

In [14]:
elements1.getStorageLevel()

StorageLevel(False, False, False, False, 1)

In [17]:
elements1.cache()
elements1.getStorageLevel() # lists: disk, memory, offheap, deserialized, #replications

StorageLevel(False, True, False, False, 1)

** Top ** 

retrieves the top elements according to a given function.

First, let's setup an RDD on the babynames collection, like in the previous tutorial.

In [22]:
babyrddprimitive = sc.textFile("../data/babynames.csv")
firstline = babyrddprimitive.first()
babyrddnofirstline = babyrddprimitive.filter(lambda x: x != firstline)
babyrdd = babyrddnofirstline.map(lambda x: x.split(','))

The use `top` to find the 5 most common babynames in 2013.

In [42]:
babyrdd.filter(lambda x: x[0] == '2013').top(5, lambda x: int(x[4]))

[['2013', 'DAVID', 'KINGS', 'M', '272'],
 ['2013', 'JAYDEN', 'KINGS', 'M', '268'],
 ['2013', 'MOSHE', 'KINGS', 'M', '219'],
 ['2013', 'JAYDEN', 'QUEENS', 'M', '219'],
 ['2013', 'ETHAN', 'QUEENS', 'M', '216']]