# SparkR

In [1]:
library(ggplot2)
library(SparkR)
library(magrittr)

# Launch SparkR in local mode with 1GB of memory allocated to it
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "1g"))


Attaching package: ‘SparkR’


The following object is masked from ‘package:ggplot2’:

    expr


The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window


The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union



Attaching package: ‘magrittr’


The following object is masked from ‘package:SparkR’:

    not


Spark package found in SPARK_HOME: /usr/local/spark



Launching java with spark-submit command /usr/local/spark/bin/spark-submit   --driver-memory "1g" sparkr-shell /tmp/RtmpgVkZRK/backend_port6434c9e3ca 


Java ref type org.apache.spark.sql.SparkSession id 1 

In [34]:
#copy data from an R data.frame or tibble to Spark.
diamonds_in_spark <- as.DataFrame(diamonds, numPartitions=8)

In [3]:
diamonds_in_spark[1,2] #error because spark dataframes are immutable

ERROR: Error in diamonds_in_spark[1, 2]: Expressions other than filtering predicates are not supported in the first parameter of extract operator [ or subset() method.


In [26]:
#transferring data back to R data.frame
diamonds_back_in_R <- collect(diamonds_in_spark)

In [35]:
head(diamonds_in_spark)  #to see the first few raw of the dataframe

Unnamed: 0_level_0,carat,cut,color,clarity,depth,table,price,x,y,z
Unnamed: 0_level_1,<dbl>,<chr>,<chr>,<chr>,<dbl>,<dbl>,<int>,<dbl>,<dbl>,<dbl>
1,0.23,Ideal,E,SI2,61.5,55,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57,336,3.94,3.96,2.48


In [5]:
write.df(diamonds_in_spark, 'sparkdiamonds.csv', mode='overwrite') # Export data to CSV
diamonds_in_spark_2 <- read.df('sparkdiamonds.csv', 'csv') # Read data back into Spark

## Data wrangling

In [33]:
head(large_diamonds_in_spark)

Unnamed: 0_level_0,carat,cut,color,clarity,depth,table,price,x,y,z
Unnamed: 0_level_1,<dbl>,<chr>,<chr>,<chr>,<dbl>,<dbl>,<int>,<dbl>,<dbl>,<dbl>
1,1.17,Very Good,J,I1,60.2,61,2774,6.83,6.9,4.13
2,1.01,Premium,F,I1,61.8,60,2781,6.39,6.36,3.94
3,1.01,Fair,E,I1,64.5,58,2788,6.29,6.21,4.03
4,1.01,Premium,H,SI2,62.7,59,2788,6.31,6.22,3.93
5,1.05,Very Good,J,SI2,63.2,56,2789,6.49,6.45,4.09
6,1.05,Fair,J,SI2,65.8,59,2789,6.41,6.27,4.18


### filtering

In [32]:
large_diamonds_in_spark <- filter(diamonds_in_spark, diamonds_in_spark$carat>1)
#or
#large_diamonds_in_spark <- filter(diamonds_in_spark, column("carat")>1)
#or
#large_diamonds_in_spark <- diamonds_in_spark[diamonds_in_spark$carat>1,]

### new variables

In [8]:
large_diamonds_in_spark <- mutate(large_diamonds_in_spark, mass=large_diamonds_in_spark$carat*0.2)
#or
#large_diamonds_in_spark <- mutate(large_diamonds_in_spark, mass=column("carat")*0.2)
#or
#large_diamonds_in_spark$mass <- large_diamonds_in_spark$carat*0.2

### selecting variables

In [10]:
large_diamonds_in_spark <- select(large_diamonds_in_spark, c("mass", "cut", "price"))
#or
#large_diamonds_in_spark <- large_diamonds_in_spark[, c("mass", "cut", "price")]

### sorting data

In [12]:
large_diamonds_in_spark <- arrange(large_diamonds_in_spark, "price")   # option descending=TRUE

### grouping and summarising data

In [14]:
summary_in_spark <- summarize(group_by(large_diamonds_in_spark, "cut"), price="min")
#or
#summary_in_spark <- summarize(group_by(large_diamonds_in_spark, "cut"),
                 #min(column("price")/column("mass")))

In [15]:
head(summary_in_spark)

Unnamed: 0_level_0,cut,min(price)
Unnamed: 0_level_1,<chr>,<int>
1,Premium,2017
2,Ideal,2416
3,Good,2066
4,Fair,1262
5,Very Good,2080


In [16]:
summary_in_spark2 <- summarize(group_by(large_diamonds_in_spark, "cut"),
                 min(column("price")/column("mass")))

In [17]:
head(summary_in_spark2)

Unnamed: 0_level_0,cut,min((price / mass))
Unnamed: 0_level_1,<chr>,<dbl>
1,Premium,9833.333
2,Ideal,11960.396
3,Good,9838.095
4,Fair,6126.214
5,Very Good,9811.321


In [18]:
collect(summary_in_spark2)  # bring back to R

Unnamed: 0_level_0,cut,min((price / mass))
Unnamed: 0_level_1,<chr>,<dbl>
1,Premium,9833.333
2,Ideal,11960.396
3,Good,9838.095
4,Fair,6126.214
5,Very Good,9811.321


### merging data sets

In [19]:
# creating coarser group
cut_groups <- as.DataFrame(data.frame(cut=c("Fair", "Good", "Very Good", "Premium", "Ideal"),
    coarse_cut=c("Fair", "(Very) Good", "(Very) Good","Premium+", "Premium+")))

head(cut_groups)

Unnamed: 0_level_0,cut,coarse_cut
Unnamed: 0_level_1,<chr>,<chr>
1,Fair,Fair
2,Good,(Very) Good
3,Very Good,(Very) Good
4,Premium,Premium+
5,Ideal,Premium+


In [36]:
large_diamonds_in_spark <- filter(diamonds_in_spark, column("carat")>1)

#merging
large_diamonds_in_spark <- SparkR::merge(large_diamonds_in_spark, cut_groups, by="cut")
#We have used SparkR:: to ensure that we call the merge function from SparkR, rather than the function from base R.

head(large_diamonds_in_spark)

Unnamed: 0_level_0,carat,cut_x,color,clarity,depth,table,price,x,y,z,cut_y,coarse_cut
Unnamed: 0_level_1,<dbl>,<chr>,<chr>,<chr>,<dbl>,<dbl>,<int>,<dbl>,<dbl>,<dbl>,<chr>,<chr>
1,1.01,Fair,E,I1,64.5,58,2788,6.29,6.21,4.03,Fair,Fair
2,1.05,Fair,J,SI2,65.8,59,2789,6.41,6.27,4.18,Fair,Fair
3,1.01,Fair,E,SI2,67.4,60,2797,6.19,6.05,4.13,Fair,Fair
4,1.2,Fair,F,I1,64.6,56,2809,6.73,6.66,4.33,Fair,Fair
5,1.01,Fair,H,SI2,65.4,59,2846,6.3,6.26,4.11,Fair,Fair
6,1.02,Fair,I,SI1,53.0,63,2856,6.84,6.77,3.66,Fair,Fair


## Pipelines

In [38]:
# SparkR does not provide a %>% operator, so the package magrittr needs to be loaded first.
# Rewriting some of the commands from above.
summary_in_spark <- diamonds_in_spark %>%
                    filter(column("carat")>1) %>%
                    mutate(mass = column("carat")*0.2) %>%
                    select(c("mass", "cut", "color", "clarity", "price")) %>%
                    SparkR::merge(cut_groups, by="cut") %>%
                    group_by("coarse_cut") %>%
                    summarize(min(column("price")/column("mass")))

In [11]:
explain(summary_in_spark)

AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[coarse_cut#125], functions=[min((cast(price#46 as double) / mass#179))], output=[coarse_cut#125, min((price / mass))#227])
   +- Exchange hashpartitioning(coarse_cut#125, 200), ENSURE_REQUIREMENTS, [id=#311]
      +- HashAggregate(keys=[coarse_cut#125], functions=[partial_min((cast(price#46 as double) / mass#179))], output=[coarse_cut#125, min#231])
         +- Project [mass#179, price#46, coarse_cut#125]
            +- SortMergeJoin [cut_x#196], [cut_y#197], Inner
               :- Sort [cut_x#196 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(cut_x#196, 200), ENSURE_REQUIREMENTS, [id=#303]
               :     +- Project [(carat#40 * 0.2) AS mass#179, cut#41 AS cut_x#196, price#46]
               :        +- Filter ((isnotnull(carat#40) AND (carat#40 > 1.0)) AND isnotnull(cut#41))
               :           +- Scan ExistingRDD[carat#40,cut#41,color#42,clarity#43,depth#44,table#45,price#46,x#47,y#48,

In [14]:
collect(summary_in_spark)

Unnamed: 0_level_0,cut,avg(price)
Unnamed: 0_level_1,<chr>,<dbl>
1,Premium,8487.249
2,Ideal,8674.227
3,Good,7753.601
4,Fair,7177.856
5,Very Good,8340.549


## SQL interface

In [39]:
# Make diamonds_in_spark available as "diamonds" in SparkSQL
createOrReplaceTempView(diamonds_in_spark, "diamonds")    
summary_in_spark <- sql("SELECT cut, AVG(price) from diamonds WHERE carat>1 GROUP BY cut")
collect(summary_in_spark)

Unnamed: 0_level_0,cut,avg(price)
Unnamed: 0_level_1,<chr>,<dbl>
1,Premium,8487.249
2,Ideal,8674.227
3,Good,7753.601
4,Fair,7177.856
5,Very Good,8340.549


### dapply   distributed apply

In [44]:
add_coarse_cut <- function(data) {
  coarse_cut <- rep("Premium+", nrow(data))
  coarse_cut[data$cut %in% c("Good", "Very good")] <- "(Very) good"
  coarse_cut[data$cut=="Fair"] <- "Fair"
  # Make sure R does not turn strings into factors, which Spark doesn't understand
  cbind(data, coarse_cut, stringsAsFactors=FALSE)  
}

structure <- structType(structField("mass", "double"), structField("cut", "string"),
                        structField("price", "integer"), structField("coarse_cut", "string"))

large_diamonds_in_spark <- dapply(large_diamonds_in_spark, add_coarse_cut,  structure)

head(large_diamonds_in_spark)

ERROR: Error in handleErrors(returnStatus, conn): org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 87.0 failed 1 times, most recent failure: Lost task 0.0 in stage 87.0 (TID 156) (jupyter-2871228b executor driver): org.apache.spark.SparkException: R unexpectedly exited.
R worker produced errors: Error in writeBin(batch, con, endian = "big") : ignoring SIGPIPE signal
In addition: Warning message:
In writeBin(as.integer(value), con, endian = "big") :
  problem writing to connection
Error in readBin(con, raw(), len, endian = "big") : invalid 'n' argument
Error in readBin(con, raw(), len, endian = "big") : invalid 'n' argument

	at org.apache.spark.api.r.BaseRRunner$ReaderIterator$$anonfun$1.applyOrElse(BaseRRunner.scala:144)
	at org.apache.spark.api.r.BaseRRunner$ReaderIterator$$anonfun$1.applyOrElse(BaseRRunner.scala:137)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.r.RRunner$$anon$1.read(RRunner.scala:127)
	at org.apache.spark.api.r.BaseRRunner$ReaderIterator.hasNext(BaseRRunner.scala:113)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.api.r.RRunner$$anon$1.read(RRunner.scala:97)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:476)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:338)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:366)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
	at org.apache.spark.sql.api.r.SQLUtils$.dfToCols(SQLUtils.scala:175)
	at org.apache.spark.sql.api.r.SQLUtils.dfToCols(SQLUtils.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:164)
	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:105)
	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:39)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(Def


#### Machine Learning_ fits a GLM to the diamonds data

In [17]:
large_diamonds_in_spark <- filter(diamonds_in_spark, diamonds_in_spark$carat>1)
large_diamonds_in_spark <- mutate(large_diamonds_in_spark, mass=large_diamonds_in_spark$carat*0.2)

collect(large_diamonds_in_spark)

Unnamed: 0_level_0,carat,cut,color,clarity,depth,table,price,x,y,z,mass
Unnamed: 0_level_1,<dbl>,<chr>,<chr>,<chr>,<dbl>,<dbl>,<int>,<dbl>,<dbl>,<dbl>,<dbl>
1,1.17,Very Good,J,I1,60.2,61,2774,6.83,6.90,4.13,0.234
2,1.01,Premium,F,I1,61.8,60,2781,6.39,6.36,3.94,0.202
3,1.01,Fair,E,I1,64.5,58,2788,6.29,6.21,4.03,0.202
4,1.01,Premium,H,SI2,62.7,59,2788,6.31,6.22,3.93,0.202
5,1.05,Very Good,J,SI2,63.2,56,2789,6.49,6.45,4.09,0.210
6,1.05,Fair,J,SI2,65.8,59,2789,6.41,6.27,4.18,0.210
7,1.01,Fair,E,SI2,67.4,60,2797,6.19,6.05,4.13,0.202
8,1.04,Premium,G,I1,62.2,58,2801,6.46,6.41,4.00,0.208
9,1.20,Fair,F,I1,64.6,56,2809,6.73,6.66,4.33,0.240
10,1.02,Premium,G,I1,60.3,58,2815,6.55,6.50,3.94,0.204


In [18]:
model <- spark.glm(price~mass+cut+color+clarity, data=large_diamonds_in_spark, family="gaussian")
summary(model)
predited_price <- predict(model, large_diamonds_in_spark)


Deviance Residuals: 
(Note: These are approximate quantiles with relative error <= 0.01)
     Min        1Q    Median        3Q       Max  
-18863.4    -737.8    -139.3     603.0    7526.9  

Coefficients:
               Estimate  Std. Error   t value    Pr(>|t|)
(Intercept)     -600.09     119.353   -5.0279  5.0088e-07
mass           50968.79     153.823  331.3475  0.0000e+00
cut_Premium     1158.56      59.127   19.5945  0.0000e+00
cut_Ideal       1609.01      59.537   27.0253  0.0000e+00
cut_Very Good   1283.64      60.729   21.1369  0.0000e+00
cut_Good         897.57      66.268   13.5446  0.0000e+00
color_G        -1207.94      45.631  -26.4721  0.0000e+00
color_H        -1994.34      45.634  -43.7033  0.0000e+00
color_I        -2792.08      48.344  -57.7543  0.0000e+00
color_F         -524.84      47.690  -11.0053  0.0000e+00
color_E         -285.09      50.443   -5.6517  1.6134e-08
color_J        -4108.99      53.183  -77.2612  0.0000e+00
clarity_SI2    -6066.99      93.642  -6