# A crash course on IPython (jupyter) notebooks

This file contains a tutorial on analyzing the music data using Spark.
But first, a primer on how to use the notebook.

## A notebook is divided into cells

The boxes labeled with `In [ ]:` are called cells.  They contain Python code.
To run the code in the cell, click on the cell and press **Shift + Enter**

Try it now on the cell below:

In [None]:
1 + 1

You should see an output like `Out[1]: 2`.

Next, edit the above cell to say `1 + 2` and run it.

## Check that the SparkContext is loaded

Since you launched the notebook with PySpark, an object called `sc` is automatically loaded.
This object is the connection to the Spark cluster.
Run the cell below.
You *should* see an output like

`<pyspark.context.SparkContext at  `  *some numbers*  `  >`

Otherwise, it won't be possible to proceed.

In [None]:
sc

### Note: How to reset the Spark cluster

If you run into strange errors, you might have to reset the cluster.
 1. In the ssh session go to `screen -r nb` and press Control + C, then Y, Enter to stop the notebook
 2. Follow the instructions in the setup to restart the Spark cluster and launch the notebook
 3. Refresh this page and run everything from the beginning

# Music recommendations with PySpark

## 1. Load some modules (libraries)

The following cell imports the modules we need

In [None]:
import os
import numpy as np
import time

## 2. Load the artist info

We will load the data in `/root/data/artist_data.txt` into Python itself as a *dictionary*
(a hash table of key-value pairs).
This allows us to make sense of the numeric IDs we will get in the analysis.

In [None]:
# Load the lines of the text into a list of strings
f = open('/root/data/artist_data.txt', 'r')
txt = f.read().split('\n')
f.close
txt[0:5]

In [None]:
# Process each line to a key-value pair
artist_ids = dict()
for line in txt:
    split = line.split('\t')
    if len(split) > 1:
        artist_ids[int(split[0])] = split[1]

In [None]:
# Check some IDs
artist_ids[1291], artist_ids[1989]

Chances are that some of your favorite artists are in the list.
To find them, make another dict() which enables reverse search

In [None]:
# Construct the reverse search dict
artist_to_id = dict((v,k) for k,v in artist_ids.iteritems())

In [None]:
# Enter your favorite artist below!
artist_to_id['The Beatles']

## 3. Load the data into hadoop (in ssh)

Go back to your ssh session.  We need to copy the `user_artist_data.txt` into the HDFS (Hadoop file system).

Make sure you are detached from any screen with Control + A, D.  Then create a new screen and go to the hadoop folder
```
screen -S hdfs
cd /root/ephemeral-hdfs/bin
```

Copy the local file into hadoop
```
./hadoop fs -put /root/data/user-artist-data.txt data.txt
```

Check that you copied the file:
```
./hadoop fs -ls
```
You should see the file listed as `/user/root/data.txt`

## 4. Load the file into Spark

The following code creates a RDD (resilient distributed dataset) with the raw strings from `data.txt`.
The number `4` indicates the number of partitions (chunks).
You can change the number of partitions and see if you get a performance improvement.

In [None]:
rawdata = sc.textFile('data.txt', 4)

Let's take a look at what was loaded.  The `takeSample` command allows you to peek at random contents of an RDD object.

This might take a while!  To occupy the time, switch to the tab `XX.XX.XX.XXX:8080` which shows the cluster status.  There should be one running application: PySparkShell.  Click on the name to see what is going on inside Spark.

In [None]:
rawlines = rawdata.takeSample(True, 5)

Why does it take so long?  Actually, the text data was not loaded until you used the command `takeSample`.  Spark uses lazy evaluation.  Let's run it again and note the time:

In [None]:
t1 = time.time()
rawlines = rawdata.takeSample(True, 5)
time.time() - t1

Spark includes a `cache()` command which caches the result of a computation.  Run the below code, then get timing information.  Does the speed improve?

In [None]:
# Caches the result of the textFile command (?)
rawdata.cache()

Let's get the number of records in the RDD.  The one() function will be used to map every record to the number 1.  Then the add() function will be used to sum up all of the ones.

In [None]:
def one(x):
    return 1

def add(x, y):
    return x + y

nrecords = rawdata.map(one).reduce(add)
nrecords

#### Exercise:

Write and run a command in the empty line below to find the total number of characters in the raw data.

*Hint:* `len(x)` returns the number of characters in `x`.

In [None]:
# Your code below:
# write ncharacters = rawdata.map(??).reduce(??)


Scroll down for the answer
<br/><br/><br/><br/><br/><br/>
...
<br/><br/><br/><br/><br/><br/>
...
<br/><br/><br/><br/><br/><br/>
...
<br/><br/><br/><br/><br/><br/>
...
<br/><br/><br/><br/><br/><br/>
...
<br/><br/><br/><br/><br/><br/>
...
<br/><br/><br/><br/><br/><br/>
...

In [None]:
# ANSWER TO EXERCISE:
ncharacters = rawdata.map(len).reduce(add)

Let us compute the mean number of characters per line.

In [None]:
float(ncharacters)/nrecords

We got the above result by carrying out two MapReduce jobs: one to compute the number of records, and one to compute the total character count.

But can we get the same result in *one* MapReduce cycle?

This leads to a classic MadReduce pattern.

Map each line `line` to a tuple `(1, len(x))`.
The first element is the count and the second element is the average length for all the lines aggregated into that tuple.
The reduce step computes the average of the counts in tuple `x` and tuple `y` *weighted* by their counts.

In [None]:
def count_len(line):
    return (1, len(line))

def mean_reducer(x, y):
    sumcount = x[0] + y[0]
    return(x[0] + y[0], x[1]*float(x[0])/sumcount + y[1]*float(y[0])/sumcount)

In [None]:
rawdata.map(count_len).reduce(mean_reducer)

## 5. Process the raw data

The raw text data is not very useful to us.
We need to process the data into relevant (key, value) pairs.
What should we set as the key, and what should we set as the value?
This depends on the application at hand.

For now, let's narrow our focus to the artist and the counts while ignoring the users.
The following function will convert a raw line into a (key, value) pair with the key being an integer artist ID and the value being the integer count.

In [None]:
def raw_to_artist_count(line):
    line = str(line)
    parts = line.split(' ')
    if len(parts) != 3:
        return (-1, 0) # a (k, v) pair indicating error
    key = int(parts[1]) # the artist ID
    value = int(parts[2]) # the count
    return (key, value) # return the (k,v) pair

Check that it works correctly using the sampled text lines.

In [None]:
print(rawlines[0])
raw_to_artist_count(rawlines[0])

Using this function, we will make a new rdd with the (key, value) pairs

In [None]:
count_rdd = rawdata.map(raw_to_artist_count).cache()

Let's check the results (and time it)

In [None]:
t1 = time.time()
sample_counts = count_rdd.takeSample(True, 5)
time.time() - t1

In [None]:
sample_counts

Now, can we find the *most popular* artist?
First we need to combine the counts for each artist.
The `reduceByKey` function is perfect for this task.

In [None]:
artist_count_rdd = count_rdd.reduceByKey(add)

In [None]:
t1 = time.time()
sample_totals = artist_count_rdd.takeSample(True, 5)
time.time() - t1

In [None]:
sample_totals

Now we need to sort in descending order.
The `sortBy` function allows us to sort by a numeric function of the (key, value) pairs.
Sorting by the negative value gives us a list by count in descending order.

In [None]:
def get_negative_value(x):
    return -x[1]

artist_count_rdd = artist_count_rdd.sortBy(get_negative_value)

Who is the most popular artist?

In [None]:
artist_count_rdd.first()

Artist # 979 is the most popular.  Any guesses to who it is?

In [None]:
artist_ids[979]

Instead of converting ids all the time, we can just convert all the IDs into the strings.
Note that the local object `artist_ids` gets exported to the Spark cluster.
If you look at the screen session `nb`, you may be able to see messages about this object being "broadcast"

In [None]:
def convert_keys(x):
    try:
        ans = (artist_ids[x[0]], x[1])
    except:
        ans = (str(x[0]), x[1])
    return ans

In [None]:
converted_count = artist_count_rdd.map(convert_keys)

In [None]:
converted_count.first()

## 6. Exporting the result to a text file

Now we have that we have produced a useful result, we would like to retrieve it back to our local computer.

This takes several steps.

  1. Convert the (key, value) pairs in the RDD to strings
  2. Write the RDD to the hadoop file system
  3. Copy from hadoop to local (on the master node)
  4. SCP from the master node to our local computer
  
The following function formats our result by a tab-delimited strings

In [None]:
def to_string(x):
    return str(x[0]) + '\t' + str(x[1])

Now convert the RDD

In [None]:
output_rdd = converted_count.map(to_string)

Write to a file called 'top_artists.txt'.

Note: if you have already done this before, clear the HDFS by the command `./hadoop fs -rmr top_artists.txt` in the screen `hdfs`

In [None]:
output_rdd.saveAsTextFile('top_artists.txt')

In the terminal go `screen -r hadoop` and type
```
./hadoop fs -ls top_artists.txt
```
You will see that `top_artists.txt` is actually a directory with files `part-00000` etc.

We can copy to local plus merge all the separate parts by the command
```
./hadoop fs -getmerge top_artists.txt top_artists.txt
```

Check the result with `more top_artists.txt`.
Then move it to the home directory with `mv top_artists.txt ~/`



### In another terminal tab (non-ssh)

Navigate to the directory where `class.pem` is stored.  Then type
```
scp -i class.pem root@XX.XX.XX.XXX:~/top_artists.txt .
```

## 7. Extra challenges

This concludes the tutorial.
If you are finished early, have a look at 
 -  https://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD

and see if you can do the following:

 - Merge IDs which are equivalent according to `artist_alias.txt`
 - Find the mean and variance of the counts for each artist
 - Pick two artists and find out how many users listen to both vs. how many listen to only one of them
 - Adjust the number of partitions and use of `cache()` at each step, see how this affects performance