# Anagrams

In this exercise we reiterate over the previous Anagrams example from TP1.
We run a local Spark program to solve the exercise.

Let's start by importing the Spark Context:

In [2]:
from pyspark import SparkContext

Next, we create an instance of SparkContext:

In [3]:
sc = SparkContext()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Now we want to get our data from HDFS.
Let's check whether the file is already there:

> Note that by starting the line with `!` we run a bash command instead of python code.

In [4]:
!hadoop fs -ls common_words_en_subset.txt

-rw-r--r--   1 vagrant supergroup       9301 2022-12-27 04:53 common_words_en_subset.txt


If the file don't show up you can put it on HDFS:

In [None]:
!hadoop fs -put /vagrant/tp/1/common_words_en_subset.txt

Next, we load the data into a Spark RDD (note that this command is lazily evaluated)

In [5]:
words = sc.textFile('common_words_en_subset.txt')
words

common_words_en_subset.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

Let's display the content of the `words` RDD:

In [6]:
words.collect()

                                                                                

['acrimonious',
 'aerodyne',
 'acariasis',
 'actually',
 'abomasum',
 'achene',
 'abscess',
 'adorn',
 'acentric',
 'absentee',
 'afterheat',
 'abbreviated',
 'aft',
 'adrift',
 'adsorbate',
 'acquirement',
 'acheron',
 'accelerator',
 'abohm',
 'aargau',
 'afoot',
 'aegean',
 'acrimony',
 'actuality',
 'aeolipile',
 'acrobatics',
 'acceptation',
 'affinal',
 'aeneid',
 'acquit',
 'affectionate',
 'afrikaans',
 'abundant',
 'abducent',
 'acrodont',
 'aforethought',
 'adulterate',
 'acolyte',
 'adown',
 'abbatial',
 'addams',
 'teatime',
 'adverse',
 'abele',
 'aerostation',
 'afro',
 'accompaniment',
 'afeard',
 'afterthought',
 'afterworld',
 'ablate',
 'achaea',
 'abscond',
 'accountable',
 'acerbity',
 'acrocarpous',
 'aerostatics',
 'aegisthus',
 'adjust',
 'acetyl',
 'acquainted',
 'account',
 'africanize',
 'abstracted',
 'aerobic',
 'absorber',
 'aeromedical',
 'admetus',
 'acromion',
 'acuminate',
 'aa',
 'afrikaner',
 'addictive',
 'accentuation',
 'a',
 'abecedarian',
 'accre

Observe the results; make sure the data has been successfully loaded.
Then, run the map transformation:

In [7]:
tuples = words.map(lambda x: (''.join(sorted(list(x))), x))
tuples.collect()

                                                                                

[('aciimnoorsu', 'acrimonious'),
 ('adeenory', 'aerodyne'),
 ('aaaciirss', 'acariasis'),
 ('aaclltuy', 'actually'),
 ('aabmmosu', 'abomasum'),
 ('aceehn', 'achene'),
 ('abcesss', 'abscess'),
 ('adnor', 'adorn'),
 ('acceinrt', 'acentric'),
 ('abeeenst', 'absentee'),
 ('aaeefhrtt', 'afterheat'),
 ('aabbdeeirtv', 'abbreviated'),
 ('aft', 'aft'),
 ('adfirt', 'adrift'),
 ('aabdeorst', 'adsorbate'),
 ('aceeimnqrtu', 'acquirement'),
 ('acehnor', 'acheron'),
 ('aacceelorrt', 'accelerator'),
 ('abhmo', 'abohm'),
 ('aaagru', 'aargau'),
 ('afoot', 'afoot'),
 ('aaeegn', 'aegean'),
 ('acimnory', 'acrimony'),
 ('aacilttuy', 'actuality'),
 ('aeeiillop', 'aeolipile'),
 ('aabcciorst', 'acrobatics'),
 ('aacceinoptt', 'acceptation'),
 ('aaffiln', 'affinal'),
 ('adeein', 'aeneid'),
 ('aciqtu', 'acquit'),
 ('aaceeffinott', 'affectionate'),
 ('aaafiknrs', 'afrikaans'),
 ('aabdnntu', 'abundant'),
 ('abcdentu', 'abducent'),
 ('acdnoort', 'acrodont'),
 ('aefghhoorttu', 'aforethought'),
 ('aadeelrttu', 'adulter

Then, run the groupByKey (the rough equivalent to Hadoop’s « shuffle »):

In [8]:
grouped = tuples.groupByKey().mapValues(lambda x: list(x))
grouped.collect()

                                                                                

[('adeenory', ['aerodyne']),
 ('aceehn', ['achene']),
 ('abcesss', ['abscess']),
 ('adnor', ['adorn']),
 ('adfirt', ['adrift']),
 ('aabdeorst', ['adsorbate']),
 ('aceeimnqrtu', ['acquirement']),
 ('acehnor', ['acheron']),
 ('abhmo', ['abohm', 'abmho']),
 ('aaagru', ['aargau']),
 ('aacilttuy', ['actuality']),
 ('aabcciorst', ['acrobatics']),
 ('aacceinoptt', ['acceptation']),
 ('aaffiln', ['affinal']),
 ('adeein', ['aeneid']),
 ('aciqtu', ['acquit']),
 ('aaafiknrs', ['afrikaans']),
 ('aabdnntu', ['abundant']),
 ('abcdentu', ['abducent']),
 ('acdnoort', ['acrodont']),
 ('aceloty', ['acolyte']),
 ('aaabbilt', ['abbatial']),
 ('aaddms', ['addams']),
 ('aeeimtt', ['teatime']),
 ('adeersv', ['adverse']),
 ('aadefr', ['afeard']),
 ('aefghhortttu', ['afterthought']),
 ('aaaceh', ['achaea']),
 ('aabccelnotu', ['accountable']),
 ('abceirty', ['acerbity']),
 ('aaccooprrsu', ['acrocarpous']),
 ('adjstu', ['adjust']),
 ('accnotu', ['account']),
 ('aacefiinrz', ['africanize']),
 ('aacdeeilmor', ['ae

Observe the results. Then, run filter:

In [9]:
filtered = grouped.filter(lambda x: len(x[1])>1)
filtered.collect()

[('abhmo', ['abohm', 'abmho']),
 ('ace', ['aec', 'ace']),
 ('abel', ['abel', 'able']),
 ('deer', ['deer', 'reed']),
 ('aabdor', ['aboard', 'abroad']),
 ('elmno', ['lemon', 'melon']),
 ('acceortu', ['accoutre', 'accouter']),
 ('abilnotu', ['ablution', 'abutilon']),
 ('aekl', ['leak', 'lake']),
 ('abdeo', ['abode', 'adobe'])]

Then, let's run mapValues:

In [10]:
res1 = filtered.mapValues(lambda x: ", ".join(x))
res1.collect()

[('abhmo', 'abohm, abmho'),
 ('ace', 'aec, ace'),
 ('abel', 'abel, able'),
 ('deer', 'deer, reed'),
 ('aabdor', 'aboard, abroad'),
 ('elmno', 'lemon, melon'),
 ('acceortu', 'accoutre, accouter'),
 ('abilnotu', 'ablution, abutilon'),
 ('aekl', 'leak, lake'),
 ('abdeo', 'abode, adobe')]

Finally, also apply mapValues to the unfiltered RDD and save both datasets on
disk:

In [11]:
res2 = grouped.mapValues(lambda x: ", ".join(x))
res2.collect()

[('adeenory', 'aerodyne'),
 ('aceehn', 'achene'),
 ('abcesss', 'abscess'),
 ('adnor', 'adorn'),
 ('adfirt', 'adrift'),
 ('aabdeorst', 'adsorbate'),
 ('aceeimnqrtu', 'acquirement'),
 ('acehnor', 'acheron'),
 ('abhmo', 'abohm, abmho'),
 ('aaagru', 'aargau'),
 ('aacilttuy', 'actuality'),
 ('aabcciorst', 'acrobatics'),
 ('aacceinoptt', 'acceptation'),
 ('aaffiln', 'affinal'),
 ('adeein', 'aeneid'),
 ('aciqtu', 'acquit'),
 ('aaafiknrs', 'afrikaans'),
 ('aabdnntu', 'abundant'),
 ('abcdentu', 'abducent'),
 ('acdnoort', 'acrodont'),
 ('aceloty', 'acolyte'),
 ('aaabbilt', 'abbatial'),
 ('aaddms', 'addams'),
 ('aeeimtt', 'teatime'),
 ('adeersv', 'adverse'),
 ('aadefr', 'afeard'),
 ('aefghhortttu', 'afterthought'),
 ('aaaceh', 'achaea'),
 ('aabccelnotu', 'accountable'),
 ('abceirty', 'acerbity'),
 ('aaccooprrsu', 'acrocarpous'),
 ('adjstu', 'adjust'),
 ('accnotu', 'account'),
 ('aacefiinrz', 'africanize'),
 ('aacdeeilmor', 'aeromedical'),
 ('ademstu', 'admetus'),
 ('aaceimntu', 'acuminate'),
 ('a

In [12]:
res1.saveAsTextFile('res-words-filtered')
res2.saveAsTextFile('res-words-unfiltered')

                                                                                

Let's check that the results were written on HDFS:

In [13]:
!hadoop fs -ls res-words-filtered

Found 3 items
-rw-r--r--   1 vagrant supergroup          0 2022-12-27 10:40 res-words-filtered/_SUCCESS
-rw-r--r--   1 vagrant supergroup         69 2022-12-27 10:40 res-words-filtered/part-00000
-rw-r--r--   1 vagrant supergroup        197 2022-12-27 10:40 res-words-filtered/part-00001


In [14]:
!hadoop fs -ls res-words-unfiltered

Found 3 items
-rw-r--r--   1 vagrant supergroup          0 2022-12-27 10:41 res-words-unfiltered/_SUCCESS
-rw-r--r--   1 vagrant supergroup      12660 2022-12-27 10:41 res-words-unfiltered/part-00000
-rw-r--r--   1 vagrant supergroup      12883 2022-12-27 10:40 res-words-unfiltered/part-00001


In [15]:
!hadoop fs -cat res-words-filtered/*

('abhmo', 'abohm, abmho')
('ace', 'aec, ace')
('abel', 'abel, able')
('deer', 'deer, reed')
('aabdor', 'aboard, abroad')
('elmno', 'lemon, melon')
('acceortu', 'accoutre, accouter')
('abilnotu', 'ablution, abutilon')
('aekl', 'leak, lake')
('abdeo', 'abode, adobe')


In [16]:
!hadoop fs -cat res-words-unfiltered/*

('adeenory', 'aerodyne')
('aceehn', 'achene')
('abcesss', 'abscess')
('adnor', 'adorn')
('adfirt', 'adrift')
('aabdeorst', 'adsorbate')
('aceeimnqrtu', 'acquirement')
('acehnor', 'acheron')
('abhmo', 'abohm, abmho')
('aaagru', 'aargau')
('aacilttuy', 'actuality')
('aabcciorst', 'acrobatics')
('aacceinoptt', 'acceptation')
('aaffiln', 'affinal')
('adeein', 'aeneid')
('aciqtu', 'acquit')
('aaafiknrs', 'afrikaans')
('aabdnntu', 'abundant')
('abcdentu', 'abducent')
('acdnoort', 'acrodont')
('aceloty', 'acolyte')
('aaabbilt', 'abbatial')
('aaddms', 'addams')
('aeeimtt', 'teatime')
('adeersv', 'adverse')
('aadefr', 'afeard')
('aefghhortttu', 'afterthought')
('aaaceh', 'achaea')
('aabccelnotu', 'accountable')
('abceirty', 'acerbity')
('aaccooprrsu', 'acrocarpous')
('adjstu', 'adjust')
('accnotu', 'account')
('aacefiinrz', 'africanize')
('aacdeeilmor', 'aeromedical')
('ademstu', 'admetus')
('aaceimntu', 'acuminate')
('aa', 'aa')
('aaabcdeeinr', 'abecedaria

Question: Why is the `grouped` RDD executed twice? How to force Spark to evaluate the `grouped` RDD only once?

You can also of course try out various other operations described during the course to familiarize yourself with them.