# Instructions and Exercises

## Introduction

These notes contain instructions and questions for the labs portion of the "Big Data: tools and statistics" course. Within this document, command-line steps are presented as follows:

In [None]:
scala>

All commands will be in a separate grey "cell" (as above).
<br><br>
Exercises will be listed as a bulleted item and italicized. For example:

<ul><li><i>Create a new directory in your home directory called sample. Upload data.csv into the sample directory on HDFS.</i></li></ul>

## Objectives

In this lab you will be expected to achieve the following:
<ol>
<li>Create a Spark Resilient Distributed Dataset (RDD)
<li>Transform an RDD
<li>Perform an action on an RDD
<li>Perform Map Reduce operations in Spark
<li>Understand and use mllib for statistical analysis of data
</ol>

# Connecting to the cluster

To connect to the Spark cluster from a windows PC, follow these instructions:
<ol>
<li>Start Putty (type putty into the search bar on the start menu).</li>
<li>In the hostname textbox, type the HOSTNAME field provided on the printout, ensuring that Connection type is set to ssh.</li>
<li>Click Open.</li>
<li>On first connection, you will be asked a question about connection security. Please click "yes".
<li>You will be asked for your username and password.</li>
<li>Usernames are trainingN (with N replaced by your allocated number) and password, provided on the printout.</li>
</ol>

To connect to the Spark cluster from a Mac, follow these instructions:
<ol>
<li>Start Terminal (type terminal into the spotlight search bar).</li>
<li>Type the following command, followed by enter:
<br>ssh HOSTNAME </li>
<li>On first connection, you will be asked a question about connection security. Please type "yes".
<li>You will be asked for your username and password.</li>
<li>Usernames are trainingN (with N replaced by your allocated number) and password, provided on the printout.</li>
</ol>


## Exercises

### Exercise 1

You will now connect to the Spark REPL, using the following command:

In [None]:
spark-shell

After a short delay, you will be presented with a new prompt that looks like follows (NB: it is safe to ignore any error messages when Spark loads):

In [None]:
scala>

To avoid ambiguity, all Spark commands listed within this document will follow this <i>scala</i> prompt.<br>
As part of the Spark startup process, a Spark context (sc) is created, which is the main interface to the requested Spark environment. To view the available functions, type the following and then immediately press the Tab key:

In [None]:
scala> sc.

You will now create an RDD by loading the Heathrow airport temperature data:

In [None]:
scala> val heathrowFile = sc.textFile("hdfs:///user/USERNAME/temperatureData/heathrowdata.txt")

Note that <b>USERNAME</b> will need to be replaced with your compute cluster username, <i>userN</i>.
<br><br>
As displayed in the console, the value heathrowFile is of type org.apache.spark.rdd.RDD[String], that is, the function textFile has created an RDD of type String (as one would perhaps expect when reading a file of text). Due to Scala’s lazy evaluation model, the file has not been read; the file has not even been checked to see if it exists! This only happens when an action is performed. You will now bring back 5 lines of data to the console and print each line, to force Spark to undertake an <i>action</i>.

In [None]:
scala> val fiveLines = heathrowFile.take(5)

In [None]:
scala> fiveLines.foreach(println)

The first line creates a value fiveLines of the RDD by taking the first 5 elements (lines in the case of a textFile). fiveLines is of type Array[String] as the memory is now being held and managed in the local console, rather than via Spark’s RDD. The second line calls the function println on each element of fiveLines, which subsequently displays the data.

<ul>
<li><i>Create a value called wickairportFile, that is an RDD[String], based on the wickairportdata.txt file in HDFS. Display the first five lines of this data file.</i>
</ul>

### Exercise 2

You will now create a function that is able to determine whether a line of data contains the header information:

In [None]:
scala> def isHeader(line: String): Boolean = {
line.contains("yyyy") || line.split(" ").size != 5
}

A function is created by using the keyword def, followed by the function name, input arguments, and output type. The function body is specified in between the braces. This function simply checks to see whether the string yyyy is present in the line of data that is passed into the function. The string yyyy is only present in the header line.
<br><br>
You will now create a Case class corresponding to the structure of the data:

In [None]:
scala> case class TemperatureRecord(year: Long, month: Int, tmax: Float, tmin: Float, rain: Float)

This class will act as a data structure to allow us to parse and manipulate the data in a convenient manner. You can create an RDD of TemperatureRecord objects by creating a parse function as follows:

In [None]:
scala> def parse(line: String) = {
val pieces = line.split('\t')
val year = pieces(0).toLong
val mm = pieces(1).toInt
val tmax = pieces(2).toFloat
val tmin = pieces(3).toFloat
val rain = pieces(4).toFloat
TemperatureRecord(year, mm, tmax, tmin, rain)
}

and by creating the following transformation:

In [None]:
scala> val heathrowData = heathrowFile.filter(x => !isHeader(x)).map(parse)

As before, due to Spark’s lazy evaluation model, this command will not be executed until we perform an action on the data. The value heathrowData is of type org.apache.spark.rdd.RDD[TemperatureRecord].

<ul>
<li><i>Perform the same transformation on the value wickairportFile, to create a value wickairportData, which is of type org.apache.spark.rdd.RDD[TemperatureRecord]. NB: You will need to create an additional filter transformation to filter the lines of data in the wickairportFile that contain missing values (i.e. the records that contain “---“ as default data files). [It is recommended that you test this set of transformations using the take command.]</i>
</ul>

### Exercise 3

It is often the case that you need to sort the data in accordance with one of the fields in the data. You will now sort the data by month rather than year:

In [None]:
scala > val heathrowMonth = heathrowData.sortBy(_.month)

<ul>
<li><i>Sort the wickairportData by maximum temperature, creating a value called wickairportTemp. <b>NB: As often occurs in data analysis, you will need to clean the data first, by filtering any erroneous data points that contain "---" as the temperature values.</b><i>
</li>

### Exercise 4

You will now convert all temperature values from degrees Celsius into degrees Fahrenheit:

In [None]:
scala> val heathrowMonthFah = heathrowMonth.map( x => {
val tmax = x.tmax * 9/5 + 32
val tmin = x.tmin * 9/5 + 32
TemperatureRecord(x.year, x.month, tmax, tmin, x.rain)
})

<ul>
<li><i>Create an RDD[Float] containing the monthly rainfall in centimeters for Wick airport (the rainfall data is represented in mm in the data), sorted by tmax.</i>
</ul>

### Exercise 5

<ul>
<li><i>Create two RDDs (heathrowData and wickairportData) of type org.apache.spark.rdd.RDD[TemperatureRecord]. NB: Remember to remove header lines and missing data lines.
<li>Using the function that you have created to remove missing data (or otherwise), make a note of the year(s) and month(s) of the missing data records.</i>
</ul>

### Exercise 6

It is possible to replicate Map Reduce processing in Spark. Consider the following statement:

In [None]:
scala> val heathrowAverageRain = heathrowData.map(x => (x.year,x.rain)).aggregateByKey((0.0, 0.0))((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).mapValues(sumCount => 1.0 * sumCount._1 / sumCount._2).sortBy(_._1)

The RDD heathrowAverageRain is of type org.apache.spark.rdd.RDD[(Long, Double)], with the first element representing the year and the second element representing the average rainfall in that year. The RDD is sorted by year, as shown by the final function call.

There are three key components used in this transformation.
<ol>
<li>The map function extracts the (year, rainfall) (key,value) pairs. The output type of this transformation is org.apache.spark.rdd.RDD[(Long, Float)], with an entry per line in the data.
<li>The aggregateByKey function computes a pair of two values for each key (year); the first is the sum of rainfall, and the second is a count of the number of elements. These counts are both initialized with 0.0. The output type of the second transformation is org.apache.spark.rdd.RDD[(Long, (Double, Double))], linking the key (year) to the two aforementioned aggregated values.
<li>The third function, mapValues, computes the average rainfall for each key by combining the two Double values.
</ol>

<ul>
<li><i>Using a similar transformation to that above, compute the average monthly max temperature for both airports (heathrowAverageTMax and wickAverageTMax).</i>
</ul>

### Exercise 7

<ul>
<li><i>Using the appropriate information contained on the following webpage: http://spark.apache.org/docs/latest/programming-guide.html#transformations, join the heathrowData and wickairportData datasets (using the join operation) to create an RDD called combinedData. The output should be of type org.apache.spark.rdd.RDD[((Int, Long), (Float, Float))], where the tuple corresponds to the (year, month) and the second tuple corresponds to the (Heathrow.TMax, Wick.TMax).</i>
</ul>

### Exercise 8

It is possible to use mllib to compute basic summary statistics of the data using the following exemplar commands:

In [None]:
scala> import org.apache.spark.mllib.linalg.Vectors

In [None]:
scala> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

This is how to convert the heathrowData.rain field into a RDD[Vector]:

In [None]:
scala> val observations = heathrowData.map(_.rain).map(x => Vectors.dense(x.toDouble))

Note that the constructor for a dense vector takes an array of double values - it may be necessary to convert each Tuple record to an array of doubles to produce the required RSS[Vector]; see <a href="https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/mllib/linalg/Vectors.html">here</a>.

In [None]:
scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)

In [None]:
scala> println(summary.mean)

In [None]:
scala> println(summary.mean)

Note that <i>observations</i> is an RDD[Vector], which can be constructed by converting the input array into a dense Vector (see http://spark.apache.org/docs/latest/mllib-data-types.html)

<ul>
<li><i>Compute summary statistics for each monthly max temperature data (summary statistics for each airport), using the appropriate columns of the joined RDD (combinedData) from the previous exercise.</i>
</ul>

### Exercise 9

The following command produces the Pearson correlation coefficient for two data series (labelled seriesX and seriesY here)

In [None]:
scala> import org.apache.spark.mllib.stat.Statistics

In [None]:
scala> val correlation = Statistics.corr(seriesX, seriesY, "pearson")

<ul>
<li><i>Compute the Pearson correlation coefficient for the two average max temperature datasets computed in Exercise 2 (heathrowAverageTMax and wickAverageTMax). What does this tell you about the data?</i>
</ul>

### Exercise 10

The following commands demonstrate how to estimate the parameters of a linear regression model:

In [None]:
scala> import org.apache.spark.mllib.regression.LabeledPoint

In [None]:
scala> import org.apache.spark.mllib.regression.LinearRegressionModel

In [None]:
scala> import org.apache.spark.mllib.regression.LinearRegressionWithSGD

In [None]:
scala> import org.apache.spark.mllib.linalg.Vectors

In [None]:
scala> val data = sc.textFile("data.txt")

In [None]:
scala> val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}.cache()

Now to build the model:

In [None]:
scala> val numIterations = 100

In [None]:
scala> val model = LinearRegressionWithSGD.train(parsedData, numIterations)

Evaluate the model on training examples and compute training error:

In [None]:
scala> val valuesAndPreds = parsedData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

In [None]:
scala> val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()

In [None]:
scala> println("training Mean Squared Error = " + MSE)

<ul>
<li><i>Estimate the parameters of a linear regression model using the combined data, with max temperature for Heathrow airport as the input and max temperature for Wick as the output variable.</i>
</ul>

HINT: The default step size is too large for this particular example. It is possible to reduce the step size to a smaller value (0.01 is recommended). The train function, and its input parameters, are described <a href="https://spark.apache.org/docs/1.5.2/api/java/org/apache/spark/mllib/regression/LinearRegressionWithSGD.html#train(org.apache.spark.rdd.RDD,%20int,%20double)"> here</a>.