In [1]:
# demonstration of sparkR
# what's here:
# @ Setting up (setting SPARK_HOME etc.)
# @ Initiation of local cluster (find out exactly what's happening under the hood)
# @ import a dataset, minor manipulation
# @ run both OLS, logistic regression
# @ (kmeans and NBC only available on spark 2.0 onwards)

In [2]:
# envt setup
Sys.setenv(SPARK_HOME = "C:/stack/spark-1.6.2-bin-hadoop2.6") # <--- change this to connect to server
.libPaths(c(file.path(Sys.getenv("SPARK_HOME", "R", "lib")), .libPaths()))

library(SparkR)
library(caret)
library(magrittr)


Attaching package: 'SparkR'

The following objects are masked from 'package:stats':

    cov, filter, lag, na.omit, predict, sd, var

The following objects are masked from 'package:base':

    colnames, colnames<-, endsWith, intersect, rank, rbind, sample,
    startsWith, subset, summary, table, transform

Loading required package: lattice
Loading required package: ggplot2


In [3]:
# initialize local spark cluster and SQL context
sc <- sparkR.init(master = "local")
sqlContext <- sparkRSQL.init(sc)

Launching java with spark-submit command C:/stack/spark-1.6.2-bin-hadoop2.6/bin/spark-submit.cmd   sparkr-shell C:\Users\WEIZHO~1\AppData\Local\Temp\Rtmpkhh5kz\backend_port369035f53213 


In [6]:
# read local file
adult <- read.csv("C:/development/SparkR-nb-scripts/data/adult.csv")

In [7]:
# create an ID column to create training/testing sets later
adult$id <- seq(1, nrow(adult))

In [8]:
# create a data frame in sparkR
DF <- createDataFrame(sqlContext, adult)
head(DF)
str(DF)
printSchema(DF)

In FUN(X[[i]], ...): Use native_country instead of native.country  as column name

Unnamed: 0,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income,id
1,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K,1
2,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K,2
3,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K,3
4,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K,4
5,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K,5
6,37,Private,284582,Masters,14,Married-civ-spouse,Exec-managerial,Wife,White,Female,0,0,40,United-States,<=50K,6


'DataFrame': 16 variables:
 $ age           : int 39 50 38 53 28 37
 $ workclass     : chr " State-gov" " Self-emp-not-inc" " Private" " Private" " Private" " Private"
 $ fnlwgt        : int 77516 83311 215646 234721 338409 284582
 $ education     : chr " Bachelors" " Bachelors" " HS-grad" " 11th" " Bachelors" " Masters"
 $ education_num : int 13 13 9 7 13 14
 $ marital_status: chr " Never-married" " Married-civ-spouse" " Divorced" " Married-civ-spouse" " Married-civ-spouse" " 
 $ occupation    : chr " Adm-clerical" " Exec-managerial" " Handlers-cleaners" " Handlers-cleaners" " Prof-specialty" " 
 $ relationship  : chr " Not-in-family" " Husband" " Not-in-family" " Husband" " Wife" " Wife"
 $ race          : chr " White" " White" " White" " Black" " Black" " White"
 $ sex           : chr " Male" " Male" " Male" " Male" " Female" " Female"
 $ capital_gain  : int 2174 0 0 0 0 0
 $ capital_loss  : int 0 0 0 0 0 0
 $ hours_per_week: int 40 13 40 40 40 40
 $ native_country: chr " United-Sta

In [11]:
# some minor manipulation
#DF$income <- as.factor(DF$income) # <- is this necessary? No

# Doesnt work:
#idx <- which(DF$education == " Bachelors")
#idx <- which(DF[,"education"] == " Bachelors")
#idx <- which(DF[,4] == " Bachelors")

# This doesnt works:
#DF2 <- subset(DF, DF$education == " Bachelors")
#dim(DF2) # takes awhile

# Doesnt work
#DF$income %>% as.character
#unclass(DF$age) %>% head

In [12]:
# Cross validation
# Need to create an ID column and do matching
trainDF <- SparkR::sample(DF, FALSE, 0.7)
trainID <- SparkR::collect(select(trainDF, "id"))$id
testID <- setdiff(1:nrow(DF), trainID)
registerTempTable(DF, "DF")
query <- paste("SELECT * FROM DF WHERE id in (",
               paste(shQuote(testID, type = "sh"), collapse = ', '), ")")
testDF <- sql(sqlContext, query)

In [13]:
# Run the following algorithms
# @ OLS
# @ logistic regression

# ====

ols0 <- SparkR::glm(data = trainDF, age ~ income + education_num,
                    family = "gaussian")
summary(ols0)

ols0_pred <- SparkR::predict(ols0, testDF)
ols0_pred <- select(ols0_pred, "prediction") %>% collect

Metrics::rmse(predicted = ols0_pred[,1],
              actual = (select(testDF, "age") %>% collect)[,1])

Unnamed: 0,Min,Max
,-26.17678,54.35031

Unnamed: 0,Estimate,Std. Error,t value,Pr(>|t|)
(Intercept),47.0957816,0.4535186,103.845313,0.0
income_ <=50K,-8.0878313,0.2177393,-37.1445578,0.0
education_num,-0.2398758,0.03591054,-6.679814,2.447398e-11


In [None]:
lg0 <- SparkR::glm(data = trainDF, income ~ age + education,
                   family = "binomial")
summary(lg0)

lg0_pred_prob <- SparkR::predict(lg0, testDF)
lg0_pred_prob %>% select(., "prediction") %>% collect %>% head
lg0_pred_prob <- lg0_pred_prob %>% select(., "prediction") %>% collect

#confusionMatrix(lg0_pred_prob, select(testDF, "income") %>% collect)
# will need to change the target variable in the original dataset to {0,1}

In [None]:
sparkR.stop()