Skip to content
Permalink
Browse files

Reorganized parallel processing.

  • Loading branch information
ChHaeni committed Jun 26, 2018
1 parent 2dfd4b0 commit b5acf2b59eca0b028ac0a561e05fb5aec14102ce
Showing with 109 additions and 91 deletions.
  1. +2 −2 DESCRIPTION
  2. +2 −2 NAMESPACE
  3. +67 −56 R/ALFAM2mod.R
  4. +38 −31 man/A2Mod.Rd
@@ -1,8 +1,8 @@
Package: ALFAM2
Type: Package
Title: Model and Data on Ammonia Emission from Field-Applied Manure
Version: 0.1.0
Date: 2018-06-11
Version: 0.1.1
Date: 2018-06-26
Author: Sasha D. Hafner, Christoph Haeni, ...
Maintainer: Sasha D. Hafner <sasha.hafner@eng.au.dk>
Description: An implementation of the ALFAM2 model for ammonia emission from field-applied manure. Model can be used with default parameters or applied to specific cases (locations, application methods, etc.). The completed ALFAM2 (alfam.dk) database, currently with ammonia emission measurements from 1800 field plots from 22 institutes in 12 countries, is also included.
@@ -1,3 +1,3 @@
import(doParallel)
import(foreach)
import(parallel)
importFrom("stats", "na.omit", "setNames")
export(ALFAM2mod)
@@ -39,14 +39,13 @@ ALFAM2mod <- function(
check.NA = TRUE,
pass.col = NULL,
add.incorp.rows = FALSE,
parallel = FALSE
parallel = FALSE,
n.cpus = 1
) {

#### NTS: not package-ready
if(parallel) {
library(foreach)
library(parallel) # NTS: shouldn't need, since in R-core??? -> hac: but you still have to load it, unless we det depend on those libraries...
library(doParallel)
requireNamespace("parallel")
}

#print(pars)
@@ -55,7 +54,6 @@ ALFAM2mod <- function(
# Add checks for all arguments
checkArgClassValue(dat, expected.class = 'data.frame')
checkArgClassValue(pars, expected.class = c('numeric', 'list'))

checkArgClassValue(time.incorp, expected.class = c('character', 'numeric', 'integer', 'NULL'))

# If pars was given as list, change to vector
@@ -198,72 +196,85 @@ ALFAM2mod <- function(
}
}

# e is output data frame
e <- NULL
# ToDo:
# - clean above for loop and f0 r1 etc parameters

# Not parallel
if(!parallel) {
for(i in sort(unique(dat$group))) {
dd <- dat[dat$group == i, ]
ff0 <- f0[dat$group == i]
rr1 <- r1[dat$group == i]
rr2 <- r2[dat$group == i]
rr3 <- r3[dat$group == i]
ff5 <- f5[dat$group == i]

# keep incorp rows?
if(add.incorp.rows){
dat[,"added.row"] <- rep(FALSE, nrow(dat))
}
s.dat <- split(cbind(dat,"__f0"=f0,"__r1"=r1,"__r2"=r2,"__r3"=r3,"__f5"=f5),dat$group)

# Not parallel
if(parallel) {
# starting cluster and trigger stop on.exit
cl <- parallel::makeCluster(n.cpus,type="SOCK")
on.exit(parallel::stopCluster(cl))

# sorting input for efficiency
s.nr <- sapply(s.dat,nrow)
do.nr <- order(s.nr,decreasing=TRUE)
e.list <- vector("list",length(s.dat))

# do parallel
# parallel::clusterExport(cl,c("calcEmis","time.name","app.name"))
e.list[do.nr] <- parallel::clusterApply(cl,s.dat[do.nr],function(sub.dat){
data.frame(group = sub.dat[!sub.dat$added.row,"group"], calcEmis(
ct = sub.dat[, time.name]
# Calculate a0 and u0 (f5 transfers done in calcEmis())
,a0 = sub.dat[1,"__f0"]*sub.dat[1, app.name]
,u0 = (1 - sub.dat[1,"__f0"])*sub.dat[1, app.name]
,r1 = sub.dat[,"__r1"]
,r2 = sub.dat[,"__r2"]
,r3 = sub.dat[,"__r3"]
,f5 = sub.dat[,"__f5"]
,ievent = sub.dat$ievent
,drop.rows = sub.dat$added.row)
, row.names = NULL, check.names = FALSE)
})

# stop cluster and empty on.exit
parallel::stopCluster(cl)
on.exit()
} else {
e.list <- vector("list",length(s.dat))
for(i in seq_along(s.dat)) {
# get subset
sub.dat <- s.dat[[i]]
# Check for duplicate ct
if(any(duplicated(dd[, time.name]))) {
if(any(duplicated(sub.dat[, time.name]))) {
stop('Look for 998123b in pmod.R. Duplicated ct values.')
}
# calculate emission
ce <- calcEmis(
ct = sub.dat[, time.name]
# Calculate a0 and u0 (f5 transfers done in calcEmis())
,a0 = sub.dat[1,"__f0"]*sub.dat[1, app.name]
,u0 = (1 - sub.dat[1,"__f0"])*sub.dat[1, app.name]
,r1 = sub.dat[,"__r1"]
,r2 = sub.dat[,"__r2"]
,r3 = sub.dat[,"__r3"]
,f5 = sub.dat[,"__f5"], ievent = sub.dat$ievent, drop.rows = sub.dat$added.row)
# add group
e.list[[i]] <- data.frame(group = sub.dat[!sub.dat$added.row,"group"], ce, row.names = NULL, check.names = FALSE)
}
}

# Calculate a0 and u0 (f5 transfers done in calcEmis())
u0 <- (1 - ff0[1])*dd[, app.name][1]
a0 <- ff0[1]*dd[, app.name][1]
ct <- dd[, time.name]
drop.rows <- dd$added.row
if(add.incorp.rows) drop.rows <- rep(FALSE, length(drop.rows))
ce <- calcEmis(ct = ct, a0 = a0, u0 = u0, r1 = rr1, r2 = rr2, r3 = rr3, f5 = ff5, ievent = dd$ievent, drop.rows = drop.rows)
e <- rbind(e, cbind(group = i, ce))
}
} else {
e <- foreach(i = sort(unique(dat$group)), .packages = 'minpack.lm', .export = 'calcEmis', .combine = rbind, .init = NULL) %dopar% {
dd <- dat[dat$group == i, ]
ff0 <- f0[dat$group == i]
rr1 <- r1[dat$group == i]
rr2 <- r2[dat$group == i]
rr3 <- r3[dat$group == i]
ff5 <- f5[dat$group == i]

# Check for duplicate ct
if(any(duplicated(dd[, time.name]))) {
stop('Look for 998123b in pmod.R. Duplicated ct values.')
}

# Calculate a0 and u0 (f5 transfers done in calcEmis())
u0 <- (1 - ff0[1])*dd[, app.name][1]
a0 <- ff0[1]*dd[, app.name][1]
ct <- dd[, time.name]
drop.rows <- dd$added.row
if(add.incorp.rows) drop.rows <- rep(FALSE, length(drop.rows))
ce <- calcEmis(ct = ct, a0 = a0, u0 = u0, r1 = rr1, r2 = rr2, r3 = rr3, f5 = ff5, ievent = dd$ievent, drop.rows = drop.rows)
# rbind e.list to data.frame
e <- do.call("rbind",e.list)

cbind(group = i, ce)
}
}

# rename 'group' column
if(!is.null(group)){
names(e)[1] <- group
}

# Sort to match original order
# NTS: check that this works
drop.rows <- dat$added.row
if(add.incorp.rows) drop.rows <- rep(FALSE, length(drop.rows))
e <- e[order(dat$orig.order[!drop.rows]), ]
e <- e[order(dat$orig.order[!dat$added.row]), ]

# Add pass-through column if requested
if(!is.null(pass.col)) {
e <- data.frame(setNames(dat[, paste0("pass_me.through_",pass.col)],pass.col), e)
e <- data.frame(setNames(dat[!dat$added.row, paste0("pass_me.through_",pass.col)],pass.col), e)
}

return(e)
@@ -12,35 +12,39 @@ The model is described in Hafner et al. (in preparation).

\usage{
ALFAM2mod(dat,
pars = c(int0 = 0.8,
int1 = -1,
int2 = -1,
int3 = -3,
app.methodos0 = -1,
app.rate0 = -0.02,
man.dm0 = 0.3,
man.sourcepig0 = -0.5,
incorpdeep5 = -4,
incorpshallow5 = -0.3,
app.methodbc1 = 0.6,
man.dm1 = -0.07,
air.temp1 = 0.04,
wind1 = 0.05,
air.temp3 = 0.005,
incorpdeep3 = -1,
app.methodos3 = -0.1)
app.name = "TAN.app", time.name = "ct",
time.incorp = NULL, group = NULL, center = TRUE,
cmns = c(app.rate = 40,
man.dm = 6,
man.tan = 1.2,
man.ph = 7.5,
air.temp = 13,
wind.1m = 2.7,
lwind = 0.43,
crop.z = 10),
check.NA = TRUE, pass.col = NULL,
add.incorp.rows = FALSE, parallel = FALSE)
pars = c(
int0 = -0.91400,
int1 = -1.16256,
int2 = -1.02444,
int3 = -2.92947,
app.methodos0 = -0.98384,
app.rate0 = -0.01602,
man.dm0 = 0.40164,
incorpdeep5 = -3.08108,
incorpshallow5 = -0.91376,
app.methodbc1 = 0.62870,
man.dm1 = -0.07974,
air.temp1 = 0.04909,
wind.1m1 = 0.04876,
air.temp3 = 0.01344,
incorpdeep3 = -0.74621,
app.methodos3 = -0.20088,
rain.rate2 = 0.38434
),
app.name = 'TAN.app', time.name = 'ct',
time.incorp = NULL,group = NULL, center = TRUE,
cmns = c(app.rate = 40,
man.dm = 6,
man.tan = 1.2,
man.ph = 7.5,
air.temp = 13,
wind.1m = 2.7,
lwind = 0.43,
crop.z = 10),
check.NA = TRUE, pass.col = NULL,
add.incorp.rows = FALSE,
parallel = FALSE,n.cpus = 1
)
}
%- maybe also 'usage' for other objects documented here.
\arguments{
@@ -81,6 +85,9 @@ ALFAM2mod(dat,
}
\item{parallel}{
should parallel processing be used? Helpful for very large \code{dat}.
}
\item{n.cpus}{
numeric. Defines the number of cores if \code{parallel} is \code{TRUE}.
}
}
\details{
@@ -163,8 +170,8 @@ ALFAM2mod(dat4, app.name = 'TAN.app', time.name = 'ctime', time.incorp = 4, add.

# Example 5
# Function accepts multiple groups
dat5 <- data.frame(field.plot = 1:5, ctime = 48, TAN.app = 100, man.dm = 5, air.temp = 15, app.methodbc = TRUE)
pred5 <- ALFAM2mod(dat6, app.name = 'TAN.app', time.name = 'ctime', group = 'plot', time.incorp = 't.incorp')
dat5 <- data.frame(field.plot = 1:5, ctime = 48, TAN.app = 100, man.dm = 5, air.temp = 15, app.methodbc = TRUE, t.incorp = 4)
pred5 <- ALFAM2mod(dat5, app.name = 'TAN.app', time.name = 'ctime', group = 'field.plot', time.incorp = 't.incorp')
pred5


4 comments on commit b5acf2b

@ChHaeni

This comment has been minimized.

Copy link
Collaborator Author

@ChHaeni ChHaeni replied Jun 26, 2018

#2 should now be solved.

@sashahafner

This comment has been minimized.

Copy link
Owner

@sashahafner sashahafner replied Sep 18, 2018

If we opt to use parallel::function_name for all the calls to parallel functions, then nothing more is needed, correct? Well, we need a "suggests" entry in DESCRIPTION, but no requireNamespace() and nothing in NAMESPACE. Right?

@sashahafner

This comment has been minimized.

Copy link
Owner

@sashahafner sashahafner replied Sep 18, 2018

And thanks for all the parallel code by the way :)

@ChHaeni

This comment has been minimized.

Copy link
Collaborator Author

@ChHaeni ChHaeni replied Sep 18, 2018

Yes, I think you're right, requireNamespace is not needed. I'm not sure, but I think that good practice would be to either put it under 'Imports' or put it under 'Suggests' and have an error throwing in the code, when needed but the package doesn't exist. On the other hand, parallel is in the standard library, so this is not necessary anyway...

Please sign in to comment.
You can’t perform that action at this time.