In [1]:
import pyspark

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [4]:
sc.getConf().getAll()

[('spark.app.id', 'local-1646269895698'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.extraJavaOptions',
  '-Dio.netty.tryReflectionSetAccessible=true'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '04d404fbf5f6'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.startTime', '1646269894202'),
 ('spark.executor.extraJavaOptions',
  '-Dio.netty.tryReflectionSetAccessible=true'),
 ('spark.driver.port', '37431'),
 ('spark.ui.showConsoleProgress', 'true')]

In [8]:
my_rdd = sc.parallelize([1,2,3,4,5,6],9)

In [9]:
my_rdd.collect()

[1, 2, 3, 4, 5, 6]

In [12]:
type(my_rdd)

pyspark.rdd.RDD

#### Glom makes the rdd to Pipelinedrdd to store into partitions

In [10]:
my_rdd.glom().collect()

[[], [1], [2], [], [3], [4], [], [5], [6]]

In [11]:
type(my_rdd.glom())

pyspark.rdd.PipelinedRDD

### Aggregate Functions

In [15]:
my_rdd.max(), my_rdd.min()

(6, 1)

### RDD Actions and Transformations

#### Actions
collect()
count()
countByValue()
take()
top()
reduce()
fold()
foreach()
saveAsTextFile()

#### Transformations
map()
flatmap()
filter()
distinct()
reduceByKey()
groupByKey()
mapValues()
flatmapValues()
sortByKey()

In [16]:
my_rdd.count()

6

In [17]:
my_rdd.countByValue()

defaultdict(int, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1})

In [18]:
my_rdd.take(2)

[1, 2]

In [22]:
my_rdd.top(2)

[6, 5]

### reduce

In [None]:
# reduce is applied to a rdd and accepts a function that takes two arguments only and returns one Output
my_rdd.reduce

In [29]:
def fun(x,y):
    return x+y

In [35]:
my_rdd.reduce(fun)

21

In [36]:
my_rdd.collect()

[1, 2, 3, 4, 5, 6]

1+2 = 3 <br>
3+3 = 6 <br>
6+4 = 10 <br>
10+5 = 15<br>
15+6 = 21<br>

In [37]:
my_rdd.reduce(lambda a,b: a*b)

720

1 x 2 = 2 <br>
2 x 3 = 6 <br>
6 x 4 = 24 <br>
24 x 5 = 120 <br>
120 x 6 = 720 <br>

In [46]:
my_rdd.takeOrdered(2)

[1, 2]

In [47]:
my_rdd.takeOrdered(4)

[1, 2, 3, 4]

### fold

In [48]:
# this is same as reduce but additional argument is passed as fold value

In [49]:
my_rdd.fold(0,fun)

21

In [51]:
my_rdd.glom().collect()

[[], [1], [2], [], [3], [4], [], [5], [6]]

In [52]:
my_rdd.fold(1, fun)

31

In [53]:
my_rdd.fold(2, fun)

41

## Transformations to rdd

when we apply different transformation functions it will not generate a new rdd everytime it will only check the syntax and saves as  DAG (Direct Asyclic Graph)

Transformations are of two types
1. Narrow ( each partition (node) can run in parallel and generate result and finally combine all results generated by each partition)
2. Wide (all partitions should talk to each other partitions (nodes) and then generate the result)

#### Narrow Transformation
map, flatMap, mapPartiton, sample, union

#### Wide Transformation
intersection and join, distinct