Skip to content

Commit

Permalink
Support quoted newlines automatically
Browse files Browse the repository at this point in the history
If a quoted newline is encountered we throw an exception which is caught
and then restart the indexing with only a single thread so the newline
is read properly.

Part of #282
  • Loading branch information
jimhester committed Dec 18, 2020
1 parent 12f7152 commit a398be4
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 98 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
@@ -1,5 +1,7 @@
# vroom (development version)

* Data with newlines within quoted fields will now automatically revert to using a single thread and be properly read (#282)

* Vectors are now subset properly when given invalid subscripts (#283)

* vroom now tracks indexing and parsing errors like readr. The first time an issue is encountered a warning will be signalled. A tibble of all found problems can be retrieved with `vroom::problems()`. (#247)
Expand Down
4 changes: 2 additions & 2 deletions R/vroom.R
Expand Up @@ -10,8 +10,8 @@ NULL
#' file. If `NULL` the delimiter is guessed from the set of `c(",", "\t", " ",
#' "|", ":", ";")`.
#' @param num_threads Number of threads to use when reading and materializing
#' vectors. If your data contains embedded newlines (newlines within fields)
#' you _must_ use `num_threads = 1` to read the data properly.
#' vectors. If your data contains newlines within fields the parser will
#' automatically be forced to use a single thread only.
#' @param escape_double Does the file escape quotes by doubling them?
#' i.e. If this option is `TRUE`, the value '""' represents
#' a single quote, '"'.
Expand Down
4 changes: 2 additions & 2 deletions man/vroom.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions man/vroom_fwf.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions man/vroom_lines.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions man/vroom_write.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

167 changes: 89 additions & 78 deletions src/delimited_index.cc
Expand Up @@ -132,90 +132,101 @@ delimited_index::delimited_index(
num_threads = 1;
}

idx_ = std::vector<idx_t>(num_threads + 1);

// Index the first row
idx_[0].push_back(start - 1);
size_t cols = 0;
bool in_quote = false;
size_t lines_read = index_region(
mmap_,
idx_[0],
delim_.c_str(),
quote,
in_quote,
start,
first_nl + 1,
0,
n_max,
cols,
0,
errors,
pb,
-1);
columns_ = idx_[0].size() - 1;

std::vector<std::thread> threads;

if (nmax_set) {
threads.emplace_back([&] {
n_max -= lines_read;
index_region(
mmap_,
idx_[1],
delim_.c_str(),
quote,
in_quote,
first_nl,
file_size,
0,
n_max,
cols,
columns_,
errors,
pb,
file_size / 100);
});
} else {
threads = parallel_for(
file_size - first_nl,
[&](size_t start, size_t end, size_t id) {
idx_[id + 1].reserve((guessed_rows / num_threads) * columns_);
start = find_next_newline(mmap_, first_nl + start, false);
end = find_next_newline(mmap_, first_nl + end, false) + 1;
size_t cols = 0;
bool in_quote = false;
index_region(
mmap_,
idx_[id + 1],
delim_.c_str(),
quote,
in_quote,
start,
end,
0,
n_max,
cols,
columns_,
errors,
pb,
file_size / 100);
},
start_indexing:

try {

idx_ = std::vector<idx_t>(num_threads + 1);

// Index the first row
idx_[0].push_back(start - 1);
size_t cols = 0;
bool in_quote = false;
size_t lines_read = index_region(
mmap_,
idx_[0],
delim_.c_str(),
quote,
in_quote,
start,
first_nl + 1,
0,
n_max,
cols,
0,
errors,
pb,
num_threads,
use_threads,
false);
}
-1);
columns_ = idx_[0].size() - 1;

std::vector<std::future<void>> threads;

if (nmax_set) {
threads.emplace_back(std::async(std::launch::async, [&] {
n_max -= lines_read;
index_region(
mmap_,
idx_[1],
delim_.c_str(),
quote,
in_quote,
first_nl,
file_size,
0,
n_max,
cols,
columns_,
errors,
pb,
num_threads,
file_size / 100);
}));
} else {
threads = parallel_for(
file_size - first_nl,
[&](size_t start, size_t end, size_t id) {
idx_[id + 1].reserve((guessed_rows / num_threads) * columns_);
start = find_next_newline(mmap_, first_nl + start, false);
end = find_next_newline(mmap_, first_nl + end, false) + 1;
size_t cols = 0;
bool in_quote = false;
index_region(
mmap_,
idx_[id + 1],
delim_.c_str(),
quote,
in_quote,
start,
end,
0,
n_max,
cols,
columns_,
errors,
pb,
num_threads,
file_size / 100);
},
num_threads,
use_threads,
false);
}

if (progress_) {
if (progress_) {
#ifndef VROOM_STANDALONE
pb->display_progress();
pb->display_progress();
#endif
}
}

for (auto& t : threads) {
t.join();
}
for (auto& t : threads) {
t.get();
}

} catch (newline_error& e) {
num_threads = 1;
goto start_indexing;
}
size_t total_size = std::accumulate(
idx_.begin(), idx_.end(), std::size_t{0}, [](size_t sum, const idx_t& v) {
sum += v.size() > 0 ? v.size() - 1 : 0;
Expand Down
10 changes: 9 additions & 1 deletion src/delimited_index.h
Expand Up @@ -30,6 +30,7 @@ struct cell {

class delimited_index : public index,
public std::enable_shared_from_this<delimited_index> {
class newline_error {};

public:
delimited_index(
Expand Down Expand Up @@ -215,7 +216,8 @@ class delimited_index : public index,
const size_t num_cols,
std::shared_ptr<vroom_errors> errors,
P& pb,
const size_t update_size = -1) {
const size_t num_threads,
const size_t update_size) {

// If there are no quotes quote will be '\0', so will just work
std::array<char, 5> query = {delim[0], '\n', '\\', quote, '\0'};
Expand All @@ -239,6 +241,12 @@ class delimited_index : public index,

else if (c == '\n') {
if (in_quote) { // This will work as long as num_threads = 1
if (num_threads != 1) {
if (progress_ && pb) {
pb->finish();
}
throw newline_error();
}
++pos;
continue;
}
Expand Down
8 changes: 6 additions & 2 deletions src/delimited_index_connection.cc
Expand Up @@ -150,7 +150,9 @@ delimited_index_connection::delimited_index_connection(
cols,
0,
errors,
empty_pb);
empty_pb,
1,
-1);

columns_ = idx_[0].size() - 1;

Expand Down Expand Up @@ -186,7 +188,9 @@ delimited_index_connection::delimited_index_connection(
cols,
columns_,
errors,
empty_pb);
empty_pb,
1,
-1);
});
}

Expand Down
19 changes: 12 additions & 7 deletions src/parallel.h
Expand Up @@ -20,7 +20,7 @@
/// @param use_threads : enable / disable threads.
///
///
static std::vector<std::thread> parallel_for(
static std::vector<std::future<void>> parallel_for(
size_t nb_elements,
std::function<void(size_t start, size_t end, size_t thread_id)> functor,
size_t nb_threads,
Expand All @@ -32,19 +32,24 @@ static std::vector<std::thread> parallel_for(

size_t batch_remainder = nb_elements % nb_threads;

auto my_threads = std::vector<std::thread>(nb_threads);
auto my_threads = std::vector<std::future<void>>(nb_threads);

if (use_threads) {
// Multithread execution
for (size_t i = 0; i < (nb_threads - 1); ++i) {
size_t start = i * batch_size;
my_threads[i] = std::thread(functor, start, start + batch_size, i);
my_threads[i] =
std::async(std::launch::async, functor, start, start + batch_size, i);
}

// Last batch includes the remainder
size_t start = (nb_threads - 1) * batch_size;
my_threads[nb_threads - 1] = std::thread(
functor, start, start + batch_size + batch_remainder, nb_threads - 1);
my_threads[nb_threads - 1] = std::async(
std::launch::async,
functor,
start,
start + batch_size + batch_remainder,
nb_threads - 1);
} else {
// Single thread execution (for easy debugging)
for (size_t i = 0; i < (nb_threads - 1); ++i) {
Expand All @@ -55,13 +60,13 @@ static std::vector<std::thread> parallel_for(
size_t start = (nb_threads - 1) * batch_size;
functor(start, start + batch_size + batch_remainder, nb_threads - 1);

return std::vector<std::thread>();
return std::vector<std::future<void>>();
}

// Wait for the other thread to finish their task
if (use_threads && cleanup) {
for (auto& t : my_threads) {
t.join();
t.get();
}
}
return my_threads;
Expand Down
10 changes: 10 additions & 0 deletions tests/testthat/test-vroom.R
Expand Up @@ -529,3 +529,13 @@ test_that("quotes at delimiters are used", {
expect_equal(z$y[[1]], "foo\"bar")
expect_equal(z$y[[2]], "baz")
})

test_that("vroom reads files with embedded newlines even when num_threads > 1", {
tf <- tempfile()
on.exit(unlink(tf))
writeLines(c("x", rep("foo", 1000), '"bar\nbaz"', rep("qux", 1000)), tf)

res <- vroom(tf, delim = ",", num_threads = 5)
expect_equal(nrow(res), 1000 + 1 + 1000)
expect_equal(res$x[[1001]], "bar\nbaz")
})

0 comments on commit a398be4

Please sign in to comment.