Skip to content

Creating covariance and correlation matrices via Rhadoop in a R environment

Juan Garcia edited this page Apr 1, 2017 · 6 revisions

Creating a covariance/correlation matrix via Rhadoop in a R environment

What covariance and correlation matrices are?

In probability theory and statistics, the mathematical concepts of covariance and correlation are very similar. Both describe the degree to which two random variables or sets of random variables tend to deviate from their expected values in similar ways. Covariance matrices and correlation matrices are used frequently in multivariate statistics.

Links:
Covariance and correlation matrices. Bristol University
Correlation, Variance and Covariance (Matrices). R-manual

What is Rhadoop?

RHadoop is a collection of five R packages that allow users to manage and analyze data with Hadoop. The packages have been tested (and always before a release) on recent releases of the Cloudera and Hortonworks Hadoop distributions and should have broad compatibility with open source Hadoop and mapR's distribution.

rhdfs
rhbase
plyrmr
rmr2
ravro

In order to perform cov/corr matrices we need to use the packages:

rhdfs: This package provides basic connectivity to the Hadoop Distributed File System. R programmers can browse, read, write, and modify files stored in HDFS from within R. Install this package only on the node that will run the R client.

rmr2: A package that allows R developer to perform statistical analysis in R via Hadoop MapReduce functionality on a Hadoop cluster. Install this package on every node in the cluster.

Performing with Rhadoop via mapreduce in R

Small data example:

N <- nrow(X)
mu <- colSums(X) / N
X <- sweep(X, STATS=mu , MARGIN =2)
Cov.X <- crossprod(X) / (N-1)
print(Cov.X)

Big data example:

Set up environment:

Sys.setenv(HADOOP_CMD='/usr/bin/hadoop')Sys.setenv(HADOOP_HOME='/usr/lib/hadoop-0.20-mapreduce')
Sys.setenv(HADOOP_STREAMING='/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.7.1.jar')
library(rJava)
library(rmr2)
library(rhdfs)
hdfs.init()

reduce function sums a list of matrices

reducer=function(.,A){
keyval(1,list(Reduce('+',A)))}

1st map-reduce

mapper3=function(.,Xr){
Xr<-Xr
keyval(1,list(nrow(Xr)))}

Calculate number of rows

nrow<-values(
from.dfs(
mapreduce(
input= table,
map=mapper3,
reduce=reducer,
combine=T)))[[1]]
N <- nrow

2nd map-reduce

mapper2=function(.,Xr){
Xr<-Xr
keyval(1,list(colSums(Xr)))}

Calculate mu

mu.N<-values(
from.dfs(
mapreduce(
input= table
, map=mapper2,
reduce=reducer,
combine=T)))[[1]]
mu<-mu.N/nrow

Define new argument using command sweep(x

s.table<- sweep(table1, STATS=mu , MARGIN =2
s.table<-to.dfs(s.table)

3rd map-reduce

mapper1=function(.,Xr){
Xr<-Xr
keyval(1,list(crossprod(Xr)))}

Calculate crossprod(X) / (N-1)

Cov.X.n1<-values(
from.dfs(
mapreduce(
input= s.table
, map=mapper1,
reduce=reducer,
combine=T )))[[1]]

Transform to correlation matrix

cov.x<-Cov.X.n1/(nrow-1)
cov.x


Juan Garcia | linkedin