- Apache Spark 1.5.2 compiled for Scala 2.11
- Hadoop 2.7.1
- Scala 2.11.7
- SBT 0.13.9
If you haven't used your local installation of Hadoop before, start with HDFS formatting.
$HADOOP_PREFIX/bin/hdfs namenode -format localnode
$HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode
$HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode
hadoop fs -mkdir -p /var/log
Resilient Distributed Datasets (RDDs):
- fault-tolerant, as in zero data loss
- are operated on in parallel: big data, cluster of workers
- contain collection of data elements: text, numbers, business entities etc.
See the official Programming Guide for more details.
RDD Operations
- Transformations
- create new RDDs according to transformation rules
- are lazy, ie. nothing happens until the changes are 'committed' by calling an action
- represent the Map part of the Map Reduce pattern
- Actions
- trigger transformation workflow
- return aggregated result back to the driver program (beware: memory consumption and resource utilization)
- represent the Reduce part of the Map Reduce pattern
See the official Programming Guide for more details.
A word count example is a hello world in a space of Map Reduce and big data processing. The example parses a text file comprising ten paragraphs of random text (lorem ipsum) and produces a lexically ordered set of word counts.
Output:
(a,9)
(ac,13)
(accumsan,1)
(ad,1)
(adipiscing,1)
(aenean,6)
(aliquam,7)
..
(lorem,5)
(luctus,5)
(maecenas,1)
(magna,8)
..
Source: WordCount.scala, WordCountTest.scala
The input file is located in the resources directory. Please copy it to HDFS as follows:
hadoop fs -copyFromLocal src/main/resources/loremipsum.txt /var/log/
Run the following command in the project root directory:
sbt clean assembly
Assuming your local installation of Hadoop is up-n-running, the text file has been copied to HDFS and the project has been built, you are ready to submit the application to Spark. To do so, run the following command while in the project root directory:
$SPARK_HOME/bin/spark-submit --class "basic.WordCount" target/scala-2.10/spark-by-example-assembly-1.0.jar
Once Spark execution is done, the resulting word counts are stored as text files (two or more) in HDFS. Here is an example of how to easily view one of the results without the need to move the file from HDFS to your local filesystem.
hadoop fs -cat '/var/out/wordcount/part-00000' | less
Don't forget to delete the output HDFS location, before submitting the job again.
hadoop fs -rm -r /var/out/wordcount/
- counters or sums that can be reliably used in parallel processing
- native support for numeric types, extensions possible via API
- workers (tranformation) can modify, but cannot read
- only a driver (action) can read the accumulated value
See the official Programming Guide for more details.
- allow for an efficient sharing of potentially large data sets
- workers have read-only access
- useful for reference data lookups
See the official Programming Guide for more details.
The problem of text analysis expands on the word count example from the previous section.
Accumulators are applied to collect standard and overly not too interesting facts about the analysed text, such as a total number of characters and words:
class TextAnalyser(val sc: SparkContext, ...) {
...
// Instance variables
val _totalChars = sc.accumulator(0, "Total Characters")
val _totalWords = sc.accumulator(0, "Total Words")
...
// In a worker thread
def analyse(rdd: RDD[String]): TextStats = {
...
// This limits serialization to the reference variables only
val totalChars = _totalChars
val totalWords = _totalWords
...
// Accumulators can be safely used in parallel computations
.map(x => {totalWords += 1; x})
...
Arguably the most exciting part is an effort to capture the essence of a piece of an English text, such as a book, within N most frequently used words.
First and foremost, casual expressions that don't carry substantial information need to be filtered out. The English Club helped me arrive at a list of common words I choose to skip: commonwords.txt.
The actual word count naturally makes use of the logic implemented in the word count example. However, minor tweaks apply:
- word count pairs are sorted by counts rather than alphabetically
- descending sort order guarantees the most frequent pairs are always on top of the list
This is when broadcast variables come into play. The list of common words could potentially run long and it would be ineffiecient to create a copy in a each and every worker. Using a broadcast variable helps performance via caching and reduced network traffic due to a specialized broadcast protocol. It's worthwhile mentioning that broadcasted data is subject of (de)serialization.
Code excerpt:
class TextAnalyser(val sc: SparkContext, ...) {
...
val _commonWords = sc.broadcast(TextAnalyser.loadCommonWords())
...
// Like accumulators a Broadcast is a wrapper, the 'value' method provides access to the actual data.
.filter(!commonWords.value.contains(_)) // Filter out all too common words
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
Now, the fun part. Forget the boring 'loremipsum' and reach out for some genuine master piece, such as 20.000 Leagues under the Sea by Jules Verne. Courtesy of textfiles.com. Here is what the text analyser concluded about the remarkable book:
characters: 568889, words: 101838, the most frequent words:
(captain,564)
(nautilus,493)
(nemo,334)
(ned,283)
(sea,273)
Source: TextAnalyser.scala, TextAnalyserTest.scala
20.000 Leagues under the Sea by Jules Verne
The example below is for Mac OS X, your 'downloads' directory location might differ.
hadoop fs -copyFromLocal ~/Downloads/2000010.txt /var/log/
Run the following command in the project root directory:
sbt package
$SPARK_HOME/bin/spark-submit --class "basic.TextAnalyser" target/scala-2.10/spark-by-example-assembly-1.0.jar
The results are printed directly into the console, so no extra work is needed.