Skip to content

Commit

Permalink
ARROW-9869: [R] Implement full S3FileSystem/S3Options constructor
Browse files Browse the repository at this point in the history
Also adds tests that run on minio, which I've tested locally, added documentation for how to run them, and added them to the macOS ("autobrew") nightly test run (because the Linux builds still don't have aws-sdk-cpp support).

Closes apache#8197 from nealrichardson/r-s3-options

Authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
  • Loading branch information
nealrichardson authored and GeorgeAp committed Jun 7, 2021
1 parent c69ccee commit 5d158df
Show file tree
Hide file tree
Showing 31 changed files with 585 additions and 100 deletions.
6 changes: 4 additions & 2 deletions dev/tasks/homebrew-formulae/travis.osx.r.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ before_install:
- sed -i.bak -E -e 's@https://github.com/apache/arrow.git"$@{{ arrow.remote }}.git", :revision => "{{ arrow.head }}"@' tools/apache-arrow.rb && rm -f tools/apache-arrow.rb.bak
# Sometimes crossbow gives a remote URL with .git and sometimes not. Make sure there's only one
- sed -i.bak -E -e 's@.git.git@.git@' tools/apache-arrow.rb && rm -f tools/apache-arrow.rb.bak
# Get minio for S3 testing
- brew install minio/stable/minio
script:
- Rscript -e 'install.packages("rcmdcheck")'
- Rscript -e 'install.packages(c("rcmdcheck", "sys"))'
# Note that this is not --as-cran. CRAN doesn't do macOS checks --as-cran
- travis_wait Rscript -e "rcmdcheck::rcmdcheck(build_args = '--no-build-vignettes', args = c('--no-manual', '--ignore-vignettes', '--run-donttest'), error_on = 'warning', check_dir = 'check')"
- travis_wait Rscript -e "minio_dir <- tempfile(); dir.create(minio_dir); pid <- sys::exec_background('minio', c('server', minio_dir)); on.exit(tools::pskill(pid)); rcmdcheck::rcmdcheck(build_args = '--no-build-vignettes', args = c('--no-manual', '--ignore-vignettes', '--run-donttest'), error_on = 'warning', check_dir = 'check')"
# If there's a build failure, it's probably in this log. Let's print it regardless though
- cat check/arrow.Rcheck/00install.out
2 changes: 1 addition & 1 deletion docs/source/python/filesystems.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ and Amazon S3-compatible storage (:class:`S3FileSystem`).
Usage
-----

A FileSystem object can be created with one of the constuctors (and check the
A FileSystem object can be created with one of the constructors (and check the
respective constructor for its options)::

>>> from pyarrow import fs
Expand Down
3 changes: 3 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ S3method(mean,Scalar)
S3method(min,Array)
S3method(min,ChunkedArray)
S3method(names,Dataset)
S3method(names,FeatherReader)
S3method(names,RecordBatch)
S3method(names,ScannerBuilder)
S3method(names,Schema)
Expand Down Expand Up @@ -287,6 +288,7 @@ importFrom(rlang,enquos)
importFrom(rlang,env)
importFrom(rlang,env_bind)
importFrom(rlang,eval_tidy)
importFrom(rlang,exec)
importFrom(rlang,is_false)
importFrom(rlang,is_integerish)
importFrom(rlang,list2)
Expand All @@ -309,6 +311,7 @@ importFrom(tidyselect,vars_rename)
importFrom(tidyselect,vars_select)
importFrom(utils,head)
importFrom(utils,install.packages)
importFrom(utils,modifyList)
importFrom(utils,tail)
importFrom(vctrs,s3_register)
importFrom(vctrs,vec_cast)
Expand Down
2 changes: 1 addition & 1 deletion r/NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
## AWS S3 support

* S3 support is now enabled in binary macOS and Windows (Rtools40 only, i.e. R >= 4.0) packages. To enable it on Linux, you will need to build and install `aws-sdk-cpp` from source, then set the environment variable `EXTRA_CMAKE_FLAGS="-DARROW_S3=ON -DAWSSDK_SOURCE=SYSTEM"` prior to building the R package (with bundled C++ build, not with Arrow system libraries) from source.
* File readers and writers (`read_parquet()`, `write_feather()`, et al.) now accept an `s3://` URI as the source or destination file, as do `open_dataset()` and `write_dataset()`. See `vignette("fs", package = "arrow")` for details.
* File readers and writers (`read_parquet()`, `write_feather()`, et al.), as well as `open_dataset()` and `write_dataset()`, allow you to access resources on S3 (or on file systems that emulate S3) either by providing an `s3://` URI or by passing an additional `filesystem` argument. See `vignette("fs", package = "arrow")` for details.

## Computation

Expand Down
2 changes: 1 addition & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#' @importFrom R6 R6Class
#' @importFrom purrr as_mapper map map2 map_chr map_dfr map_int map_lgl
#' @importFrom assertthat assert_that is.string
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos eval_tidy new_data_mask syms env env_bind as_label set_names exec
#' @importFrom tidyselect vars_select
#' @useDynLib arrow, .registration = TRUE
#' @keywords internal
Expand Down
8 changes: 2 additions & 6 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions r/R/csv.R
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
#' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.).
#' @param convert_options see [file reader options][CsvReadOptions]
#' @param read_options see [file reader options][CsvReadOptions]
#' @param filesystem A [FileSystem] where `file` can be found if it is a
#' string file path; default is the local file system
#' @param as_data_frame Should the function return a `data.frame` (default) or
#' an Arrow [Table]?
#'
Expand Down Expand Up @@ -98,6 +100,7 @@ read_delim_arrow <- function(file,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
filesystem = NULL,
as_data_frame = TRUE) {

if (is.null(parse_options)) {
Expand All @@ -119,7 +122,7 @@ read_delim_arrow <- function(file,
}

if (!inherits(file, "InputStream")) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
reader <- CsvTableReader$create(
Expand Down Expand Up @@ -206,7 +209,7 @@ read_tsv_arrow <- function(file,
#' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods
#' take the following arguments:
#'
#' - `file` A character path to a local file, or an Arrow input stream
#' - `file` An Arrow [InputStream]
#' - `convert_options` (CSV only), `parse_options`, `read_options`: see
#' [CsvReadOptions]
#' - `...` additional parameters.
Expand All @@ -227,7 +230,7 @@ CsvTableReader$create <- function(file,
parse_options = CsvParseOptions$create(),
convert_options = CsvConvertOptions$create(),
...) {
file <- make_readable_file(file)
assert_is(file, "InputStream")
shared_ptr(
CsvTableReader,
csv___TableReader__Make(file, read_options, parse_options, convert_options)
Expand Down
18 changes: 12 additions & 6 deletions r/R/feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream]
#' @param filesystem A [FileSystem] where `sink` should be written if it is a
#' string file path; default is the local file system
#' @param version integer Feather file version. Version 2 is the current.
#' Version 1 is the more limited legacy format.
#' @param chunk_size For V2 files, the number of rows that each chunk of data
Expand Down Expand Up @@ -52,6 +54,7 @@
#' @include arrow-package.R
write_feather <- function(x,
sink,
filesystem = NULL,
version = 2,
chunk_size = 65536L,
compression = c("default", "lz4", "uncompressed", "zstd"),
Expand Down Expand Up @@ -106,7 +109,7 @@ write_feather <- function(x,
assert_is(x, "Table")

if (is.string(sink)) {
sink <- make_output_stream(sink)
sink <- make_output_stream(sink, filesystem)
on.exit(sink$close())
}
assert_is(sink, "OutputStream")
Expand Down Expand Up @@ -141,17 +144,16 @@ write_feather <- function(x,
#' # Can select columns
#' df <- read_feather(tf, col_select = starts_with("d"))
#' }
read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, ...) {
read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, filesystem = NULL, ...) {
if (!inherits(file, "RandomAccessFile")) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
reader <- FeatherReader$create(file, ...)

all_columns <- ipc___feather___Reader__column_names(reader)
col_select <- enquo(col_select)
columns <- if (!quo_is_null(col_select)) {
vars_select(all_columns, !!col_select)
vars_select(names(reader), !!col_select)
}

out <- reader$Read(columns)
Expand Down Expand Up @@ -198,10 +200,14 @@ FeatherReader <- R6Class("FeatherReader", inherit = ArrowObject,
),
active = list(
# versions are officially 2 for V1 and 3 for V2 :shrug:
version = function() ipc___feather___Reader__version(self) - 1L
version = function() ipc___feather___Reader__version(self) - 1L,
column_names = function() ipc___feather___Reader__column_names(self)
)
)

#' @export
names.FeatherReader <- function(x) x$column_names

FeatherReader$create <- function(file, mmap = TRUE, ...) {
assert_is(file, "RandomAccessFile")
shared_ptr(FeatherReader, ipc___feather___Reader__Open(file))
Expand Down
94 changes: 84 additions & 10 deletions r/R/filesystem.R
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,39 @@ FileSelector$create <- function(base_dir, allow_not_found = FALSE, recursive = F
#'
#' @section Factory:
#'
#' The `$create()` factory methods instantiate the `FileSystem` object and
#' take the following arguments, depending on the subclass:
#' `LocalFileSystem$create()` returns the object and takes no arguments.
#'
#' - no argument is needed for instantiating a `LocalFileSystem`
#' - `base_path` and `base_fs` for instantiating a `SubTreeFileSystem`
#' `SubTreeFileSystem$create()` takes the following arguments:
#'
#' - `base_path`, a string path
#' - `base_fs`, a `FileSystem` object
#'
#' `S3FileSystem$create()` optionally takes arguments:
#'
#' - `anonymous`: logical, default `FALSE`. If true, will not attempt to look up
#' credentials using standard AWS configuration methods.
#' - `access_key`, `secret_key`: authentication credentials. If one is provided,
#' the other must be as well. If both are provided, they will override any
#' AWS configuration set at the environment level.
#' - `session_token`: optional string for authentication along with
#' `access_key` and `secret_key`
#' - `role_arn`: string AWS ARN of an AccessRole. If provided instead of `access_key` and
#' `secret_key`, temporary credentials will be fetched by assuming this role.
#' - `session_name`: optional string identifier for the assumed role session.
#' - `external_id`: optional unique string identifier that might be required
#' when you assume a role in another account.
#' - `load_frequency`: integer, frequency (in seconds) with which temporary
#' credentials from an assumed role session will be refreshed. Default is
#' 900 (i.e. 15 minutes)
#' - `region`: AWS region to connect to. If omitted, the AWS library will
#' provide a sensible default based on client configuration, falling back
#' to "us-east-1" if no other alternatives are found.
#' - `endpoint_override`: If non-empty, override region with a connect string
#' such as "localhost:9000". This is useful for connecting to file systems
#' that emulate S3.
#' - `scheme`: S3 connection transport (default "https")
#' - `background_writes`: logical, whether `OutputStream` writes will be issued
#' in the background, without blocking (default `TRUE`)
#'
#' @section Methods:
#'
Expand Down Expand Up @@ -279,13 +307,56 @@ LocalFileSystem$create <- function() {
#' @usage NULL
#' @format NULL
#' @rdname FileSystem
#' @importFrom utils modifyList
#' @export
S3FileSystem <- R6Class("S3FileSystem", inherit = FileSystem)
S3FileSystem$create <- function() {
fs___EnsureS3Initialized()
shared_ptr(S3FileSystem, fs___S3FileSystem__create())
S3FileSystem$create <- function(anonymous = FALSE, ...) {
args <- list2(...)
if (anonymous) {
invalid_args <- intersect(c("access_key", "secret_key", "session_token", "role_arn", "session_name", "external_id", "load_frequency"), names(args))
if (length(invalid_args)) {
stop("Cannot specify ", oxford_paste(invalid_args), " when anonymous = TRUE", call. = FALSE)
}
} else {
keys_present <- length(intersect(c("access_key", "secret_key"), names(args)))
if (keys_present == 1) {
stop("Key authentication requires both access_key and secret_key", call. = FALSE)
}
if ("session_token" %in% names(args) && keys_present != 2) {
stop(
"In order to initialize a session with temporary credentials, ",
"both secret_key and access_key must be provided ",
"in addition to session_token.",
call. = FALSE
)
}
arn <- "role_arn" %in% names(args)
if (keys_present == 2 && arn) {
stop("Cannot provide both key authentication and role_arn", call. = FALSE)
}
arn_extras <- intersect(c("session_name", "external_id", "load_frequency"), names(args))
if (length(arn_extras) > 0 && !arn) {
stop("Cannot specify ", oxford_paste(arn_extras), " without providing a role_arn string", call. = FALSE)
}
}
args <- c(modifyList(default_s3_options, args), anonymous = anonymous)
shared_ptr(S3FileSystem, exec(fs___S3FileSystem__create, !!!args))
}

default_s3_options <- list(
access_key = "",
secret_key = "",
session_token = "",
role_arn = "",
session_name = "",
external_id = "",
load_frequency = 900L,
region = "",
endpoint_override = "",
scheme = "",
background_writes = TRUE
)

arrow_with_s3 <- function() {
.Call(`_s3_available`)
}
Expand All @@ -295,9 +366,12 @@ arrow_with_s3 <- function() {
#' @rdname FileSystem
#' @export
SubTreeFileSystem <- R6Class("SubTreeFileSystem", inherit = FileSystem)
SubTreeFileSystem$create <- function(base_path, base_fs) {
xp <- fs___SubTreeFileSystem__create(clean_path_rel(base_path), base_fs)
shared_ptr(SubTreeFileSystem, xp)
SubTreeFileSystem$create <- function(base_path, base_fs = NULL) {
fs_and_path <- get_path_and_filesystem(base_path, base_fs)
shared_ptr(
SubTreeFileSystem,
fs___SubTreeFileSystem__create(fs_and_path$path, fs_and_path$fs)
)
}

#' Copy files, including between FileSystems
Expand Down
10 changes: 7 additions & 3 deletions r/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,16 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
file
}

make_output_stream <- function(x) {
make_output_stream <- function(x, filesystem = NULL) {
if (is_url(x)) {
fs_and_path <- FileSystem$from_uri(x)
fs_and_path$fs$OpenOutputStream(fs_and_path$path)
} else {
filesystem = fs_and_path$fs
x <- fs_and_path$path
}
if (is.null(filesystem)) {
FileOutputStream$create(x)
} else {
filesystem$OpenOutputStream(x)
}
}

Expand Down
10 changes: 6 additions & 4 deletions r/R/ipc_stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
#' serialize data to a buffer.
#' [RecordBatchWriter] for a lower-level interface.
#' @export
write_ipc_stream <- function(x, sink, ...) {
write_ipc_stream <- function(x, sink, filesystem = NULL, ...) {
x_out <- x # So we can return the data we got
if (is.data.frame(x)) {
x <- Table$create(x)
}
if (is.string(sink)) {
sink <- make_output_stream(sink)
sink <- make_output_stream(sink, filesystem)
on.exit(sink$close())
}
assert_is(sink, "OutputStream")
Expand Down Expand Up @@ -90,16 +90,18 @@ write_to_raw <- function(x, format = c("stream", "file")) {
#' open.
#' @param as_data_frame Should the function return a `data.frame` (default) or
#' an Arrow [Table]?
#' @param filesystem A [FileSystem] where `file` can be found if it is a
#' string file path; default is the local file system
#' @param ... extra parameters passed to `read_feather()`.
#'
#' @return A `data.frame` if `as_data_frame` is `TRUE` (the default), or an
#' Arrow [Table] otherwise
#' @seealso [read_feather()] for writing IPC files. [RecordBatchReader] for a
#' lower-level interface.
#' @export
read_ipc_stream <- function(file, as_data_frame = TRUE, ...) {
read_ipc_stream <- function(file, as_data_frame = TRUE, filesystem = NULL, ...) {
if (!inherits(file, "InputStream")) {
file <- make_readable_file(file)
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}

Expand Down
13 changes: 10 additions & 3 deletions r/R/json.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@
#' ', tf, useBytes=TRUE)
#' df <- read_json_arrow(tf)
#' }
read_json_arrow <- function(file, col_select = NULL, as_data_frame = TRUE, ...) {
read_json_arrow <- function(file,
col_select = NULL,
as_data_frame = TRUE,
filesystem = NULL,
...) {
if (!inherits(file, "InputStream")) {
file <- make_readable_file(file, filesystem = filesystem)
on.exit(file$close())
}
tab <- JsonTableReader$create(file, ...)$Read()

col_select <- enquo(col_select)
Expand Down Expand Up @@ -64,8 +72,7 @@ JsonTableReader$create <- function(file,
read_options = JsonReadOptions$create(),
parse_options = JsonParseOptions$create(),
...) {

file <- make_readable_file(file)
assert_is(file, "InputStream")
shared_ptr(
JsonTableReader,
json___TableReader__Make(file, read_options, parse_options)
Expand Down

0 comments on commit 5d158df

Please sign in to comment.