Skip to content

Commit

Permalink
dataflow-expression: Add jsonb_set PostgreSQL function
Browse files Browse the repository at this point in the history
The target JSON value is updated at the given key/index path with the
provided JSON value. If `create_if_missing` is true, the value will be
inserted if no value exists at the path.

Addresses: ENG-1504
Release-Note-Core: Added support for the PostgreSQL function
  `jsonb_set`.
Change-Id: I023fc9be2ad774bd72bbac44e1fc95487ae769e0
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/3889
Tested-by: Buildkite CI
Reviewed-by: Nick Marino <nick@readyset.io>
Reviewed-by: Griffin Smith <griffin@readyset.io>
  • Loading branch information
nvzqz committed Dec 12, 2022
1 parent 15d53f7 commit 9c91a77
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 1 deletion.
208 changes: 208 additions & 0 deletions dataflow-expression/src/eval/builtins.rs
Expand Up @@ -611,6 +611,30 @@ impl BuiltinFunction {

Ok(target_json.into())
}
BuiltinFunction::JsonbSet(target_json, key_path, new_json, create_if_missing) => {
let mut target_json = non_null!(target_json.eval(record)?).to_json()?;

let key_path = non_null!(key_path.eval(record)?);
let key_path = key_path.as_array()?.values();

let new_json = non_null!(new_json.eval(record)?).to_json()?;

let create_if_missing = match create_if_missing {
Some(create_if_missing) => {
bool::try_from(non_null!(create_if_missing.eval(record)?))?
}
None => true,
};

crate::eval::json::json_set(
&mut target_json,
key_path,
new_json,
create_if_missing,
)?;

Ok(target_json.into())
}
BuiltinFunction::Coalesce(arg1, rest_args) => {
let val1 = arg1.eval(record)?;
let rest_vals = rest_args
Expand Down Expand Up @@ -2002,5 +2026,189 @@ mod tests {
test_non_null(object, &keys, "42", true, expected);
}
}

mod jsonb_set {
use super::*;

#[track_caller]
fn test_nullable(
json: &str,
keys: &str,
new_json: &str,
create_if_missing: Option<bool>,
expected_json: Option<&str>,
) {
// Normalize formatting and convert.
let expected_json = expected_json
.map(normalize_json)
.map(DfValue::from)
.unwrap_or_default();

let create_if_missing_args: &[&str] = match create_if_missing {
// Test calling with explicit and implicit true `create_if_missing` argument.
Some(true) => &[", true", ""],
Some(false) => &[", false"],

// Test null result.
None => &[", null"],
};

for create_if_missing_arg in create_if_missing_args {
let expr =
format!("jsonb_set({json}, {keys}, {new_json}{create_if_missing_arg})");
assert_eq!(
eval_expr(&expr, PostgreSQL),
expected_json,
"incorrect result for for `{expr}`"
);
}
}

#[track_caller]
fn test_non_null(
json: &str,
keys: &[&str],
new_json: &str,
create_if_missing: bool,
expected_json: &str,
) {
test_nullable(
&format!("'{json}'"),
&strings_to_array_expr(keys),
&format!("'{new_json}'"),
Some(create_if_missing),
Some(expected_json),
)
}

#[track_caller]
fn test_error(json: &str, keys: &[&str], new_json: &str, create_if_missing: bool) {
let keys = strings_to_array_expr(keys);
let expr =
format!("jsonb_set('{json}', {keys}, '{new_json}', {create_if_missing})");

if let Ok(value) = try_eval_expr(&expr, PostgreSQL) {
panic!("Expected error for `{expr}`, got {value:?}");
}
}

#[test]
fn null_propagation() {
test_nullable("null", "array[]", "'42'", Some(false), None);
test_nullable("'{}'", "null", "'42'", Some(false), None);
test_nullable("'{}'", "array[]", "null", Some(false), None);
test_nullable("'{}'", "array[]", "'42'", None, None);
}

#[test]
fn scalar() {
test_error("1", &["0"], "42", false);
test_error("1", &["0"], "42", true);

test_error("1.5", &["0"], "42", false);
test_error("1.5", &["0"], "42", true);

test_error("true", &["0"], "42", false);
test_error("true", &["0"], "42", true);

test_error("\"hi\"", &["0"], "42", false);
test_error("\"hi\"", &["0"], "42", true);

test_error("null", &["0"], "42", false);
test_error("null", &["0"], "42", true);
}

#[test]
fn array_empty() {
test_non_null("[]", &["0"], "42", false, "[]");
test_non_null("[]", &["0"], "42", true, "[42]");

test_non_null("[]", &["1"], "42", false, "[]");
test_non_null("[]", &["1"], "42", true, "[42]");

test_non_null("[]", &["-1"], "42", false, "[]");
test_non_null("[]", &["-1"], "42", true, "[42]");
}

#[test]
fn array_positive() {
let array = "[0, 1, 2]";

// Positive start:
test_non_null(array, &["0"], "42", false, "[42, 1, 2]");
test_non_null(array, &["0"], "42", true, "[42, 1, 2]");

// Positive middle:
test_non_null(array, &["1"], "42", false, "[0, 42, 2]");
test_non_null(array, &["1"], "42", true, "[0, 42, 2]");

// Positive end:
test_non_null(array, &["2"], "42", false, "[0, 1, 42]");
test_non_null(array, &["2"], "42", true, "[0, 1, 42]");

// Positive end, out-of-bounds:
test_non_null(array, &["3"], "42", false, "[0, 1, 2]");
test_non_null(array, &["3"], "42", true, "[0, 1, 2, 42]");
test_non_null(array, &["4"], "42", false, "[0, 1, 2]");
test_non_null(array, &["4"], "42", true, "[0, 1, 2, 42]");
}

#[test]
fn array_negative() {
let array = "[0, 1, 2]";

// Negative start, out-of-bounds:
test_non_null(array, &["-4"], "42", false, "[0, 1, 2]");
test_non_null(array, &["-4"], "42", true, "[42, 0, 1, 2]");
test_non_null(array, &["-5"], "42", false, "[0, 1, 2]");
test_non_null(array, &["-5"], "42", true, "[42, 0, 1, 2]");

// Negative start:
test_non_null(array, &["-3"], "42", false, "[42, 1, 2]");
test_non_null(array, &["-3"], "42", true, "[42, 1, 2]");

// Negative middle:
test_non_null(array, &["-2"], "42", false, "[0, 42, 2]");
test_non_null(array, &["-2"], "42", true, "[0, 42, 2]");

// Negative end:
test_non_null(array, &["-1"], "42", false, "[0, 1, 42]");
test_non_null(array, &["-1"], "42", true, "[0, 1, 42]");
}

#[test]
fn object() {
// Empty:
test_non_null("{}", &["a"], "42", false, "{}");
test_non_null("{}", &["a"], "42", true, r#"{"a": 42}"#);

// Override:
test_non_null("{\"a\": 0}", &["a"], "42", false, "{\"a\": 42}");
test_non_null("{\"a\": 0}", &["a"], "42", true, "{\"a\": 42}");

// Not found:
test_non_null("{}", &["a", "b"], "42", false, "{}");
test_non_null("{}", &["a", "b"], "42", true, "{}");
}

#[test]
fn array_nested() {
let array = "[[[0, 1, 2]]]";
let keys = ["0", "0", "0"];

test_non_null(array, &keys, "42", false, "[[[42, 1, 2]]]");
test_non_null(array, &keys, "42", true, "[[[42, 1, 2]]]");
}

#[test]
fn object_nested() {
let object = r#"{ "a": { "b": { "c": 0 } } }"#;
let expected = r#"{ "a": { "b": { "c": 42 } } }"#;
let keys = ["a", "b", "c"];

test_non_null(object, &keys, "42", false, expected);
test_non_null(object, &keys, "42", true, expected);
}
}
}
}
48 changes: 48 additions & 0 deletions dataflow-expression/src/eval/json.rs
Expand Up @@ -4,6 +4,7 @@ use std::mem;

use readyset_data::DfValue;
use readyset_errors::{invalid_err, ReadySetError, ReadySetResult};
use serde_json::map::Entry as JsonEntry;
use serde_json::{Number as JsonNumber, Value as JsonValue};

use crate::utils;
Expand Down Expand Up @@ -237,6 +238,53 @@ pub(crate) fn json_insert<'k>(
Ok(())
}

pub(crate) fn json_set<'k>(
target_json: &mut JsonValue,
key_path: impl IntoIterator<Item = &'k DfValue>,
new_json: JsonValue,
create_if_missing: bool,
) -> ReadySetResult<()> {
if json_is_scalar(target_json) {
return Err(invalid_err!("cannot set path in scalar"));
}

let key_path: Vec<&str> = key_path
.into_iter()
.map(TryInto::try_into)
.collect::<ReadySetResult<_>>()?;

let Some((&inner_key, search_keys)) = key_path.split_last() else {
return Ok(());
};

match json_find_mut(target_json, search_keys)? {
Some(JsonValue::Array(array)) => {
let index = parse_json_index(inner_key)
.ok_or_else(|| parse_json_index_error(search_keys.len()))?;

if let Some(entry) = utils::index_bidirectional_mut(array, index) {
*entry = new_json;
} else if create_if_missing {
let index = if index.is_negative() { 0 } else { array.len() };
array.insert(index, new_json);
}
}
Some(JsonValue::Object(object)) => match object.entry(inner_key) {
JsonEntry::Occupied(mut entry) => {
entry.insert(new_json);
}
JsonEntry::Vacant(entry) => {
if create_if_missing {
entry.insert(new_json);
}
}
},
_ => {}
}

Ok(())
}

/// Returns a mutable reference to an inner JSON value located at the end of a key path.
fn json_find_mut<K>(mut json: &mut JsonValue, key_path: K) -> ReadySetResult<Option<&mut JsonValue>>
where
Expand Down
5 changes: 4 additions & 1 deletion dataflow-expression/src/lib.rs
Expand Up @@ -56,6 +56,8 @@ pub enum BuiltinFunction {
JsonExtractPath { json: Expr, keys: Vec1<Expr> },
/// [`jsonb_insert`](https://www.postgresql.org/docs/current/functions-json.html)
JsonbInsert(Expr, Expr, Expr, Option<Expr>),
/// [`jsonb_set`](https://www.postgresql.org/docs/current/functions-json.html)
JsonbSet(Expr, Expr, Expr, Option<Expr>),
/// [`coalesce`](https://www.postgresql.org/docs/current/functions-conditional.html#FUNCTIONS-COALESCE-NVL-IFNULL)
Coalesce(Expr, Vec<Expr>),
/// [`concat`](https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_concat)
Expand Down Expand Up @@ -110,6 +112,7 @@ impl BuiltinFunction {
JsonStripNulls { .. } => "json_strip_nulls",
JsonExtractPath { .. } => "json_extract_path",
JsonbInsert { .. } => "jsonb_insert",
JsonbSet { .. } => "jsonb_set",
Coalesce { .. } => "coalesce",
Concat { .. } => "concat",
Substring { .. } => "substring",
Expand Down Expand Up @@ -160,7 +163,7 @@ impl Display for BuiltinFunction {
JsonExtractPath { json, keys } => {
write!(f, "({}, {})", json, keys.iter().join(", "))
}
JsonbInsert(arg1, arg2, arg3, arg4) => {
JsonbInsert(arg1, arg2, arg3, arg4) | JsonbSet(arg1, arg2, arg3, arg4) => {
write!(f, "({arg1}, {arg2}, {arg3}")?;
if let Some(arg4) = arg4 {
write!(f, ", {arg4}")?;
Expand Down
4 changes: 4 additions & 0 deletions dataflow-expression/src/lower.rs
Expand Up @@ -248,6 +248,10 @@ impl BuiltinFunction {
Self::JsonbInsert(next_arg()?, next_arg()?, next_arg()?, args.next()),
DfType::Jsonb,
),
"jsonb_set" => (
Self::JsonbSet(next_arg()?, next_arg()?, next_arg()?, args.next()),
DfType::Jsonb,
),
"coalesce" => {
let arg1 = next_arg()?;
let ty = arg1.ty().clone();
Expand Down

0 comments on commit 9c91a77

Please sign in to comment.