Skip to content

Commit

Permalink
Merge pull request #21 from clarkfitzg/data-example
Browse files Browse the repository at this point in the history
Data example
  • Loading branch information
fun-indra committed Oct 7, 2016
2 parents 5f36c0a + 3fe2b2d commit f6d7056
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 0 deletions.
34 changes: 34 additions & 0 deletions examples/pems_data/pems_base.R
@@ -0,0 +1,34 @@
# This is how I would do the task sequentially in base R

# Loads `read1` function and `pems_files`
source("pems_common.R")


# Read files in sequentially
slist <- lapply(pems_files, read1)

station <- do.call(rbind, slist)

# 1.6 GB
#print(object.size(station), units = "GB")

# 40 million rows x 8 columns
#dim(station)


s1 <- station$speed1
s2 <- station$speed2

# Define the subset
in50_90 <- 45 <= s1 & s1 <= 90 &
45 <= s2 & s2 <= 90 &
!is.na(s1) & !is.na(s2)

delta <- s1[in50_90] - s2[in50_90]

# About 2 seconds
binned <- cut(delta, breaks, break_names)

pdf('base_plot.pdf')
plot(binned)
dev.off()
32 changes: 32 additions & 0 deletions examples/pems_data/pems_common.R
@@ -0,0 +1,32 @@
# California PEMS data can be downloaded as the txt.gz
# files here:
# http://www.stat.ucdavis.edu/~clarkf/
#
# The goal here is to plot
# the speed differences between the first and second lanes.

# My machine has 4 physical cores and 16 GB memory. The full objects will be 1.3
# GB each, which is a little too big for comfort. Better just read the first two
# lanes in to cut this size in half.


# There are 4 gzipped files here
pems_files = list.files("~/data/pems/", full.names = TRUE)

# Read a single one of these
# Vary nrows if you run short on memory
read1 <- function(file, nrows = -1) {
read.table(file, header = FALSE, sep = ",", nrows = nrows,
col.names = c("timestamp", "station", "flow1", "occupancy1",
"speed1", "flow2", "occupancy2", "speed2",
rep("NULL", 18)),
colClasses = c("character", "factor", "integer", "numeric",
"integer", "integer", "numeric", "integer",
rep("NULL", 18))
)
}

# After selecting a subset we'll use these to bin the speed differences
breaks <- c(-Inf, seq(from = -16.5, to = 16.5, by = 3), Inf)
break_names <- as.character(breaks[-c(1, 2, length(breaks))] - 1.5)
break_names <- c("<-17", break_names, ">17")
55 changes: 55 additions & 0 deletions examples/pems_data/pems_ddR.R
@@ -0,0 +1,55 @@
# See pems_common.R for a description of the data and task
#
# Loads `read1`, `pems_files`, and `breaks`
source("pems_common.R")

library(ddR)

# Can try various options with ddR: useBackend(parallel, type = 'PSOCK')


# Distributed station dataframe:
station <- dmapply(read1, pems_files, output.type = "dframe", nparts = c(4, 1),
combine = "rbind")


allrows <- seq.int(nrow(station))
speed1 <- station[allrows, 5]
speed2 <- station[allrows, 8]

# This operation is not happening in ddR
in50_90 <- 45 <= speed1 & speed1 <= 90 &
45 <= speed2 & speed2 <= 90 &
!is.na(speed1) & !is.na(speed2)

# This has been converted to data.frame. So convert it back.
s <- station[which(in50_90), seq.int(ncol(station))]

# These are 1 x n matrices, aka row vectors
s1 <- as.darray(matrix(s$speed1), psize = c(50000, 1))
s2 <- as.darray(matrix(s$speed2), psize = c(50000, 1))


# More robust version should find its way into ddR
setMethod("-", signature(e1 = "ParallelObj", e2 = "ParallelObj"), function(e1, e2) {
dmapply(`-`, e1, e2, output.type = "darray", combine = "cbind")
})

delta <- s1 - s2


# Parallelized binning
cut.DObject <- function(x, breaks, labels = NULL) {
localcut <- function(y) {
table(cut(y, breaks = breaks, labels = labels))
}
tabs <- collect(dlapply(x, localcut))
Reduce(`+`, tabs)
}

binned <- cut.DObject(delta, breaks, break_names)


pdf('ddR_plot.pdf')
plot(binned)
dev.off()
43 changes: 43 additions & 0 deletions examples/pems_data/pems_parallel.R
@@ -0,0 +1,43 @@
# Use R's built in parallel library

# Loads `read1` function and `pems_files`
source("pems_common.R")

ncores <- parallel::detectCores(logical = FALSE)

# Read files in parallel
slist <- parallel::mclapply(pems_files, read1, mc.cores = ncores)

station <- do.call(rbind, slist)

s1 <- station$speed1
s2 <- station$speed2

# Define the subset
in50_90 <- 45 <= s1 & s1 <= 90 &
45 <= s2 & s2 <= 90 &
!is.na(s1) & !is.na(s2)

delta <- s1[in50_90] - s2[in50_90]

# Parallelized version of cut
pcut <- function(x, breaks, labels = NULL, mc.cores = ncores) {
localcut <- function(y) {
table(cut(y, breaks = breaks, labels = labels))
}
n <- length(x)
# For `fork` systems it makes sense to have data structures like the
# one created here: the base objects augmented with a split for
# parallelization
sp <- rep_len(seq.int(ncores), length.out = n)
grouped <- split(x, sp)
tabs <- parallel::mclapply(grouped, localcut, mc.cores = mc.cores)
Reduce(`+`, tabs)
}

# About 1 second
binned <- pcut(delta, breaks, break_names)

pdf('parallel_plot.pdf')
plot(binned)
dev.off()
10 changes: 10 additions & 0 deletions examples/pems_data/timer.sh
@@ -0,0 +1,10 @@
#!/bin/sh

echo "Sequential version base R\n"
time Rscript pems_base.R

echo "R's standard parallel library\n"
time Rscript pems_parallel.R

echo "ddR\n"
time Rscript pems_ddR.R

0 comments on commit f6d7056

Please sign in to comment.