# Exploratory Data Analysis and Modeling with R and Spark

This module describes how to do exploratory data analysis with R and Spark using the `sparklyr` package, and Microsoft R Server.

## Copying Library Over from HDFS

Before we get started, let's make sure we have the necessary libraries. If you saved your user library from the edge node over to HDFS, we can copy that over to our head node's user library.

In [None]:
.libPaths()

In [None]:
list.files(.libPaths()[1])

In [None]:
rxHadoopListFiles("/Rlib/3.3")

In [None]:
rxHadoopCopyToLocal("/Rlib/3.3/*", .libPaths()[1])

In [None]:
list.files(.libPaths()[1])

# Create Spark Context

The `sparklyr` package has a handy function for creating a Spark context. This differs from the method that is used by the `SparkR` package.

There seems to be some error when loading the namespace of the copied version of `sparklyr`, probably due to some environmental variables. Let's reinstall the package.

In [None]:
r <- getOption('repos')
# set mirror to something a bit more recent
mran_date <- Sys.Date() - 1
r[["CRAN"]] <- paste0("https://mran.revolutionanalytics.com/snapshot/", mran_date)
options(repos = r)

In [None]:
install.packages("sparklyr")

In [None]:
library(sparklyr)

## D13V2, 56 gigs of ram, 8 cores of memory
## 6/2 = 3 executors with two cores each
## 54/3 = 18 gigs of ram per executor

conf <- spark_config()
conf$spark.executor.instances <- 12
conf$
conf$'sparklyr.shell.executor-memory' <- "17g"
conf$'sparklyr.shell.driver-memory' <- "17g"
conf$spark.executor.cores <- 2
conf$spark.executor.memory <- "17G"
conf$spark.yarn.am.cores  <- 2
conf$spark.yarn.am.memory <- "1G"
conf$spark.dynamicAllocation.enabled <- "false"

sc <- spark_connect(master = "yarn-client", config = conf)

## Create Spark DataFrame

Check the existing tables in the Hive metastore of our cluster.

In [None]:
src_tbls(sc)

Create a path to the downloaded taxi dataset.

In [None]:
wasb_taxi <- "/NYCTaxi/sample"
# rxHadoopListFiles("/")
# rxHadoopMakeDir(wasb_taxi)
# rxHadoopCopyFromLocal("taxi_large.csv", wasb_taxi)
# rxHadoopCommand("fs -cat /NYCTaxi/sample/taxi_large.csv | head")


In [None]:
system.time(taxi <- spark_read_csv(sc,
                       path = wasb_taxi,
                       "taxisample",
                       header = TRUE))

Now if we check, we should see the taxi dataset in our Hive metastore.

In [None]:
src_tbls(sc)

## Exploratory Data Analysis

### Feature Engineering -- Add Tip Column

The `sparklyr` package can be used in conjuction with the `dplyr` package. It basically uses SparkSQL/HiveSQL to convert `dplyr` functions into `SQL` equivalents that can be delivered to Spark SQL and the Catalyst optimizer. The reason this works is that SQL DataFrames created by the `sparklyr` package all inherit the *tbl_sql* class.

In [None]:
library(dplyr)
class(taxi)
sample_taxi <- mutate(taxi, tip_pct = tip_amount/fare_amount)
explain(sample_taxi)


Same exact syntax as before, but this time, all computation takes place in Spark.


### Create summary functions

Just as before, we can create aggregation functions. Here's the same function from before, that worked with `data.frames` and with `xdfs` using the `dplyrXdf` package.

In [None]:

taxi_hood_sum <- function(taxi_data = taxi_df, ...) {
  
  load(url("http://alizaidi.blob.core.windows.net/training/manhattan.RData"))
  
  taxi_data %>% 
    filter(pickup_nhood %in% manhattan_hoods,
           dropoff_nhood %in% manhattan_hoods, ...) %>% 
    group_by(dropoff_nhood, pickup_nhood) %>% 
    summarize(ave_tip = mean(tip_pct), 
              ave_dist = mean(trip_distance)) %>% 
    filter(ave_dist > 3, ave_tip > 0.05) -> sum_df
  
  return(sum_df)
  
}


## Visualize Summarized Data

We can visualize our summarized data above using our favorite R visualization tools. In this case, our summarized dataset is small enough to fit in-memory of our local R driver. We'll use the `collect` function to convert the Spark DataFrame into an R `data.frame`. Let's check and make sure our data is small enough to fit comfortably in-memory of our local context.

In [None]:
library(dplyr)
count(ungroup(taxi_hood_sum(sample_taxi)))

We'll use a wrapper function to create our plot, taking in the collected R data.frame and visualizing it using `ggplot2`.

In [None]:

tile_plot_hood <- function(df = taxi_hood_sum()) {
  
  library(ggplot2)
  
  ggplot(data = df, aes(x = pickup_nhood, y = dropoff_nhood)) + 
    geom_tile(aes(fill = ave_tip), colour = "white") + 
    theme_bw() +
    theme(axis.text.x = element_text(angle = 45, hjust = 1),
          legend.position = 'bottom') +
    scale_fill_gradient(low = "white", high = "steelblue") -> gplot
  
  return(gplot)
}

### Calculate Summary, Collect Results, Plot... Profit

We can chain together our results. We can use the `collect` function to bring the aggregated/summarized dataset into memory, so `ggplot2` can understand it.


In [None]:
taxi_summary <- taxi_hood_sum(sample_taxi)
class(taxi_summary)

taxi_df <- taxi_summary %>% collect

taxi_df
g <- tile_plot_hood(taxi_df)

In [None]:
g

In [None]:
IRdisplay::display_html(repr::repr_html(ggplotly(g)))

## Machine Learning Pipelines with Spark 

Since Spark 1.3, there has been much interest in creating a simple, interactive machine learning pipeline that represents a full data science application from start to end. The `mllib` package was created to address this need.

Looks very similar to the python `scikit-learn` package. For more of the available functionality, refer to [sparklyr-ml](http://spark.rstudio.com/mllib.html) website.


### Create Training and Split

In addition to modeling functions, there are numerous `mllib` functions for pre-processing and transformations. Here's well use the partition function to split our data into training and test sets.

In [None]:
partitions <- sample_taxi %>%
  sdf_partition(training = 0.75, test = 0.25, seed = 1099)

str(partitions)

### Fit a Linear Regression Model

We'll now train a simple linear regression model (one feature variable) on our training dataset.

In [None]:

system.time(
    fit <- partitions$training %>% 
      filter(tip_pct < 0.5) %>% 
      ml_linear_regression(response = "tip_pct", features = c("trip_distance"))
)
fit

## Plot Results

This may not work for larger datasets, so we'll sample, and for our current example we can simply select the two columns we used in our model, and collect them into local memory for visualization.

In [None]:

library(ggplot2)

partitions$test %>%
  select(tip_pct, trip_distance) %>%
  filter(tip_pct < 0.5) %>% 
  sample_n(10^5) %>% 
  collect %>%
  ggplot(aes(trip_distance, tip_pct)) +
  geom_point(size = 2, alpha = 0.5) +
  geom_abline(aes(slope = coef(fit)[["trip_distance"]],
                  intercept = coef(fit)[["(Intercept)"]]),
              color = "red") +
  scale_y_continuous(label = scales::percent) +
  labs(
    x = "Trip Distance in Miles",
    y = "Tip Percentage (%)",
    title = "Linear Regression: Tip Percent ~ Trip Distance",
    subtitle = "Spark.ML linear regression to predict tip percentage."
  )


## Classification Tree

Try out a binary classification model using the ensemble tree algoritms. 

Let's first create a binary column to use as our response variable:

In [None]:

taxi_binary <- sample_taxi %>%
  ft_binarizer(input_col = "tip_pct",
               output_col = "good_tip",
               threshold = 0.1)

partitions <- taxi_binary %>%
  sdf_partition(training = 0.75, test = 0.25, seed = 1099)


Train a single decision tree.

In [None]:

system.time(
    fit_dtree <- partitions$training %>% 
      ml_decision_tree(response = "good_tip", 
                       features = c("payment_type", "passenger_count", "trip_distance"), 
                       type = "classification")

)

Let's try predicting with the fitted decision tree model:


In [None]:
score_dtree <- sdf_predict(fit_dtree, partitions$test)


In [None]:
ml_tree_feature_importance(sc, fit_dtree)

In [None]:
ml_tree_feature_importance(sc, fit_dtree) %>%
    mutate(importance = as.numeric(as.character(importance))) %>%
    ggplot(aes(x = feature, y = importance)) +
        geom_bar(stat = 'identity') +
        theme_minimal() +
        coord_flip()

That was for a single tree. Let's try to train a ensemble tree using the random forest function:


In [None]:
system.time(
    fit_dforest <- partitions$training %>% 
      ml_random_forest(response = "good_tip", 
                       features = c("payment_type", "passenger_count", "trip_distance"), 
                       type = "classification"))

In [None]:
score_dforest <- sdf_predict(fit_dforest, partitions$test)

## Create a Confusion Matrix

Now that we have our predicted results, we could create a confusion matrix of our predictions vs the actuals.

If we want to work entirely with Spark DataFrames, we can use the `sdf_predict` function and then group_by the predictions and actual values:

In [None]:
sdf_conf_df <- function(predictions = score_dtree) {
  
  
  conf_sdf <- predictions %>% group_by(prediction, good_tip) %>% tally()
  
  return(conf_sdf)
  
}

sdf_conf_df()

In [None]:
library(tidyr)

conf_spread <- . %>% spread(key = "good_tip", value  = "n")

dtree_conf <- sdf_conf_df() %>% collect %>% conf_spread

# create ratios for confusion matrix
dtree_conf <- dtree_conf[, 2:3]
dtree_conf/sum(dtree_conf)

In [None]:

dtree_auc <- ml_binary_classification_eval(predicted_tbl_spark = score_dtree, 
                                            label = "good_tip", score = "probability")

dforest_auc <- ml_binary_classification_eval(predicted_tbl_spark = score_dforest, 
                                             label = "good_tip", score = "probability")


### metric choices: c("f1", "precision", "recall", "weightedPrecision", "weightedRecall", "accuracy")

dforest_accuracy <- ml_classification_eval(predicted_tbl_spark = score_dforest, 
                                           label = "good_tip", 
                                           predicted_lbl = "prediction", 
                                           metric = "accuracy")

dtree_auc
dforest_auc
dforest_accuracy


Let's see how the feature variables effected our predictions.

In [None]:
feature_importance <- ml_tree_feature_importance(sc, fit_dforest) %>%
    mutate(importance = as.numeric(levels(importance))[importance]) %>%
    mutate(feature = as.character(feature))

In [None]:
feature_importance %>%
    mutate(importance = as.numeric(as.character(importance))) %>%
    ggplot(aes(x = feature, y = importance)) +
        geom_bar(stat = 'identity') +
        theme_minimal() +
        coord_flip()

## Modeling with RxSpark

This section shows how to use the `RxSpark` compute context for modeling.

# Locate RevoShare dir

Every MRS installation on a HDFS environment creates a share directory on HDFS. By default, each user will have her own shared directory under the `/user/RevoShare/` file path.

In [None]:

rxHadoopListFiles("/user/RevoShare/")
username <- "sshuser"
data_path <- file.path("/user/RevoShare", username)

In [None]:
rxHadoopListFiles("/user/RevoShare/sshuser/taxiTextXdf")

# Saving the Spark DataFrame to CSV

The `RxSpark` and the Spark Compute contexts are completely distinct compute environments. In order to use the `rx` functions, we need to move the Spark DataFrame into a format that MRS can understand.


### Write Sample Taxi to RevoShare 

In [None]:
library(sparklyr)

spark_write_csv(taxi_binary, 
                path = file.path(data_path, 'sampleTaxi'))

# spark_write_parquet(taxi_binary,
#                     path = file.path(data_path, "taxiParquet"))


Since we've saved our Spark DataFrame from the `sparklyr` application to persistent storage, we can shut down our `sparklyr` connection.

In [None]:
spark_disconnect(sc)

Writing to HDFS adds a directory giving a flag for the completion rate of our write job. Whenever it completes successfully, we'd like to delete the _SUCCESS_ flag.

In [None]:

rxHadoopListFiles(file.path(data_path, "sampleTaxi"))
file_to_delete <- file.path(data_path, 
                            "sampleTaxi", "_SUCCESS")
delete_command <- paste("fs -rm", file_to_delete)
rxHadoopCommand(delete_command)

## Create HDFS and Spark Contexts for Revo

Let's create the pointers to the file paths and HDFS to use the `RxSpark` compute context. We'll need a pointer to our `inData` object, which is the CSV file in the `sampleTaxi` directory, as well as a pointer to an XDF file we will save our results to (the `outFile` argument).

In [None]:

myNameNode <- "default"
myPort <- 0
hdfsFS <- RxHdfsFileSystem()

taxi_text <- RxTextData(file.path(data_path,
                                  "sampleTaxi"),
                        fileSystem = hdfsFS)

# taxi_parquet <- RxParquetData(file.path(data_path, "taxiParquet"),
#                               fileSystem = hdfsFS)

taxi_xdf <- RxXdfData(file.path(data_path, "taxiTextXdf"),
                      fileSystem = hdfsFS)


### Create RxSpark Compute Context

In [None]:
computeContext <- RxSpark(consoleOutput=TRUE,
                          nameNode=myNameNode,
                          port=myPort, 
                          numExecutors = 12,
                          executorCores=2, 
                          executorMem = "12g", 
                          executorOverheadMem = "5g", 
                          persistentRun = TRUE, 
                          extraSparkConfig = "--conf spark.speculation=true")

rxSetComputeContext(computeContext)

Import CSV file into XDF File. 

In [None]:
col_classes <- c('VendorID' = "factor",
                 'passenger_count' = "integer",
                 'trip_distance' = "numeric",
                 'RateCodeID' = "factor",
                 'store_and_fwd_flag' = "factor",
                 'payment_type' = "factor",
                 'fare_amount' = "numeric",
                 'tip_amount' = "numeric",
                 'tolls_amount' = "numeric",
                 'pickup_hour' = "factor",
                 'pickup_dow' = "factor", 
                 'dropoff_hour' = "factor",
                 'dropoff_dow' = "factor",
                 'pickup_nhood' = "factor",
                 'dropoff_nhood' = "factor",
                 'kSplits' = "factor",
                 'tip_pct' = "numeric",
                 'good_tip' = "factor")

system.time(
    rxImport(inData = taxi_text, 
         taxi_xdf, 
         overwrite = TRUE, colClasses = col_classes)

)

In [None]:
rxGetInfo(taxi_xdf, getVarInfo = TRUE)

## Visualizations with MRS

Let's use MRS to summarize our data. We'll use the `rxCube` function to create a tabulation of average tips for each combination of pickup day of week and pickup hour. This gives us a sense of the temporal distribution of the tip percent variable.

In [None]:

tip_dist_df <- rxCube(tip_pct ~ pickup_hour + pickup_dow, 
                      data = taxi_xdf, returnDataFrame = TRUE)

library(ggplot2)
library(magrittr)

tip_dist_df %>% ggplot(aes(x = pickup_hour, y = pickup_dow, fill = tip_pct)) +
  geom_tile() + theme_minimal() + 
  scale_fill_continuous(label = scales::percent) +
  labs(x = "Pickup Hour", y = "Pickup Day of Week", fill = "Tip Percent",
      title = "Distribution of Tip Percents",
      subtitle = "Do Passengers Tip More in the AM?")


## Modeling with MRS

### Creating Linear Models

Let's predict tip_pct as a function of distance and neighborhoods. In order to ensure that the neighbhorhood columns are treated as categorical, we need them to factors. `RevoScaleR` and the `RxSpark` compute context are more picky about factor types than base R models, since they utilize data that is chunked and stored in distributed file systems. 


In [None]:
system.time(linmod <- rxLinMod(tip_pct ~ pickup_nhood + pickup_hour + trip_distance, 
                               data = taxi_xdf, 
                               cube = TRUE))

The `cube` argument parallelizes training over the levels of the categorical feature variables. In our case, this means we'll train, in parallel, the coefficients for the various pickup neighborhoods and pickup hour combinations.

In [None]:
system.time(dtree <- rxDTree(good_tip ~ pickup_nhood + pickup_hour + trip_distance, 
                               data = taxi_xdf, minSplit = 10))

In [None]:
library(RevoTreeView)
plot(createTreeView(dtree))
IRdisplay::display_html((plot(createTreeView(dtree))))