# Introduction to Spark

The purpose of this lab is to get some experience with Spark. 

The following code sets up the "Spark Context" which is how we interact with Spark from Python.

## AWS 
 
For the next lab, we will be repeating this activity, but on Amazon AWS.  This is Amazon’s computing-as-a-utility service. To prepare for Lab, you’ll need an AWS account.  Please do the following… preferably today!

 

1) Sign up for an Amazon AWS account.  Go to aws.amazon.com to sign up if you don’t already have an account. Note, you will need a credit card to sign up, but there is no cost until you actually rent some machines to be activated.  Wait for your account to be activated.  This may take 24 hours.

 

2) Optionally: once you have your account number, sign up for Amazon AWS credit here: https://aws.amazon.com/education/awseducate/apply/.  This will get you some free time on Amazon AWS.

In [1]:
import pyspark

sc = pyspark.SparkContext()

In [None]:
#sc.stop() #commented out so that you don't stop your context by mistake

## Word Count

Below is the countWords function from the Spark lecture.  We are going to run this code on a set of files.

In [1]:
def countWords (fileName):
    lines = sc.textFile(fileName)
    tokens = lines.flatMap(lambda line: line.split(" "))
    instances = tokens.map (lambda word: (word, 1))
    aggregatedCounts = instances.reduceByKey (lambda a, b: a + b)
    return aggregatedCounts.top (200, key=lambda p : p[1])

Upload the following files from the course Box folder and run countWords on each of them.

https://rice.box.com/v/RiceDSToolsAndModels

Holmes.txt

war.txt

william.txt

dictionary.txt

In [None]:
countWords("<path to your uploaded data here>")

## Completing a Spark Program

For this part of the lab, we are going to use a subset of the 20 News groups dataset 
http://qwone.com/~jason/20Newsgroups/

The data file you need (20-news-same-line-small.txt) is also in the following folder:

https://rice.box.com/v/RiceDSToolsAndModels

This dataset is from back in the day when there were electronic "bulletin board" discussions on different topics on Usenet.

Load the data into an RDD and take a peek at it.

In [None]:
myRDD = sc.textFile('data/20-news-same-line-small.txt')
myRDD.take(3)

There are 19997 lines in this file, each
corresponding to a different text document. The goal here is to build, as an RDD, a
dictionary. The dictionary will have as its key a number from 0 to 19,999 (this is a rank)
and the value is the word corresponding to that rank. The words will be ranked
according to their frequency in the corpus, with 0 being the most frequent, 19,999 being
the 20-thousandth-most frequent.

Take a look at the code. 

Note that nothing interesting happens until you enter the line that tries to collect the
results (using the call to top), at which point Spark goes off, performs all of the
computations, and tries collect the results into topWords. At this point, topWords is a
local variable. In your Jupyer Notebook, you can manipulate it, change it, and print it, just as
you would any other local variable.

Once you’ve made it to the question marks, come up with an appropriate lambda that is
going to attach the correct word to each number, putting the results into an RDD.
Replace the question marks with your lambda. The key of the tuple you create and put
into the RDD via the lambda should be the word; the value is the index (the number
from 0 to 19,999). Your lambda will refer to the local variable topWords.

After you run dictionary.take(10) you can get checked off! Note that this will
return the first 10 words by index.


In [3]:
import re
import numpy as np

# load up all of the 19997 documents in the corpus
corpus = sc.textFile ("./data/20-news-same-line-small.txt")

# each entry in validLines will be a line from the text file
validLines = corpus.filter(lambda x : 'id' in x)

# now we transform it into a bunch of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:]))

# now we split the text in each (docID, text) pair into a list of words
# after this, we have a data set with (docID, ["word1", "word2", "word3", ...])
# we have a bit of fancy regular expression stuff here to make sure that we do not
# die on some of the documents
regex = re.compile('[^a-zA-Z]')
keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))

# now get the top 20,000 words... first change (docID, ["word1", "word2", "word3", ...])
# to ("word1", 1) ("word2", 1)...
allWords = keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))

# now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey (lambda a, b: a + b)

# and get the top 20,000 words in a local array
# each entry is a ("word1", count) pair
topWords = allCounts.top (20000, lambda x : x[1])

# and we'll create a RDD that has a bunch of (word, dictNum) pairs
# start by creating an RDD that has the number 0 thru 20000
# 20000 is the number of words that will be in our dictionary
twentyK = sc.parallelize(range(20000))

### Your Code Here

Your task is to map the parallelized range $(0, 1, 2, \ldots, )$ 
into a set of tuples ("mostcommonword", 0), ("nextmostcommon", 1), ....

In [None]:
# now, we transform (0), (1), (2), ... to ("mostcommonword", 0) ("nextmostcommon", 1), ...
# the number will be the spot in the dictionary used to tell us where the word is located
# HINT: make use of topWords in the lambda that you supply
dictionary = twentyK.map (?????????)

# finally, print out some of the dictionary, just for debugging and to get checked off
dictionary.take(10)

Stop the Spark context when you are done

In [4]:
sc.stop() #commented out so that you don't stop your context by mistake

Copyright ©2019 Christopher M Jermaine (cmj4@rice.edu), and Risa B Myers  (rbm2@rice.edu)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.