In [1]:
import spark.sparkContext as sc

#### RDD Operations
**Transformations**: operations applied on an RDD to create a new RDD.
- map( )
- flatMap( )
- filter( )
- mapPartition( )
- mapPartitionWithIndex( )
- sample( )
- union( )
- intersection( )
- distinct( )
- groupByKey( )
- reduceByKey( )
- aggregateByKey( )
- sortByKey()
- join( )
- cogroup( )
- cartesian( )
- pipe( )
- coalesce( )
- repartition( ) 
- repartitionAndSortWithinPartitions( )

<!-- 
and some of the most common actions are reduce(), collect(), first(), take(), count(), saveAsHadoopFile()

**Actions**: operations applied on RDD to get outputs
- reduce( ): for cumulative sum
- collect( )
- first( )
- take( ) 
- count( )
- saveAsHadoopFile( ) -->

map

In [4]:
def addOne(x):
  return x+1

a_list = [1, 2, 3, 4, 5]
a_list_rdd = sc.parallelize(a_list)

a_list_rdd.map(addOne).collect()

flatMap

In [6]:
a_list = [1, 2, 3, 4, 5]
a_list_rdd = sc.parallelize(a_list)
a_list_rdd.flatMap(lambda x: [x, x*x]).collect() 

difference between map() and flatMap()

In [8]:
a_list = [2, 4, 6, 8]

map_output  = sc.parallelize(a_list).map(lambda x: [x,  x*x]).collect() 

flatmap_output = sc.parallelize(a_list).flatMap(lambda x: [x, x*x]).collect() 


print("Output of map() is a nested list:")
print(map_output)
print(" ")
print("Output of map() is a flattened list:")
print(flatmap_output)

filter

In [10]:
word_list = ['python', 'scala', 'java', 'c', 'spark', 'pyspark', 'sparkR', 'sparkling']
word_list_rdd = sc.parallelize (word_list)

word_filtered = word_list_rdd.filter(lambda x: 'spark' in x)
word_filtered.collect()

partition  
Return a new RDD by applying a function to each partition of this RDD.

In [12]:
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): 
  yield sum(iterator)

rdd.mapPartitions(f).collect()

mapPartitionsWithIndex  
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

In [14]:
rdd = sc.parallelize([1, 2, 3, 4], 4)
def f(splitIndex, iterator): 
  yield splitIndex
rdd.mapPartitionsWithIndex(f).sum()

sample(withReplacement, fraction, seed=None)  
Return a sampled subset of this RDD.

In [16]:
rdd = sc.parallelize(range(100), 4)
6 <= rdd.sample(False, 0.1, 81).count() <= 14

sampleByKey(withReplacement, fractions, seed=None)  
Return a subset of this RDD sampled by key (via stratified sampling). Create a sample of this RDD using variable sampling rates for different keys as specified by fractions, a key to sampling rate map.

In [18]:
fractions = {"a": 0.2, "b": 0.1}
rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())

In [19]:
100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150

In [20]:
max(sample["a"]) <= 999 and min(sample["a"]) >= 0

In [21]:
max(sample["b"]) <= 999 and min(sample["b"]) >= 0

sampleStdev()   
Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).

In [23]:
sc.parallelize([1, 2, 3]).sampleStdev()

In [24]:
sc.parallelize([5, 20, 100]).sampleStdev()

sampleVariance()    
Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).

In [26]:
sc.parallelize([1, 2, 3]).sampleVariance()

In [27]:
sc.parallelize([5, 20, 100]).sampleVariance()