Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comma separated value parsing as an option in iterate_many #2016

Merged
merged 5 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/iterate_many.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,8 @@ This will print:
```

Importantly, you should only call `truncated_bytes()` after iterating through all of the documents since the stream cannot tell whether there are truncated documents at the very end when it may not have accessed that part of the data yet.

Comma separated documents
-----------

`iterate_many` also takes in an option to allow parsing of comma separated documents. In this mode, the entire buffer is processed in 1 batch and batch size will be increased to be as large as the JSON passed. Therefore, the capacity of the parser has to be sufficient to support the batch size set.
7 changes: 6 additions & 1 deletion include/simdjson/generic/ondemand/document_stream-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ simdjson_inline document_stream::document_stream(
ondemand::parser &_parser,
const uint8_t *_buf,
size_t _len,
size_t _batch_size
size_t _batch_size,
bool _allow_comma_separated
) noexcept
: parser{&_parser},
buf{_buf},
len{_len},
batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
allow_comma_separated{_allow_comma_separated},
error{SUCCESS}
#ifdef SIMDJSON_THREADS_ENABLED
, use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
Expand All @@ -107,6 +109,7 @@ simdjson_inline document_stream::document_stream() noexcept
buf{nullptr},
len{0},
batch_size{0},
allow_comma_separated{false},
error{UNINITIALIZED}
#ifdef SIMDJSON_THREADS_ENABLED
, use_thread(false)
Expand Down Expand Up @@ -290,6 +293,8 @@ inline void document_stream::next_document() noexcept {
if (error) { return; }
// Always set depth=1 at the start of document
doc.iter._depth = 1;
// consume comma if comma separated is allowed
if (allow_comma_separated) { doc.iter.consume_character(','); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I absolutely love that this is so confined to a single place! I'm a little surprised we couldn't use advance(), though, switching from thinking in terms of tokens (which , is one of) to thinking of characters does seem like a dangerous thing to me, even if it's confined to this one place where it works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkeiser I merged this because, as you have remarked, it is a very neat patch that is quite isolated.

We can change the design.

Are you saying that we should just skip over the token, no matter what it is? Ignoring its nature?

Can you elaborate on your concern?

(I am 100% open to changing this.)

// Resets the string buffer at the beginning, thus invalidating the strings.
doc.iter._string_buf_loc = parser->string_buf.get();
doc.iter._root = doc.iter.position();
Expand Down
4 changes: 3 additions & 1 deletion include/simdjson/generic/ondemand/document_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class document_stream {
ondemand::parser &parser,
const uint8_t *buf,
size_t len,
size_t batch_size
size_t batch_size,
bool allow_comma_separated
) noexcept;

/**
Expand Down Expand Up @@ -271,6 +272,7 @@ class document_stream {
const uint8_t *buf;
size_t len;
size_t batch_size;
bool allow_comma_separated;
/**
* We are going to use just one document instance. The document owns
* the json_iterator. It implies that we only ever pass a reference
Expand Down
8 changes: 8 additions & 0 deletions include/simdjson/generic/ondemand/json_iterator-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ simdjson_inline void json_iterator::reenter_child(token_position position, depth
_depth = child_depth;
}

simdjson_inline error_code json_iterator::consume_character(char c) noexcept {
if (*peek() == c) {
return_current_and_advance();
return SUCCESS;
}
return TAPE_ERROR;
}

#if SIMDJSON_DEVELOPMENT_CHECKS

simdjson_inline token_position json_iterator::start_position(depth_t depth) const noexcept {
Expand Down
1 change: 1 addition & 0 deletions include/simdjson/generic/ondemand/json_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ class json_iterator {
simdjson_inline simdjson_result<std::string_view> unescape_wobbly(raw_json_string in) noexcept;
simdjson_inline void reenter_child(token_position position, depth_t child_depth) noexcept;

simdjson_inline error_code consume_character(char c) noexcept;
#if SIMDJSON_DEVELOPMENT_CHECKS
simdjson_inline token_position start_position(depth_t depth) const noexcept;
simdjson_inline void set_start_position(depth_t depth, token_position position) noexcept;
Expand Down
17 changes: 9 additions & 8 deletions include/simdjson/generic/ondemand/parser-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,19 @@ simdjson_warn_unused simdjson_inline simdjson_result<json_iterator> parser::iter
return json_iterator(reinterpret_cast<const uint8_t *>(json.data()), this);
}

inline simdjson_result<document_stream> parser::iterate_many(const uint8_t *buf, size_t len, size_t batch_size) noexcept {
inline simdjson_result<document_stream> parser::iterate_many(const uint8_t *buf, size_t len, size_t batch_size, bool allow_comma_separated) noexcept {
if(batch_size < MINIMAL_BATCH_SIZE) { batch_size = MINIMAL_BATCH_SIZE; }
return document_stream(*this, buf, len, batch_size);
if(allow_comma_separated && batch_size < len) { batch_size = len; }
return document_stream(*this, buf, len, batch_size, allow_comma_separated);
}
inline simdjson_result<document_stream> parser::iterate_many(const char *buf, size_t len, size_t batch_size) noexcept {
return iterate_many(reinterpret_cast<const uint8_t *>(buf), len, batch_size);
inline simdjson_result<document_stream> parser::iterate_many(const char *buf, size_t len, size_t batch_size, bool allow_comma_separated) noexcept {
return iterate_many(reinterpret_cast<const uint8_t *>(buf), len, batch_size, allow_comma_separated);
}
inline simdjson_result<document_stream> parser::iterate_many(const std::string &s, size_t batch_size) noexcept {
return iterate_many(s.data(), s.length(), batch_size);
inline simdjson_result<document_stream> parser::iterate_many(const std::string &s, size_t batch_size, bool allow_comma_separated) noexcept {
return iterate_many(s.data(), s.length(), batch_size, allow_comma_separated);
}
inline simdjson_result<document_stream> parser::iterate_many(const padded_string &s, size_t batch_size) noexcept {
return iterate_many(s.data(), s.length(), batch_size);
inline simdjson_result<document_stream> parser::iterate_many(const padded_string &s, size_t batch_size, bool allow_comma_separated) noexcept {
return iterate_many(s.data(), s.length(), batch_size, allow_comma_separated);
}

simdjson_inline size_t parser::capacity() const noexcept {
Expand Down
12 changes: 6 additions & 6 deletions include/simdjson/generic/ondemand/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,15 @@ class parser {
* - other json errors if parsing fails. You should not rely on these errors to always the same for the
* same document: they may vary under runtime dispatch (so they may vary depending on your system and hardware).
*/
inline simdjson_result<document_stream> iterate_many(const uint8_t *buf, size_t len, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
inline simdjson_result<document_stream> iterate_many(const uint8_t *buf, size_t len, size_t batch_size = DEFAULT_BATCH_SIZE, bool allow_comma_separated = false) noexcept;
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
inline simdjson_result<document_stream> iterate_many(const char *buf, size_t len, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
inline simdjson_result<document_stream> iterate_many(const char *buf, size_t len, size_t batch_size = DEFAULT_BATCH_SIZE, bool allow_comma_separated = false) noexcept;
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
inline simdjson_result<document_stream> iterate_many(const std::string &s, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
inline simdjson_result<document_stream> iterate_many(const std::string &&s, size_t batch_size) = delete;// unsafe
inline simdjson_result<document_stream> iterate_many(const std::string &s, size_t batch_size = DEFAULT_BATCH_SIZE, bool allow_comma_separated = false) noexcept;
inline simdjson_result<document_stream> iterate_many(const std::string &&s, size_t batch_size, bool allow_comma_separated = false) = delete;// unsafe
/** @overload parse_many(const uint8_t *buf, size_t len, size_t batch_size) */
inline simdjson_result<document_stream> iterate_many(const padded_string &s, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept;
inline simdjson_result<document_stream> iterate_many(const padded_string &&s, size_t batch_size) = delete;// unsafe
inline simdjson_result<document_stream> iterate_many(const padded_string &s, size_t batch_size = DEFAULT_BATCH_SIZE, bool allow_comma_separated = false) noexcept;
inline simdjson_result<document_stream> iterate_many(const padded_string &&s, size_t batch_size, bool allow_comma_separated = false) = delete;// unsafe

/** @private We do not want to allow implicit conversion from C string to std::string. */
simdjson_result<document_stream> iterate_many(const char *buf, size_t batch_size = DEFAULT_BATCH_SIZE) noexcept = delete;
Expand Down
1 change: 1 addition & 0 deletions tests/ondemand/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ add_cpp_test(ondemand_readme_examples LABELS ondemand acceptance per_impl
add_cpp_test(ondemand_scalar_tests LABELS ondemand acceptance per_implementation)
add_cpp_test(ondemand_twitter_tests LABELS ondemand acceptance per_implementation)
add_cpp_test(ondemand_wrong_type_error_tests LABELS ondemand acceptance per_implementation)
add_cpp_test(ondemand_iterate_many_csv LABELS ondemand acceptance per_implementation)

if(HAVE_POSIX_FORK AND HAVE_POSIX_WAIT) # assert tests use fork and wait, which aren't on MSVC
add_cpp_test(ondemand_assert_out_of_order_values LABELS assert per_implementation explicitonly ondemand)
Expand Down
160 changes: 160 additions & 0 deletions tests/ondemand/ondemand_iterate_many_csv.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#include "simdjson.h"
#include "test_ondemand.h"

#include <cstdint>

using namespace simdjson;

namespace iterate_many_csv_tests {
using namespace std;

bool normal() {
TEST_START();
auto json = R"( 1, 2, 3, 4, "a", "b", "c", {"hello": "world"} , [1, 2, 3])"_padded;
ondemand::parser parser;
ondemand::document_stream doc_stream;
ASSERT_SUCCESS(parser.iterate_many(json, json.size(), true).get(doc_stream));

for (auto doc : doc_stream)
{
ASSERT_SUCCESS(doc);
}

TEST_SUCCEED();
}

bool small_batch_size() {
TEST_START();
auto json = R"( 1, 2, 3, 4, "a", "b", "c", {"hello": "world"} , [1, 2, 3])"_padded;
ondemand::parser parser;
ondemand::document_stream doc_stream;
ASSERT_SUCCESS(parser.iterate_many(json, 32, true).get(doc_stream));

for (auto doc : doc_stream)
{
ASSERT_SUCCESS(doc);
}

TEST_SUCCEED();
}

bool trailing_comma() {
TEST_START();
auto json = R"(1,)"_padded;
ondemand::parser parser;
ondemand::document_stream doc_stream;
ASSERT_SUCCESS(parser.iterate_many(json, json.size(), true).get(doc_stream));

for (auto doc : doc_stream)
{
ASSERT_SUCCESS(doc);
}

TEST_SUCCEED();
}

bool check_parsed_values() {
TEST_START();

auto json = R"( 1 , "a" , [100, 1] , {"hello" : "world"} , )"_padded;
ondemand::parser parser;
ondemand::document_stream doc_stream;
ASSERT_SUCCESS(parser.iterate_many(json, json.size(), true).get(doc_stream));

auto begin = doc_stream.begin();
auto end = doc_stream.end();
int cnt = 0;
auto it = begin;
for (; it != end && cnt < 4; ++it, ++cnt) {
auto doc = *it;
switch (cnt)
{
case 0:
{
int64_t actual;
ASSERT_SUCCESS(doc.get_int64().get(actual));
ASSERT_EQUAL(actual, 1);
break;
}
case 1:
{
std::string_view sv;
ASSERT_SUCCESS(doc.get_string().get(sv));
ASSERT_EQUAL(sv, "a");
break;
}
case 2:
{
std::vector<int64_t> expected{100, 1};
ondemand::array arr;
ASSERT_SUCCESS(doc.get_array().get(arr));
size_t element_count;
ASSERT_SUCCESS(arr.count_elements().get(element_count));
ASSERT_EQUAL(element_count, 2);
int i = 0;
for (auto a : arr)
{
int64_t actual;
ASSERT_SUCCESS(a.get(actual));
ASSERT_EQUAL(actual, expected[i++]);
}
break;
}
case 3:
{
ondemand::object obj;
ASSERT_SUCCESS(doc.get_object().get(obj));
std::string_view sv;
obj.find_field("hello").get(sv);
ASSERT_EQUAL(sv, "world");
break;
}
default:
TEST_FAIL("Too many cases")
}
}

ASSERT_EQUAL(cnt, 4);
ASSERT_TRUE(!(it != end));

TEST_SUCCEED();
}

#if SIMDJSON_EXCEPTIONS

bool leading_comma() {
TEST_START();
auto json = R"(,1)"_padded;
ondemand::parser parser;
ondemand::document_stream doc_stream;
ASSERT_SUCCESS(parser.iterate_many(json, json.size(), true).get(doc_stream));

try {
auto begin = doc_stream.begin();
auto end = doc_stream.end();
for (auto it = begin; it != end; ++it) {}
} catch (simdjson_error& e) {
ASSERT_ERROR(e.error(), TAPE_ERROR);
}

TEST_SUCCEED();
}

#endif

bool run() {
return normal() &&
small_batch_size() &&
trailing_comma() &&
check_parsed_values() &&
#if SIMDJSON_EXCEPTIONS
leading_comma() &&
#endif
true;
}

}

int main(int argc, char *argv[]) {
return test_main(argc, argv, iterate_many_csv_tests::run);
}