Skip to content

Spark AWS Exercise1

Jason Baldridge edited this page Apr 3, 2013 · 3 revisions

Introduction

This exercise shows you how to work with Spark, a distributed computation framework that is geared toward the kinds of iterative algorithms commonly used in machine learning. It also is incredibly more concise to write programs using it, compared to Hadoop. Spark itself is written in Scala, but it provides Scala, Java and Python APIs for using it.

There is lots of excellent documentation for Spark, including:

However, it can be hard for those new to the big data game to get started because usually there are various things left unsaid or assumed. This guide will attempt to take you from virtually zero to computing on the cloud with plenty of sign-posts along the way.

With this tutorial, I'll attempt to give the steps that you need to complete things without lots of extra distracting information. In many cases, others already have extensive documentation that is relevant. If that documentation is perfect, I'll just provide the link. However, in many cases, I'll say what you need to do and then give a link with the text "additional documentation" after my own description---these links will provide useful supplementary information, but be prepared for much more additional information that is not immediately relevant, or is too detailed for what you really need..

Writing and running programs for big data truly do fall square under the terminology cloud computing (which is used loosely these days for just about anything that involves running code on other machines). Rather than running programs on the machine in front of you, it is possible to requisition machines from providers such as Amazon and Rackspace. With this model, you are essentially renting computation and storage, which typically reduces the costs for using large clusters of computers that you would otherwise have to purchase, install, and maintain yourself.

A key concept with this is the cluster, which is composed of a master computer (node) and multiple worker (a.k.a, "slave") nodes. In a cluster of machines, the master is a designated node that directs the work to be done and makes sure that all of the nodes have work to do and the workers then perform those tasks to the best of their abilities. I prefer the term 'worker', but 'slave' is used primarily in Spark.

Setup

This tutorial assumes that you will use Amazon Web Services (AWS). For this, you'll need to sign up for an AWS account and provide a valid credit card so that Amazon can charge you for your usage. When you first sign up for AWS, you are on the Free Usage Tier, and doing the actions in this tutorial (should) fall well below the limits for free usage and thus doing this tutorial should not cost you anything. Otherwise, you may end up paying a something like $5-$20 to complete it.

For their EC2 (Elastic Compute Cloud) service, AWS has compute nodes available in several different regions. For this tutorial, you should work with the region referred to as "US East", which is also indicated as "North Virginia" in some menus. (This will make it easier to work with Spark since some of the tools for creating Spark clusters on Amazon assume you are using this region.)

Because you will be accessing other computers remotely, it is necessary to provide identification that proves who have a right to login into them. To do this, you need to create both an access key and a key pair.

To create your access key, go to the security credentials page for your account. On the tab for "Access Keys", click "Create a new Access Key" (additional documentation). This will produce two values: an Access Key ID and a Secret Access Key. In your .profile or .bash_aliases file, add the following export statements, using the values for the access key you just created.

export AWS_ACCESS_KEY_ID=<your access key id>
export AWS_SECRET_ACCESS_KEY=<your secret access key>

Next, you need to create a key pair. The key pair must be associated with a specific AWS region. As mentioned previously, we are working in the us-east-1 region, so go to the AWS page for creating key pairs for us-east-1 and click on the "Create Key Pair" button. It will ask you for a name---use "spark-tutorial". After you click the "create" button, a file named spark-tutorial.pem will be downloaded. Put that file into a directory called .aws in your home directory (create the directory if necessary), and then make sure it has read only permissions, e.g.:

$ chmod 600 ~/.aws/spark-tutorial.pem 

There is some additional documentation on this step.

Finally, you need to get Spark. Here, we'll use Spark 0.7.0. Download the Spark 0.7.0 source package and unpack it somewhere convenient on your file system.

Checklist of things you need to proceed:

  • AWS account
  • Access keys created, and their values exported for your shell
  • EC2 us-east-1 key pair, and the .pem file put into your ~/.aws directory with read-only permissions.
  • Spark 0.7.0 source distribution

Running Spark interactively on EC2

Having created the above credentials, we can now spin up machines on Amazon EC2. There are a number of ways of doing this, including using Amazon's web interface. But since Spark provides a convenient command-line tool called spark-ec2 that makes this easy, let's use that. It is located in the ec2 directory of the Spark distribution. (I'll assume that you are in that directory to run the commands that follow.)

Note: while this guide shows how to run Spark on EC2 and perform some specific actions, you should also consider consulting the primary documentation, Running Spark on EC2.

The first thing we need to do is launch a cluster by requesting compute nodes.

$ ./spark-ec2 -k spark-tutorial -i ~/.aws/spark-tutorial.pem -s 1 -w 360 launch SparkTutorial

This specifies the name of the key pair (with -k), the key file (-i), and the number of workers (-s). The -w option specifies that the setup can a certain number of seconds to wait for everything to be ready, etc. (-w 360 allows it to wait for 6 minutes; where the default is 2 minutes).

This process will take a while (5-10 minutes) as the nodes are obtained, the Spark AMI (Amazon Machine Image) is installed, software is updated on the fresh instances, and various other setup processes are completed. While this is ongoing, you can monitor progress on the EC2 Dashboard for your us-east-1 instances.

If everything is going well, you'll see logging messages about launching the instances, copying SSH keys, waiting (the time indicated by -w), deploying files to the master node, initializing ephemeral and persistent HDFS (Hadoop Distributed File System), configuring many things, setting up ganglia, etc. If everything went well, the last message will be "Done!". However, it is quite possible that you'll encounter some problems at this stage, e.g.:

  • AWS might reject your request for instances (for example, they migth say there are no instances avalable). One strategy that works sometimes is to wait and try again.
  • It waits for the period of time indicated with -w, but then has an SSH failure. Increase the time.

If the launch is successful, find the address of the master node by using the get-master command:

$ ./spark-ec2 -k spark-tutorial -i ~/.aws/spark-tutorial.pem get-master SparkTutorial
Searching for existing cluster SparkTutorial...
Found 1 master(s), 1 slaves, 0 ZooKeeper nodes
ec2-50-19-34-150.compute-1.amazonaws.com

You will of course have a different address. You can now go to port 8080 for that address to see the web UI for your cluster. For example, for the cluster I obtained above, it would be:

http://ec2-50-19-34-150.compute-1.amazonaws.com:8080/

More usefully, you can now ssh to that machine.

$ ./spark-ec2 -k spark-tutorial -i ~/.aws/spark-tutorial.pem login SparkTutorial

This will log you into your cluster and give you a prompt. If you list the contents of the directory, you'll see the following.

[root@ip-10-114-98-53 ~]# ls
ephemeral-hdfs  mesos      persistent-hdfs  shark-0.2  spark-ec2
hive-0.9.0-bin  mesos-ec2  scala-2.9.2      spark

Enter the spark directory and load up the Spark shell.

root@ip-10-114-98-53 ~]# cd spark
[root@ip-10-114-98-53 spark]# ./spark-shell 

We'll start by counting the words in the Spark README file in the master node's filesystem. Go into paste mode (with :paste) and enter the following.

val file = sc.textFile("README.md")
val counts = file
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
  .collect

This is all like the previous word count example, except that the Spark Scala REPL provides a SparkContext (via sc) and we turn the RDD resulting from reduceByKey into an Array by using collect. Let's get the most frequent words:

scala> counts.sortBy(-_._2).take(10).foreach(println)
,35)
(the,25)
(to,17)
(you,9)
(Spark,8)
(run,6)
(of,6)
(on,6)
(or,6)
(##,5)

Running Spark on a file in the distributed file system

Next, we'd like to be able to do the same using a file on the distributed file system. This can be one of the more confusing aspects when one is starting to work with big data, but it is actually fairly straightforward. Basically, your cluster has an HDFS set up and ready to use, but it has nothing on it at first. The way you interact with the HDFS is by using the fs command of the hadoop executable in ephemeral-hdfs/bin. For example, after you change to the home directory, the following will list the contents of the HDFS.

[root@ip-10-114-98-53 ~]# ephemeral-hdfs/bin/hadoop fs -ls
ls: Cannot access .: No such file or directory.

Oops! There aren't any files in the system yet. Let's put one there.

[root@ip-10-114-98-53 ~]# ephemeral-hdfs/bin/hadoop fs -put spark/LICENSE Spark-license.txt

So, this puts the Spark license file onto HDFS. (There are many other commands to work with HDFS, including getting files on to and off of HDFS).

Let's list the contents of HDFS again.

[root@ip-10-114-98-53 ~]# ephemeral-hdfs/bin/hadoop fs -ls
Found 1 items
-rw-r--r--   3 root supergroup       1571 2013-04-03 04:42 /user/root/Spark-license.txt

I of course could have kept the name LICENSE, but I gave it a new name on HDFS so that it is clear which file is which in what follows. (A common beginner error is to try working with a file in HDFS, but to pick up a file that is on the local file system with the same name.)

Let's now count the words in that file. In the Spark REPL, get the RDD for the file as follows (substituting your machine address at the appropriate place).

scala> val license = sc.textFile("hdfs://ec2-50-19-34-150.compute-1.amazonaws.com:9000/user/root/Spark-license.txt")

Getting the word counts is then the same as it was before.

scala> val licenseCounts = license.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).collect

So, now we've run word count using Spark on HDFS. Woohoo! Except that was still a tiny little file. Let's do a slightly bigger one that is not sitting on the master node. A good candidate for this is the 6.2M file big.txt from Peter Norvig's excellent spelling correction tutorial. The following commands will fetch the file from Peter's site, and then put it onto HDFS. (Again, I give it a different name to make sure we are using the right file when we do word count.)

[root@ip-10-169-1-223 ~]# wget http://norvig.com/big.txt
[root@ip-10-169-1-223 ~]# ephemeral-hdfs/bin/hadoop fs -put big.txt spellcorrect-big.txt
[root@ip-10-169-1-223 ~]# ephemeral-hdfs/bin/hadoop fs -ls
Found 2 items
-rw-r--r--   3 root supergroup       1571 2013-04-03 16:27 /user/root/Spark-license.txt
-rw-r--r--   3 root supergroup    6488666 2013-04-03 16:29 /user/root/spellcorrect-big.txt

Note: Getting data onto EC2 is not free, so this process of getting that file just cost you some pennies.

Now, in the Spark REPL, do the following (make sure to substitute your machine's information as before).

scala> val bigger = sc.textFile("hdfs://ec2-107-20-38-124.compute-1.amazonaws.com:9000/user/root/spellcorrect-big.txt")
scala> val biggerCounts = bigger.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).collect

To check you got the same result, here is some of my output.

scala> biggerCounts.length
res0: Int = 81410

scala> biggerCounts.sortBy(-_._2).take(10).foreach(println)
(the,71744)
(,68870)
(of,39169)
(and,35968)
(to,27895)
(a,19811)
(in,19515)
(that,11216)
(was,11129)
(his,9561)

scala> val theCount = biggerCounts.toSeq.toMap.apply("the")
theCount: Int = 71744

scala> val numTokens = biggerCounts.unzip._2.sum
numTokens: Int = 1164553

scala> val theProb = theCount/numTokens.toDouble
theProb: Double = 0.06160647046549191

Okay, so that still isn't a very big file, but it doesn't matter: it all works the same, as long as your file fits on HDFS.

Writing results out to disk and retrieving them

More often, you run programs that read data in and output the results to disk, rather than running code interactively. Spark makes this easy as well. For example, the following will do the word counts on spellcorrect-big.txt and the dump them to disk.

scala> :paste
val bigger = sc.textFile("hdfs://ec2-107-20-38-124.compute-1.amazonaws.com:9000/user/root/spellcorrect-big.txt")
val biggerCounts = bigger
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
biggerCounts.saveAsTextFile("hdfs://ec2-107-20-38-124.compute-1.amazonaws.com:9000/user/root/spellcorrect-counts")

Now, in a shell in the root directory on the master node, check the contents of the ephemeral HDFS.

root@ip-10-169-1-223 ~]# ephemeral-hdfs/bin/hadoop fs -ls
Found 3 items
-rw-r--r--   3 root supergroup       1571 2013-04-03 16:27 /user/root/Spark-license.txt
-rw-r--r--   3 root supergroup    6488666 2013-04-03 16:29 /user/root/spellcorrect-big.txt
drwxr-xr-x   - root supergroup          0 2013-04-03 16:58 /user/root/spellcorrect-counts

The results have been written to the directory spellcorrect-counts. Looking into that directory, we have a single file part-00000.

[root@ip-10-169-1-223 ~]# ephemeral-hdfs/bin/hadoop fs -ls /user/root/spellcorrect-counts
Found 1 items
-rw-r--r--   3 root supergroup    1099915 2013-04-03 16:58 /user/root/spellcorrect-counts/part-00000

We only have one worker node---otherwise there would be more parts, one for each reducer (try again with a new cluster that has multiple workers). Each part gives, well, part of the solution. We can get a single file using get.

[root@ip-10-169-1-223 ~]# ephemeral-hdfs/bin/hadoop fs -get /user/root/spellcorrect-counts/part-00000 counts.txt
[root@ip-10-169-1-223 ~]# head -5 counts.txt 
(,68870)
(deadly.,1)
(time?,2)
(buildings--schools,,1)
(Phosphorus,1)

When there are multiple files (e.g. from when you are using more nodes to handle larger input data), it is more convenient to use getmerge, which gets all the files in an HDFS directory and copies them to a single file in the normal file system.

[root@ip-10-169-1-223 ~]# ephemeral-hdfs/bin/hadoop fs -getmerge /user/root/spellcorrect-counts countsmerge.txt

Of course, in this case, counts.txt and countsmerge.txt are equivalent, but this will not usually be the case since you'll have multiple parts-XXXXX files.

Spark also does allow you to write directly to the standard file system, bypassing HDFS for the output. For example, if the last line above were the following:

biggerCounts.saveAsTextFile("/root/spellcorrect-counts-local-fs")

It will be sitting right there on the local file system, no need to get it off HDFS. For this simple word counting example, that probably makes more sense, but in other cases, you are going to store some large result to HDFS so that another Spark job can pick it up and compute something useful with it.

Wrap-up

Make sure to tear down your cluster so that you don't pay for machines you aren't using.

./spark-ec2 -k spark-tutorial -i ~/.aws/spark-tutorial.pem destroy SparkTutorial

You can verify that the instances have been properly shutdown by looking at your EC2 Dashboard for us-east-1 instances. If you refresh your browser several times, you'll see their state value going from "running" to "shutting-down" to "terminated".