In [72]:
from pyspark import SparkContext
sc = SparkContext()

# Ways to create RDDs

### Parallelized Collections

In [3]:
data=("Hello, Welcome to Edureka".split(" "))

In [4]:
modified_data=sc.parallelize(data,2)

In [5]:
modified_data.collect()

['Hello,', 'Welcome', 'to', 'Edureka']

### From RDDs

In [25]:
r1=sc.parallelize("Happy Learning".split(" "))

In [26]:
r2=r1.map(lambda line: line.upper())

In [27]:
r2.collect()

['HAPPY', 'LEARNING']

### External Data

In [95]:
text_file=sc.textFile("/user/edureka_1165500/demo.txt")

In [96]:
r4=text_file.map(lambda line: line.upper())

In [97]:
r4.collect

<bound method PipelinedRDD.collect of PythonRDD[43] at RDD at PythonRDD.scala:48>

In [98]:
print(r4)

PythonRDD[43] at RDD at PythonRDD.scala:48


# Transformations: Create new dataset from an existing one

### MAP: returns new distributed dataset by passing each element of the datasource through the function

In [22]:
data=sc.parallelize("Hello Harry, How was your day?".split(" "))

In [23]:
modified_data=data.map(lambda word:(word, word[0], word.startswith("H")))

In [24]:
modified_data.collect()

[('Hello', 'H', True),
 ('Harry,', 'H', True),
 ('How', 'H', True),
 ('was', 'w', False),
 ('your', 'y', False),
 ('day?', 'd', False)]

### FILTER: returns a new dataset by selecting those elements of the source for which function returns true

In [25]:
modified_data.filter(lambda word:word[2]).take(2)

[('Hello', 'H', True), ('Harry,', 'H', True)]

### Distinct: returns a new dataset by removing duplicates

In [3]:
data=sc.parallelize("Welcome to edureka")

In [4]:
data.distinct()

PythonRDD[5] at RDD at PythonRDD.scala:48

In [28]:
data.distinct().count()

13

# Other Transformations

### Intersection

In [33]:
rdd1 = sc.parallelize(((1,"jan",2016),(3,"nov",2014), (16,"feb",2014)))
rdd2 = sc.parallelize(((5,"dec",2014),(1,"jan",2016)))

In [34]:
comman = rdd1.intersection(rdd2)

In [35]:
comman.collect()

[(1, 'jan', 2016)]

### flatmap: allows emitting more than one item in map function

In [5]:
RDD = sc.parallelize(["hello world","how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))

In [6]:
RDD_flatmap.collect()

['hello', 'world', 'how', 'are', 'you']

In [8]:
RDD = sc.parallelize(["hello world","how are you"])
RDD_map = RDD.map(lambda x: x.split(" "))

In [9]:
RDD_map.collect()

[['hello', 'world'], ['how', 'are', 'you']]

### Union: Union function returns a new dataset that contains the combination of elements present in the different datasets.

In [60]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 1, 6])
rdd3 = sc.parallelize([7, 2, 9])

rdd = sc.union([rdd1, rdd2, rdd3])
rdd.collect()


[1, 2, 3, 4, 1, 6, 7, 2, 9]

# pair RDDs in PySpark

Real life datasets are usually key/value pairs
Each row is a key and maps to one or more values
Pair RDD is a special data structure to work with this kind of datasets
Pair RDD: Key is the identier and value is data

### Creating pair RDDs

Two common ways to create pair RDDs:

From a list of key-value tuple
From a regular RDD

In [78]:
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)

In [79]:
my_list = ['Sam 23','Mary 34','Peter 25']
regularRDD = sc.parallelize(my_list)
pairRDD_RDD = regularRDD.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))

### groupBy: groups all the values with the same key in the pair RDD

In [43]:
airports = [("US","JFK"),("UK","LHR"),("FR","CDG"),("US","SFO")]
regularRDD = sc.parallelize(airports)
pairRDD_group = regularRDD.groupByKey().collect()
for cont, air in pairRDD_group:
    print(cont, list(air))

('FR', ['CDG'])
('UK', ['LHR'])
('US', ['SFO', 'JFK'])


### reduceBy: combines values with the same key

It runs parallel operations for each key in the dataset

In [44]:
regularRDD = sc.parallelize([("Messi", 23), ("Ronaldo", 34),("Neymar", 22), ("Messi", 24)])
pairRDD_reducebykey = regularRDD.reduceByKey(lambda x,y : x + y)
pairRDD_reducebykey.collect()


[('Messi', 47), ('Neymar', 22), ('Ronaldo', 34)]

### SORTBY: returns a new dataset by sorting the RDDs, orders pair RDD by key

It returns an RDD sorted by key in ascending or descending order


In [None]:
regularRDD = sc.parallelize([("Messi", 23), ("Ronaldo", 34),("Neymar", 22), ("Messi", 24)])

pairRDD_reducebykey = regularRDD.reduceByKey(lambda x,y : x + y)

pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0]))

pairRDD_reducebykey_rev.sortByKey(ascending=False).collect()

### key-value

In [43]:
words=sc.parallelize("Hello World, Welcome to edureka".split(" "))

In [44]:
keyword = words.keyBy(lambda word: word.lower()[0])

In [45]:
keyword.collect()

[('h', 'Hello'),
 ('w', 'World,'),
 ('w', 'Welcome'),
 ('t', 'to'),
 ('e', 'edureka')]

In [48]:
keyword.lookup("w")

['World,', 'Welcome']

In [49]:
keyword.mapValues(lambda word: word.upper()).collect() #operates only on values

[('h', 'HELLO'),
 ('w', 'WORLD,'),
 ('w', 'WELCOME'),
 ('t', 'TO'),
 ('e', 'EDUREKA')]

# Mathemarical Operations

In [57]:
a = sc . parallelize ([11, 12, 9, 7, 5, 10])
a.mean()

9.0

In [58]:
sc.parallelize([1, 2, 3, 5, 6, 7, 8, 9, 10]).variance()

8.88888888888889

In [59]:
sc.parallelize([39, 21, 34, 11, 5, 70]).sum()

180

In [60]:
sc.parallelize([10, 20, 30, 40, 50, 60, 70, 80, 90, 100]).stats()

(count: 10, mean: 55.0, stdev: 28.7228132327, max: 100.0, min: 10.0)

In [73]:
#general rdd functions

In [46]:
words=sc.parallelize("Python is one of the programming languages supported by Spark")

In [47]:
words.coalesce(1).getNumPartitions()

2

In [82]:
words=sc.parallelize("Python is one of the programming languages supported by Spark".split(" "))

In [83]:
numRange = sc.parallelize(range(10), 2)

In [84]:
ans=words.zip(numRange)

In [85]:
ans.collect()

[('Python', 0),
 ('is', 1),
 ('one', 2),
 ('of', 3),
 ('the', 4),
 ('programming', 5),
 ('languages', 6),
 ('supported', 7),
 ('by', 8),
 ('Spark', 9)]

In [86]:
words=sc.parallelize("Python is one of the programming languages supported by Spark".split(" "))

In [87]:
words.min()

'Python'

In [88]:
words.max()

'the'

In [95]:
#Joins

In [102]:
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])

In [103]:
joined = x.join(y)

In [107]:
joined.collect()

[('hadoop', (4, 5)), ('spark', (1, 2))]

# RDD Actions

Operation return a value after running a computation on the RDD

Basic RDD Actions
collect()
take(N)
rst()
count()

### collect() return all the elements of the dataset as an array

### take(N) returns an array with the first N elements of the dataset

In [73]:
rdd_map = sc.parallelize([1, 2, 3])

In [74]:
rdd_map.collect()

[1, 2, 3]

In [75]:
rdd_map.take(2)

[1, 2]

### first() prints the rst element of the RDD

### count() return the number of elements in the RDD


In [76]:
rdd_map.first()

1

In [77]:
rdd_map.count()

3

### More Actions

### reduce(func) action is used for aggregating the elements of a regular RDD

reduce(func) action is used for aggregating the elements of a regular RDD

The function should be commutative (changing the order of the operands does not change
the result) and associative

An example of reduce() action in PySpark

In [None]:
x = [1,3,4,6]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)

### saveAsTextFile() action

saves RDD into a text file inside a directory with each partition as a separate file

In [92]:
x = [1,3,4,6]
RDD8 = sc.parallelize(x)
RDD8.saveAsTextFile("/user/edureka_1165500/output.txt")

### coalesce() method can be used to save RDD as a single text file


In [None]:
x = [1,3,4,6]
RDD9 = sc.parallelize(x)
RDD9.coalesce(1).saveAsTextFile("/user/edureka_1165500/output.txt")

# Action Operations on pair RDDs

### countByKey() only available for type (K, V)

countByKey() action counts the number of elements for each key

Example of countByKey() on a simple list

In [93]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
for kee, val in rdd.countByKey().items():
    print(kee, val)

('a', 2)
('b', 1)


### collectAsMap() return the key-value pairs in the RDD as a dictionary

Example of collectAsMap() on a simple tuple

In [94]:
sc.parallelize([(1, 2), (3, 4)]).collectAsMap()

{1: 2, 3: 4}