Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring bulk import #27

Merged
merged 7 commits into from Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 0 additions & 12 deletions R/assert.R
@@ -1,16 +1,4 @@

assert_files_exist <- function(files){
if(length(files) == 0){
warning("no file specified")
}
if(length(files) == 1){
if(!file.exists(files))
warning(sprintf("file '%s' not found", files), call. = FALSE)
}
lapply(files, function(f)
if(!file.exists(f))
warning(sprintf("'%s' not found", f), call. = FALSE))
}

assert_dir_exists <- function(dir){
if(!dir.exists(dir))
Expand Down
80 changes: 56 additions & 24 deletions R/bulk_importers.R
@@ -1,3 +1,9 @@
## FIXME rewrite bulk_monetdb to use S3 method
## to use more robust SQL generalization of monet.read.csv that does not involve
## collisions in arguments between it and passing to read.table (quotes, delim/sep, nrows)
## Should have an S3 generic: bulk(con, file, tablename, ...) that passes all ...
## to read.table / read_tsv and understands column headers etc


#' @importFrom R.utils gunzip
#' @importFrom tools file_path_sans_ext
Expand All @@ -12,46 +18,72 @@ bulk_importer <- function(db_con, streamable_table){
if(is.null(delim)) return(NULL)

quote <- switch(streamable_table$extension,
"tsv" = "",
"tsv" = "\"", # hmmm...
"csv" = "\"",
NULL)

bulk_monetdb <- function(conn, file, tablename){

## if compressed: uncompress first. (remote urls will also be downloaded)
if(tools::file_ext(file) %in% c("gz", "bz2", "xz")){
if(grepl("://", file)){ # replace URL paths with local path
tmp <- file.path(tempdir(), basename(file))
utils::download.file(file, tmp)
file <- tmp
}
dest <- tools::file_path_sans_ext(file)
R.utils::gunzip(file, dest, remove = FALSE)
} else {
dest <- file
}


## Method returns this function
bulk_monetdb <- function(conn, file, tablename, ...){
file <- expand_if_compressed(file)
suppress_msg({
MonetDBLite::monet.read.csv(
conn, dest, tablename,
delim = delim,
quote = quote,
nrow.check = 1e4,
best.effort = FALSE)
MonetDBLite::monet.read.csv(
conn,
file,
tablename,
delim = delim,
quote = quote,
nrow.check = 1e4,
best.effort = FALSE)
## For now, omit ... from monet.read.csv call. `...` may
## contain arguments to readr::read_tsv that will not be recognized by `read.table`
## and thus may cause errors
})

return(0)
}

## Should do this as an S3 method for MonetDBEmbeddedConnection
if (inherits(db_con, "MonetDBEmbeddedConnection")){
return(bulk_monetdb)
}

## Make this the return if no method can be found?
NULL
}



## FIXME Consider migrating these up into arkdb::unark behavior, so that standard R-DBI method
## can also work with URLs of .bz2 and .xz files (currently bzfile & xzfile don't work over URLs)
download_if_remote <- function(file){
if(grepl("://", file)){ # replace URL paths with local path
tmp <- file.path(tempdir(), basename(file))
utils::download.file(file, tmp)
tmp
} else if (file.exists(file)) {
file
} else {
warning(paste("file", file, "not found."), call.=FALSE)
}
}
expand_if_compressed <- function(file){
if(tools::file_ext(file) %in% c("gz", "bz2", "xz")){
file <- download_if_remote(file)
dest <- tools::file_path_sans_ext(file)
if(!file.exists(dest)) # assumes if it exists that it is the same thing :/
R.utils::gunzip(file, dest, remove = FALSE)
} else {
dest <- file
}
dest
}








suppress_msg <- function(expr, pattern = "reserved SQL"){
withCallingHandlers(expr,
message = function(e){
Expand Down
13 changes: 10 additions & 3 deletions R/unark.R
Expand Up @@ -62,7 +62,6 @@ unark <- function(files,
try_native = TRUE,
...){

assert_files_exist(files)
assert_dbi(db_con)

## Guess streamable table
Expand Down Expand Up @@ -123,12 +122,20 @@ unark_file <- function(filename,
}

## Check for a bulk importer first
## FIXME use S3 method
bulk <- bulk_importer(db_con, streamable_table)
if(!is.null(bulk) && try_native){
t0 <- Sys.time()
message(paste("Native bulk importer found, attempting fast import of", basename(filename)))
status <-
tryCatch(bulk(db_con, filename, tablename),
tryCatch(bulk(db_con, filename, tablename, ...),
error = function(e) 1)
if(status == 0) return(invisible(db_con))
if(status == 0){
message(sprintf("\t...Done! (in %s)", format(Sys.time() - t0)))
return(invisible(db_con))
} else {
message("Native import failed, falling back on R-based parser")
}
}

con <- compressed_file(filename, "r", encoding = encoding)
Expand Down
2 changes: 2 additions & 0 deletions arkdb.Rproj
Expand Up @@ -16,3 +16,5 @@ BuildType: Package
PackageUseDevtools: Yes
PackageInstallArgs: --no-multiarch --with-keep.source
PackageRoxygenize: rd,collate,namespace

QuitChildProcessesOnExit: Yes
3 changes: 1 addition & 2 deletions tests/testthat/test-bulk-import.R
Expand Up @@ -12,9 +12,8 @@ test_that("We can do fast bulk import with MonetDBLite in most cases", {
library(nycflights13)

MonetDBLite::monetdblite_shutdown()
tmp <- tempdir()

# test unark on alternate DB
tmp <- tempdir()
monet_dir <- fs::dir_create(fs::path(tmp, "monet"))
db_con <- DBI::dbConnect(MonetDBLite::MonetDBLite(), monet_dir)

Expand Down
12 changes: 6 additions & 6 deletions tests/testthat/test-errors.R
@@ -1,12 +1,12 @@
testthat::context("test-errors")
context("test-errors")

testthat::test_that("we can detect errors on types", {
test_that("we can detect errors on types", {

x <- NULL
testthat::expect_warning(assert_files_exist("not-a-file"), "not found")
testthat::expect_warning(assert_files_exist(
c("not-a-file", "not")), "not found")
testthat::expect_warning(assert_files_exist(x), "no file specified")
# testthat::expect_warning(assert_files_exist("not-a-file"), "not found")
# testthat::expect_warning(assert_files_exist(
# c("not-a-file", "not")), "not found")
# testthat::expect_warning(assert_files_exist(x), "no file specified")

testthat::expect_error(assert_dir_exists("not-a-dir"), "not found")
testthat::expect_error(assert_dbi(x), "DBI")
Expand Down