
# Introduction to Scala

![](/notebooks/img/scala-logo.png)

* Install instructions (OS X):

    `$ brew install scala`


* Launch REPL (shell environment) with:

    `$ scala`


* This notebook covers the basic dos and don'ts of Scala. Run with:

    `$ docker run -it -v <local dir>:/notebooks/ -p 8888:8888 aghorbani/spark-jupyter-scala`

## An object-oriented language

Let's begin by defining a simple class:

In [None]:
class Point(xc: Int, yc: Int) {  // braces define scope (not indentation)
    var x: Int = xc  // this is a member variable or *field*
    private var y: Int = yc; // a semi-colon or line-break must separate lines of code

    def move(dx: Int, dy: Int) {
        x = x + dx
        y = y + dy
        println ("Point x location : " + x);
        println (f"Point y location : $y%s"); // f interpolator for string
    }
}

We instantiate an object in a traditional way:

In [None]:
val pt = new Point(10, 20);

Calling member functions also follow a conventional syntax:

In [None]:
pt.move(10, 10)

Member variables too:

In [None]:
println(pt.x)

(but not private ones)

In [None]:
println(pt.y)

Note, Scala supports abstract classes in types called *traits*.

## Basic Syntax

### Values and variables

Variables are declared with the `var` keyword:

In [None]:
var myVar = 1;

Here Scala makes a *variable type inference*. We can also declare the data type:

In [None]:
var myVar : Int = 1;

Either way, myVar is statically-typed:

In [None]:
myVar = 2  // fine

In [None]:
myVar = "Foo" // not fine

A constant can be declared with the `val` keyword:

In [None]:
val myVal = 1;

Now it is immutable:

In [None]:
myVal = 2;

One may also define multiple variables at once:

In [None]:
val (myVar1, myVar2) = Pair(40, "Foo")

In [None]:
println(myVar1)
println(myVar2)

Note strings require double quote marks. Single quote marks indicate a *character* type.

### Logical Operators

In [None]:
val A : Boolean = true // Declare with `Boolean` type 
val B = false // or not :)

In [None]:
if(A == B){
    println("A == B") 
} else if(!A == B && A == B) {
    println("!A == B && A == B")
} else if (!A == B || A == B) {
    println("!A == B || A == B")
} else {
    println("A != B")
}

Note the `else if` / `else` keywords must lead on from the same line as the closing brace of the previous block. 

### Arrays

Arrays declared with fixed size à la C/Java (this does not exist in Python):

In [None]:
var z : Array[String] = new Array[String](3)

Elements indexed (from 0) with parantheses. Returns `null` if nothing in memory:

In [None]:
println(z(0)) ; println(z(1)) ; println(z(2))

Of course, indexing outside the allocated memory throws an exception:

In [None]:
z(3)

Notice the exception is from Java? We'll return to this later... An array does know its own length, however:

In [None]:
var nums = Array(100, 5000, 700)
var total = 0;

for (i <- 0 until nums.length) {
    total += nums(i);
}
println(total)

### Collections

Scala has several built-in collection types: Lists, Sets, Maps, Tuples, ... For Python users: Lists in Scala are *linked lists* ($\mathcal{O}(n)$ indexing). There is no built-in dynamic array type like the Python list.

A map is a hash map (dictionary in Python):

In [None]:
// A map with keys and values.
var colors = Map("red" -> "#FF0000", "blue" -> "#0000FF")

Index by key string:

In [None]:
colors("red")

Add items with the increment operator:

In [None]:
colors += ("green" -> "#00FF00")

Map object has various attributes:

In [None]:
colors.keys

### Iteration

A `for` loop 

In [None]:
for(i <- 1 to 3) { // to: inclusive upper bound
    println(i)
}

In [None]:
for(i <- 1 until 3) { // until: exclusive upper bound
    println(i)
}

The `to` and `until` keywords create a collection object called a Range:

In [None]:
1 until 3

However, the `<-`operator invokes a *generator*; it should be invoked for iteration only:

In [None]:
i <- 1 to 3

We can, of course, loop over a collection:

In [None]:
val myRange = 1 to 6

for (i <- myRange) {
    println(i)
}

One nice feature is looping with filters:

In [None]:
for (i <- myRange if i % 2 == 0; if i > 3) { // all evens above 3
    println(i)
}

Indefinite iteration works in an obvious way:

In [None]:
var i : Int = 1

while(i <= 6) {
    println(i);
    i += 1;  // i++ not supported! :(
}

Scala also graciously offers a `do while` routine! :)

In [None]:
do {
    println("I always run at least once!")
}
while (false)

## A functional language

Why functional?

* Safer (everything is scoped (no changing states) $\implies$ easier to predict/prove results


* Dervies from lambda calculus, a formal specification


* Elegant, encapsulated coding

Because it allows changes in state, Scala is not purely functional, but supports many of the tenets of functional programming. **N.B.** Other languages such as Python incorporate some functional features, as well as Java 8, C++11, ...

In Scala, a function, like a method (class member function), may be defined in the following way:

In [None]:
def addInt(a : Int, b : Int) : Int = {
    var sum:Int = 0
    sum = a + b
    return sum  // we can actually omit the return statement!
}

We can then call it in the usual way:

In [None]:
addInt(1, 1)

In Scala (as in Python, etc.), functions are `first-class citizens`, which essentially means they are objects 

### Higher-order functions

Higher-order functions permit the parameterisation of functions.

In [None]:
object Demo {
    def main(args: Array[String]) {
        println(apply(layout, 10))
    }
    
    def apply(f: Int => String, v: Int) = f(v)
    
    def layout[A](x: A) = "[" + x.toString() + "]"
}

### Anonymous functions

Anonymous functions are functions not bound to an identifier. This means they may be invoked on-the-fly, which can be allow some powerful functional behavious as we will see shortly.

In [None]:
// Assign anonymous function to variable mul
var mul = (x: Int, y: Int) => x * y
// Use mul as if a function
println(mul(3, 4))

### Nested functions

Functions with functions!

In [None]:
def factorial(i : Int): Int = {
    def fact(i : Int, accumulator : Int): Int = {
        if (i <= 1)  // no need for braces here
            accumulator  // implied return!
        else
            fact(i - 1, i * accumulator)
    }
    return fact(i, 1)
}

In [None]:
factorial(5)

### Closures

A closure is a function whose return value depends on values defined outside the function scope. For example:

In [None]:
var factor = 3  // 
val multiplier = (i:Int) => i * factor

Let's try something more interesting. First we declare a simple printing function:

In [None]:
def getText(name : String) : String = {
    return f"Hello, $name%s!"
}

println(getText("Joe"))

Now, let's say we want to decorate the output of this function. We can write a higher-order function acting as a template:

In [None]:
def getTextWithTags(f: String => String) : String => String = {
    def hector(name : String) : String = {
        return "<h1>" + f(name) + "</h1>"
    }
    return hector
}

In [None]:
var f = getTextWithTags(getText)  // returns a wrapped version of the function!
f("Joe")

We can also do this in a more concise way:

In [None]:
def getTextWithTags(f: String => String) = {  // return type inference!
    (name : String) => {  // anonymous function
        "<h1>" + f("Joe") + "</h1>" // return type inference!
    }
}

In [None]:
var f = getTextWithTags(getText)
f("Joe")

### Currying

Currying is the technique of translating the evaluation of a function that takes multiple arguments (or a tuple of arguments) into evaluating a sequence of functions, each with a single argument.

In [None]:
def strcat(s1: String)(s2: String) = s1 + s2

In [None]:
strcat("Cu")("rie")

Or rather with an anonymous function:

In [None]:
def strcat(s1: String) = (s2: String) => s1 + s2

## Practical usage

Import standard libraries

In [None]:
import scala.math
println(math.Pi)

scala build tool (sbt)

`$ brew install sbt`

Use with:

`$ sbt package`

First impressions: community support is not as good as it is for, say, Python.

## Connection to Java

* Scala compiles to Java byte code $\implies$ runs on Java VM (>= 1.8). Can also import/execute Java code (language *interoperability*).


* Scala was designed to address ceratin shortcomings of Java, in particular functional aspects, which have recently been included into Java (Java 8, 2014).

In [None]:
import java.time  // import java library
var now = time.LocalDate.now  // use seamlessly!
println(now)

## Spark

![](/notebooks/img/spark-logo.png)

Apache Spark is a cluster computing framework that runs atop distributed storage software HDFS (among others), but which offers substantial performance improvement over Hadoop. This is primarily due to the bottlenecks of writing/replicating to disk, which in practice overwhelms MapReduce computations, especially when MapReduce jobs are chained together. Spark introduces Resilient Distributed Datasets (RDDs) (superseded by Datasets in Spark 2.0+), which are shared objects stored in main memory. Spark consists of `Spark Core`, the basis of the project. `Spark SQL`, `MLlib`, `Spark Streaming`, and `GraphX` are libraries atop Spark Core, considered also to be components of Spark.

### Installation

Apart from the Docker image associated with this notebook, Spark may be installed in a non-distributed manner on any PC. With Scala installed, download any version (preferably 2.0+) from http://spark.apache.org/downloads.html and unzip. Navigate to the unzipped directory (or add to `PATH`) and launch shell with:

`$ ./bin/spark-scala`

### Getting started

Since we're using the full-blown HDFS, we first copy a local file onto the HDFS:

In [None]:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)

val srcPath = new Path("README.md")
val destPath = new Path("README.md")

hdfs.copyFromLocalFile(srcPath, destPath)

We now read this file into memory:

In [None]:
val textFile = spark.read.textFile("README.md")  // creates dataset
textFile  // print object

This creates an object called a `dataset`. This is the main programming interface of Spark in recent versions. In earlier versions (before 2.0), this was instead the `RDD` object. It appears this can still be accessed through the global Spark context object `sc`:

In [None]:
val textFile = sc.textFile("README.md")  // creates RDD
textFile  // print object

The basic functionality of RDDs and datasets are the same. We can perform the same basic operations on either:

In [None]:
textFile.count()

Print the first line:

In [None]:
textFile.first()

We can filter to return a new, reduced RDD/dataset:

In [None]:
val linesWithSpark = textFile.filter(line => line.contains("Spark"))

In [None]:
linesWithSpark.count()

The `collect()` method returns an array object:

In [None]:
var collect_to_array = linesWithSpark.collect()

for (i <- 0 until collect_to_array.length) {
    println(f"Line $i%s:")
    println(collect_to_array(i))
}

A MapReduce computation can be accomplished spectacularly with:

In [None]:
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)  // find longest line (in words)

As an aside, recall we can import Java libraries:

In [None]:
import java.lang.Math
textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))  // replace the custom code

### Map reduce

First we copy a local file onto the HDFS. Copy code taken from (https://stackoverflow.com/a/37348396).

In [None]:
val srcPath = new Path("data/hamlet.txt")
val destPath = new Path("hamlet.txt")

hdfs.copyFromLocalFile(srcPath, destPath)

Load hdfs file into dataset object with Spark:

In [None]:
val textFile = sc.textFile("hamlet.txt")  // create Dataset

Another spectacular deployment of MapReduce:

In [None]:
val wordCounts = textFile.flatMap(
    line => line.split(" ")).map(
    word => (word, 1)).reduceByKey(_ + _)

Let's quickly examine the results:

In [None]:
wordCounts.collect().sortBy(x => -x._2)  // '-' indicates descending order

### PageRank

Though the PageRank vector $\mathbf{v}$ is theoretically the principal eigenvector of the transition matrix $\mathbf{M}$, a cubic-time operation on an adjacency matrix for a graph the size of the web is out of the question. Therefore, we seek to obtain a (good) approximation by repeated ($\sim50$) matrix-vector multiplications,

$$\mathbf{v}' = \beta\mathbf{M}\mathbf{v} + (1 - \beta)\mathbf{e}/n$$

where $\beta \in (0, 1]$ is a weighting constant and $\mathbf{e}$ is a vector of ones. The transition matrix is stored in a sparse representation, and the computation is parallelised with MapReduce. Note that a matrix-vector multiplication returns a vector $\mathbf{v}'$ such that,

$$v'_i = \sum_{j=1}^N m_{ij} v_j$$

#### The ideal case

For a MapReduce matrix-vector computation, we ideally load the vector into memory at each compute node. Then, each mapper receiving a chunk of $\mathbf{M}$ emits $(i, m_{ij} v_j)$ for each $m_{ij}$. The reducer simply sums all the values it receives and emits $(i, \sum_{j=1}^N m_{ij} v_j)$.

#### The realistic case: striping

Sadly, ranking tens of billions of web sites tends to prevent storing the entire PageRank vector in memory. Therefore, we may devise a strategy based on *stripes* (groups of contiguous columns), where both the matrix and vector are partitioned into corresponding bands, stored in separate files on the DFS. Mappers receive chunks from a particular stripe, then load only the relevant stripe of the rank vector into memory. The algorithm proceeds as above.

#### The real-world case

Notice, however, that the striping approach is rather a naive solution. If we have to read the corresponding piece of the rank vector from disk for every chunk we receive, we incur a large overhead, and in the extreme case, *disk thrashing*.

A workable solution is to further parition the matrix into $k^2$ blocks of size $n/k \times n/k$. In this case, each mapper task receives one square in the partition, and one stripe in the vector. In this case, each piece of the matrix is sent only once.

Now, whereas multiple blocks contribute to each parition of the vector, they contribute only to this partition. In the former case, each stripe contributed (via at least one of its rows) to every element (hence partition) of $\mathbf{v}$. So, if each block of the matrix only receives one mapper and vice versa, the blocks are read only once.



First we load the data into HDFS:

In [None]:
val srcPath = new Path("data/links.txt")
val destPath = new Path("links.txt")
hdfs.copyFromLocalFile(srcPath, destPath)

Then we create a Spark session (the entry point to the Spark API):

In [None]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("SparkPageRank").getOrCreate()

Read the file into memory:

In [None]:
val lines = spark.read.textFile("links.txt").rdd
lines.collect()

Next we read the apply a map to create a dictionary for each entry in the dataset.

In [None]:
val link_rdd = lines.map{
    s => val parts = s.split("\\s+"); (parts(0), parts(1))}.distinct().groupByKey()

We can view the data using the RDD's `collect()` function:

In [None]:
link_rdd.collect()

To make this available in memory, we use the `cache()` function:

In [None]:
val links = link_rdd.cache()

We initialise the rank data:

In [None]:
var ranks = links.mapValues(v => 1.0)

Now we iterate (10 times):

In [None]:
val iters = 10

for (i <- 1 to iters) {
    val contribs = links.join(ranks).values.flatMap {
        case (urls, rank) => val size = urls.size; urls.map(url => (url, rank / size))
    }
    ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}

// Collect into local variable
val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))  // nice foreach() method on iterable object

// Close session
spark.stop()

### Machine learning

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. First we load some sample data onto HDFS:

In [None]:
// Create new Spark session
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("SparkPageRank").getOrCreate()

// Load data
val srcPath = new Path("data/sample_libsvm_data.txt")
val destPath = new Path("sample_libsvm_data.txt")

hdfs.copyFromLocalFile(srcPath, destPath)

The following code was retrieved from https://spark.apache.org/examples.html:

In [None]:
import org.apache.spark.ml.classification.LogisticRegression

// Load training data
val training = spark.read.format("libsvm").load("sample_libsvm_data.txt")

val lr = new LogisticRegression()
lr.setMaxIter(10)
lr.setRegParam(0.3)
lr.setElasticNetParam(0.8)

In [None]:
// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

## Credits

This tutorial is modelled after readings of the following resources:

* https://www.tutorialspoint.com/scala/

* https://spark.apache.org/docs/latest/quick-start.html