Skip to content

Commit

Permalink
Reading multi-line JSON in string columns using runtime configurable …
Browse files Browse the repository at this point in the history
…delimiter (#15556)

Addresses #15277
Given a JSON lines buffer with records separated by a delimiter passed at runtime, the idea is to modify the JSON tokenization FST to consider the passed delimiter to generate EOL token instead of the newline character currently hard-coded. 
This PR does not modify the whitespace normalization FST to [strip out unquoted `\n` and `\r`](#14865 (comment)). Whitespace normalization will be handled in follow-up works.
Note that this is not a multi-object JSON reader since we are not using the offsets data in the string column, and hence there is no resetting of the start state at every row offset.

Current status:
- [X] Semantic bracket/brace DFA 
- [X] DFA removing excess characters after record in line
- [X] Pushdown automata generating tokens
- [x] Test passing arbitrary delimiter that does not occur in input to the reader

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Paul Mattione (https://github.com/pmattione-nvidia)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Elias Stehle (https://github.com/elstehle)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #15556
  • Loading branch information
shrshi committed May 20, 2024
1 parent e6e6761 commit 9ce1721
Show file tree
Hide file tree
Showing 5 changed files with 476 additions and 149 deletions.
45 changes: 45 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class json_reader_options {
bool _lines = false;
// Parse mixed types as a string column
bool _mixed_types_as_string = false;
// Delimiter separating records in JSON lines
char _delimiter = '\n';
// Prune columns on read, selected based on the _dtypes option
bool _prune_columns = false;

Expand Down Expand Up @@ -229,6 +231,13 @@ class json_reader_options {
return base_padding + num_columns * column_bytes;
}

/**
* @brief Returns delimiter separating records in JSON lines
*
* @return Delimiter separating records in JSON lines
*/
char get_delimiter() const { return _delimiter; }

/**
* @brief Whether to read the file as a json object per line.
*
Expand Down Expand Up @@ -340,6 +349,30 @@ class json_reader_options {
*/
void set_byte_range_size(size_type size) { _byte_range_size = size; }

/**
* @brief Set delimiter separating records in JSON lines
*
* @param delimiter Delimiter separating records in JSON lines
*/
void set_delimiter(char delimiter)
{
switch (delimiter) {
case '{':
case '[':
case '}':
case ']':
case ',':
case ':':
case '"':
case '\'':
case '\\':
case ' ':
case '\t':
case '\r': CUDF_FAIL("Unsupported delimiter character.", std::invalid_argument); break;
}
_delimiter = delimiter;
}

/**
* @brief Set whether to read the file as a json object per line.
*
Expand Down Expand Up @@ -507,6 +540,18 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set delimiter separating records in JSON lines
*
* @param delimiter Delimiter separating records in JSON lines
* @return this for chaining
*/
json_reader_options_builder& delimiter(char delimiter)
{
options.set_delimiter(delimiter);
return *this;
}

/**
* @brief Set whether to read the file as a json object per line.
*
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ enum class stack_behavior_t : char {
PushPopWithoutReset,

/// Opening brackets and braces, [, {, push onto the stack, closing brackets and braces, ], }, pop
/// from the stack. Newline characters are considered delimiters and therefore reset to an empty
/// stack.
/// from the stack. Delimiter characters are passed when the stack context is constructed to
/// reset to an empty stack.
ResetOnDelimiter
};

Expand Down Expand Up @@ -198,11 +198,13 @@ namespace detail {
* within the context of a struct, a '[' represents that it is within the context of an array, and a
* '_' symbol that it is at the root of the JSON.
* @param[in] stack_behavior Specifies the stack's behavior
* @param[in] delimiter Specifies the delimiter to use as separator for JSON lines input
* @param[in] stream The cuda stream to dispatch GPU kernels to
*/
void get_stack_context(device_span<SymbolT const> json_in,
SymbolT* d_top_of_stack,
stack_behavior_t stack_behavior,
SymbolT delimiter,
rmm::cuda_stream_view stream);

/**
Expand Down
144 changes: 84 additions & 60 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,13 @@ constexpr auto NUM_SYMBOL_GROUPS = static_cast<uint32_t>(dfa_symbol_group_id::NU
* @brief Function object to map (input_symbol,stack_context) tuples to a symbol group.
*/
struct SymbolPairToSymbolGroupId {
SymbolT delimiter = '\n';
CUDF_HOST_DEVICE SymbolGroupT operator()(thrust::tuple<SymbolT, StackSymbolT> symbol) const
{
auto const input_symbol = thrust::get<0>(symbol);
auto const stack_symbol = thrust::get<1>(symbol);
return static_cast<SymbolGroupT>(
input_symbol == '\n'
input_symbol == delimiter
? dfa_symbol_group_id::DELIMITER
: (stack_symbol == '_' ? dfa_symbol_group_id::ROOT : dfa_symbol_group_id::OTHER));
}
Expand Down Expand Up @@ -331,50 +332,72 @@ enum class dfa_symbol_group_id : uint8_t {
CLOSING_BRACKET, ///< Closing bracket SG: ]
QUOTE_CHAR, ///< Quote character SG: "
ESCAPE_CHAR, ///< Escape character SG: '\'
NEWLINE_CHAR, ///< Newline character SG: '\n'
DELIMITER_CHAR, ///< Delimiter character SG
OTHER_SYMBOLS, ///< SG implicitly matching all other characters
NUM_SYMBOL_GROUPS ///< Total number of symbol groups
};

constexpr auto TT_NUM_STATES = static_cast<StateT>(dfa_states::TT_NUM_STATES);
constexpr auto NUM_SYMBOL_GROUPS = static_cast<uint32_t>(dfa_symbol_group_id::NUM_SYMBOL_GROUPS);

// The i-th string representing all the characters of a symbol group
std::array<std::string, NUM_SYMBOL_GROUPS - 1> const symbol_groups{
{{"{"}, {"["}, {"}"}, {"]"}, {"\""}, {"\\"}, {"\n"}}};
// The DFA's starting state
constexpr auto start_state = static_cast<StateT>(TT_OOS);

// Transition table for the default JSON and JSON lines formats
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const transition_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}},
/* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_STR, TT_STR}},
/* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR}}}};

// Transition table for the JSON lines format that recovers from invalid JSON lines
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
resetting_transition_table{
template <typename SymbolT>
auto get_sgid_lut(SymbolT delim)
{
// The i-th string representing all the characters of a symbol group
std::array<std::vector<SymbolT>, NUM_SYMBOL_GROUPS - 1> symbol_groups{
{{'{'}, {'['}, {'}'}, {']'}, {'"'}, {'\\'}, {delim}}};

return symbol_groups;
}

auto get_transition_table(stack_behavior_t stack_behavior)
{
// Transition table for the default JSON and JSON lines formats
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const transition_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}},
/* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_OOS, TT_STR}},
/* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_STR}}}};

// Translation table for the default JSON and JSON lines formats
std::array<std::array<std::vector<char>, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const translation_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {}, {}}},
/* TT_STR */ {{{}, {}, {}, {}, {}, {}, {}, {}}},
/* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {}, {}}}}};

// Translation table for the JSON lines format that recovers from invalid JSON lines
std::array<std::array<std::vector<char>, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
resetting_translation_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {'\n'}, {}}},
/* TT_STR */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}},
/* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}}};
/* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_STR, TT_STR}},
/* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR}}}};

// Transition table for the JSON lines format that recovers from invalid JSON lines
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
resetting_transition_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}},
/* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_OOS, TT_STR}},
/* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_STR}}}};

// Transition table specialized on the choice of whether to reset on newlines
return (stack_behavior == stack_behavior_t::ResetOnDelimiter) ? resetting_transition_table
: transition_table;
}

auto get_translation_table(stack_behavior_t stack_behavior)
{
// Translation table for the default JSON and JSON lines formats
std::array<std::array<std::vector<char>, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
translation_table{
{/* IN_STATE { [ } ] " \ <delim> OTHER */
/* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {}, {}}},
/* TT_STR */ {{{}, {}, {}, {}, {}, {}, {}, {}}},
/* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {}, {}}}}};

// Translation table for the JSON lines format that recovers from invalid JSON lines
std::array<std::array<std::vector<char>, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
resetting_translation_table{
{/* IN_STATE { [ } ] " \ <delim> OTHER */
/* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {'\n'}, {}}},
/* TT_STR */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}},
/* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}}};

// Translation table specialized on the choice of whether to reset on newlines
return stack_behavior == stack_behavior_t::ResetOnDelimiter ? resetting_translation_table
: translation_table;
}

// The DFA's starting state
constexpr auto start_state = static_cast<StateT>(TT_OOS);
} // namespace to_stack_op

// JSON tokenizer pushdown automaton
Expand Down Expand Up @@ -572,6 +595,7 @@ static __constant__ PdaSymbolGroupIdT tos_sg_to_pda_sgid[] = {
* visibly pushdown automaton (DVPA)
*/
struct PdaSymbolToSymbolGroupId {
SymbolT delimiter = '\n';
template <typename SymbolT, typename StackSymbolT>
__device__ __forceinline__ PdaSymbolGroupIdT
operator()(thrust::tuple<SymbolT, StackSymbolT> symbol_pair) const
Expand All @@ -593,8 +617,15 @@ struct PdaSymbolToSymbolGroupId {
// The relative symbol group id of the current input symbol
constexpr auto pda_sgid_lookup_size =
static_cast<int32_t>(sizeof(tos_sg_to_pda_sgid) / sizeof(tos_sg_to_pda_sgid[0]));
// We map the delimiter character to LINE_BREAK symbol group id, and the newline character
// to OTHER. Note that delimiter cannot be any of opening(closing) brace, bracket, quote,
// escape, comma, colon or whitespace characters.
auto const symbol_position =
symbol == delimiter
? static_cast<int32_t>('\n')
: (symbol == '\n' ? static_cast<int32_t>(delimiter) : static_cast<int32_t>(symbol));
PdaSymbolGroupIdT symbol_gid =
tos_sg_to_pda_sgid[min(static_cast<int32_t>(symbol), pda_sgid_lookup_size - 1)];
tos_sg_to_pda_sgid[min(symbol_position, pda_sgid_lookup_size - 1)];
return stack_idx * static_cast<PdaSymbolGroupIdT>(symbol_group_id::NUM_PDA_INPUT_SGS) +
symbol_gid;
}
Expand Down Expand Up @@ -1398,6 +1429,7 @@ namespace detail {
void get_stack_context(device_span<SymbolT const> json_in,
SymbolT* d_top_of_stack,
stack_behavior_t stack_behavior,
SymbolT delimiter,
rmm::cuda_stream_view stream)
{
check_input_size(json_in.size());
Expand All @@ -1423,20 +1455,11 @@ void get_stack_context(device_span<SymbolT const> json_in,
constexpr auto max_translation_table_size =
to_stack_op::NUM_SYMBOL_GROUPS * to_stack_op::TT_NUM_STATES;

// Transition table specialized on the choice of whether to reset on newlines
const auto transition_table = (stack_behavior == stack_behavior_t::ResetOnDelimiter)
? to_stack_op::resetting_transition_table
: to_stack_op::transition_table;

// Translation table specialized on the choice of whether to reset on newlines
const auto translation_table = (stack_behavior == stack_behavior_t::ResetOnDelimiter)
? to_stack_op::resetting_translation_table
: to_stack_op::translation_table;

auto json_to_stack_ops_fst = fst::detail::make_fst(
fst::detail::make_symbol_group_lut(to_stack_op::symbol_groups),
fst::detail::make_transition_table(transition_table),
fst::detail::make_translation_table<max_translation_table_size>(translation_table),
fst::detail::make_symbol_group_lut(to_stack_op::get_sgid_lut(delimiter)),
fst::detail::make_transition_table(to_stack_op::get_transition_table(stack_behavior)),
fst::detail::make_translation_table<max_translation_table_size>(
to_stack_op::get_translation_table(stack_behavior)),
stream);

// "Search" for relevant occurrence of brackets and braces that indicate the beginning/end
Expand Down Expand Up @@ -1539,16 +1562,16 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
// Range of encapsulating function that parses to internal columnar data representation
CUDF_FUNC_RANGE();

auto const new_line_delimited_json = options.is_enabled_lines();
auto const delimited_json = options.is_enabled_lines();
auto const delimiter = options.get_delimiter();

// (!new_line_delimited_json) => JSON
// (new_line_delimited_json and recover_from_error) => JSON_LINES_RECOVER
// (new_line_delimited_json and !recover_from_error) => JSON_LINES
auto format = new_line_delimited_json
? (options.recovery_mode() == json_recovery_mode_t::RECOVER_WITH_NULL
? tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER
: tokenizer_pda::json_format_cfg_t::JSON_LINES)
: tokenizer_pda::json_format_cfg_t::JSON;
// (!delimited_json) => JSON
// (delimited_json and recover_from_error) => JSON_LINES_RECOVER
// (delimited_json and !recover_from_error) => JSON_LINES
auto format = delimited_json ? (options.recovery_mode() == json_recovery_mode_t::RECOVER_WITH_NULL
? tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER
: tokenizer_pda::json_format_cfg_t::JSON_LINES)
: tokenizer_pda::json_format_cfg_t::JSON;

// Prepare for PDA transducer pass, merging input symbols with stack symbols
auto const recover_from_error = (format == tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER);
Expand All @@ -1559,7 +1582,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
// Identify what is the stack context for each input character (JSON-root, struct, or list)
auto const stack_behavior =
recover_from_error ? stack_behavior_t::ResetOnDelimiter : stack_behavior_t::PushPopWithoutReset;
get_stack_context(json_in, stack_symbols.data(), stack_behavior, stream);
get_stack_context(json_in, stack_symbols.data(), stack_behavior, delimiter, stream);

// Input to the full pushdown automaton finite-state transducer, where a input symbol comprises
// the combination of a character from the JSON input together with the stack context for that
Expand All @@ -1573,7 +1596,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
if (recover_from_error) {
auto fix_stack_of_excess_chars = fst::detail::make_fst(
fst::detail::make_symbol_group_lookup_op(
fix_stack_of_excess_chars::SymbolPairToSymbolGroupId{}),
fix_stack_of_excess_chars::SymbolPairToSymbolGroupId{delimiter}),
fst::detail::make_transition_table(fix_stack_of_excess_chars::transition_table),
fst::detail::make_translation_functor(fix_stack_of_excess_chars::TransduceInputOp{}),
stream);
Expand All @@ -1592,8 +1615,9 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
constexpr auto max_translation_table_size =
tokenizer_pda::NUM_PDA_SGIDS *
static_cast<tokenizer_pda::StateT>(tokenizer_pda::pda_state_t::PD_NUM_STATES);

auto json_to_tokens_fst = fst::detail::make_fst(
fst::detail::make_symbol_group_lookup_op(tokenizer_pda::PdaSymbolToSymbolGroupId{}),
fst::detail::make_symbol_group_lookup_op(tokenizer_pda::PdaSymbolToSymbolGroupId{delimiter}),
fst::detail::make_transition_table(tokenizer_pda::get_transition_table(format)),
fst::detail::make_translation_table<max_translation_table_size>(
tokenizer_pda::get_translation_table(recover_from_error)),
Expand Down

0 comments on commit 9ce1721

Please sign in to comment.