# Spark exercises
from docs
https://spark.apache.org/examples.html

* [textsearch exercise](#textsearch)
* [wordcount exercise](#wordcount)
* [flatMap vs map example](#flatmap)
* [total characters in a file](#totalcount)
* [estimate pi](#pi)

note that you have to import Row from pyspark.sql and col from pyspark.sql.functions

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [2]:
%%writefile example.txt
03/22 08:51:01 INFO   :.main: *************** RSVP Agent started ***************
03/22 08:51:01 INFO   :...locate_configFile: Specified configuration file: /u/user10/rsvpd1.conf
03/22 08:51:01 INFO   :.main: Using log level 511
03/22 08:51:01 INFO   :..settcpimage: Get TCP images rc - EDC8112I Operation not supported on socket.
03/22 08:51:01 INFO   :ERROR..MySQLsettcpimage: Associate with TCP/IP image name = TCPCS
03/22 08:51:02 INFO   :..reg_process: registering process with the system
03/22 08:51:02 INFO   :..reg_process: attempt OS/390 registration
03/22 08:51:02 INFO   :ERROR..MySQLreg_process: return from registration rc=0

Writing example.txt


#### create an RDD using the textfile method. You can now perform OPERATIONS on it
#### textfile is the RDD. sc is the spark context that connects to a spark cluster
#### call the textFile method to call the example.txt file to create the RDD object

In [None]:
# create an RDD using the textfile method. You can now perform OPERATIONS on it
# textfile is the RDD. sc is the spark context that connects to a spark cluster
# call the textFile method to call the example.txt file to create the RDD object
textFile = sc.textFile('example.txt')

#### we can perform actions or transformations on RDD objects
actions return values
transformations return pointers to new RDDs

* count number of elements in RDD
* each element is a line

In [6]:
textFile.count()

8

grab first line

In [7]:
textFile.first()

'03/22 08:51:01 INFO   :.main: *************** RSVP Agent started ***************'

transformation that returns new RDD with what you want. Use the filter method for example

Transformations are like recipes. they are not evaluated until you say so

In [8]:
find_error = textFile.filter(lambda line: 'ERROR' in line)

In [9]:
find_error

PythonRDD[5] at RDD at PythonRDD.scala:53

In [10]:
find_error.collect()

['03/22 08:51:01 INFO   :ERROR..MySQLsettcpimage: Associate with TCP/IP image name = TCPCS',
 '03/22 08:51:02 INFO   :ERROR..MySQLreg_process: return from registration rc=0']

<a id='textsearch'></a>
## Textsearch example: making the RDD a Dataframe and using the dataframe API

In [17]:
# Creates a DataFrame having a single column named "line"
from pyspark.sql import *

df = textFile.map(lambda r: Row(r)).toDF(["line"])

In [14]:
df.show()

+--------------------+
|                line|
+--------------------+
|03/22 08:51:01 IN...|
|03/22 08:51:01 IN...|
|03/22 08:51:01 IN...|
|03/22 08:51:01 IN...|
|03/22 08:51:01 IN...|
|03/22 08:51:02 IN...|
|03/22 08:51:02 IN...|
|03/22 08:51:02 IN...|
+--------------------+



filter the new dataframe so it only shows the rows that have ERROR

In [19]:
from pyspark.sql.functions import col

errors = df.filter(col("line").like("%ERROR%"))

In [20]:
errors.count()

2

In [21]:
errors.filter(col("line").like("%MySQL%")).count()

2

In [22]:
errors.filter(col("line").like("%MySQL%")).collect()

[Row(line='03/22 08:51:01 INFO   :ERROR..MySQLsettcpimage: Associate with TCP/IP image name = TCPCS'),
 Row(line='03/22 08:51:02 INFO   :ERROR..MySQLreg_process: return from registration rc=0')]

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import col

# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()

<a id='wordcount'></a>
# Wordcount example

In [24]:
%%writefile dickens.txt
My father's family name being Pirrip, and my Christian name Philip, my
infant tongue could make of both names nothing longer or more explicit
than Pip. So, I called myself Pip, and came to be called Pip.

I give Pirrip as my father's family name, on the authority of his
tombstone and my sister,--Mrs. Joe Gargery, who married the blacksmith.
As I never saw my father or my mother, and never saw any likeness
of either of them (for their days were long before the days of
photographs), my first fancies regarding what they were like were
unreasonably derived from their tombstones. The shape of the letters on
my father's, gave me an odd idea that he was a square, stout, dark man,
with curly black hair. From the character and turn of the inscription,
“Also Georgiana Wife of the Above,” I drew a childish conclusion that
my mother was freckled and sickly. To five little stone lozenges, each
about a foot and a half long, which were arranged in a neat row beside
their grave, and were sacred to the memory of five little brothers of
mine,--who gave up trying to get a living, exceedingly early in
that universal struggle,--I am indebted for a belief I religiously
entertained that they had all been born on their backs with their hands
in their trousers-pockets, and had never taken them out in this state of
existence.

Ours was the marsh country, down by the river, within, as the river
wound, twenty miles of the sea. My first most vivid and broad impression
of the identity of things seems to me to have been gained on a memorable
raw afternoon towards evening. At such a time I found out for certain
that this bleak place overgrown with nettles was the churchyard; and
that Philip Pirrip, late of this parish, and also Georgiana wife of the
above, were dead and buried; and that Alexander, Bartholomew, Abraham,
Tobias, and Roger, infant children of the aforesaid, were also dead
and buried; and that the dark flat wilderness beyond the churchyard,
intersected with dikes and mounds and gates, with scattered cattle
feeding on it, was the marshes; and that the low leaden line beyond
was the river; and that the distant savage lair from which the wind was
rushing was the sea; and that the small bundle of shivers growing afraid
of it all and beginning to cry, was Pip.

Writing dickens.txt


In [55]:
text_file = sc.textFile('dickens.txt')
# for each line split on space. this splits the line into words. flatmap returns all the words in one list of words
counts = text_file.flatMap(lambda line: line.split(" "))

In [57]:
# for each word, make a tuple with the word and a count of 1. notice that map returns a list of tuples.
# flatMap would return a list of all the elements ['my',1,'fathers',1...]
counts = counts.map(lambda word: (word, 1))

In [59]:
# reduceByKey takes all the words(keys) that are the same and adds their values
# since all the values are 1, the result is the count of each word
counts = counts.reduceByKey(lambda a, b: a + b)

In [60]:
# see the result
counts.collect()

[('My', 2),
 ("father's", 2),
 ('family', 2),
 ('name', 2),
 ('being', 1),
 ('Pirrip,', 2),
 ('and', 23),
 ('my', 9),
 ('Christian', 1),
 ('Philip,', 1),
 ('infant', 2),
 ('tongue', 1),
 ('could', 1),
 ('make', 1),
 ('of', 19),
 ('both', 1),
 ('names', 1),
 ('nothing', 1),
 ('longer', 1),
 ('or', 2),
 ('more', 1),
 ('explicit', 1),
 ('than', 1),
 ('Pip.', 3),
 ('So,', 1),
 ('I', 6),
 ('called', 2),
 ('myself', 1),
 ('Pip,', 1),
 ('came', 1),
 ('to', 6),
 ('be', 1),
 ('', 2),
 ('give', 1),
 ('Pirrip', 1),
 ('as', 2),
 ('name,', 1),
 ('on', 5),
 ('the', 25),
 ('authority', 1),
 ('his', 1),
 ('tombstone', 1),
 ('sister,--Mrs.', 1),
 ('Joe', 1),
 ('Gargery,', 1),
 ('who', 1),
 ('married', 1),
 ('blacksmith.', 1),
 ('As', 1),
 ('never', 3),
 ('saw', 2),
 ('father', 1),
 ('mother,', 1),
 ('any', 1),
 ('likeness', 1),
 ('either', 1),
 ('them', 2),
 ('(for', 1),
 ('their', 6),
 ('days', 2),
 ('were', 7),
 ('long', 1),
 ('before', 1),
 ('photographs),', 1),
 ('first', 2),
 ('fancies', 1),
 ('re

In [71]:
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))

My: 2
father's: 2
family: 2
name: 2
being: 1
Pirrip,: 2
and: 23
my: 9
Christian: 1
Philip,: 1
infant: 2
tongue: 1
could: 1
make: 1
of: 19
both: 1
names: 1
nothing: 1
longer: 1
or: 2
more: 1
explicit: 1
than: 1
Pip.: 3
So,: 1
I: 6
called: 2
myself: 1
Pip,: 1
came: 1
to: 6
be: 1
: 2
give: 1
Pirrip: 1
as: 2
name,: 1
on: 5
the: 25
authority: 1
his: 1
tombstone: 1
sister,--Mrs.: 1
Joe: 1
Gargery,: 1
who: 1
married: 1
blacksmith.: 1
As: 1
never: 3
saw: 2
father: 1
mother,: 1
any: 1
likeness: 1
either: 1
them: 2
(for: 1
their: 6
days: 2
were: 7
long: 1
before: 1
photographs),: 1
first: 2
fancies: 1
regarding: 1
what: 1
they: 2
like: 1
unreasonably: 1
derived: 1
from: 2
tombstones.: 1
The: 1
shape: 1
letters: 1
father's,: 1
gave: 2
me: 2
an: 1
odd: 1
idea: 1
that: 11
he: 1
was: 9
a: 9
square,: 1
stout,: 1
dark: 2
man,: 1
with: 5
curly: 1
black: 1
hair.: 1
From: 1
character: 1
turn: 1
inscription,: 1
“Also: 1
Georgiana: 2
Wife: 1
Above,”: 1
drew: 1
childish: 1
conclusion: 1
mother: 1
freckled: 

In [None]:
# all at once
text_file = sc.textFile('dickens.txt')
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile('dickens_count.txt')

<a id='flatmap'></a>
### flatMap vs map

Notice that flatMap returns all the results in one list together. map returns the results of each line as its own list so that the total result is a list of lists made from the words in each line.

In [31]:
text_file.collect()

["My father's family name being Pirrip, and my Christian name Philip, my",
 'infant tongue could make of both names nothing longer or more explicit',
 'than Pip. So, I called myself Pip, and came to be called Pip.',
 '',
 "I give Pirrip as my father's family name, on the authority of his",
 'tombstone and my sister,--Mrs. Joe Gargery, who married the blacksmith.',
 'As I never saw my father or my mother, and never saw any likeness',
 'of either of them (for their days were long before the days of',
 'photographs), my first fancies regarding what they were like were',
 'unreasonably derived from their tombstones. The shape of the letters on',
 "my father's, gave me an odd idea that he was a square, stout, dark man,",
 'with curly black hair. From the character and turn of the inscription,',
 '“Also Georgiana Wife of the Above,” I drew a childish conclusion that',
 'my mother was freckled and sickly. To five little stone lozenges, each',
 'about a foot and a half long, which were arrange

In [45]:
# using flatMap
counts.collect()

['My',
 "father's",
 'family',
 'name',
 'being',
 'Pirrip,',
 'and',
 'my',
 'Christian',
 'name',
 'Philip,',
 'my',
 'infant',
 'tongue',
 'could',
 'make',
 'of',
 'both',
 'names',
 'nothing',
 'longer',
 'or',
 'more',
 'explicit',
 'than',
 'Pip.',
 'So,',
 'I',
 'called',
 'myself',
 'Pip,',
 'and',
 'came',
 'to',
 'be',
 'called',
 'Pip.',
 '',
 'I',
 'give',
 'Pirrip',
 'as',
 'my',
 "father's",
 'family',
 'name,',
 'on',
 'the',
 'authority',
 'of',
 'his',
 'tombstone',
 'and',
 'my',
 'sister,--Mrs.',
 'Joe',
 'Gargery,',
 'who',
 'married',
 'the',
 'blacksmith.',
 'As',
 'I',
 'never',
 'saw',
 'my',
 'father',
 'or',
 'my',
 'mother,',
 'and',
 'never',
 'saw',
 'any',
 'likeness',
 'of',
 'either',
 'of',
 'them',
 '(for',
 'their',
 'days',
 'were',
 'long',
 'before',
 'the',
 'days',
 'of',
 'photographs),',
 'my',
 'first',
 'fancies',
 'regarding',
 'what',
 'they',
 'were',
 'like',
 'were',
 'unreasonably',
 'derived',
 'from',
 'their',
 'tombstones.',
 'The'

In [43]:
# using map
counts.collect()

[['My',
  "father's",
  'family',
  'name',
  'being',
  'Pirrip,',
  'and',
  'my',
  'Christian',
  'name',
  'Philip,',
  'my'],
 ['infant',
  'tongue',
  'could',
  'make',
  'of',
  'both',
  'names',
  'nothing',
  'longer',
  'or',
  'more',
  'explicit'],
 ['than',
  'Pip.',
  'So,',
  'I',
  'called',
  'myself',
  'Pip,',
  'and',
  'came',
  'to',
  'be',
  'called',
  'Pip.'],
 [''],
 ['I',
  'give',
  'Pirrip',
  'as',
  'my',
  "father's",
  'family',
  'name,',
  'on',
  'the',
  'authority',
  'of',
  'his'],
 ['tombstone',
  'and',
  'my',
  'sister,--Mrs.',
  'Joe',
  'Gargery,',
  'who',
  'married',
  'the',
  'blacksmith.'],
 ['As',
  'I',
  'never',
  'saw',
  'my',
  'father',
  'or',
  'my',
  'mother,',
  'and',
  'never',
  'saw',
  'any',
  'likeness'],
 ['of',
  'either',
  'of',
  'them',
  '(for',
  'their',
  'days',
  'were',
  'long',
  'before',
  'the',
  'days',
  'of'],
 ['photographs),',
  'my',
  'first',
  'fancies',
  'regarding',
  'what',
  't

In [46]:
test = sc.parallelize([3,4,5])
test.collect()

[3, 4, 5]

In [37]:
test.map(lambda x: range(1,x)).collect()

[range(1, 3), range(1, 4), range(1, 5)]

In [39]:
test.flatMap(lambda x: range(1,x)).collect()

[1, 2, 1, 2, 3, 1, 2, 3, 4]

<a id='totalcount'></a>
# count number of characters in a txt file
https://spark.apache.org/docs/2.1.1/programming-guide.html#basics

The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.

In [63]:
lines = sc.textFile("dickens.txt")
# number of characters in each line
lineLengths = lines.map(lambda s: len(s))
# add all the lines together "reducing" to one count number
totalLength = lineLengths.reduce(lambda a, b: a + b)

In [69]:
totalLength

2241

<a id='pi'></a>
## Pi exercise

We are building an RDD called count. We call upon the parallelize function to split up this process over the nodes available. The code just maps the result of the sample function call. Finally, we reduce the generated map set by adding all the samples.

The sample function gets two random numbers and returns a 1 or a 0 depending on where the two numbers end up in size.

In [85]:
import random

NUM_SAMPLES = 100000

def sample(p):
    x,y = random.random(),random.random()
    return 1 if x*x + y*y < 1 else 0

count = sc.parallelize(range(0, NUM_SAMPLES)) \
            .map(sample) \
            .reduce(lambda a, b: a + b)

print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

Pi is roughly 3.140160


This computation is based on the following heuristic: By definition 𝜋 is the area 𝐴Circle of a circle with radius 𝑟=1 (generally, 𝜋⋅𝑟2 is the area of a circle of radius 𝑟).

One then circumscribes this unit circle with a square whose area equals 𝐴Square=4. The ratio of these two areas thus equals to 𝐴Circle𝐴Square=𝜋4 and gives the geometric probability of a point inside the square to lie inside in the circle.

Now let us assume that we pick a huge number 𝑛 of points randomly inside the circumscribed square, for example, by throwing darts or dropping rain drops onto it. A certain number 𝑛in of these points will end up inside the area described by the circle while the remaining number 𝑛out of these points will lie outside of it (but inside the square). Thus 𝑛in+𝑛out=𝑛 and the probability of a point lying inside of the circle area is 𝑛in𝑛.
Heuristically, one has 𝐴Circle𝐴Square≈𝑛in𝑛 and hence 𝜋≈4⋅𝑛in𝑛.
It goes without saying, that this algorithm is non-deterministic and results will likely change with each run.

To wrap things up: The beauty of this is, it paves a way to approximate 𝜋 by simply counting the fraction of points that end up inside the circle out of a total population of points randomly thrown at the circumscribed square. 