## Using SparkR with Anaconda

Setup:

1. Install r-essentials on all cluster nodes via `acluster conda install r-essentials -c r`

1. Install IRkernel on head node (https://github.com/IRkernel/IRkernel)

1. Add symbolic link from /usr/bin/Rscript to /opt/anaconda/bin/Rscript on all nodes

### Set path and import SparkR library

In [1]:
Sys.setenv(SPARK_HOME='/opt/anaconda/share/spark')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))

In [2]:
library(SparkR)


Attaching package: 'SparkR'

The following objects are masked from 'package:stats':

    filter, na.omit

The following objects are masked from 'package:base':

    intersect, rbind, sample, subset, summary, table, transform



### Initialize the SparkContext

In [3]:
sc <- sparkR.init("spark://ip-172-31-9-200:7077")

Launching java with spark-submit command /opt/anaconda/share/spark/bin/spark-submit   sparkr-shell /tmp/Rtmp7CVgW6/backend_port226960341be9 


In [4]:
sqlContext <- sparkRSQL.init(sc)

### Create a SparkR DataFrame using the `faithful` dataset from R

In [5]:
df <- createDataFrame(sqlContext, faithful) 

In [6]:
df

DataFrame[eruptions:double, waiting:double]

In [7]:
head(select(df, df$eruptions))

Unnamed: 0,eruptions
1,3.6
2,1.8
3,3.333
4,2.283
5,4.533
6,2.883


In [8]:
head(filter(df, df$waiting < 50))

Unnamed: 0,eruptions,waiting
1,1.75,47
2,1.75,47
3,1.867,48
4,1.75,48
5,2.167,48
6,2.1,49


In [9]:
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))

Unnamed: 0,waiting,count
1,81,13
2,60,6
3,93,2
4,68,1
5,47,4
6,80,8


In [10]:
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Unnamed: 0,waiting,count
1,78,15
2,83,14
3,81,13
4,77,12
5,82,12
6,79,10


In [11]:
df$waiting_secs <- df$waiting * 60
head(df)

Unnamed: 0,eruptions,waiting,waiting_secs
1,3.6,79,4740
2,1.8,54,3240
3,3.333,74,4440
4,2.283,62,3720
5,4.533,85,5100
6,2.883,55,3300


### Load JSON data from HDFS

In [12]:
people <- read.df(sqlContext, "mock_data.json", "json")

In [13]:
count(people)

In [14]:
head(people)

Unnamed: 0,email,first_name,gender,id,ip_address,last_name
1,areyes0@home.pl,Arthur,Male,1,163.138.188.169,Reyes
2,smorales1@scientificamerican.com,Samuel,Male,2,237.78.10.190,Morales
3,jfisher2@samsung.com,Julia,Female,3,2.149.116.117,Fisher
4,kreid3@prlog.org,Kathy,Female,4,14.165.16.179,Reid
5,jjackson4@addthis.com,Jeremy,Male,5,70.140.102.156,Jackson
6,emills5@google.co.uk,Ernest,Male,6,182.51.21.159,Mills


In [15]:
printSchema(people)

root
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- last_name: string (nullable = true)


### Running SQL queries from SparkR

In [16]:
registerTempTable(people, "people")

In [17]:
males <- sql(sqlContext, "SELECT first_name FROM people WHERE gender = 'Male'")
head(males)

Unnamed: 0,first_name
1,Arthur
2,Samuel
3,Jeremy
4,Ernest
5,Louis
6,William


In [18]:
females <- sql(sqlContext, "SELECT first_name FROM people WHERE gender = 'Female'")
head(females)

Unnamed: 0,first_name
1,Julia
2,Kathy
3,Ann
4,Debra
5,Rose
6,Kathleen


### Machine learning

In [19]:
df <- createDataFrame(sqlContext, iris)

In FUN(X[[i]], ...): Use Petal_Width instead of Petal.Width  as column name

In [20]:
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

In [21]:
summary(model)

Unnamed: 0,Estimate
(Intercept),2.251393
Sepal_Width,0.8035609
Species__versicolor,1.458743
Species__virginica,1.946817


In [22]:
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))

Unnamed: 0,Sepal_Length,prediction
1,5.1,5.063856
2,4.9,4.662076
3,4.7,4.822788
4,4.6,4.742432
5,5.0,5.144212
6,5.4,5.385281
