From ad7c2c36aa7af65242fb9a347594c5adfec27839 Mon Sep 17 00:00:00 2001 From: Saptarshi Guha Date: Tue, 22 Jan 2013 11:57:03 -0800 Subject: [PATCH] Almost done moving to rJava, read and write functions to be implemented --- .gitignore | 1 + code/R/DESCRIPTION | 4 - code/R/NAMESPACE | 1 + code/R/R/dfs.R | 40 -- code/R/R/dohelp.R | 145 ----- code/R/R/extra.functions.R | 39 +- code/R/R/mapfile.functions.R | 12 +- code/R/R/rhdel.R | 5 +- code/R/R/rhex.R | 17 +- code/R/R/rhget.R | 19 +- code/R/R/rhinit.R | 110 ---- code/R/R/rhkill.R | 10 +- code/R/R/rhls.R | 19 +- code/R/R/rhmerge.R | 39 -- code/R/R/rhmv.R | 1 - code/R/R/rhoptions.R | 4 +- code/R/R/rhput.R | 12 +- code/R/R/rhstatus.R | 12 +- code/R/R/server.R | 274 --------- code/R/R/zzz.R | 312 +++++----- code/R/man/hdfsReadLines.Rd | 16 + code/java/build.num | 4 +- code/java/org/godhuli/rhipe/FileUtils.java | 117 +--- .../org/godhuli/rhipe/PersonalServer.java | 573 ++++-------------- 24 files changed, 346 insertions(+), 1440 deletions(-) delete mode 100644 code/R/R/dohelp.R delete mode 100644 code/R/R/rhinit.R delete mode 100644 code/R/R/rhmerge.R delete mode 100644 code/R/R/server.R create mode 100644 code/R/man/hdfsReadLines.Rd diff --git a/.gitignore b/.gitignore index 4b7f54e..430e973 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ togit.sh code/java/build */.DS_Store *.o +*.so ._* *.class 19RHIPE diff --git a/code/R/DESCRIPTION b/code/R/DESCRIPTION index e5f0d67..639dad7 100644 --- a/code/R/DESCRIPTION +++ b/code/R/DESCRIPTION @@ -13,18 +13,15 @@ Depends: methods Packaged: Fri Mar 6 17:02:01 2012; jrounds Collate: - 'dohelp.R' 'rhcp.R' 'rhdel.R' 'rhex.R' 'rhget.R' 'rhgetkey.R' - 'rhinit.R' 'rhjoin.R' 'rhkill.R' 'rhload.R' 'rhls.R' - 'rhmerge.R' 'rhmv.R' 'rhoptions.R' 'rhput.R' @@ -36,7 +33,6 @@ Collate: 'rhwrite.R' 'dfs.R' 'rhmr.R' - 'server.R' 'zzz.R' 'extra.functions.R' 'rhwatch.R' diff --git a/code/R/NAMESPACE b/code/R/NAMESPACE index 4619f0a..542dbc3 100644 --- a/code/R/NAMESPACE +++ b/code/R/NAMESPACE @@ -1,5 +1,6 @@ export(bashRhipeArchive) export(hdfs.getwd) +export(hdfsReadLines) export(hdfs.setwd) export("[<-.mapfile") export("[.mapfile") diff --git a/code/R/R/dfs.R b/code/R/R/dfs.R index 06e191e..6315e0b 100644 --- a/code/R/R/dfs.R +++ b/code/R/R/dfs.R @@ -12,23 +12,6 @@ rhreadBin <- function(file,maxnum=-1, readbuf=0,mc=lapply,verb=FALSE){ mc(x,function(r) list(rhuz(r[[1]]),rhuz(r[[2]]))) } -#test! -## rhrename <- function(src,dest,delete=T,ignore.stderr=T,verbose=F){ -## Rhipe:::doCMD(rhoptions()$cmd['rename'],infiles=src,ofile=dest,ignore.stderr=T,verbose=F) -## if(delete) rhdel(src) -## } - -##returns the data files in a directory of map files (the index files can't be sent to ##a mapreduce), which can be used for mapreduce jobs as the ifolder param -## rhmap.sqs <- function(x){ -## v=rhls(x) -## sapply(v$file,function(r){ -## sprintf("%s/data",r) -## },USE.NAMES=F)} - - -rhmerge <- function(inr,ou){ - system(paste(paste(Sys.getenv("HADOOP_BIN"),"hadoop",sep=.Platform$file.sep,collapse=""),"dfs","-cat",inr,">", ou,collapse=" ")) -} print.rhversion <- function(x,...){ al <- paste(sapply(seq_along(attr(x,"notes")),function(y) sprintf("%s. %s",y,attr(x,"notes")[y])),collapse="\n") @@ -40,11 +23,6 @@ print.rhversion <- function(x,...){ invisible(y) } -rhmv <- function(ifile, ofile) { - system(command=paste(paste(Sys.getenv("HADOOP_BIN"), "hadoop", -sep=.Platform$file.sep), "fs", "-mv", ifile, ofile, sep=" ")) - -} ############################################################################ ### from old rhmr.R @@ -68,21 +46,3 @@ print.jobtoken <- function(s,verbose=1,...){ } -## rhsubset <- function(ifolder,ofolder,subs,inout=c('text','text'),local=T){ -## if(!is.function(subs)) stop('subs must be a function') -## setup <- list(map=parse(text=paste("userFUN...=",paste(deparse(subs),collapse="\n"))), -## reduce=expression()) - -## m <- expression({ -## for(x1 in 1:length(map.values)){ -## y <- userFUN...(map.keys[[x1]],map.values[[x1]]) -## if(!is.null(y)) -## rhcollect(map.keys[[x1]],y) -## }}) -## mpr <- list(mapred.textoutputformat.separator=" ") -## if(local) mpr$mapred.job.tracker <- 'local' -## z <- rhmr(map=m,ifolder=ifolder,ofolder=ofolder,inout=inout,setup=setup,mapred=mpr) -## rhex(z) -## } -## rhsubset("/tmp/small","/tmp/s",msub) - diff --git a/code/R/R/dohelp.R b/code/R/R/dohelp.R deleted file mode 100644 index 139484e..0000000 --- a/code/R/R/dohelp.R +++ /dev/null @@ -1,145 +0,0 @@ -doCMD <- function(CMD=0,needoutput=F,opts=rhoptions(),verbose=T,ignore.stderr=F - ,fold=NA,src=NA,dest=NA,locals=NA - ,overwrite=NA,tempf=NA,output=NA - ,groupsize=NA,howmany=NA,N=NA, sequence=F,skip=0, - infiles=NA, ofile=NA,ilocal=NA,keys=NA,recursive='FALSE', jobid=NULL, - joinwordy=TRUE,rhreaddebug=FALSE){ - on.exit({unlink(tm)}) - tm <- paste(tempfile(),paste(sample(letters,5),sep="",collapse=""),sep="",collapse="") -## cp <- opts$cp - ## ip <- paste("java -cp ",paste(cp,sep="",collapse=":")," ", -## "org.godhuli.rhipe.FileUtils",sep="",collapse="") - - ip <- paste(Sys.getenv("HADOOP_BIN"),"/hadoop jar ",opts$jarloc," ", - "org.godhuli.rhipe.FileUtils",sep="",collapse="") - - p <- switch(as.integer(CMD), - { - rhsz(list(fold,recursive)) - }, - { - rhsz(c(src,dest)) # - }, - { - rhsz(fold) - }, - { - rhsz(list(locals,dest,overwrite)) - }, - { - rhsz(c(tempf,output,groupsize,howmany,N)) - }, - { - rhsz(c(ofile,ilocal*1,howmany,infiles)) ##rhread - }, - { - rhsz(list(keys,src,dest,sequence,skip)) ##getkey - }, - { - rhsz(c(infiles,ofile,ilocal*1)) - }, - { - rhsz(list(infiles,ofile)) ##rename - }, - { - rhsz(list(jobid,joinwordy)) ##join a running RHIPE job - }, - { - rhsz(list(jobid)) ##find status of a running job - } - ) - if(!is.null(p)){ - f <- file(tm, open='wb') - writeBin(p,f) - close(f); - } - cmd <- paste(ip," ",CMD, " ",tm,sep='',collapse='') - if(verbose) cat(cmd,"\n") - if(CMD==6){ ##rhread - if(ignore.stderr) cmd <- sprintf("%s 2>/dev/null",cmd) - ## - ## system(cmd, - ## intern=F,ignore.stderr=ignore.stderr) - ## cmd = ofile - return(.Call("readSQFromPipe",cmd,as.integer(1024*1024L),rhreaddebug,PACKAGE="Rhipe")) - } - r <- system(cmd, - intern=F,ignore.stderr=ignore.stderr) - if(verbose) message("Error Code is ",r) - if(r==256){ - f <- file(tm, open='rb') - s <- readBin(f,'int',1,size=4,endian='big') -## close(f) - xx=rhuz(readBin(f,'raw',n=s)) - close(f) - stop(xx) - }else - if(needoutput){ - f <- file(tm, open='rb') - s <- readBin(f,'int',1,size=4,endian='big') - x <- rhuz(readBin(f,'raw',n=s)) - close(f) - return(x) - } -} - - - -## checkEx <- function(socket){ -## v <- readBin(socket,'raw',n=1) -## if(v==as.raw(0x0)){ -## v <- readBin(socket,'int',n=1,size=4,endian='big') -## v <- readBin(socket,'raw',n=v) -## stop(rawToChar(v)) -## } -## } - - - -## ## doDeletableFile <- function(fold,sock){ -## ## pl <- rhsz(fold) -## ## writeBin(2L,sock,size=4,endian='big') -## ## writeBin(length(pl),sock,size=4,endian='big') -## ## writeBin(pl,sock,endian='big') -## ## checkEx(sock) -## ## } - -## doPut <- function(fold1,dest,bool,sock){ -## pl <- rhsz(list(fold1,dest,bool)) -## writeBin(3L,sock,size=4,endian='big') -## writeBin(length(pl),sock,size=4,endian='big') -## writeBin(pl,sock,endian='big') -## checkEx(sock) -## } - -## doGet <- function(fold1,fold2,sock){ -## pl <- rhsz(c(fold1,fold2)) -## writeBin(4L,sock,size=4,endian='big') -## writeBin(length(pl),sock,size=4,endian='big') -## writeBin(pl,sock,endian='big') -## checkEx(sock) -## } -## doLS <- function(fold,sock){ -## pl <- rhsz(fold) -## writeBin(5L,sock,size=4,endian='big') -## writeBin(length(pl),sock,size=4,endian='big') -## writeBin(pl,sock,endian='big') -## checkEx(sock) -## v <- readBin(sock,'int',n=1,endian='big') -## v <- readBin(sock,'raw',n=v) -## rhuz(v) -## } -## doDel <- function(fold,sock){ -## pl <- rhsz(fold) -## writeBin(6L,sock,size=4,endian='big') -## writeBin(length(pl),sock,size=4,endian='big') -## writeBin(pl,sock,endian='big') -## checkEx(sock) -## } -## doConvertBinToSequence <- function(infile,outfile,group,nfiles, total,sock){ -## pl <- rhsz(c(infile,outfile, group,nfiles, total)) -## writeBin(7L,sock,size=4,endian='big') -## writeBin(length(pl),sock,size=4,endian='big') -## writeBin(pl,sock,endian='big') -## checkEx(sock) -## } diff --git a/code/R/R/extra.functions.R b/code/R/R/extra.functions.R index a68436a..6b163ab 100644 --- a/code/R/R/extra.functions.R +++ b/code/R/R/extra.functions.R @@ -29,8 +29,45 @@ initPRNG <- function(seed=NULL){ mi } - +#' Reads all lines from a text file located on the HDFS +#' @param inp The location of the text file, interolated based on hdfs.getwd +#' @keywords HDFS TextFile +#' @export +hdfsReadLines <- function(inp){ + rhoptions()$server$readTextFile(rhabsolute.hdfs.path(inp)) +} + + +getypes <- function(files,type,skip){ + type = match.arg(type,c("sequence","map","text","gzip","index")) + files <- switch(type, + "text"={ + unclass(rhls(files)['file'])$file + }, + "gzip"={ + uu=unclass(rhls(files)['file'])$file + uu[grep("gz$",uu)] + }, + "sequence"={ + unclass(rhls(files)['file'])$file + }, + "map"={ + uu=unclass(rhls(files,rec=TRUE)['file'])$file + uu[grep("data$",uu)] + }, + "index"={ + uu=unclass(rhls(files,rec=TRUE)['file'])$file + uu[grep("data$",uu)] + } + ) + for(s in skip){ + remr <- c(grep(s,files)) + if(length(remr)>0) + files <- files[-remr] + } + return(files) +} diff --git a/code/R/R/mapfile.functions.R b/code/R/R/mapfile.functions.R index 3ad38e4..48f3439 100644 --- a/code/R/R/mapfile.functions.R +++ b/code/R/R/mapfile.functions.R @@ -5,9 +5,7 @@ #' Keep \code{openhandles} below the maximum number of open sockets #' @export rh.init.cache <- function(mbsize=1024*1024*100, openhandles=100){ - a <- Rhipe:::send.cmd(rhoptions()$child$handle, list("initializeCaches", - list(as.integer(c(mbsize,openhandles)) - ))) + rhoptions()$server$initializeCaches(as.integer(mbsize),as.integer(openhandles)) } @@ -25,9 +23,7 @@ rhmapfile <- function(paths){ stop("paths must be a character vector of mapfiles( a directory containing them or a single one)") akey <- paste(head(strsplit(paths[1], "/")[[1]], -1), sep = "", collapse = "/") - a <- Rhipe:::send.cmd(rhoptions()$child$handle, - list("initializeMapFile", - list(paths, akey))) + a <- rhoptions()$server$initializeMapFile(paths, akey) obj <- new.env() obj$filename <- akey obj$paths <- paths @@ -84,9 +80,7 @@ getkey <- function(v,keys,size=3000,mc=lapply){ #' @export rhcacheStats <- function(which=c("filehandles","valuebytes")){ which <- list(filehandles=0L, valuebytes=1L)[[which]] - v <- Rhipe:::send.cmd(rhoptions()$child$handle, - list("cacheStatistics", - list(which)))[[1]] + v <- rhuz(rhoptions()$server$cacheStatistics(as.integer(which))) v <- data.frame(measure=c("averageLoadPenalty","evictionCount", "hitCount","hitRate","loadCount","loadExceptionCount", "loadExceptionRate","loadSuccessCount","missCount","missRate", diff --git a/code/R/R/rhdel.R b/code/R/R/rhdel.R index 48d9e01..7b02ce0 100644 --- a/code/R/R/rhdel.R +++ b/code/R/R/rhdel.R @@ -14,9 +14,8 @@ #' @keywords delete HDFS directory #' @export rhdel <- function(folder){ - folder = rhabsolute.hdfs.path(folder) - x <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhdel",folder)) - x[[1]]=="OK" + folder = rhabsolute.hdfs.path(folder) + rhoptions()$server$rhdel(folder) } diff --git a/code/R/R/rhex.R b/code/R/R/rhex.R index 63c69d2..116b45d 100644 --- a/code/R/R/rhex.R +++ b/code/R/R/rhex.R @@ -60,17 +60,14 @@ rhex <- function (conf,async=TRUE,mapred,...) if(!is.null(paramaters)){ lines <- Rhipe:::saveParams(paramaters,lines=lines) } - cmd <- sprintf("%s/hadoop jar %s org.godhuli.rhipe.RHMR %s ",Sys.getenv("HADOOP_BIN"),rhoptions()$jarloc,zonf) - x. <- paste("Running: ", cmd) - y. <- paste(rep("-",min(nchar(x.),40))) - message(y.);message(x.);message(y.) - if(rhoptions()$mode=="current"){ - result <- system(cmd,...) - }else if (rhoptions()$mode=="experimental"){ - result <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhex", zonf)) - result <- as.integer(result[[1]]) + ## cmd <- sprintf("%s/hadoop jar %s org.godhuli.rhipe.RHMR %s ",Sys.getenv("HADOOP_BIN"),rhoptions()$jarloc,zonf) + ## x. <- paste("Running: ", cmd) + ## y. <- paste(rep("-",min(nchar(x.),40))) + ## message(y.);message(x.);message(y.) + { + result <- rhoptions()$server$rhex(zonf) if(result == 1) result <- 256 - cat(sprintf("result:%s\n",result)) + ## cat(sprintf("result:%s\n",result)) } f3=NULL if(result==256){ diff --git a/code/R/R/rhget.R b/code/R/R/rhget.R index b54ce17..68cc13d 100644 --- a/code/R/R/rhget.R +++ b/code/R/R/rhget.R @@ -18,21 +18,6 @@ #' @keywords get HDFS directory #' @export rhget <- function(src, dest){ - src = rhabsolute.hdfs.path(src) - x <- Rhipe:::send.cmd(rhoptions()$child$handle, list("rhget",src,path.expand(dest))) + src = rhabsolute.hdfs.path(src) + rhoptions()$server$rhget(src,path.expand(dest)) } - -# rhget <- function(src,dest,ignore.stderr=T,verbose=F){ -# ## Copies src to dest -# ## If src is a directory and dest exists, -# ## src is copied inside dest(i.e a folder inside dest) -# ## If not, src's contents is copied to a new folder called dest -# ## -# ## If source is a file, and dest exists as a dire -# ## source is copied inside dest -# ## If dest does not exits, it is copied to that file -# ## Wildcards allowed -# ## OVERWRITES! -# doCMD(rhoptions()$cmd['get'],src=src,dest=dest,needout=F,ignore.stderr=ignore.stderr,verbose=verbose) -# ## doGet(src,dest,if(is.null(socket)) rhoptions()$socket else socket) -# } diff --git a/code/R/R/rhinit.R b/code/R/R/rhinit.R deleted file mode 100644 index 31d60d3..0000000 --- a/code/R/R/rhinit.R +++ /dev/null @@ -1,110 +0,0 @@ -#' Initialize Rhipe -#' -#' Necessary to call before using Rhipe. Rhipe likely will not work if any -#' functions in the package are called before this. Generally, a user just -#' calls it with default arguments as \code{rhinit()}. -#' -#' @param errors If TRUE errors are printed to the display. -#' @param info If TRUE reports command line information related to how it is starting Rhipe PersonalServer. -#' @param path Depreciated. Still used internally. -#' @param cleanup Force the shutdown of Rhipe's Java server on garbage collection of rhoptions()$child$handle? Not really needed can leave FALSE. -#' @param bufsize Size of buffer from which data is sent to and from hadoop -#' client. -#' @param buglevel The higher the number the more information that is outputted -#' during Rhipe operations. Currently 2000 prints all information. -#' @param first Deprecated. Still used internally. -#' @return TRUE if successful -#' @author Saptarshi Guha -#' @seealso \code{\link{rhoptions}} -#' @examples -#' -#' # rhinit() #typical use in day to day Rhipe -#' # prints a slew of information; sometimes useful if Rhipe is not installed correctly. -#' # rhinit(TRUE,TRUE,buglevel=2000) -#' @export -rhinit <-function(errors=TRUE,buglevel=0,info=FALSE,path=NULL,cleanup=FALSE,bufsize=as.integer(3*1024*1024),first=TRUE){ - ## for debug: rhinit(errors=TRUE,info - Rhipe:::.rhinit(errors,info,path,cleanup,bufsize,buglevel) - if(first){ - ## if(buglevel>0) message("Initial call to personal server") - ## Rhipe:::.rhinit(errors=TRUE,info=if(buglevel) TRUE else FALSE,path,cleanup,bufsize,buglevel=buglevel) - rhoptions(mode = Rhipe:::Mode,mropts=rhmropts(),quiet=FALSE) # "experimental" - ## if(buglevel>0) message("Secondary call to personal server") - ## Rhipe:::.rhinit(errors=TRUE,info=if(buglevel) TRUE else FALSE,path,cleanup,bufsize,buglevel=buglevel) - Sys.sleep(2) - message("Rhipe first run complete") - message("Initializing mapfile caches") - rh.init.cache() - return(TRUE) - } - message("Initializing mapfile caches") - rh.init.cache() -} - - -.rhinit <- function(errors=FALSE, info=FALSE,path=NULL,cleanup=FALSE,bufsize=as.integer(3*1024*1024),buglevel=0){ - - - Sys.setenv(HADOOP_CLASSPATH=sprintf("%s:%s", Sys.getenv("HADOOP_CLASSPATH"),paste(rhoptions()$mycp,collapse=":"))) - rhoptions(.code.in=sample(1e6,1)) - ntimeout <- options("timeout")[[1]] - options(timeout = if(!is.null(rhoptions()$timeout)) as.integer(rhoptions()$timeout) else 15552000L) - on.exit({ - options(timeout = ntimeout) - unlink(r) - unlink(r2) - }) - - - f1 <- "localhost" - r <- tempfile(pattern="sockets");r2 <- tempfile(pattern="signal") - if(is.null(path)) - cmda <- paste( c(sprintf("%s/hadoop jar",Sys.getenv("HADOOP_BIN")),rhoptions()$jarloc,"org.godhuli.rhipe.PersonalServer",f1,r,r2,as.integer(buglevel)),collapse=" ") - else cmda <- path - if(info){ - message(cmda) - } - - - - ################################################################################################ - # Right before creating a new java process at this point so trying to shut down what we already have - ################################################################################################ - try({killServer(rhoptions()$child$handle)},silent=TRUE) - ################################################################################################ - # Now making a new one - ################################################################################################ - - j <- .Call("createProcess", cmda, c(as.integer(errors),as.integer(info)),as.integer(bufsize),as.integer(buglevel),PACKAGE="Rhipe") - ## This is a potential race here, the child starts the Java server - ## but before it even starts we arrive here ... - ## so we busy wait - ## to fix this I simply need to read from the Java standard output. - ## will implement one day - while(TRUE){ - if(!is.na(file.info(r2)[1,]$size)){ - if(buglevel>1000) message(sprintf("Found signal file (created by personalserver): %s",r2)) - break - } - } - x <- read.table(r,head=TRUE) - y <- new.env() - y$ports <- x - y$tojava <- socketConnection(f1,as.numeric(y$ports['fromR']),open='wb',blocking=TRUE) - y$fromjava <- socketConnection(f1,as.numeric(y$ports['toR']),open='rb',blocking=TRUE) - y$err <- socketConnection(f1,as.numeric(y$ports['err']),open='rb',blocking=TRUE) - y$killed = FALSE - - reg.finalizer(y, function(r){ - if(cleanup) { - if(!is.null(rhoptions()$quiet) && !rhoptions()$quiet) - - ## tryCatch({writeBin(as.integer(-1),con=r$tojava,endian="big")},error=function(e) {},warning=function(e){}) - killServer(r) - } - },onexit=TRUE) - if(is.null(errors)) errors <- FALSE - if(is.null(info)) info <- FALSE - message("Rhipe initialization complete") - rhoptions(child=list(errors=errors,info=info,handle=y,bufsize=bufsize)) -} diff --git a/code/R/R/rhkill.R b/code/R/R/rhkill.R index 3b3b06c..8eec080 100644 --- a/code/R/R/rhkill.R +++ b/code/R/R/rhkill.R @@ -19,13 +19,5 @@ rhkill <- function(job) { job <- job[[1]] id <- job[['job.id']] } - result <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhkill", list(id))) + result <- rhoptions()$server$rhkill(id) } - -# rhkill <- function(w,...){ -# if(class(w)=="jobtoken") -# w= w[[1]][['job.id']] else { -# if(length(grep("^job_",w))==0) w=paste("job_",w,sep="",collapse="") -# } -# system(command=paste(paste(Sys.getenv("HADOOP_BIN"),"hadoop",sep=.Platform$file.sep,collapse=""),"job","-kill",w,collapse=" "),...) -# } diff --git a/code/R/R/rhls.R b/code/R/R/rhls.R index b57df7c..4478691 100644 --- a/code/R/R/rhls.R +++ b/code/R/R/rhls.R @@ -23,7 +23,8 @@ rhls <- function(folder,recurse=FALSE){ if( is(folder,"rhmr") || is(folder, "rhwatch")) folder <- rhofolder(folder) folder <- rhabsolute.hdfs.path(folder) - v <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhls",folder, if(recurse) 1L else 0L)) + v <- rhoptions()$server$rhls(folder, if(recurse) 1L else 0L) + v <- rhuz(v) if(is.null(v)) return(NULL) #condition nothing in the directory? if(length(v) == 1 && length(v[[1]]) == 0){ @@ -37,19 +38,3 @@ rhls <- function(folder,recurse=FALSE){ unique(f) } -# rhls <- function(fold,recurse=FALSE,ignore.stderr=T,verbose=F){ -# ## List of files, -# v <- Rhipe:::doCMD(rhoptions()$cmd['ls'],fold=fold,recur=as.integer(recurse),needoutput=T,ignore.stderr=ignore.stderr,verbose=verbose) -# if(is.null(v)) return(NULL) -# if(length(v)==0) { -# warning(sprintf("Is not a readable directory %s",fold)) -# return(v) -# } -# ## k <- strsplit(v,"\n")[[1]] -# ## k1 <- do.call("rbind",sapply(v,strsplit,"\t")) -# f <- as.data.frame(do.call("rbind",sapply(v,strsplit,"\t")),stringsAsFactors=F) -# rownames(f) <- NULL -# colnames(f) <- c("permission","owner","group","size","modtime","file") -# f$size <- as.numeric(f$size) -# unique(f) -# } diff --git a/code/R/R/rhmerge.R b/code/R/R/rhmerge.R deleted file mode 100644 index c863f0a..0000000 --- a/code/R/R/rhmerge.R +++ /dev/null @@ -1,39 +0,0 @@ -rhmerge <- function(inr,ou){ - inr = rhabsolute.hdfs.path(inr) - system(paste(paste(Sys.getenv("HADOOP_BIN"),"hadoop", - sep=.Platform$file.sep,collapse=""),"dfs","-cat",inr,">", ou,collapse=" ")) -} - -hmerge <- function(inputfiles,buffsize=2*1024*1024,max=-1L,type,verb=FALSE){ - - type=switch(type, "text"=0L, "gzip"=1L,-1L) - if(type<0) stop(sprintf("In reading a file, wrong value of type")) - x <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhcat",inputfiles,as.integer(buffsize),as.integer(max),as.integer(type)), - getresponse=0L,conti=function(){ - k <- length(inputfiles) - z <- rhoptions()$child$handle - su <- 0;nlines <- 0 - byt <- c() - while(TRUE){ - a=readBin(z$fromjava,integer(),n=1,endian="big") - if(a<0) break - byt <- c(byt,readBin(z$fromjava,raw(),n=a)) - su <- su+a - if(verb) cat(sprintf("Read %s bytes\n", su)) - } - if(verb) cat("Converting to characters\n") - lines <- rawToChar(byt) - if(verb) cat("Splitfiying\n") - t.t <- strsplit(lines,"\n") - if(verb) cat("Extracting\n") - t.t <- t.t[[1]] - if(verb) cat("As Matrix\n") - lines <- matrix(t.t,ncol=1) - nlines <- nrow(lines); - pfx <- if(k>1) "s" else "" - cat(sprintf("Read %s bytes, %s lines from %s file%s\n",prettyNum(su,big.mark = ",") - ,prettyNum(nlines,big.mark = ","),prettyNum(k,big.mark = ","),pfx)) - lines - }) - x -} diff --git a/code/R/R/rhmv.R b/code/R/R/rhmv.R index 756558d..262e3ee 100644 --- a/code/R/R/rhmv.R +++ b/code/R/R/rhmv.R @@ -19,5 +19,4 @@ rhmv <- function(ifile, ofile) { system(command=paste(paste(Sys.getenv("HADOOP_BIN"), "hadoop", sep=.Platform$file.sep), "fs", "-cp", ifile, ofile, sep=" ")) rhdel(ifile) - ## v <- Rhipe:::send.cmd(rhoptions()$child$handlygene,list("rhcp",ifile, ofile)) } diff --git a/code/R/R/rhoptions.R b/code/R/R/rhoptions.R index c022cd3..0b4ecf3 100644 --- a/code/R/R/rhoptions.R +++ b/code/R/R/rhoptions.R @@ -65,7 +65,5 @@ optmerge <- function(la,lb){ } rhmropts <- function(){ - ## List of files, - v <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhmropts")) - v[[1]] + rhuz(rhoptions()$server$rhmropts()) } diff --git a/code/R/R/rhput.R b/code/R/R/rhput.R index 6ce9bac..8746c72 100644 --- a/code/R/R/rhput.R +++ b/code/R/R/rhput.R @@ -16,13 +16,9 @@ #' @keywords put HDFS file #' @export rhput <- function(src, dest,deletedest=TRUE){ - dest = rhabsolute.hdfs.path(dest) - x <- Rhipe:::send.cmd(rhoptions()$child$handle, list("rhput",path.expand(src),dest,as.logical(deletedest))) + dest = rhabsolute.hdfs.path(dest) + y <- as.logical(deletedest) + Y <- if(is.na(y) || y==FALSE) FALSE else TRUE + rhoptions()$server$rhput(path.expand(src),dest,y) } -# rhput <- function(src,dest,deleteDest=TRUE,ignore.stderr=T,verbose=F){ -# doCMD(rhoptions()$cmd['put'],locals=path.expand(src),dest=dest,overwrite=deleteDest,needoutput=F -# ,ignore.stderr=ignore.stderr,verbose=verbose) -# ## doCMD(src,dest,deleteDest,if(is.null(socket)) rhoptions()$socket else socket) -# } - diff --git a/code/R/R/rhstatus.R b/code/R/R/rhstatus.R index 92f0e53..2af0ae3 100644 --- a/code/R/R/rhstatus.R +++ b/code/R/R/rhstatus.R @@ -44,9 +44,7 @@ rhstatus <- function(job,mon.sec=5,autokill=TRUE,showErrors=TRUE,verbose=FALSE id <- job[['job.id']] } if(mon.sec==Inf){ - result <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhjoin", list(id, - needoutput=as.character(TRUE), - joinwordy = as.character(as.logical(TRUE)))))[[1]] + result <-rhoptions()$server$rhjoin(id,TRUE) mon.sec=1 } if(mon.sec<=0) { @@ -58,7 +56,7 @@ rhstatus <- function(job,mon.sec=5,autokill=TRUE,showErrors=TRUE,verbose=FALSE } while(TRUE){ y <- .rhstatus(id,autokill=TRUE,showErrors) - cat(sprintf("\n[%s] Name:%s Job: %s, State: %s, Duration: %s\nURL: %s\n",date(),y$jobname, id,y$state,y$duration,y$tracking)) + cat(sprintf("\n[%s] Name:%s Job: %s State: %s Duration: %s\nURL: %s\n",date(),y$jobname, id,y$state,y$duration,y$tracking)) print(y$progress) if(!is.null(y$warnings)){ cat("\n--Warnings Present, follows:\n") @@ -81,7 +79,8 @@ rhstatus <- function(job,mon.sec=5,autokill=TRUE,showErrors=TRUE,verbose=FALSE } .rhstatus <- function(id,autokill=FALSE,showErrors=FALSE){ - result <- Rhipe:::send.cmd(rhoptions()$child$handle, list("rhstatus", list(id, as.integer(showErrors))))[[1]] + result <- rhoptions()$server$rhstatus(as.character(id), as.logical(showErrors)) + result <- rhuz(result) d <- data.frame("pct"=result[[3]],"numtasks"=c(result[[4]][1],result[[5]][[1]]), "pending"=c(result[[4]][2],result[[5]][[2]]), "running" = c(result[[4]][3],result[[5]][[3]]), @@ -165,10 +164,11 @@ rhstatus <- function(job,mon.sec=5,autokill=TRUE,showErrors=TRUE,verbose=FALSE if(haveRError) state <- "FAILED" ro <- result[[6]];trim.trailing <- function (x) sub("\\s+$", "", x) ro2 <- lapply(ro, function(r) { s = as.matrix(r); rownames(s) = trim.trailing(rownames(s)); s }) - return(list(state=state,duration=duration,progress=d, warnings=wrns,counters=ro2,rerrors=haveRError,errors=errs,jobname=result[[9]],tracking=result[[8]])); + return(list(state=state,duration=duration,progress=d, warnings=wrns,counters=ro2,rerrors=haveRError,errors=errs,jobname=result[[9]],tracking=result[[8]],config=result[[10]])); } + # rhstatus <- function(x){ # if(class(x)!="jobtoken" && class(x)!="character" ) stop("Must give a jobtoken object(as obtained from rhex)") # if(class(x)=="character") id <- x else { diff --git a/code/R/R/server.R b/code/R/R/server.R deleted file mode 100644 index f459e77..0000000 --- a/code/R/R/server.R +++ /dev/null @@ -1,274 +0,0 @@ - -isalive <- function(z) { - tryCatch({ - writeBin(as.integer(0),con=z$tojava,endian="big") - o <- readBin(con=z$fromjava,what=raw(),n=1,endian="big") - #if(length(o) == 0) - # warning("Zero length read in isalive") #now understand why this happens (ctrl + C kills RunJar) - if(length(o) > 0 && o==0x01) TRUE else FALSE - },error=function(e){ - return(FALSE) - }) -} - -restartR <- function(){ - z <- rhoptions()$child$hdl - rm(z);gc() - if(!is.null(rhoptions()$quiet) && !rhoptions()$quiet) - warning("RHIPE: restarting server") - rhinit(errors = rhoptions()$child$errors,info=rhoptions()$child$info,cleanup=TRUE,first=FALSE) - z <- rhoptions()$child$hdl -} - -send.cmd <- function(z,command, getresponse=TRUE,continuation=NULL,...){ - if(is.null(z)) - stop("Rhipe not initialized.") - if(!Rhipe:::isalive(z)){ - # rm(z);gc() #removed this line. gc does almost nothing here because z is not being rm from other objects that point to it. - if(!is.null(rhoptions()$quiet) && !rhoptions()$quiet) - warning("RHIPE: Creating a new RHIPE connection object, previous one died!") - rhinit(errors = rhoptions()$child$errors,info=rhoptions()$child$info,first=FALSE) - z <- rhoptions()$child$handle - } - ## browser() - command <- rhsz(command) - writeBin(length(command),z$tojava, endian='big') - writeBin(command, z$tojava, endian='big') - flush(z$tojava) - if(getresponse){ - sz <- readBin(z$fromjava,integer(),n=1,endian="big") - if(sz<0) { - #abs(sz) because sendMessage in Java PersonalServer sends negative sizes on error. - #We must read this to clean the pipe for subsequent commands. - resp <- readBin(z$fromjava,raw(),n=abs(sz),endian="big") - #resp <- rhuz(resp) - #Used to report resp, but it goes to stderr too. - stop(paste("Error response from Rhipe Java server.")) - } - resp <- readBin(z$fromjava,raw(),n=sz,endian="big") - resp <- rhuz(resp) - nx <- unclass(resp) - return(nx) - } - if(!is.null(continuation)) return(continuation()) -} - - -getypes <- function(files,type,skip){ - type = match.arg(type,c("sequence","map","text","gzip","index")) - files <- switch(type, - "text"={ - unclass(rhls(files)['file'])$file - }, - "gzip"={ - uu=unclass(rhls(files)['file'])$file - uu[grep("gz$",uu)] - }, - "sequence"={ - unclass(rhls(files)['file'])$file - }, - "map"={ - uu=unclass(rhls(files,rec=TRUE)['file'])$file - uu[grep("data$",uu)] - }, - "index"={ - uu=unclass(rhls(files,rec=TRUE)['file'])$file - uu[grep("data$",uu)] - } - ) - for(s in skip){ - remr <- c(grep(s,files)) - if(length(remr)>0) - files <- files[-remr] - } - return(files) -} - - - - - -## print.jobtoken <- function(s,verbose=1,...){ -## r <- s[[1]] -## v <- sprintf("RHIPE Job Token Information\n--------------------------\nURL: %s\nName: %s\nID: %s\nSubmission Time: %s\n", -## r[1],r[2],r[3],r[4]) -## cat(v) -## if(verbose>0){ -## result <- rhstatus(s) -## cat(sprintf("State: %s\n",result[[1]])) -## cat(sprintf("Duration(sec): %s\n",result[[2]])) -## cat(sprintf("Progess\n")) -## print(result[[3]]) -## if(verbose==2) -## print(result[[4]]) -## } -## } - - - -rbstream <- function(z,size=3000,mc,asraw=FALSE,quiet=FALSE){ - v <- vector(mode='list',length=size) - i <- 0;by <- 0;ed <- 0 - while(TRUE){ - sz1 <- readBin(z$fromjava,integer(),n=1,endian="big") - if(sz1<=0) { - ed=sz1; - break - } - rw.k <- readBin(z$fromjava,raw(),n=sz1,endian="big") - sz2 <- readBin(z$fromjava,integer(),n=1,endian="big") - if(sz2<=0) { - ed = sz2; - break - } - rw.v <- readBin(z$fromjava,raw(),n=sz2,endian="big") - i <- i+1 - if(i %% size == 0) - v <- append(v,vector(mode='list',length=size)) - v[[i]] <- list(rw.k,rw.v) - by <- by+ sz1+sz2 - } - if(ed<0) { - rwe <- rhuz(readBin(z$fromjava,raw(),n=-ed,endian="big")) - stop(rwe) - } - prs <- if(i>1) "pairs" else "pair" - if(!quiet){ - if( (by < 1024)) - message(sprintf("RHIPE: Read %s %s occupying %s bytes, deserializing", i,prs,by)) - else if( (by < 1024*1024)) - message(sprintf("RHIPE: Read %s %s occupying %s KB, deserializing", i,prs, round(by/1024,3))) - else - message(sprintf("RHIPE: Read %s %s occupying %s MB, deserializing", i,prs, round(by/1024^2,3))) - } - MCL <- mc - p <- v[unlist(MCL(v,function(r) !is.null(r)))] - if (!asraw) - MCL(p,function(r) list(rhuz(r[[1]]),rhuz(r[[2]]))) - else - p - -} - -rhstreamsequence <- function(inputfile,type='sequence',batch=1000,quiet=TRUE,...){ - ## We can't afford the java server to crash now, else it will - ## throw all the reads off sync - calledcode <- rhoptions()[[".code.in"]] - files <- Rhipe:::getypes(inputfile,type) - index <- 1;max.file <- length(files) - if(!quiet) cat(sprintf("Moved to file %s (%s/%s)\n", files[index],index,max.file)) - x <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhopensequencefile",files[1]),getresponse=1L) - if(x[[1]]=="OK"){ - return(list(get=function(mc=FALSE){ - quantum <- batch - ## if (rhoptions()[[".code.in"]]!=calledcode) warning("Server has been restarted, excpect an error") - p <- Rhipe:::send.cmd(rhoptions()$child$handle, list("rhgetnextkv", files[index],as.integer(quantum)) - ,getresponse=0L, - conti = function(){ - return(Rhipe:::rbstream(rhoptions()$child$handle,size=quantum,mc=mc,quiet=quiet,...)) - }) - if(length(p)==quantum) return(p) - ## if p is of length 0, either fast forward to next file in files - ## that is not empty! OR if already at end, return empty list - ## also user requested quantum but we got less, so read some more - ## p <- list() - while(TRUE){ - index<<-index+1 - if(index> max.file) break - if(!quiet) cat(sprintf("Moved to file %s (%s/%s)\n", files[index],index,max.file)) - x <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhopensequencefile",files[index]),getresponse=1L) - if(x[[1]]!="OK") stop(sprintf("Problem reading next in sequence %s",files[index])) - p <- append(p,Rhipe:::send.cmd(rhoptions()$child$handle, list("rhgetnextkv", files[index],as.integer(quantum)) - ,getresponse=0L, - conti = function(){ - return(Rhipe:::rbstream(rhoptions()$child$handle,size=quantum,mc=mc,quiet=quiet,...)) - })) - if(length(p)==quantum) break - } - return(p) - },close=function(){ - ## if (rhoptions()[[".code.in"]]!=calledcode) warning("Server has been restarted, excpect an error") - x <- Rhipe:::send.cmd(rhoptions()$child$handle,list("rhclosesequencefile",files[index],getresponse=1L)) - })) - }else stop(sprintf("Could not open %s for readin",inputfile)) -} - - -rhbiglm.stream.hdfs <- function(filename,type='sequence',modifier=NULL,batch=100,...){ - a <- NULL - index = 1 - return(function(reset=FALSE){ - if(reset){ - index<<-1 - if(!is.null(a)) a$close() - a <<- Rhipe::rhstreamsequence(filename,type,batch,...) - modifier(NULL,TRUE) - }else{ - dd <- a$get() - if(length(dd)==0) return(NULL) - p=do.call("rbind",lapply(dd,"[[",2)) - p <- if(!is.null(modifier)) modifier(p,reset) else p - return(p) - }})} - - -scalarSummer <- expression( - pre={ total=0 }, - reduce = { total <- total+sum(unlist(reduce.values)) }, - post = { rhcollect(reduce.key, total)} - ) - - - -################################################################################################ -# shutdownJavaServer -# Tries to shutdown a java process from handle -################################################################################################ -javaServerCommand = function(con,command ){ - command <- rhsz(command) - writeBin(length(command),con, endian='big') - writeBin(command, con, endian='big') - flush(con) - -} - - - -################################################################################################ -# HELPER FUNCTION TO KILL THE PROCESS WITH THE HANDLE GENERATED BY .rhinit -# Tries to let the Java server shut itself off and then kill it -# Author: Jeremiah Rounds -################################################################################################ -killServer = function(handle){ - if(is.null(handle)) - stop("Handle is NULL") - if(!is.environment(handle)) - stop("Handle must be environment.") - - - killed=FALSE - if(!is.null(handle$killed)) - killed= handle$killed - if(killed) - return(TRUE) - handle$killed = TRUE - - - #Decided best practice at the moment is to do nothing if the handle won't respond to heart beats. - #We did nothing for a year in all cases. Reverting to that in the case where the server is - #unresponsive cannot hurt compared to older versions of Rhipe. - #necessary to wait for a response before trying to shutdown - if(isalive(handle)){ - if(!is.null(rhoptions()$quiet) && !rhoptions()$quiet) - cat(sprintf("Rhipe shutting down old Java server.\n")) #,handle$ports['PID'])); - #assumes it will shutdown if it is responding to heartbeats. - try({javaServerCommand(handle$tojava, list("shutdownJavaServer"))}, silent=TRUE) - for(x in list(handle$tojava, handle$fromjava,handle$err)) - try({close(x)},silent=TRUE) - } - - - - return(TRUE) - -} diff --git a/code/R/R/zzz.R b/code/R/R/zzz.R index 9549fea..810cbae 100644 --- a/code/R/R/zzz.R +++ b/code/R/R/zzz.R @@ -6,184 +6,176 @@ attr(vvvv,"date") <- 'Sunday 20th January' class(vvvv) <- "rhversion" assign("rhipeOptions" ,list(version=vvvv) ,envir=.rhipeEnv ) -Mode <- "experimental" - - - - .onLoad <- function(libname,pkgname){ library.dynam("Rhipe", pkgname, libname) - #require(methods) onload.2(libname,pkgname) } + onload.2 <- function(libname, pkgname){ opts <- get("rhipeOptions",envir=.rhipeEnv) - ################################################################################################ - # JAVA AND HADOOP - ################################################################################################ - - opts$jarloc <- list.files(paste(system.file(package="Rhipe"),"java",sep=.Platform$file.sep),pattern="Rhipe.jar$",full=T) - opts$mycp <- list.files(paste(system.file(package="Rhipe"),"java",sep=.Platform$file.sep),pattern="jar$",full=T) - opts$mycp <- setdiff(opts$mycp, opts$jarloc) - if(Sys.getenv("HADOOP")=="" && Sys.getenv("HADOOP_BIN")=="") - warning("Rhipe requires the HADOOP or HADOOP_BIN environment variable to be present\n $HADOOP/bin/hadoop or $HADOOP_BIN/hadoop should exists") - - if(Sys.getenv("HADOOP_BIN")==""){ - warning("Rhipe: HADOOP_BIN is missing, using $HADOOP/bin") - Sys.setenv(HADOOP_BIN=sprintf("%s/bin",Sys.getenv("HADOOP"))) - } - - ################################################################################################ - # RhipeMapReduce, runner, and checks - ################################################################################################ - - opts$RhipeMapReduce <- list.files(paste(system.file(package="Rhipe"),"bin",sep=.Platform$file.sep), - pattern="^RhipeMapReduce$",full=T) - - if(is.null(opts$RhipeMapReduce) || length(opts$RhipeMapReduce) != 1){ - warning("RhipeMapReduce executable not found in package bin folder as expected") - } - #RhipeMapReduce is the executable, but the simpliest way to run it is via R CMD which sets up environment variables. - opts$runner <-paste("R","CMD", opts$RhipeMapReduce ,"--slave","--silent","--vanilla") #,"--max-ppsize=100000","--max-nsize=1G") + ## ############################################################################################## + ## JAVA AND HADOOP + ## ############################################################################################# + opts$jarloc <- list.files(paste(system.file(package="Rhipe"),"java",sep=.Platform$file.sep),pattern="Rhipe.jar$",full=T) + opts$mycp <- list.files(paste(system.file(package="Rhipe"),"java",sep=.Platform$file.sep),pattern="jar$",full=T) + opts$mycp <- setdiff(opts$mycp, opts$jarloc) + if(Sys.getenv("HADOOP")=="" && Sys.getenv("HADOOP_HOME") && Sys.getenv("HADOOP_BIN")=="") + warning("Rhipe requires HADOOP_HOME or HADOOP or HADOOP_BIN environment variable to be present\n $HADOOP/bin/hadoop or $HADOOP_BIN/hadoop should exist") + if(Sys.getenv("HADOOP_BIN")==""){ + warning("Rhipe: HADOOP_BIN is missing, using $HADOOP/bin") + Sys.setenv(HADOOP_BIN=sprintf("%s/bin",Sys.getenv("HADOOP"))) + } + + if(Sys.getenv("HADOOP_HOME")=="") + warning("HADOOP_HOME missing") + if(Sys.getenv("HADOOP_CONF_DIR")=="") + warning("HADOOP_CONF_DIR missing, you are probably going to have a problem running RHIPE.\nHADOOP_CONF_DIR should be the location of the directory that contains the configuration files") + opts$hadoop.env <-Sys.getenv(c("HADOOP_HOME","HADOOP_CONF_DIR")) + ## ############################################################################################## + ## RhipeMapReduce, runner, and checks + ## ############################################################################################## + opts$RhipeMapReduce <- list.files(paste(system.file(package="Rhipe"),"bin",sep=.Platform$file.sep), + pattern="^RhipeMapReduce$",full=T) + if(is.null(opts$RhipeMapReduce) || length(opts$RhipeMapReduce) != 1){ + warning("RhipeMapReduce executable not found in package bin folder as expected") + } + ##RhipeMapReduce is the executable, but the simpliest way to run it is via R CMD which sets up environment variables. + opts$runner <-paste("R","CMD", opts$RhipeMapReduce ,"--slave","--silent","--vanilla") #,"--max-ppsize=100000","--max-nsize=1G") - ################################################################################################ - # OTHER DEFAULTS - ################################################################################################ - - opts$file.types.remove.regex ="(/_SUCCESS|/_LOG|/_log|rhipe_debug|rhipe_merged_index_db)" - opts$max.read.in.size <- 200*1024*1024 ## 100MB - opts$reduce.output.records.warn <- 200*1000 - opts$rhmr.max.records.to.read.in <- NA - opts$HADOOP.TMP.FOLDER <- "/tmp" - opts$readback <- TRUE - opts$zips <- c() - opts$hdfs.working.dir = "/" - ## other defaults - opts$copyObjects <- list(auto=TRUE,maxsize=100*1024*1024, exclude=c("map.values","map.keys","reduce.values","reduce.key","rhcollect")) - opts$templates <- list() - opts$templates$scalarsummer <- expression( - pre={.sum <- 0}, - reduce={.sum <- .sum+ sum(unlist(reduce.values),na.rm=TRUE)}, - post = { {rhcollect(reduce.key,.sum)}} ) - opts$templates$scalarsummer <- structure(opts$templates$scalarsummer,combine=TRUE) - opts$templates$colsummer <- expression( - pre={.sum <- 0}, - reduce={.sum <- .sum + apply(do.call('rbind', reduce.values),2,sum)}, - post = { {rhcollect(reduce.key,.sum)}} ) - opts$templates$colsummer <- structure(opts$templates$colsummer,combine=TRUE) + ## ############################################################################################## + ## OTHER DEFAULTS + ## ############################################################################################## + opts$file.types.remove.regex ="(/_SUCCESS|/_LOG|/_log|rhipe_debug|rhipe_merged_index_db)" + opts$max.read.in.size <- 200*1024*1024 ## 100MB + opts$reduce.output.records.warn <- 200*1000 + opts$rhmr.max.records.to.read.in <- NA + opts$HADOOP.TMP.FOLDER <- "/tmp" + opts$readback <- TRUE + opts$zips <- c() + opts$hdfs.working.dir = "/" + ## other defaults + opts$copyObjects <- list(auto=TRUE,maxsize=100*1024*1024, exclude=c("map.values","map.keys","reduce.values","reduce.key","rhcollect")) + opts$templates <- list() + opts$templates$scalarsummer <- expression( + pre={.sum <- 0}, + reduce={.sum <- .sum+ sum(unlist(reduce.values),na.rm=TRUE)}, + post = { {rhcollect(reduce.key,.sum)}} ) + opts$templates$scalarsummer <- structure(opts$templates$scalarsummer,combine=TRUE) + opts$templates$colsummer <- expression( + pre={.sum <- 0}, + reduce={.sum <- .sum + apply(do.call('rbind', reduce.values),2,sum)}, + post = { {rhcollect(reduce.key,.sum)}} ) + opts$templates$colsummer <- structure(opts$templates$colsummer,combine=TRUE) + + opts$templates$rbinder <- function(r=NULL,combine=FALSE,dfname='adata'){ + ..r <- substitute(r) + r <- if( is(..r,"name")) get(as.character(..r)) else ..r + def <- if(is.null(r)) TRUE else FALSE + r <- if(is.null(r)) substitute({rhcollect(reduce.key, adata)}) else r + y <-bquote(expression( + pre = { adata <- list()}, + reduce = { adata[[length(adata) + 1 ]] <- reduce.values }, + post = { + adata <- do.call("rbind", unlist(adata,recursive=FALSE)); + .(P) + }), list(P=r)) + y <- if(combine || def) structure(y,combine=TRUE) else y + environment(y) <- .BaseNamespaceEnv + y + } + opts$templates$raggregate <- function(r=NULL,combine=FALSE,dfname='adata'){ + ..r <- substitute(r) + ..r <- if( is(..r,"name")) get(as.character(..r)) else r + def <- if(is.null(..r)) TRUE else FALSE + r <- if(is.null(..r)) substitute({ adata <- unlist(adata, recursive = FALSE); rhcollect(reduce.key, adata)}) else ..r + y <-bquote(expression( + pre = { adata <- list()}, + reduce = { adata[[length(adata) + 1 ]] <- reduce.values }, + post = { + .(P) + }), list(P=r)) + y <- if(combine || def) structure(y,combine=TRUE) else y + environment(y) <-.BaseNamespaceEnv ## Using GlobalEnv screws thing sup ... + + y + } + opts$templates$identity <- expression(reduce={ lapply(reduce.values,function(r) rhcollect(reduce.key,r)) }) + opts$templates$range <- expression( + pre = { + rng <- c(Inf,-Inf) + }, + reduce = { + rx <- unlist(reduce.values) + rng <- c(min(rng[1],rx,na.rm=TRUE),max(rng[2],rx,na.rm=TRUE)) + }, + post={rhcollect(reduce.key,rng)} + ) + opts$templates$range <- structure(opts$templates$range,combine=TRUE) + opts$debug <- list() + opts$debug$map <- list() + opts$debug$map$collect <- list(setup= expression({ + rhAccumulateError <- local({ + maxm <- tryCatch(rh.max.errors,error=function(e) 20) + x <- function(maximum.errors=maxm){ + errors <- list() + maximum.errors <- maximum.errors + counter <- 0 + return(function(X,retrieve=FALSE){ + if(retrieve) return(errors) + if(counter0){ + save(rhipe.errors,file=sprintf("./tmp/rhipe_debug_%s",Sys.getenv("mapred.task.id"))) + rhcounter("@R_DebugFile","saved.files",1) + } + }), handler=function(e,k,r){ + rhcounter("R_UNTRAPPED_ERRORS",as.character(e),1) + rhAccumulateError(list(as.character(e),k,r)) + }) + opts$debug$map$count <- list(setup=NA, cleanup=NA, handler=function(e,k,r) rhcounter("R_UNTRAPPED_ERRORS",as.character(e),1)) + opts$debug$map[["stop"]] <- list(setup=NA, cleanup=NA, handler=function(e,k,r) rhcounter("R_ERRORS", as.character(e),1)) - opts$templates$rbinder <- function(r=NULL,combine=FALSE,dfname='adata'){ - ..r <- substitute(r) - r <- if( is(..r,"name")) get(as.character(..r)) else ..r - def <- if(is.null(r)) TRUE else FALSE - r <- if(is.null(r)) substitute({rhcollect(reduce.key, adata)}) else r - y <-bquote(expression( - pre = { adata <- list()}, - reduce = { adata[[length(adata) + 1 ]] <- reduce.values }, - post = { - adata <- do.call("rbind", unlist(adata,recursive=FALSE)); - .(P) - }), list(P=r)) - y <- if(combine || def) structure(y,combine=TRUE) else y - environment(y) <- .BaseNamespaceEnv - y - } - opts$templates$raggregate <- function(r=NULL,combine=FALSE,dfname='adata'){ - ..r <- substitute(r) - ..r <- if( is(..r,"name")) get(as.character(..r)) else r - def <- if(is.null(..r)) TRUE else FALSE - r <- if(is.null(..r)) substitute({ adata <- unlist(adata, recursive = FALSE); rhcollect(reduce.key, adata)}) else ..r - y <-bquote(expression( - pre = { adata <- list()}, - reduce = { adata[[length(adata) + 1 ]] <- reduce.values }, - post = { - .(P) - }), list(P=r)) - y <- if(combine || def) structure(y,combine=TRUE) else y - environment(y) <-.BaseNamespaceEnv ## Using GlobalEnv screws thing sup ... - - y - } - opts$templates$identity <- expression(reduce={ lapply(reduce.values,function(r) rhcollect(reduce.key,r)) }) - opts$templates$range <- expression( - pre = { - rng <- c(Inf,-Inf) - }, - reduce = { - rx <- unlist(reduce.values) - rng <- c(min(rng[1],rx,na.rm=TRUE),max(rng[2],rx,na.rm=TRUE)) - }, - post={rhcollect(reduce.key,rng)} - ) - opts$templates$range <- structure(opts$templates$range,combine=TRUE) - opts$debug <- list() - opts$debug$map <- list() - opts$debug$map$collect <- list(setup= expression({ - rhAccumulateError <- local({ - maxm <- tryCatch(rh.max.errors,error=function(e) 20) - x <- function(maximum.errors=maxm){ - errors <- list() - maximum.errors <- maximum.errors - counter <- 0 - return(function(X,retrieve=FALSE){ - if(retrieve) return(errors) - if(counter0){ - save(rhipe.errors,file=sprintf("./tmp/rhipe_debug_%s",Sys.getenv("mapred.task.id"))) - rhcounter("@R_DebugFile","saved.files",1) - } - }), handler=function(e,k,r){ - rhcounter("R_UNTRAPPED_ERRORS",as.character(e),1) - rhAccumulateError(list(as.character(e),k,r)) - }) - opts$debug$map$count <- list(setup=NA, cleanup=NA, handler=function(e,k,r) rhcounter("R_UNTRAPPED_ERRORS",as.character(e),1)) - opts$debug$map[["stop"]] <- list(setup=NA, cleanup=NA, handler=function(e,k,r) rhcounter("R_ERRORS", as.character(e),1)) - ## ##################### ## Handle IO Formats ## ###################### opts <- handleIOFormats(opts) - ################################################################################################ - # FINSHING - ################################################################################################ + ## ############################################################################################## + ## FINSHING + ## ############################################################################################# assign("rhipeOptions",opts,envir=.rhipeEnv) - message("-------------------------------------------------------- -| IMPORTANT: Before using Rhipe call rhinit() | -| Rhipe will not work or most probably crash | ---------------------------------------------------------") + initialize() } -first.run <- function(buglevel=0){ - ## if(buglevel>0) message("Initial call to personal server") - ## Rhipe::rhinit(errors=TRUE,info=if(buglevel) TRUE else FALSE,buglevel=buglevel) - ## rhoptions(mode = Rhipe:::Mode,mropts=rhmropts(),quiet=FALSE) # "experimental" - ## if(buglevel>0) message("Secondary call to personal server") - ## Rhipe::rhinit(errors=TRUE,info=if(buglevel) TRUE else FALSE,buglevel=buglevel) - ## Sys.sleep(2) - ## message("Rhipe first run complete") - ## return(TRUE) - - stop("Function has been deprecated, call rhinit(first=TRUE)") +initialize <- function(){ + opts <- rhoptions() + hadoop <- opts$hadoop.env + library(rJava) + c1 <- list.files(hadoop["HADOOP_HOME"],pattern="jar$",full=T,rec=TRUE) + c2 <- hadoop["HADOOP_CONF_DIR"] + .jinit() + .jaddClassPath(c(c2,c1,opts$jarloc,opts$mycp)) + message("Initializing Rhipe") + server <- .jnew("org/godhuli/rhipe/PersonalServer") + dbg <- as.integer(Sys.getenv("RHIPE_DEBUG_LEVEL")) + tryCatch(server$run(if(is.na(dbg)) 0L else dbg),Exception=function(e) e$printStackTrace()) + rhoptions(server=server) + rhoptions(mropts = Rhipe:::rhmropts()) + message("Initializing mapfile caches") + rh.init.cache() } - - - - - - - - - +#' @export +rhinit <- function(...){ + warning("Defunct. Will disappear soon, no need to call this") +} diff --git a/code/R/man/hdfsReadLines.Rd b/code/R/man/hdfsReadLines.Rd new file mode 100644 index 0000000..427b214 --- /dev/null +++ b/code/R/man/hdfsReadLines.Rd @@ -0,0 +1,16 @@ +\name{hdfsReadLines} +\alias{hdfsReadLines} +\title{Reads all lines from a text file located on the HDFS} +\usage{ + hdfsReadLines(inp) +} +\arguments{ + \item{inp}{The location of the text file, interolated + based on hdfs.getwd} +} +\description{ + Reads all lines from a text file located on the HDFS +} +\keyword{HDFS} +\keyword{TextFile} + diff --git a/code/java/build.num b/code/java/build.num index 34e9840..7eb12f5 100644 --- a/code/java/build.num +++ b/code/java/build.num @@ -1,3 +1,3 @@ #Build Number for ANT. Do not edit! -#Wed Dec 19 16:40:54 PST 2012 -build.number=475 +#Tue Jan 22 11:54:22 PST 2013 +build.number=500 diff --git a/code/java/org/godhuli/rhipe/FileUtils.java b/code/java/org/godhuli/rhipe/FileUtils.java index 85ff387..2c14905 100644 --- a/code/java/org/godhuli/rhipe/FileUtils.java +++ b/code/java/org/godhuli/rhipe/FileUtils.java @@ -150,11 +150,10 @@ public void copyMain(String src,String dest) throws IOException{ } } - public String[] ls(REXP r,int f) throws IOException,FileNotFoundException, + public String[] ls(String[] r,int f) throws IOException,FileNotFoundException, URISyntaxException{ ArrayList lsco = new ArrayList(); - for(int i=0;i0 ? true:false); } return(lsco.toArray(new String[]{})); @@ -449,20 +448,13 @@ public void getKeys(REXP rexp0, DataOutputStream out,boolean vint) throws Except } rw.close(); } - } - public REXP getstatus(REXP r) throws Exception{ - String jd = r.getRexpValue(0).getStringValue(0).getStrval(); - boolean geterrors = r.getRexpValue(1).getIntValue(0)==1? true :false; - // org.apache.hadoop.mapreduce.JobID jid = org.apache.hadoop.mapreduce.JobID.forName(jd); + public REXP getstatus(String jd, boolean geterrors) throws Exception{ org.apache.hadoop.mapred.JobID jj = org.apache.hadoop.mapred.JobID.forName(jd); if(jj==null) throw new IOException("Jobtracker could not find jobID: "+jd); - // org.apache.hadoop.mapred.JobClient jclient = new org.apache.hadoop.mapred.JobClient( - // org.apache.hadoop.mapred.JobTracker.getAddress(c),c); - // org.apache.hadoop.mapred.JobID jj = org.apache.hadoop.mapred.JobID.downgrade(jid); org.apache.hadoop.mapred.RunningJob rj = jclient.getJob(jj); if(rj==null) throw new IOException("No such job: "+jd+" available, wrong job? or try the History Viewer (see the Web UI) "); @@ -571,6 +563,7 @@ public REXP getstatus(REXP r) throws Exception{ thevals.addRexpValue( errcontainer); thevals.addRexpValue( RObjects.makeStringVector(rj.getTrackingURL())); thevals.addRexpValue( RObjects.makeStringVector( new String[]{ jobname})); + thevals.addRexpValue( RObjects.makeStringVector( new String[]{ jobfile})); return(thevals.build()); } @@ -585,18 +578,13 @@ public long getStart(org.apache.hadoop.mapred.JobClient jc,org.apache.hadoop.map } return(0); } - public void killjob(REXP r) throws Exception{ - String jd = r.getRexpValue(0).getStringValue(0).getStrval(); - // org.apache.hadoop.mapreduce.JobID jid = org.apache.hadoop.mapreduce.JobID.forName(jd); - // org.apache.hadoop.mapred.JobID jj = org.apache.hadoop.mapred.JobID.downgrade(jid); + public void killjob(String jd) throws Exception{ org.apache.hadoop.mapred.JobID jj = org.apache.hadoop.mapred.JobID.forName(jd); org.apache.hadoop.mapred.RunningJob rj = jclient.getJob(jj); rj.killJob(); } - public REXP joinjob(REXP r) throws Exception{ - String jd = r.getRexpValue(0).getStringValue(0).getStrval(); - boolean verbose = r.getRexpValue(1).getStringValue(0).getStrval().equals("TRUE")? true:false; + public REXP joinjob(String jd, boolean verbose) throws Exception{ org.apache.hadoop.mapred.JobID jj = org.apache.hadoop.mapred.JobID.forName(jd); org.apache.hadoop.mapred.RunningJob rj = jclient.getJob(jj); String jobfile = rj.getJobFile(); @@ -639,98 +627,5 @@ public static REXP buildlistFromOldCounter(org.apache.hadoop.mapred.Counters c,d return(RObjects.makeList(groupdispname,cn)); } public static void main(String[] args) throws Exception{ - int cmd = Integer.parseInt(args[0]); - //parse data - //invokes class CMD inputfile - //writes results(or erors) to inputfile - REXP r; - REXP b=null; - boolean error = false; - FileUtils fu= new FileUtils(new Configuration()); - try{ - switch(cmd){ - case 0: - // hadoop options - b = fu.mapredopts(); - fu.writeTo(args[1], b); - break; - case 1: - // ls - r = fu.readInfo(args[1]); - String[] result0 = fu.ls(r.getRexpValue(0),r.getRexpValue(1).getIntValue(0)); - b = RObjects.makeStringVector(result0); - fu.writeTo(args[1], b); - break; - case 2: - //copy from hdfs to local - r = fu.readInfo(args[1]); - String src = r.getStringValue(0).getStrval(); - String dest = r.getStringValue(1).getStrval(); - fu.copyMain(src,dest); - break; - case 3: - //delete from the hdfs - r = fu.readInfo(args[1]); - for(int i=0;i< r.getStringValueCount();i++){ - String s = r.getStringValue(i).getStrval(); - fu.delete(s,true); - } - break; - case 4: - //copy local files to hdfs - r = fu.readInfo(args[1]); - String[] locals = new String[r.getRexpValue(0).getStringValueCount()]; - for(int i=0;i commands = new Vector(); - commands.add("/bin/bash"); - commands.add("-c"); - commands.add("echo $PPID"); - ProcessBuilder pb = new ProcessBuilder(commands); - Process pr = pb.start(); - pr.waitFor(); - if (pr.exitValue() == 0) { - BufferedReader outReader = new BufferedReader( - new InputStreamReader(pr.getInputStream())); - return outReader.readLine().trim(); - } else { - throw new IOException("Problem getting PPID"); - } - } - - public void docrudehack(String temp) throws IOException { - FileWriter outFile = new FileWriter(temp); - String x = "DONE"; - outFile.write(x, 0, x.length()); - outFile.flush(); - outFile.close(); - } - public void setUserInfo(String ipaddress, String tempfile, String tempfile2, - int bugl) throws InterruptedException, FileNotFoundException, - UnknownHostException, SecurityException, IOException { + public void setUserInfo( int bugl) throws InterruptedException,IOException { bbuf = new byte[100]; this.buglevel = bugl; seqhash = new Hashtable(); mrhash = new Hashtable(); mapfilehash = new Hashtable(); - REXP.Builder thevals = REXP.newBuilder(); - thevals.setRclass(REXP.RClass.LOGICAL); - thevals.addBooleanValue(REXP.RBOOLEAN.T); - yesalive = thevals.build(); - if (buglevel > 10) - LOG.info("Calling FileUtils"); - fu = new FileUtils(getConf()); - if (buglevel > 10) - LOG.info("Got FileUtils object:" + fu); - - ServerSocket fromRsock, errsock, toRsock; - if (buglevel > 10) - LOG.info("Creating listening and writing sockets"); - fromRsock = new ServerSocket(0, 0, InetAddress.getByName(ipaddress)); - toRsock = new ServerSocket(0); - errsock = new ServerSocket(0); - if (buglevel > 10) - LOG.info("Got fromRsock=" + fromRsock + " toRsock=" + toRsock - + " errsock=" + errsock); - FileWriter outFile = new FileWriter(tempfile); - if (buglevel > 10) - LOG.info("Writing information to file:" + outFile); - String x = "fromR toR err PID\n"; - outFile.write(x, 0, x.length()); - x = fromRsock.getLocalPort() + " " + toRsock.getLocalPort() + " " - + errsock.getLocalPort() + " " + getPID() + "\n"; - outFile.write(x, 0, x.length()); - outFile.flush(); - outFile.close(); - docrudehack(tempfile2); - if (buglevel > 10) - LOG.info("Finished with crudehack by creating a file called " - + tempfile2); - Socket a = fromRsock.accept(); - _fromR = new DataInputStream(new BufferedInputStream( - a.getInputStream(), 1024)); - a = toRsock.accept(); - _toR = new DataOutputStream(new BufferedOutputStream(a - .getOutputStream(), 1024)); - a = errsock.accept(); - _err = new DataOutputStream(new BufferedOutputStream(a - .getOutputStream(), 1024)); - - if(buglevel > 10) - LOG.info("Initializing Caches"); - if (buglevel > 10) - LOG.info("Now waiting on all sockets"); + fu = new FileUtils(_configuration); } - public void rhmropts(REXP r) throws Exception { + public byte[] rhmropts() throws Exception { REXP b = fu.mapredopts(); - send_result(b); + return b.toByteArray(); } - public void rhcat(REXP r) throws Exception { - final int buff = r.getRexpValue(2).getIntValue(0); - final int mx = r.getRexpValue(3).getIntValue(0); - final int whattype = r.getRexpValue(4).getIntValue(0); - for (int i = 0; i < r.getRexpValue(1).getStringValueCount(); i++) { - Path srcPattern = new Path(r.getRexpValue(1).getStringValue(i) - .getStrval()); - new DelayedExceptionThrowing() { - void process(Path p, FileSystem srcFs) throws IOException { - if (srcFs.getFileStatus(p).isDir()) { - throw new IOException("Source must be a file."); - } - // System.err.println("INPUT="+p); - InputStream ins = srcFs.open(p); - if (whattype == 1) { - ins = new java.util.zip.GZIPInputStream(ins); - } - printToStdout(ins, buff, mx); - } - } - .globAndProcess(srcPattern, srcPattern.getFileSystem(fu - .getConf())); - } - _toR.writeInt(-1); - _toR.flush(); + public byte[] rhls(String p, int a) throws Exception { + return rhls(new String[]{p}, a); + } + public byte[] rhls(String[] p, int a) throws Exception { + String[] result0 = fu.ls(p, a); + REXP b = RObjects.makeStringVector(result0); + return b.toByteArray(); } - private void printToStdout(InputStream in, int buffsize, int mx) - throws IOException { - try { - byte buf[] = new byte[buffsize]; - int bytesRead = in.read(buf); - int totalread = bytesRead; - while (bytesRead >= 0) { - // System.err.println("Wrote "+bytesRead); - _toR.writeInt(bytesRead); - _toR.write(buf, 0, bytesRead); - _toR.flush(); - if (mx > -1 && totalread >= mx) - break; - bytesRead = in.read(buf); - totalread += bytesRead; - } - } finally { - in.close(); + + public void rhdel(String folder) throws Exception { + rhdel(new String[] {folder}); + } + public void rhdel(String[] folder) throws Exception { + for (String s: folder){ + fu.delete(s, true); } } + public void rhget(String src, String dest) throws Exception { + System.err.println("Copying " + src + " to " + dest); + fu.copyMain(src, dest); + } + public void rhput(String local,String dest2, boolean overwrite) throws Exception { + rhput(new String[] {local}, dest2, overwrite); + } + public void rhput(String[] locals,String dest2, boolean overwrite) throws Exception { + fu.copyFromLocalFile(locals, dest2, overwrite); + } - public void rhls(REXP r) throws Exception { - String[] result0 = fu.ls(r.getRexpValue(1) // This is a string vector - , r.getRexpValue(2).getIntValue(0)); - REXP b = RObjects.makeStringVector(result0); - send_result(b); + public byte[] rhstatus(String s,boolean b) throws Exception { + REXP result = fu.getstatus(s,b); + return result.toByteArray(); } - public void rhdel(REXP r) throws Exception { - for (int i = 0; i < r.getRexpValue(1).getStringValueCount(); i++) { - String s = r.getRexpValue(1).getStringValue(i).getStrval(); - fu.delete(s, true); - } - send_result("OK"); + public byte[] rhjoin(String a, boolean b) throws Exception { + REXP result = fu.joinjob(a,b); + return result.toByteArray(); } - public void rhget(REXP r) throws Exception { - String src = r.getRexpValue(1).getStringValue(0).getStrval(); - String dest = r.getRexpValue(2).getStringValue(0).getStrval(); - System.err.println("Copying " + src + " to " + dest); - fu.copyMain(src, dest); - send_result("OK"); + public void rhkill(String s) throws Exception { + fu.killjob(s); } - - public void rhput(REXP r) throws Exception { - String[] locals = new String[r.getRexpValue(1).getStringValueCount()]; - for (int i = 0; i < locals.length; i++) - locals[i] = r.getRexpValue(1).getStringValue(i).getStrval(); - String dest2 = r.getRexpValue(2).getStringValue(0).getStrval(); - REXP.RBOOLEAN overwrite_ = r.getRexpValue(3).getBooleanValue(0); - boolean overwrite; - if (overwrite_ == REXP.RBOOLEAN.F) - overwrite = false; - else if (overwrite_ == REXP.RBOOLEAN.T) - overwrite = true; - else - overwrite = false; - fu.copyFromLocalFile(locals, dest2, overwrite); - send_result("OK"); + public int rhex(String zonf) throws Exception { + int result = RHMR.fmain(new String[] {zonf}); + return result; + } + public int rhex(String[] zonf) throws Exception { + int result = RHMR.fmain(zonf); + return result; + } + + public void initializeCaches(int a, int b) throws Exception{ + valueCache = CacheBuilder.newBuilder() + .maximumWeight(a) //max MB in bytes, set to 100MB + .weigher(new Weigher() { + public int weigh(ValuePair k, RHBytesWritable g) { + return k.getKey().getLength() + g.getLength(); + } + }) + .recordStats() + .build(); + + RemovalListener removalListener = new RemovalListener() { + public void onRemoval(RemovalNotification removal) throws RuntimeException{ + try{ + MapFile.Reader conn = removal.getValue(); + LOG.info("Closing a mapfile, emptied from cache:"+conn); + conn.close(); + }catch(IOException e){ + throw new RuntimeException(e); + } + } + }; + mapfileReaderCache = CacheBuilder.newBuilder() + .maximumSize(b) + .removalListener(removalListener) + .recordStats() + .build(); } public void sequenceAsBinary(REXP r) throws Exception { // works @@ -286,13 +205,11 @@ public void sequenceAsBinary(REXP r) throws Exception { // works } public void rhopensequencefile(REXP r) throws Exception { - // System.out.println("----Called-----"); String name = r.getRexpValue(1).getStringValue(0).getStrval(); Configuration cfg = new Configuration(); SequenceFile.Reader sqr = new SequenceFile.Reader(FileSystem.get(cfg), new Path(name), cfg); seqhash.put(name, sqr); - send_result("OK"); } public void rhgetnextkv(REXP r) throws Exception { @@ -321,7 +238,8 @@ public void rhgetnextkv(REXP r) throws Exception { cdo.flush(); } - public void rhclosesequencefile(REXP r) throws Exception { + public void rhclosesequencefile(byte[] b) throws Exception { + REXP r = REXP.newBuilder().mergeFrom(b, 0,b.length).build(); String name = r.getRexpValue(1).getStringValue(0).getStrval(); SequenceFile.Reader sqr = seqhash.get(name); if (sqr != null) { @@ -331,128 +249,6 @@ public void rhclosesequencefile(REXP r) throws Exception { } catch (Exception e) { } } - send_result("OK"); - } - - public void rhstatus(REXP r) throws Exception { - REXP jid = r.getRexpValue(1); - REXP result = fu.getstatus(jid); - send_result(result); - } - - public void rhjoin(REXP r) throws Exception { - REXP result = fu.joinjob(r.getRexpValue(1)); - send_result(result); - } - - public void rhkill(REXP r) throws Exception { - REXP jid = r.getRexpValue(1); - fu.killjob(jid); - send_result("OK"); - } - public void send_alive() throws Exception { - try { - - _toR.writeByte(1); - _toR.flush(); - } catch (IOException e) { - System.err.println("RHIPE: Could not tell R it is alive"); - System.exit(1); - } - } - - public void send_error_message(Throwable e) { - ByteArrayOutputStream bs = new ByteArrayOutputStream(); - e.printStackTrace(new PrintStream(bs)); - String s = bs.toString(); - send_error_message(s); - } - - public void send_error_message(String s) { - REXP clattr = RObjects.makeStringVector("worker_error"); - REXP r = RObjects - .addAttr(RObjects.buildStringVector(new String[] { s }), - "class", clattr).build(); - System.err.println(s); - sendMessage(r, true); - } - - public void sendMessage(REXP r) { - sendMessage(r, false); - } - - public void sendMessage(REXP r, boolean bb) { - try { - byte[] b = r.toByteArray(); - DataOutputStream dos = _toR; - // if(bb) dos = _err; - if (bb) - dos.writeInt(-b.length); - else - dos.writeInt(b.length); - dos.write(b, 0, b.length); - dos.flush(); - } catch (IOException e) { - System.err - .println("RHIPE: Could not send data back to R master, sending to standard error"); - System.err.println(r); - System.exit(1); - } - } - - public void send_result(String s) { - REXP r = RObjects.makeStringVector(s); - send_result(r); - } - - public void send_result(REXP r) { - // we create a list of class "worker_result" - // it is a list of element given by s - // all results are class worker_result and are a list - REXP.Builder thevals = REXP.newBuilder(); - thevals.setRclass(REXP.RClass.LIST); - thevals.addRexpValue(r); - RObjects.addAttr(thevals, "class", RObjects - .makeStringVector("worker_result")); - sendMessage(thevals.build()); - } - - public void initializeCaches(REXP rexp) throws Exception{ - REXP rexp0 = rexp.getRexpValue(1); - // RemovalListener rl = new RemovalListener() { - // public void onRemoval(RemovalNotification removal) throws RuntimeException{ - // LOG.info("Extterminate key, emptied from cache:"+removal.getKey()); - // } - // }; - valueCache = CacheBuilder.newBuilder() - .maximumWeight(rexp0.getRexpValue(0).getIntValue(0)) //max MB in bytes, set to 100MB - .weigher(new Weigher() { - public int weigh(ValuePair k, RHBytesWritable g) { - return k.getKey().getLength() + g.getLength(); - } - }) - // .removalListener(rl) - .recordStats() - .build(); - - RemovalListener removalListener = new RemovalListener() { - public void onRemoval(RemovalNotification removal) throws RuntimeException{ - try{ - MapFile.Reader conn = removal.getValue(); - LOG.info("Closing a mapfile, emptied from cache:"+conn); - conn.close(); - }catch(IOException e){ - throw new RuntimeException(e); - } - } - }; - mapfileReaderCache = CacheBuilder.newBuilder() - .maximumSize(rexp0.getRexpValue(0).getIntValue(1)) - .removalListener(removalListener) - .recordStats() - .build(); - - send_result("OK"); } public void clearEntiresFor(String forkey) throws Exception{ @@ -463,14 +259,12 @@ public void clearEntiresFor(String forkey) throws Exception{ cachedHandle.clear(); cachedValues.clear(); } - public void initializeMapFile(REXP rexp) throws Exception{ - REXP rexp0 = rexp.getRexpValue(1); - REXP paths = rexp0.getRexpValue(0); // paths to read from - String akey = rexp0.getRexpValue(1).getStringValue(0).getStrval(); - String[] pathsForMap = new String[ paths.getStringValueCount() ]; - for (int i = 0; i < pathsForMap.length; i++) { - pathsForMap[i] = paths.getStringValue(i).getStrval(); - } + + + public void initializeMapFile(String pathsForMap, String akey) throws Exception{ + initializeMapFile(new String[] {pathsForMap}, akey); + } + public void initializeMapFile(String[] pathsForMap, String akey) throws Exception{ if(mapfilehash.get(akey)!=null){ LOG.info("Clearing Caches for "+akey); clearEntiresFor(akey); @@ -479,7 +273,6 @@ public void initializeMapFile(REXP rexp) throws Exception{ mapToValueCacheHandles.put(akey, new ArrayList(100)); } mapfilehash.put(akey, pathsForMap); - send_result("OK"); } public void rhgetkeys2(REXP rexp) throws Exception,IOException @@ -516,9 +309,7 @@ public void rhgetkeys2(REXP rexp) throws Exception,IOException out.flush(); } - public void cacheStatistics(REXP rexp){ - REXP rexp0 = rexp.getRexpValue(1); - int which = rexp0.getRexpValue(0).getIntValue(0); + public byte[] cacheStatistics(int which){ CacheStats c = null; if(which == 0) c = mapfileReaderCache.stats(); @@ -531,71 +322,10 @@ else if(which == 1) (double)c.loadSuccessCount(),(double)c.missCount(),(double)c.missRate(), (double)c.requestCount(),((double)c.totalLoadTime())*1e-6 }).build(); - send_result(r); - } - public void rhgetkeys(REXP rexp00) throws Exception { - REXP rexp0 = rexp00.getRexpValue(1); - REXP keys = rexp0.getRexpValue(0); // keys - REXP paths = rexp0.getRexpValue(1); // paths to read from - String tempdest = rexp0.getRexpValue(2).getStringValue(0).getStrval(); // tempdest - REXP.RBOOLEAN b = rexp0.getRexpValue(3).getBooleanValue(0); // as - - // sequence - // or binary - Configuration c = fu.getConf(); - DataOutputStream out = _toR; - String akey = rexp0.getRexpValue(4).getStringValue(0).getStrval(); // tempdest - String[] pnames = new String[paths.getStringValueCount()]; - for (int i = 0; i < pnames.length; i++) { - pnames[i] = paths.getStringValue(i).getStrval(); - } - MapFile.Reader[] mr = mrhash.get(akey); - if(mr == null){ - // LOG.info("Did not find in hashtable"); - mr = RHMapFileOutputFormat.getReaders(pnames, c); - mrhash.put(akey, mr); - } - - int numkeys = keys.getRexpValueCount(); - RHBytesWritable k = new RHBytesWritable(); - RHBytesWritable v = new RHBytesWritable(); - boolean closeOut = false; - if (b == REXP.RBOOLEAN.F) { // binary style - if (out == null) { - closeOut = true; - out = new DataOutputStream(new FileOutputStream(tempdest)); - } - for (int i = 0; i < numkeys; i++) { - k.set(keys.getRexpValue(i).getRawValue().toByteArray()); - RHMapFileOutputFormat.getEntry(mr, k, v); - k.writeAsInt(out); - v.writeAsInt(out); - } - if (closeOut) - out.close(); - else { - out.writeInt(0); - out.flush(); - } - } else {// these will be written out as a sequence file - RHWriter rw = new RHWriter(tempdest, fu.getConf()); - SequenceFile.Writer w = rw.getSQW(); - for (int i = 0; i < numkeys; i++) { - k.set(keys.getRexpValue(i).getRawValue().toByteArray()); - RHMapFileOutputFormat.getEntry(mr, k, v); - w.append(k, v); - } - rw.close(); - } + return r.toByteArray(); } - public void rhex(REXP rexp0) throws Exception { - String[] zonf = new String[] { rexp0.getRexpValue(1).getStringValue(0) - .getStrval() }; - int result = RHMR.fmain(zonf); - send_result("" + result); - } public void binaryAsSequence(REXP r) throws Exception { // works Configuration cfg = new Configuration(); @@ -622,8 +352,22 @@ public void binaryAsSequence(REXP r) throws Exception { // works w.doWriteFile(in, count); w.close(); } - send_result("OK"); } + public String[] readTextFile(String inp) throws Exception{ + FSDataInputStream in = _filesystem.open(new Path(inp)); + BufferedReader inb = new BufferedReader(new InputStreamReader(in)); + ArrayList arl = new ArrayList(); + + while(true){ + String s = inb.readLine(); + if(s == null) break; + arl.add(s); + } + String[] s = new String[arl.size()]; + s = arl.toArray(s); + return s; + } + public void binaryAsSequence2(REXP r) throws Exception { // works Configuration cfg = new Configuration(); String ofolder = r.getRexpValue(1).getStringValue(0).getStrval(); @@ -647,130 +391,17 @@ public void binaryAsSequence2(REXP r) throws Exception { // works try{ w.close(); }catch(Exception e){} - send_result("OK"); } - public void shutdownJavaServer() throws Exception { - //send_alive(); Actually, I will let R ask for send_alive(). - System.exit(0); - } - public void startme() { - while (true) { - try { - int size = _fromR.readInt(); - if (size > bbuf.length) { - bbuf = new byte[size]; - } - if (size < 0) - break; - else if (size == 0) - send_alive(); - else { - _fromR.readFully(bbuf, 0, size); - REXP r = REXP.newBuilder().mergeFrom(bbuf, 0, size).build(); - if (r.getRclass() == REXP.RClass.NULLTYPE) - send_alive(); - // the first element of list is function, the rest are - // arguments - String tag = r.getRexpValue(0).getStringValue(0) - .getStrval(); - // if (tag.equals("rhmropts")) - // rhmropts(r); - // else if (tag.equals("rhls")) - // rhls(r); - // else if (tag.equals("rhget")) - // rhget(r); - // else if (tag.equals("rhput")) - // rhput(r); - // else if (tag.equals("rhdel")) - // rhdel(r); - // else if (tag.equals("rhgetkeys")) - // rhgetkeys(r); - // else if (tag.equals("binaryAsSequence")) - // binaryAsSequence(r); - // else if (tag.equals("sequenceAsBinary")) - // sequenceAsBinary(r); - // else if (tag.equals("rhstatus")) - // rhstatus(r); - // else if (tag.equals("rhjoin")) - // rhjoin(r); - // else if (tag.equals("rhkill")) - // rhkill(r); - // else if (tag.equals("rhex")) - // rhex(r); - // else if (tag.equals("rhcat")) - // rhcat(r); - // else if (tag.equals("rhopensequencefile")) - // rhopensequencefile(r); - // else if (tag.equals("rhgetnextkv")) - // rhgetnextkv(r); - // else if (tag.equals("initializeCaches")) - // initializeCaches(r); - // else if (tag.equals("initializeMapFile")) - // initializeMapFile(r); - // else if (tag.equals("rhgetkeys2")) - // rhgetkeys2(r); - // else if (tag.equals("rhclosesequencefile")) - // rhclosesequencefile(r); - if (tag.equals("shutdownJavaServer")) - shutdownJavaServer(); - else{ - Method method = Class.forName("org.godhuli.rhipe.PersonalServer").getMethod(tag, new Class[]{REXP.class}); - method.invoke(this, r); - } - - // else if(tag.equals("rhcp")) rhcp(r); - // else if(tag.equals("rhmv")) rhmv(r); - // else if(tag.equals("rhmerge")) rhmerge(r); - - // else - // send_error_message("Could not find method with name: " - // + tag + "\n"); - } - } catch (EOFException e) { - System.exit(0); - } catch (SecurityException e) { - send_error_message(e.getCause()); - } catch (RuntimeException e) { - send_error_message(e); - } catch (IOException e) { - send_error_message(e); - } catch(NoSuchMethodException e){ - send_error_message(e.getCause()); - } catch(IllegalAccessException e){ - send_error_message(e.getCause()); - } catch(InvocationTargetException e){ - send_error_message(e.getCause()); - } catch (Exception e) { - send_error_message(e); - } - } - } - - public int run(String[] args) throws Exception { - // Configuration processed by ToolRunner - Configuration conf = getConf(); - _configuration = getConf(); + public int run(int buglevel) throws Exception { + _configuration = new Configuration(); _filesystem = FileSystem.get(_configuration); _hp = RHMapFileOutputFormat.getHP(); - int buglevel = Integer.parseInt(args[3]); - setUserInfo(args[0], args[1], args[2], - buglevel); - startme(); - - // while (true) { - // try { - // } catch (Exception e) { - // System.err.println(Thread.currentThread().getStackTrace()); - // } - // } + setUserInfo(buglevel); return(0); } - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(new Configuration(), new PersonalServer(), args); - } public abstract class DelayedExceptionThrowing { abstract void process(Path p, FileSystem srcFs) throws IOException;