# Spark

[Spark](https://spark.apache.org/docs/1.6.2/index.html) (we are going to use version 1.6.2) is a fast and general-purpose cluster computing system, that allows to process large datasets using several machines. In the era of the *big data*, it has become a necessity to be able to distribute computing power and process data in parallel. We introduce here the basics of the framework.

In any case, you should have a look at the [official tutorial](https://spark.apache.org/docs/1.6.2/programming-guide.html).

## Hadoop File System (HDFS)

The main abstraction Spark offers is the *Resisilient Distributed Dataset* (RDD), which is a collection of elements distributed across the nodes of the cluster. The idea is that you typically **do not** load them in the memory, but rather process them *lazily*. They are initialized with files in the *Hadoop File System* (HDFS). 

Let us first see what there is in HDFS.

In [2]:
!hdfs dfs -ls

Found 2 items
drwxr-xr-x   - shirinov hdfs          0 2019-02-26 17:44 .sparkStaging
-rw-r--r--   3 shirinov hdfs    2147813 2019-02-24 20:09 election-day-tweets.txt


*Note:* a line starting with `!` indicates a shell command. You can hence run this same command in a bash terminal.

We can now add a dataset to HFDS. In the `data` folder, there is text file containing 30000 tweets collected from the [Inauguration Day](https://en.wikipedia.org/wiki/United_States_presidential_inauguration) on January 20, 2017. 

In [3]:
!hdfs dfs -put data/election-day-tweets.txt

put: `election-day-tweets.txt': File exists


And check that the file is indeed in HDFS.

In [4]:
!hdfs dfs -ls

Found 2 items
drwxr-xr-x   - shirinov hdfs          0 2019-02-26 17:44 .sparkStaging
-rw-r--r--   3 shirinov hdfs    2147813 2019-02-24 20:09 election-day-tweets.txt


*Note:* you can remove files from HDFS using `hdfs dfs -rm FILE`. 

## Dataset processing

We can now start playing with the tweets. We will instantiate the dataset from the text file.

In [5]:
tweets = sc.textFile('election-day-tweets.txt')

The `tweets` variable is now an RDD. An RDD is *resilient* to faults by storing the sequence of operations applied to it so that it can easily recreate the state of the data structure in case of a problem. It can also be logically cut to be *distributed* on several machines.

The `sc` object is a special object, namely a [Spark Context](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.SparkContext) obeject, that we configured to be available everywhere in your Python environment. It is the main interface with Spark that has numerous useful methods. Check the documentation!

Let us display a few tweets.

In [6]:
some_tweets = tweets.take(10)
for i, tweet in enumerate(some_tweets):
    print("%d: %s" % (i, tweet))

0: @Lawrence @HillaryClinton Two first  @SenSchumer tomorrow. @TheLastWord #brooklyn  TheRealAmerica #Vote #Democrats #NastyWomenVote #Senate
1: My @latimesopinion op-ed on historic #California #Senate race. First time an elected woman senator succeeds another.
2: https://t.co/cbjQTK0Q1V
3: #Senate Wisconsin Senate Preview: Johnson vs. Feingold
4: If Rubio Wins and #Trump Loses in #Florida... #HillaryClinton #Senate #RepublicanPrimary #Senaterace #Miami... https://t.co/zIeNEcVnMO
5: #Senate Wisconsin Senate Preview: Johnson vs. Feingold
6: bob day is an honest  person   #senate patterson . a loss to the senate
7: Make Republicans #PayAPrice!
8:  💙🇺🇸#VoteBLUE🔃theBallot🇺🇸💙
9:  #Congress #Senate #FlipItDem


Once created, the `tweets` dataset can be acted on by dataset **operations**. RDDs support two types of operations: *transformations*, which create a new RDD from an existing one, and *actions*, which return a value to the driver program after running a computation on the RDD. For instance, we can `count` the number of tweets. Is it a transofrmation or an action?

In [7]:
tweets.count()

30000

We can also `filter` the lines matching a specific string. Is it a transformation or an action ?

*Note*: `lower()` is a `str` method that converts the string to lower case. What happens if you do not do that? 

In [8]:
print('in' in 'this is how to use in')
lines_with_trump = tweets.filter(lambda line: "trump" in line.lower())
lines_with_trump.count()

True


1023

A typical processing pattern is [MapReduce](https://en.wikipedia.org/wiki/MapReduce), as popularized by Hadoop. The idea is that you apply a function to your dataset that filters or sorts it (**map**), and then combine the ouputs by computing something (**reduce**). The typical example is a word count. So, how many times each word occur among our tweets?  

In [9]:
# Split every line into words and flattens the result (aggregate them all together)
words = tweets.flatMap(lambda line: line.split())
words.take(15)

['@Lawrence',
 '@HillaryClinton',
 'Two',
 'first',
 '@SenSchumer',
 'tomorrow.',
 '@TheLastWord',
 '#brooklyn',
 'TheRealAmerica',
 '#Vote',
 '#Democrats',
 '#NastyWomenVote',
 '#Senate',
 'My',
 '@latimesopinion']

In [10]:
# Assign 1 to each word
counts = words.map(lambda word: (word, 1))
counts.take(10)

[('@Lawrence', 1),
 ('@HillaryClinton', 1),
 ('Two', 1),
 ('first', 1),
 ('@SenSchumer', 1),
 ('tomorrow.', 1),
 ('@TheLastWord', 1),
 ('#brooklyn', 1),
 ('TheRealAmerica', 1),
 ('#Vote', 1)]

In [11]:
# Combines the counts by adding up the value (1) of equal keys (words)
word_counts = counts.reduceByKey(lambda a, b: a + b)
word_counts.take(10)

[('@HillaryClinton', 121),
 ('Two', 8),
 ('#Democrats', 20),
 ('op-ed', 1),
 ('historic', 10),
 ('race.', 5),
 ('an', 316),
 ('senator', 10),
 ('another.', 3),
 ('https://t.co/cbjQTK0Q1V', 1)]

You should read the list of common [transformations](https://spark.apache.org/docs/1.6.2/programming-guide.html#transformations) and [actions](https://spark.apache.org/docs/1.6.2/programming-guide.html#actions) to get an idea of what you can do.

## Dataset statistics

Your turn! Try to extract some statisctis from the dataset:

1. Total number of words
2. Average number of words per tweet
3. Ten most frequent word

## Bigrams

A bigram is combination of two consecutive words. 

- Extract the ten most frequent bigrams from the dataset. 

Be careful not to consider the last word of a tweet and the first one of the next tweet!

*Hint:* note that the `map` and `flatMap` methods take a **function** as argument, meaning that you can define a function that you pass as argument:

```python
def process(line):
    # Do something
    return something

x = tweets.flatMap(process)
```

In [16]:
# Your code goes here
def make_bigrams(line): 
    words = line.split()
    bigrams = ['%s %s' % (words[i], words[i+1]) for i in range(len(words) - 1)]
    return bigrams

bigrams = tweets.flatMap(make_bigrams)
bigrams.take(100)
counts = bigrams.map(lambda word: (word, 1))
counts.take(10)
# # Combines the counts by adding up the value (1) of equal keys (bigrams)
bigrams_counts = counts.reduceByKey(lambda a, b: a + b)
bigrams_counts.takeOrdered(10)

[('! #BlackMoney', 1),
 ('! #Election2016', 1),
 ('! ...and', 1),
 ('! 1471', 1),
 ('! 1500', 1),
 ('! @RBI', 1),
 ('! @Wade4Justice', 1),
 ('! A', 1),
 ('! Aaj', 1),
 ('! Am', 1)]