# RDD Operations

RDDs support two types of operations: **_transformations_**, which create a new dataset from an existing one, and **_actions_**, which return a value to the driver program after running a computation on the dataset.

## Basics

In [0]:
file_name = '/FileStore/tables/transaction_detail.csv'
lines = sc.textFile(file_name)


In [0]:
#lines.collect()

In [0]:
lineLengths = lines.map(lambda s: len(s))
#lineLengths.collect()

In [0]:
totalLength = lineLengths.reduce(lambda a, b: a + b)
totalLength

Out[4]: 80852

## Passing Functions to Spark

Let Try to get count of all transaction_product_name

In [0]:
def myFunc(s):
    words = s.split(",")
    return (words[-1],1)


transaction_product_name = sc.textFile(file_name).map(myFunc)

In [0]:
#transaction_product_name.collect()

In [0]:
counts = transaction_product_name.reduceByKey(lambda a, b: a + b)
counts.collect()

Out[7]: [('Wrist Band', 97),
 ('TV', 88),
 ('Text Books', 86),
 ('LAN Cable', 80),
 ('HDMI Cable', 102),
 ('transaction_product_name', 1),
 ('Laptop', 86),
 ('TV Stand', 75),
 ('External Hard Drive', 82),
 ('Wrist Watch', 73),
 ('Mobile Phone', 84),
 ('Pen Drive', 78),
 ('Desktop Computer', 69)]

## closures



In [0]:

counter = 0
data = [1,2,3,4,5]
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

Counter value:  0


 
To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

## Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.


Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.



## RDD Persistence
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.


The available storage levels in Python include MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, DISK_ONLY_2, and DISK_ONLY_3.

## Shared Variables

Spark provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

### Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.


In [0]:
broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar.value

Out[9]: [1, 2, 3]

### Accumulators
Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.

In [0]:
accum = sc.accumulator(0)
accum

Out[10]: Accumulator<id=0, value=0>

In [0]:
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))

In [0]:
accum.value

Out[12]: 10

In [0]:
data = [1,2,3,4,5]
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
accum = sc.accumulator(0)
rdd.foreach(lambda x: accum.add(x))

print("accum value: ", accum)

accum value:  15


### AccumulatorParam

In [0]:
from pyspark.accumulators import AccumulatorParam

class ListParam(AccumulatorParam):    

    def zero(self, value):
        return []
    
    def addInPlace(self, variable, value):
        if not isinstance(value, list):
            value = [value]
        variable.extend(value)
        return variable

# Then, create an Accumulator of this type:
listAccum = sc.accumulator([],ListParam())
listAccum

Out[14]: Accumulator<id=2, value=[]>

In [0]:

rdd = sc.parallelize([1,2,3,4,5,6,7,8]) 


In [0]:
def g(x):
    global listAccum
    listAccum.add(x)
rdd.foreach(g)

print(listAccum.value)

[2, 4, 8, 3, 1, 6, 5, 7]
