## Introduction

This notebook is a sparkR tutorial.

The examples in this notebook are taken from: http://spark.apache.org/docs/2.0.0/sparkr.html

Ensure you refer to the sparkR API for the function defintions.

## Creating SparkDataFrames
With a SparkSession, applications can create SparkDataFrames from a local R data frame, from a Hive table, or from other data sources.

### From local data frames
The simplest way to create a data frame is to convert a local R data frame into a SparkDataFrame. Specifically we can use as.DataFrame or createDataFrame and pass in the local R data frame to create a SparkDataFrame. As an example, the following creates a SparkDataFrame based using the faithful dataset from R.

In [None]:
df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)

### SparkDataFrame Operations
SparkDataFrames support a number of functions to do structured data processing. Here we include some basic examples and a complete list can be found in the API docs:

#### Selecting rows, columns

In [None]:
# Select only the "eruptions" column
head(select(df, df$eruptions))

In [None]:
# You can also pass in column name as strings
head(select(df, "eruptions"))

In [None]:
# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

#### Grouping, Aggregation
SparkR data frames support a number of commonly used functions to aggregate data after grouping. For example we can compute a histogram of the waiting time in the faithful dataset as shown below

In [None]:
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))

In [None]:
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

#### Operating on Columns
SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.

In [None]:
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)

#### Applying User-Defined Function
In SparkR, we support several kinds of User-Defined Functions:

Run a given function on a large dataset using dapply or dapplyCollect

##### dapply


Apply a function to each partition of a SparkDataFrame. The function to be applied to each partition of the SparkDataFrame and should have only one parameter, to which a data.frame corresponds to each partition will be passed. The output of function should be a data.frame. Schema specifies the row format of the resulting a SparkDataFrame. It must match to data types of returned value.

In [None]:
# reload the dataframe
df <- as.DataFrame(faithful)

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

##### dapplyCollect

Like dapply, apply a function to each partition of a SparkDataFrame and collect the result back. The output of function should be a data.frame. But, Schema is not required to be passed. Note that dapplyCollect can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.

In [None]:
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

#### Run a given function on a large dataset grouping by input column(s) and using gapply or gapplyCollect

##### gapply

Apply a function to each group of a SparkDataFrame. The function is to be applied to each group of the SparkDataFrame and should have only two parameters: grouping key and R data.frame corresponding to that key. The groups are chosen from SparkDataFrames column(s). The output of function should be a data.frame. Schema specifies the row format of the resulting SparkDataFrame. It must represent R function’s output schema on the basis of Spark data types. The column names of the returned data.frame are set by user. Below is the data type mapping between R and Spark.

In [None]:
# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##### gapplyCollect

Like gapply, applies a function to each partition of a SparkDataFrame and collect the result back to R data.frame. The output of the function should be a data.frame. But, the schema is not required to be passed. Note that gapplyCollect can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.



In [None]:
# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

#### Run local R functions distributed using spark.lapply

##### spark.lapply

Similar to lapply in native R, spark.lapply runs a function over a list of elements and distributes the computations with Spark. Applies a function in a manner that is similar to doParallel or lapply to elements of a list. The results of all the computations should fit in a single machine. If that is not the case they can do something like df <- createDataFrame(list) and then use dapply

In [None]:
# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

### Running SQL Queries from SparkR
A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. The sql function enables applications to run SQL queries programmatically and returns the result as a SparkDataFrame.

In [None]:
# First let's write a json file locally
fileConn<-file("people.json")
writeLines('{"name":"Michael"}\n{"name":"Andy", "age":30}\n{"name":"Justin", "age":19}', fileConn)
close(fileConn)

In [None]:
# Load a JSON file
people <- read.df("./people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)

## Machine Learning
SparkR supports the following machine learning algorithms currently: Generalized Linear Model, Accelerated Failure Time (AFT) Survival Regression Model, Naive Bayes Model and KMeans Model. Under the hood, SparkR uses MLlib to train the model. Users can call summary to print a summary of the fitted model, predict to make predictions on new data, and write.ml/read.ml to save/load fitted models. SparkR supports a subset of the available R formula operators for model fitting, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘.

### Algorithms
#### Generalized Linear Model
spark.glm() or glm() fits generalized linear model against a Spark DataFrame. Currently “gaussian”, “binomial”, “poisson” and “gamma” families are supported.

In [None]:
irisDF <- suppressWarnings(createDataFrame(iris))
# Fit a generalized linear model of family "gaussian" with spark.glm
gaussianDF <- irisDF
gaussianTestDF <- irisDF
gaussianGLM <- spark.glm(gaussianDF, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")

# Model summary
summary(gaussianGLM)

# Prediction
gaussianPredictions <- predict(gaussianGLM, gaussianTestDF)
showDF(gaussianPredictions)

# Fit a generalized linear model with glm (R-compliant)
gaussianGLM2 <- glm(Sepal_Length ~ Sepal_Width + Species, gaussianDF, family = "gaussian")
summary(gaussianGLM2)

# Fit a generalized linear model of family "binomial" with spark.glm
binomialDF <- filter(irisDF, irisDF$Species != "setosa")
binomialTestDF <- binomialDF
binomialGLM <- spark.glm(binomialDF, Species ~ Sepal_Length + Sepal_Width, family = "binomial")

# Model summary
summary(binomialGLM)

# Prediction
binomialPredictions <- predict(binomialGLM, binomialTestDF)
showDF(binomialPredictions)

##### Accelerated Failure Time (AFT) Survival Regression Model
spark.survreg() fits an accelerated failure time (AFT) survival regression model on a SparkDataFrame. Note that the formula of spark.survreg() does not support operator ‘.’ currently.

In [None]:
# Use the ovarian dataset available in R survival package
library(survival)

# Fit an accelerated failure time (AFT) survival regression model with spark.survreg
ovarianDF <- suppressWarnings(createDataFrame(ovarian))
aftDF <- ovarianDF
aftTestDF <- ovarianDF
aftModel <- spark.survreg(aftDF, Surv(futime, fustat) ~ ecog_ps + rx)

# Model summary
summary(aftModel)

# Prediction
aftPredictions <- predict(aftModel, aftTestDF)
showDF(aftPredictions)

##### Naive Bayes Model
spark.naiveBayes() fits a Bernoulli naive Bayes model against a SparkDataFrame. Only categorical data is supported.

In [None]:
# Fit a Bernoulli naive Bayes model with spark.naiveBayes
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
nbDF <- titanicDF
nbTestDF <- titanicDF
nbModel <- spark.naiveBayes(nbDF, Survived ~ Class + Sex + Age)

# Model summary
summary(nbModel)

# Prediction
nbPredictions <- predict(nbModel, nbTestDF)
showDF(nbPredictions)

##### KMeans Model
spark.kmeans() fits a k-means clustering model against a Spark DataFrame, similarly to R’s kmeans().

In [None]:
# Fit a k-means model with spark.kmeans
irisDF <- suppressWarnings(createDataFrame(iris))
kmeansDF <- irisDF
kmeansTestDF <- irisDF
kmeansModel <- spark.kmeans(kmeansDF, ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width,
                            k = 3)

# Model summary
summary(kmeansModel)

# Get fitted result from the k-means model
showDF(fitted(kmeansModel))

# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
showDF(kmeansPredictions)

#### Model persistence
The following example shows how to save/load a MLlib model by SparkR.

In [None]:
irisDF <- suppressWarnings(createDataFrame(iris))
# Fit a generalized linear model of family "gaussian" with spark.glm
gaussianDF <- irisDF
gaussianTestDF <- irisDF
gaussianGLM <- spark.glm(gaussianDF, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
showDF(gaussianPredictions)

unlink(modelPath)
