diff --git a/dataflow-expression/src/eval/builtins.rs b/dataflow-expression/src/eval/builtins.rs index 4b74de4b2c..b542e97e81 100644 --- a/dataflow-expression/src/eval/builtins.rs +++ b/dataflow-expression/src/eval/builtins.rs @@ -611,6 +611,30 @@ impl BuiltinFunction { Ok(target_json.into()) } + BuiltinFunction::JsonbSet(target_json, key_path, new_json, create_if_missing) => { + let mut target_json = non_null!(target_json.eval(record)?).to_json()?; + + let key_path = non_null!(key_path.eval(record)?); + let key_path = key_path.as_array()?.values(); + + let new_json = non_null!(new_json.eval(record)?).to_json()?; + + let create_if_missing = match create_if_missing { + Some(create_if_missing) => { + bool::try_from(non_null!(create_if_missing.eval(record)?))? + } + None => true, + }; + + crate::eval::json::json_set( + &mut target_json, + key_path, + new_json, + create_if_missing, + )?; + + Ok(target_json.into()) + } BuiltinFunction::Coalesce(arg1, rest_args) => { let val1 = arg1.eval(record)?; let rest_vals = rest_args @@ -2002,5 +2026,189 @@ mod tests { test_non_null(object, &keys, "42", true, expected); } } + + mod jsonb_set { + use super::*; + + #[track_caller] + fn test_nullable( + json: &str, + keys: &str, + new_json: &str, + create_if_missing: Option, + expected_json: Option<&str>, + ) { + // Normalize formatting and convert. + let expected_json = expected_json + .map(normalize_json) + .map(DfValue::from) + .unwrap_or_default(); + + let create_if_missing_args: &[&str] = match create_if_missing { + // Test calling with explicit and implicit true `create_if_missing` argument. + Some(true) => &[", true", ""], + Some(false) => &[", false"], + + // Test null result. + None => &[", null"], + }; + + for create_if_missing_arg in create_if_missing_args { + let expr = + format!("jsonb_set({json}, {keys}, {new_json}{create_if_missing_arg})"); + assert_eq!( + eval_expr(&expr, PostgreSQL), + expected_json, + "incorrect result for for `{expr}`" + ); + } + } + + #[track_caller] + fn test_non_null( + json: &str, + keys: &[&str], + new_json: &str, + create_if_missing: bool, + expected_json: &str, + ) { + test_nullable( + &format!("'{json}'"), + &strings_to_array_expr(keys), + &format!("'{new_json}'"), + Some(create_if_missing), + Some(expected_json), + ) + } + + #[track_caller] + fn test_error(json: &str, keys: &[&str], new_json: &str, create_if_missing: bool) { + let keys = strings_to_array_expr(keys); + let expr = + format!("jsonb_set('{json}', {keys}, '{new_json}', {create_if_missing})"); + + if let Ok(value) = try_eval_expr(&expr, PostgreSQL) { + panic!("Expected error for `{expr}`, got {value:?}"); + } + } + + #[test] + fn null_propagation() { + test_nullable("null", "array[]", "'42'", Some(false), None); + test_nullable("'{}'", "null", "'42'", Some(false), None); + test_nullable("'{}'", "array[]", "null", Some(false), None); + test_nullable("'{}'", "array[]", "'42'", None, None); + } + + #[test] + fn scalar() { + test_error("1", &["0"], "42", false); + test_error("1", &["0"], "42", true); + + test_error("1.5", &["0"], "42", false); + test_error("1.5", &["0"], "42", true); + + test_error("true", &["0"], "42", false); + test_error("true", &["0"], "42", true); + + test_error("\"hi\"", &["0"], "42", false); + test_error("\"hi\"", &["0"], "42", true); + + test_error("null", &["0"], "42", false); + test_error("null", &["0"], "42", true); + } + + #[test] + fn array_empty() { + test_non_null("[]", &["0"], "42", false, "[]"); + test_non_null("[]", &["0"], "42", true, "[42]"); + + test_non_null("[]", &["1"], "42", false, "[]"); + test_non_null("[]", &["1"], "42", true, "[42]"); + + test_non_null("[]", &["-1"], "42", false, "[]"); + test_non_null("[]", &["-1"], "42", true, "[42]"); + } + + #[test] + fn array_positive() { + let array = "[0, 1, 2]"; + + // Positive start: + test_non_null(array, &["0"], "42", false, "[42, 1, 2]"); + test_non_null(array, &["0"], "42", true, "[42, 1, 2]"); + + // Positive middle: + test_non_null(array, &["1"], "42", false, "[0, 42, 2]"); + test_non_null(array, &["1"], "42", true, "[0, 42, 2]"); + + // Positive end: + test_non_null(array, &["2"], "42", false, "[0, 1, 42]"); + test_non_null(array, &["2"], "42", true, "[0, 1, 42]"); + + // Positive end, out-of-bounds: + test_non_null(array, &["3"], "42", false, "[0, 1, 2]"); + test_non_null(array, &["3"], "42", true, "[0, 1, 2, 42]"); + test_non_null(array, &["4"], "42", false, "[0, 1, 2]"); + test_non_null(array, &["4"], "42", true, "[0, 1, 2, 42]"); + } + + #[test] + fn array_negative() { + let array = "[0, 1, 2]"; + + // Negative start, out-of-bounds: + test_non_null(array, &["-4"], "42", false, "[0, 1, 2]"); + test_non_null(array, &["-4"], "42", true, "[42, 0, 1, 2]"); + test_non_null(array, &["-5"], "42", false, "[0, 1, 2]"); + test_non_null(array, &["-5"], "42", true, "[42, 0, 1, 2]"); + + // Negative start: + test_non_null(array, &["-3"], "42", false, "[42, 1, 2]"); + test_non_null(array, &["-3"], "42", true, "[42, 1, 2]"); + + // Negative middle: + test_non_null(array, &["-2"], "42", false, "[0, 42, 2]"); + test_non_null(array, &["-2"], "42", true, "[0, 42, 2]"); + + // Negative end: + test_non_null(array, &["-1"], "42", false, "[0, 1, 42]"); + test_non_null(array, &["-1"], "42", true, "[0, 1, 42]"); + } + + #[test] + fn object() { + // Empty: + test_non_null("{}", &["a"], "42", false, "{}"); + test_non_null("{}", &["a"], "42", true, r#"{"a": 42}"#); + + // Override: + test_non_null("{\"a\": 0}", &["a"], "42", false, "{\"a\": 42}"); + test_non_null("{\"a\": 0}", &["a"], "42", true, "{\"a\": 42}"); + + // Not found: + test_non_null("{}", &["a", "b"], "42", false, "{}"); + test_non_null("{}", &["a", "b"], "42", true, "{}"); + } + + #[test] + fn array_nested() { + let array = "[[[0, 1, 2]]]"; + let keys = ["0", "0", "0"]; + + test_non_null(array, &keys, "42", false, "[[[42, 1, 2]]]"); + test_non_null(array, &keys, "42", true, "[[[42, 1, 2]]]"); + } + + #[test] + fn object_nested() { + let object = r#"{ "a": { "b": { "c": 0 } } }"#; + let expected = r#"{ "a": { "b": { "c": 42 } } }"#; + let keys = ["a", "b", "c"]; + + test_non_null(object, &keys, "42", false, expected); + test_non_null(object, &keys, "42", true, expected); + } + } } } diff --git a/dataflow-expression/src/eval/json.rs b/dataflow-expression/src/eval/json.rs index ff7a594bd6..55123cc545 100644 --- a/dataflow-expression/src/eval/json.rs +++ b/dataflow-expression/src/eval/json.rs @@ -4,6 +4,7 @@ use std::mem; use readyset_data::DfValue; use readyset_errors::{invalid_err, ReadySetError, ReadySetResult}; +use serde_json::map::Entry as JsonEntry; use serde_json::{Number as JsonNumber, Value as JsonValue}; use crate::utils; @@ -237,6 +238,53 @@ pub(crate) fn json_insert<'k>( Ok(()) } +pub(crate) fn json_set<'k>( + target_json: &mut JsonValue, + key_path: impl IntoIterator, + new_json: JsonValue, + create_if_missing: bool, +) -> ReadySetResult<()> { + if json_is_scalar(target_json) { + return Err(invalid_err!("cannot set path in scalar")); + } + + let key_path: Vec<&str> = key_path + .into_iter() + .map(TryInto::try_into) + .collect::>()?; + + let Some((&inner_key, search_keys)) = key_path.split_last() else { + return Ok(()); + }; + + match json_find_mut(target_json, search_keys)? { + Some(JsonValue::Array(array)) => { + let index = parse_json_index(inner_key) + .ok_or_else(|| parse_json_index_error(search_keys.len()))?; + + if let Some(entry) = utils::index_bidirectional_mut(array, index) { + *entry = new_json; + } else if create_if_missing { + let index = if index.is_negative() { 0 } else { array.len() }; + array.insert(index, new_json); + } + } + Some(JsonValue::Object(object)) => match object.entry(inner_key) { + JsonEntry::Occupied(mut entry) => { + entry.insert(new_json); + } + JsonEntry::Vacant(entry) => { + if create_if_missing { + entry.insert(new_json); + } + } + }, + _ => {} + } + + Ok(()) +} + /// Returns a mutable reference to an inner JSON value located at the end of a key path. fn json_find_mut(mut json: &mut JsonValue, key_path: K) -> ReadySetResult> where diff --git a/dataflow-expression/src/lib.rs b/dataflow-expression/src/lib.rs index 2d056242ed..322be3ab98 100644 --- a/dataflow-expression/src/lib.rs +++ b/dataflow-expression/src/lib.rs @@ -56,6 +56,8 @@ pub enum BuiltinFunction { JsonExtractPath { json: Expr, keys: Vec1 }, /// [`jsonb_insert`](https://www.postgresql.org/docs/current/functions-json.html) JsonbInsert(Expr, Expr, Expr, Option), + /// [`jsonb_set`](https://www.postgresql.org/docs/current/functions-json.html) + JsonbSet(Expr, Expr, Expr, Option), /// [`coalesce`](https://www.postgresql.org/docs/current/functions-conditional.html#FUNCTIONS-COALESCE-NVL-IFNULL) Coalesce(Expr, Vec), /// [`concat`](https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_concat) @@ -110,6 +112,7 @@ impl BuiltinFunction { JsonStripNulls { .. } => "json_strip_nulls", JsonExtractPath { .. } => "json_extract_path", JsonbInsert { .. } => "jsonb_insert", + JsonbSet { .. } => "jsonb_set", Coalesce { .. } => "coalesce", Concat { .. } => "concat", Substring { .. } => "substring", @@ -160,7 +163,7 @@ impl Display for BuiltinFunction { JsonExtractPath { json, keys } => { write!(f, "({}, {})", json, keys.iter().join(", ")) } - JsonbInsert(arg1, arg2, arg3, arg4) => { + JsonbInsert(arg1, arg2, arg3, arg4) | JsonbSet(arg1, arg2, arg3, arg4) => { write!(f, "({arg1}, {arg2}, {arg3}")?; if let Some(arg4) = arg4 { write!(f, ", {arg4}")?; diff --git a/dataflow-expression/src/lower.rs b/dataflow-expression/src/lower.rs index c1565892cd..536830b947 100644 --- a/dataflow-expression/src/lower.rs +++ b/dataflow-expression/src/lower.rs @@ -248,6 +248,10 @@ impl BuiltinFunction { Self::JsonbInsert(next_arg()?, next_arg()?, next_arg()?, args.next()), DfType::Jsonb, ), + "jsonb_set" => ( + Self::JsonbSet(next_arg()?, next_arg()?, next_arg()?, args.next()), + DfType::Jsonb, + ), "coalesce" => { let arg1 = next_arg()?; let ty = arg1.ty().clone();