# SparkR Template

Reference: https://rpubs.com/wendyu/sparkr

In [1]:
library(SparkR)
library(magrittr)


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union


Attaching package: ‘magrittr’

The following object is masked from ‘package:SparkR’:

    not



In [2]:
sc <- sparkR.init(master = "local",sparkEnvir = list(spark.driver.memory="1g"))

“'sparkR.init' is deprecated.
Use 'sparkR.session' instead.
See help("Deprecated")”

Launching java with spark-submit command /usr/local/spark/bin/spark-submit   --driver-memory "1g" sparkr-shell /tmp/RtmpGbOX3j/backend_port48f778e77fa 


In [3]:
sqlContext <- sparkR.session(sc)

## Dataframe Operations

### 1. Create DataFrame

In [4]:
df <- createDataFrame(iris)
head(df)

“Use Petal_Width instead of Petal.Width  as column name”

Sepal_Length,Sepal_Width,Petal_Length,Petal_Width,Species
<dbl>,<dbl>,<dbl>,<dbl>,<chr>
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa
4.7,3.2,1.3,0.2,setosa
4.6,3.1,1.5,0.2,setosa
5.0,3.6,1.4,0.2,setosa
5.4,3.9,1.7,0.4,setosa


### 2. Select and Filter 

In [5]:
head(select(df, df$Sepal_Length, df$Species )) 

Sepal_Length,Species
<dbl>,<chr>
5.1,setosa
4.9,setosa
4.7,setosa
4.6,setosa
5.0,setosa
5.4,setosa


In [6]:
head(filter(df, df$Sepal_Length >5.5))

Sepal_Length,Sepal_Width,Petal_Length,Petal_Width,Species
<dbl>,<dbl>,<dbl>,<dbl>,<chr>
5.8,4.0,1.2,0.2,setosa
5.7,4.4,1.5,0.4,setosa
5.7,3.8,1.7,0.3,setosa
7.0,3.2,4.7,1.4,versicolor
6.4,3.2,4.5,1.5,versicolor
6.9,3.1,4.9,1.5,versicolor


In [7]:
head(select(filter(df, df$Sepal_Length >5.5), df$Sepal_Length, df$Species))

Sepal_Length,Species
<dbl>,<chr>
5.8,setosa
5.7,setosa
5.7,setosa
7.0,versicolor
6.4,versicolor
6.9,versicolor


### 3. Grouping and Aggregation 

In [8]:
df2<-summarize(groupBy(df, df$Species), mean=mean(df$Sepal_Length), count=n(df$Sepal_Length))
head(df2)

Species,mean,count
<chr>,<dbl>,<dbl>
virginica,6.588,50
versicolor,5.936,50
setosa,5.006,50


In [9]:
head(arrange(df2, desc(df2$mean)))

Species,mean,count
<chr>,<dbl>,<dbl>
virginica,6.588,50
versicolor,5.936,50
setosa,5.006,50


### 4. Combine queries with Marittr 

In [10]:
finaldf<-filter(df, df$Sepal_Length >5.5) %>%
  group_by(df$Species)%>%
  summarize(mean=mean(df$Sepal_Length))
arrange(finaldf, desc(finaldf$mean)) %>% head

Species,mean
<chr>,<dbl>
virginica,6.622449
versicolor,6.120513
setosa,5.733333


### 5. SQL Queries 

In [11]:
registerTempTable(df,"df")

“'registerTempTable' is deprecated.
Use 'createOrReplaceTempView' instead.
See help("Deprecated")”

In [12]:
dfSQL<-sql(sqlContext, "SELECT * FROM df WHERE Sepal_Length > 5.5")

“'sql(sqlContext...)' is deprecated.
Use 'sql(sqlQuery)' instead.
See help("Deprecated")”

In [13]:
dflocal<-collect(dfSQL)
print(dflocal[1:10,])

   Sepal_Length Sepal_Width Petal_Length Petal_Width    Species
1           5.8         4.0          1.2         0.2     setosa
2           5.7         4.4          1.5         0.4     setosa
3           5.7         3.8          1.7         0.3     setosa
4           7.0         3.2          4.7         1.4 versicolor
5           6.4         3.2          4.5         1.5 versicolor
6           6.9         3.1          4.9         1.5 versicolor
7           6.5         2.8          4.6         1.5 versicolor
8           5.7         2.8          4.5         1.3 versicolor
9           6.3         3.3          4.7         1.6 versicolor
10          6.6         2.9          4.6         1.3 versicolor


## Machine Learning - Linear Regression

### 1. Preparing a train/test data set

In [14]:
#create an ID column
iris$ID<-c(1:nrow(iris))
df <- createDataFrame(sqlContext, iris)

“'createDataFrame(sqlContext...)' is deprecated.
Use 'createDataFrame(data, schema = NULL)' instead.
“Use Petal_Width instead of Petal.Width  as column name”

In [15]:
#total number of observations
nrow(df)

In [16]:
#20% data as test set
df_test<-sample(df, FALSE, 0.2)
nrow(df_test)  

In [17]:
#80% data as train set
testID<-collect(select(df_test, "ID"))$ID
df$istest<-df$ID %in% testID
df_train<-subset(df, df$istest==FALSE)
nrow(df_train)

### 2. Train a linear model

In [18]:
#fit model
model<-glm(Sepal_Length ~ . - ID - istest , data=df_train, family="gaussian")
#look at model summary
summary(model)


Deviance Residuals: 
(Note: These are approximate quantiles with relative error <= 0.01)
     Min        1Q    Median        3Q       Max  
-0.63533  -0.23613  -0.00251   0.17668   0.69852  

Coefficients:
                    Estimate  Std. Error  t value    Pr(>|t|)
(Intercept)          1.89992    0.313912   6.0524  1.8663e-08
Sepal_Width          0.55253    0.095745   5.7709  6.9009e-08
Petal_Length         0.87188    0.074605  11.6866  0.0000e+00
Petal_Width         -0.27418    0.171935  -1.5947  1.1356e-01
Species_versicolor  -0.83700    0.259295  -3.2280  1.6282e-03
Species_virginica   -1.20329    0.362506  -3.3194  1.2119e-03

(Dispersion parameter for gaussian family taken to be 0.09053587)

    Null deviance: 82.975  on 119  degrees of freedom
Residual deviance: 10.321  on 114  degrees of freedom
AIC: 60.15

Number of Fisher Scoring iterations: 1


### 3. Model evaluation using the test set 

In [19]:
#makde predictions 
prediction<-predict(model, newData=df_test)
head(select(prediction, "Sepal_Length", "prediction"))

Sepal_Length,prediction
<dbl>,<dbl>
4.6,4.86576
4.6,4.916914
4.9,4.893178
5.7,5.529217
5.7,5.399492
5.1,5.225115


In [20]:
#mean of Sepal_Length
smean<-collect(agg(df_train, mean=mean(df_train$Sepal_Length)))$mean
smean

In [21]:
#Squared residual and squared total
prediction<-transform(
  prediction,
  s_res=(prediction$Sepal_Length - prediction$prediction)**2,
  s_tot=(prediction$Sepal_Length - smean)**2)
head(select(prediction, "Sepal_Length", "prediction", "s_res", "s_tot"))

Sepal_Length,prediction,s_res,s_tot
<dbl>,<dbl>,<dbl>,<dbl>
4.6,4.86576,0.07062827,1.62987778
4.6,4.916914,0.1004344,1.62987778
4.9,4.893178,4.654589e-05,0.95387778
5.7,5.529217,0.02916669,0.03121111
5.7,5.399492,0.09030532,0.03121111
5.1,5.225115,0.01565383,0.60321111


In [22]:
#Sum of squares
res<-collect(agg(prediction, 
                 ss_res=sum(prediction$s_res),
                 ss_tot=sum(prediction$s_tot)
))
res

ss_res,ss_tot
<dbl>,<dbl>
3.506208,19.36033


In [23]:
#R-squared
R2=1-(res$ss_res/res$ss_tot)
R2