In [1]:
import urllib.request as urllib
urllib.urlretrieve ("https://kannu.csc.fi/index.php/s/2FKIuCYWfzdc2hn/download", "HP.txt")
urllib.urlretrieve ("https://kannu.csc.fi/index.php/s/oU72rsNmnDqJn0d/download", "GoT.txt")

('GoT.txt', <http.client.HTTPMessage at 0x7f5019b75780>)

In [2]:
from pyspark import SparkContext
sc = SparkContext()

## CREATING RDDs USING PYTHON COLLECTIONS

In [3]:
sc.parallelize(['this', 'is', 'an', 'example'])

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

In [8]:
range_rdd = sc.parallelize(range(1,100))
#range_rdd.collect() #muuttaa rdd:n takaisin python listaksi, pakko tehdä jos haluaa nähdä sisällön
range_rdd.count()

99

## CREATING RDDs USING TEXT FILES

In [9]:
rdd_hp = sc.textFile('HP.txt')

In [10]:
rdd_hp # Check the type of the RDD

MapPartitionsRDD[9] at textFile at NativeMethodAccessorImpl.java:-2

In [11]:
# Check the contents of the rdd created
rdd_hp.collect()

['The place where things are hidden',
 'If you have to ask you will never know',
 'If you know you need only ask',
 'The hidden room of requirement']

In [12]:
# Check the total elements in the RDD
rdd_hp.count()

4

## SPARK TRANSFORMATIONS & ACTIONS

#### map(func) : Return a new distributed dataset formed by passing each element of the source through a function func. 

In [13]:
ten_range_rdd = sc.parallelize(range(1,10)) # [1,2,.....9]
ten_range_rdd_mapped = ten_range_rdd.map(lambda x: (x, x)) # RDD created from an RDD
ten_range_rdd_mapped.collect()  # collect is an action!

[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]

In [None]:
#lambda lyhentää kirjoittamista
#def nimi(x):
#    return (x, x)

In [14]:
ten_range_rdd_mapped_again = ten_range_rdd_mapped.map(lambda x: (x[0]*2, x[1])).collect() # Python collection
print(ten_range_rdd_mapped_again)

[(2, 1), (4, 2), (6, 3), (8, 4), (10, 5), (12, 6), (14, 7), (16, 8), (18, 9)]


#### flatMap(func) : Similar to map, but flattens the final result. 

In [16]:
ten_range_rdd_flatmapped = ten_range_rdd_mapped.flatMap(lambda x: (x[0], x[1] + 1))
ten_range_rdd_flatmapped.collect()

[1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10]

#### filter(func) : return a new dataset formed by selecting those elements of the source on which func returns true. 

In [17]:
rdd_hp.collect() # Get back to the RDD we created

['The place where things are hidden',
 'If you have to ask you will never know',
 'If you know you need only ask',
 'The hidden room of requirement']

In [18]:
rdd_hp_filtered = rdd_hp.filter(lambda line: line.startswith('If'))
rdd_hp_filtered.collect()

['If you have to ask you will never know', 'If you know you need only ask']

##### USER EXERCISE

In [23]:
#Convert all the lines to Uppercase
rdd_hp_upper = rdd_hp.map(lambda line: line.upper())
rdd_hp_upper.collect()

['THE PLACE WHERE THINGS ARE HIDDEN',
 'IF YOU HAVE TO ASK YOU WILL NEVER KNOW',
 'IF YOU KNOW YOU NEED ONLY ASK',
 'THE HIDDEN ROOM OF REQUIREMENT']

In [26]:
# Remove the lines which contain the word "HIDDEN” in it
rdd_no_hidden = rdd_hp.filter(lambda line: "hidden" not in line)
rdd_no_hidden.collect()

['If you have to ask you will never know', 'If you know you need only ask']

In [70]:
# Arrange all the words in a single list using a one-liner and count the total number of words
rdd_one_liner = rdd_hp.flatMap(lambda x: x.split())
rdd_one_liner.collect()

['The',
 'place',
 'where',
 'things',
 'are',
 'hidden',
 'If',
 'you',
 'have',
 'to',
 'ask',
 'you',
 'will',
 'never',
 'know',
 'If',
 'you',
 'know',
 'you',
 'need',
 'only',
 'ask',
 'The',
 'hidden',
 'room',
 'of',
 'requirement']

In [72]:
# Arrange the words with the length of each word in a tuple for eg. ("word", 4)
rdd_results = rdd_one_liner.map(lambda x: (x, len(x)))
rdd_results.collect()

[('The', 3),
 ('place', 5),
 ('where', 5),
 ('things', 6),
 ('are', 3),
 ('hidden', 6),
 ('If', 2),
 ('you', 3),
 ('have', 4),
 ('to', 2),
 ('ask', 3),
 ('you', 3),
 ('will', 4),
 ('never', 5),
 ('know', 4),
 ('If', 2),
 ('you', 3),
 ('know', 4),
 ('you', 3),
 ('need', 4),
 ('only', 4),
 ('ask', 3),
 ('The', 3),
 ('hidden', 6),
 ('room', 4),
 ('of', 2),
 ('requirement', 11)]

#### groupByKey() : When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable &lt;V&gt;) pairs. 

In [73]:
rdd_places = sc.parallelize([("Finland", "Helsinki"), ("Norway", "Oslo"), ("Sweden", "Stockholm"),
                             ("Denmark", "Copenhagen"), ("Norway", "Bergen"), ("Finland", "Tampere"),
                             ("Denmark", "Aarhus"), ("Finland", "Turku")])
rdd_places.collect()

[('Finland', 'Helsinki'),
 ('Norway', 'Oslo'),
 ('Sweden', 'Stockholm'),
 ('Denmark', 'Copenhagen'),
 ('Norway', 'Bergen'),
 ('Finland', 'Tampere'),
 ('Denmark', 'Aarhus'),
 ('Finland', 'Turku')]

In [74]:
rdd_places_grouped = rdd_places.groupByKey()
rdd_places_grouped.collect()

[('Denmark', <pyspark.resultiterable.ResultIterable at 0x7f5007068208>),
 ('Finland', <pyspark.resultiterable.ResultIterable at 0x7f5007068710>),
 ('Sweden', <pyspark.resultiterable.ResultIterable at 0x7f5007068438>),
 ('Norway', <pyspark.resultiterable.ResultIterable at 0x7f50070687f0>)]

In [75]:
# User readable format
rdd_places_grouped.mapValues(list).collect()

[('Denmark', ['Copenhagen', 'Aarhus']),
 ('Finland', ['Helsinki', 'Tampere', 'Turku']),
 ('Sweden', ['Stockholm']),
 ('Norway', ['Oslo', 'Bergen'])]

#### join(otherDataset, [numTasks]) : When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported

In [78]:
rdd_places_other = sc.parallelize([("Finland", "Espoo"), ("Norway", "Stavanger"), ("Sweden", "Gothenburg"), ("Finland", "Vantaa")])
rdd_places_other.collect()

[('Finland', 'Espoo'),
 ('Norway', 'Stavanger'),
 ('Sweden', 'Gothenburg'),
 ('Finland', 'Vantaa')]

In [79]:
rdd_places.join(rdd_places_other).collect()

[('Norway', ('Oslo', 'Stavanger')),
 ('Norway', ('Bergen', 'Stavanger')),
 ('Finland', ('Helsinki', 'Espoo')),
 ('Finland', ('Helsinki', 'Vantaa')),
 ('Finland', ('Tampere', 'Espoo')),
 ('Finland', ('Tampere', 'Vantaa')),
 ('Finland', ('Turku', 'Espoo')),
 ('Finland', ('Turku', 'Vantaa')),
 ('Sweden', ('Stockholm', 'Gothenburg'))]

In [80]:
rdd_places_grouped.mapValues(list).join(rdd_places_other).collect()

[('Norway', (['Oslo', 'Bergen'], 'Stavanger')),
 ('Finland', (['Helsinki', 'Tampere', 'Turku'], 'Espoo')),
 ('Finland', (['Helsinki', 'Tampere', 'Turku'], 'Vantaa')),
 ('Sweden', (['Stockholm'], 'Gothenburg'))]

#### cogroup(otherDataset, [numTasks]) : When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt; , Iterable&lt;W&gt;)) tuples.

In [83]:
rdd_places_other_cogrouped = rdd_places.cogroup(rdd_places_other)
rdd_places_other_cogrouped.collect()

[('Denmark',
  (<pyspark.resultiterable.ResultIterable at 0x7f5007170ef0>,
   <pyspark.resultiterable.ResultIterable at 0x7f500705ec88>)),
 ('Norway',
  (<pyspark.resultiterable.ResultIterable at 0x7f5007170898>,
   <pyspark.resultiterable.ResultIterable at 0x7f500705e0f0>)),
 ('Finland',
  (<pyspark.resultiterable.ResultIterable at 0x7f500705ec18>,
   <pyspark.resultiterable.ResultIterable at 0x7f500705e4e0>)),
 ('Sweden',
  (<pyspark.resultiterable.ResultIterable at 0x7f500705e128>,
   <pyspark.resultiterable.ResultIterable at 0x7f500705ef28>))]

In [82]:
rdd_places_other_cogrouped.map(lambda x: (x[0], (list(x[1][0]), list(x[1][1])))).collect()

[('Denmark', (['Copenhagen', 'Aarhus'], [])),
 ('Norway', (['Oslo', 'Bergen'], ['Stavanger'])),
 ('Finland', (['Helsinki', 'Tampere', 'Turku'], ['Espoo', 'Vantaa'])),
 ('Sweden', (['Stockholm'], ['Gothenburg']))]

#### sortByKey([ascending], [numTasks]) : When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

In [84]:
# Sort by key
rdd_places.sortByKey().collect()

[('Denmark', 'Copenhagen'),
 ('Denmark', 'Aarhus'),
 ('Finland', 'Helsinki'),
 ('Finland', 'Tampere'),
 ('Finland', 'Turku'),
 ('Norway', 'Oslo'),
 ('Norway', 'Bergen'),
 ('Sweden', 'Stockholm')]

#### reduceByKey(func, [numTasks]) 	When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V.

In [89]:
rdd_reduce_example = sc.parallelize([("Messi", 2), ("Suarez", 2), ("Ronaldo", 0), ("Ronaldo", 3), ("Messi", 2)])
rdd_reduce_example_reduced = rdd_reduce_example.reduceByKey(lambda a,b: a+b)
rdd_reduce_example_reduced.collect()

[('Ronaldo', 3), ('Messi', 4), ('Suarez', 2)]

In [88]:
# Sort by value
rdd_reduce_example.sortBy(lambda x: x[1]).collect()

[('Ronaldo', 0), ('Messi', 2), ('Suarez', 2), ('Messi', 2), ('Ronaldo', 3)]

## SOME RDD ACTIONS

In [90]:
rdd_places.collect()

[('Finland', 'Helsinki'),
 ('Norway', 'Oslo'),
 ('Sweden', 'Stockholm'),
 ('Denmark', 'Copenhagen'),
 ('Norway', 'Bergen'),
 ('Finland', 'Tampere'),
 ('Denmark', 'Aarhus'),
 ('Finland', 'Turku')]

In [91]:
rdd_places.count()

8

In [92]:
rdd_places.first()

('Finland', 'Helsinki')

In [93]:
rdd_places.take(2)

[('Finland', 'Helsinki'), ('Norway', 'Oslo')]

In [94]:
rdd_places.takeSample(True, 2)

[('Finland', 'Tampere'), ('Denmark', 'Copenhagen')]

In [95]:
rdd_places.takeOrdered(2)

[('Denmark', 'Aarhus'), ('Denmark', 'Copenhagen')]

In [96]:
rdd_places.countByKey()

defaultdict(int, {'Denmark': 2, 'Finland': 3, 'Norway': 2, 'Sweden': 1})