In [2]:
import $file.Qa
import Qa._

[32mimport [39m[36m$file.$ 
[39m
[32mimport [39m[36mQa._[39m

# Scala Spark: RDDs and DataFrames

## Spark Context Handler


For RDDs the central handler of a Spark session is the `SparkContext` object.

A `SparkContext` object is your handler for calling Spark functions.

In [2]:
val sc = spark.sparkContext

[36msc[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mSparkContext[39m = org.apache.spark.SparkContext@588553cb

## Creating RDDs

There are many ways to create RDD objects:
1. From list or arrays defined within the program
2. By reading from normal files
3. Reading from Hadoop HDFS
4. From the output of Hive queries
5. From the output of normal databases queries

### Lists

Below, we create RDDs from lists and arrays directly, using the `SparkContext.parallelize()` method.

In [3]:
val rdd = sc.parallelize(Array(1,2,3,4))

[36mrdd[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mInt[39m] = ParallelCollectionRDD[0] at parallelize at cmd2.sc:1

When we look at what the object we have created looks like, we see that it is different to a core scala collection.

In [4]:
rdd

[36mres3[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mInt[39m] = ParallelCollectionRDD[0] at parallelize at cmd2.sc:1

However, we can easily extract the information we stored as RDD by using the `collect()` method

In [None]:
rdd.collect()

* This *localizes* the dataset: it was distributed in an RDD, now it's here in the memory of this machine.

### Files

For standard files we can use the `textfile()` method to read data in from a specified filepath.

In [None]:
val rdd_file = sc.textFile("etc/ny-taxi/test.csv")

The method `take()` allows us to specify how many lines we wish to see from the RDD

In [None]:
rdd_file.take(2)(1)

For HDFS files we can use the same function, but a different protocol

In [None]:
// val rdd_hdfs = sc.textFile('hdfs:///path_to_file')

## Actions

RDD objects are *lazy* data structures, in that they only contain the logic to obtain results. They will generally only output results *until* an action method is called.

Suppose we have an RDD. All the methods below are action methods, in that they all produce output in a non-distributed format.

In [None]:
import scala.util.Random

val r = new Random()
val normals = Array.fill(20)(r.nextGaussian).map(2 * _ + 10).map(_.toInt)

In [None]:
val rdd = sc.parallelize(normals)

In [None]:
rdd.collect()

In [None]:
rdd.take(5)

In [None]:
rdd.top(5)

In [None]:
rdd.count()

If we wish to understand the distribution of values within the dataset, and so we use the function `countByValue()` to bucket and count each value.

In [None]:
val hist = rdd.countByValue()

In [None]:
val x = hist.keySet.toSeq
val y = hist.values.toSeq

In [None]:
import plotly._, plotly.Almond._

plot(Seq(Bar(x,y)))

### Reduction

The below methods all perform reductive operations upon the RDD structure, in that the output of each is a single, unitary object

In [None]:
rdd.sum()

In [None]:
rdd.min()

In [None]:
rdd.max()

In [None]:
rdd.mean()

In [None]:
rdd.reduce((t, e) => t + 2 * e)

In [None]:
rdd.fold(1)(_ * _)

In [None]:
Qa.dir(rdd)

## Transformations

Transformations on the RDD are given as functions to .map

In [None]:
rdd.collect().slice(0, 5)

In [None]:
rdd.map( 2 * _ ).collect().slice(0, 5)

In [None]:
rdd.flatMap(x => Array(x-0.1, x+0.1)).collect().slice(0, 5)

In [None]:
rdd.filter(_ % 2 == 0).collect().slice(0, 5)

In [None]:
rdd.distinct().collect()

In [None]:
// take random samples; sample half of the rdd, with values not replaced
rdd.sample(false, 0.5).collect()

## Example

In the below example, we demonstrate a number of Spark functions by loading in a dataset and extracting information from it.

In [None]:
val rsp = sc.textFile("etc/ny-taxi/test.csv")

val parse = (line: String) => line.trim().split(",")
    
val header = parse(rsp.take(1)(0))

After reading in the file, we look to isolate the third column. To do this, we write a lambda function which discretises the input and isolates the column. We then apply this to the RDD by using the `map()` function.

In [None]:
rsp.map( row => parse(row)(2) ).take(10)

## Key-Value RDDs

Key-value RDDs contain pairs of values for each item in the collection, again, distributed across multiple nodes.

The first element of each tuple is called the "key", and the second the "value".

**Note**: These should not be confused with a  Map. Key-value RDDs are permitted repeat/duplicate keys, whereas Maps are not.

Below, we instantiate a key-value RDD.

In [None]:
val kv = sc.parallelize(
    Array(
        ("Alice", 5),
        ("Bob", 2),
        ("Charlie", 3),
        ("Alice", 2),
        ("Charlie", 1)
    )
)
kv.collect()

### Special Actions

There are a number of actions we can perform which are specific to key-value RDD structures. We demonstrate a number of these below.

If we wish to consolidate those keys which are not unique, and combine the values, then we can use the `reduceByKey()`

In [None]:
kv.reduceByKey(_ + _ ).collect()

If we want to sort the k-v RDD by key, we can use `sortByKey()`. If we wish to sort by value, we can use `sortBy()` and specify a lambda function.

In [None]:
kv.sortByKey().collect()

In [None]:
kv.sortBy(_._1).collect()

If we want to create a collection of keys, we use `keys()`, if we want the values, we use `values()`.

In [None]:
kv.keys.collect()

In [None]:
kv.values.collect()

### Transforming Values

Below, we demonstrate the `mapValues()`/`flatMapValues()` , which works in the same way as map on all values of the k-v RDD.

In [None]:
kv.mapValues(_ * 2).collect()

In [None]:
kv.mapValues(x => (x, x*2)).collect()

In [None]:
kv.flatMapValues(x => Array(x, x*2)).collect()

# Spark SQL DataFrame

Spark allows one to create DataFrames similar to once provided by Pandas, and allows SQL to be performed on them.  And of course everything, the DataFrame, and the SQL operations are stored and performed on the distributed cluster.

## SQLContext

To use Spark SQL features, we need a dedicated SQL context handler. This serves as the point of call for all SQL related operations, and can be instantiated using the Spark context we already have.

An `SQLContext` can also be created directly from a SparkSession -- which is the preffered method for the newer versions.

## Creating a DataFrame from a SparkSession

Aside: Spark uses long chains of method calls which, for clarity, are conventionally placed on their own lines. 

In [None]:
import org.apache.spark.sql._

val spark = SparkSession.builder()
    .master("local[*]")
    .getOrCreate()

## .read

Spark SQL provdies `spark.read` which will parse and distribute your data if its in one of its supported formats (eg., csv, json, parquet, etc.). 

If your file contains its own schema (eg., a csv with a header row), spark can use it to structure your data appropriately. If it does not you can manually define one, as below. 

In [None]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata

val schema_item = StructType(Array(
  StructField("UserID", LongType),
  StructField("Title", StringType),
))

val dfi = (
  spark
  .read
  .schema(schema_item)
  .option("header", "false")
  .option("delimiter", "|")
  .csv("etc/ml-100k/u.item")
)

dfi.show(3)

## Querying Spark DataFrame

In [None]:
val df = (
  spark
  .read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("etc/responses.csv")
)


In [None]:
df.select("Music", "Internet").show(5)

In [None]:
df.columns.slice(40, 50)

In [None]:
df.select("Music", "Internet").show(5)

In [None]:
import spark.implicits._

df
    .select("Music", "Internet")
    .filter($"Music" > 2)
    .show(5)

In [None]:
import org.apache.spark.sql.functions.{desc, asc}

df
    .select("Music", "Internet")
    .filter($"Music" > 2)
    .orderBy(desc("Internet"))
    .show(5)

In [None]:
df
    .select("Music", "Internet", "Physics", "Religion")
    .filter($"Music" > 2)
    .groupBy($"Religion")
    .mean()
    .orderBy($"Religion")
    .show()

    SELECT AVG(Physics), STDDEV(Religion), COUNT(Religion)
    FROM results WHERE Music > 2 
    GROUP BY Physics, Religion
    WHERE Religion >0
    ORDER BY Religion ASC

In [None]:
import org.apache.spark.sql.functions._

val results = ( 
    df
    .select("Physics", "Religion")
    .filter($"Music" > 2)
    .groupBy("Religion")
    .agg(
        mean("Physics").alias("Epx"),
        stddev("Physics").alias("Spx"),
        count("Religion").alias("Nr")
    )
    .filter($"Religion" > 0)
    .orderBy("Religion")
)

In [None]:
results.show()

In [None]:
val x = results.select("Religion").as[Int].collect().toSeq
val y = results.select("Epx").as[Double].collect().toSeq


In [None]:
plot(Seq(Bar(x, y)))

In [None]:
import plotly.element._, plotly.layout._

lazy val layout = Layout(
  title = "R vs P"
)

plot(Seq(Scatter(x,y, mode = ScatterMode(ScatterMode.Markers))), layout)

## Stopping

Last, but not least, we stop the SparkContext object, much in the same way we would close a connection to a file.

In [None]:
// sc.stop()