Skip to content

Commit

Permalink
feat: implement unset operator for update API
Browse files Browse the repository at this point in the history
  • Loading branch information
himank committed Oct 10, 2022
1 parent fdf06cc commit 853434e
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 23 deletions.
57 changes: 41 additions & 16 deletions query/update/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ package update

import (
"fmt"
"strings"

"github.com/buger/jsonparser"
jsoniter "github.com/json-iterator/go"
"github.com/tigrisdata/tigris/errors"
"github.com/tigrisdata/tigris/util/log"
)

// FieldOPType is the field operator passed in the Update API.
type FieldOPType string

const (
Set FieldOPType = "$set"
Set FieldOPType = "$set"
UnSet FieldOPType = "$unset"
)

// BuildFieldOperators un-marshals request "fields" present in the Update API and returns a FieldOperatorFactory
Expand All @@ -43,6 +44,8 @@ func BuildFieldOperators(reqFields []byte) (*FieldOperatorFactory, error) {
for op, val := range decodedOperators {
if op == string(Set) {
operators[string(Set)] = NewFieldOperator(Set, val)
} else if op == string(UnSet) {
operators[string(UnSet)] = NewFieldOperator(UnSet, val)
}
}

Expand All @@ -57,30 +60,52 @@ type FieldOperatorFactory struct {
FieldOperators map[string]*FieldOperator
}

// MergeAndGet method to converts the input to the output after applying all the operators.
// MergeAndGet method to converts the input to the output after applying all the operators. First "$set" operation is
// applied and then "$unset" which means if a field is present in both $set and $unset then it won't be stored in the
// resulting document.
func (factory *FieldOperatorFactory) MergeAndGet(existingDoc jsoniter.RawMessage) (jsoniter.RawMessage, error) {
setFieldOp := factory.FieldOperators[string(Set)]
if setFieldOp == nil {
return nil, errors.InvalidArgument("set operator not present in the fields parameter")
out := existingDoc
var err error
if setFieldOp, ok := factory.FieldOperators[string(Set)]; ok {
if out, err = factory.set(out, setFieldOp.Input); err != nil {
return nil, err
}
}
out, err := factory.apply(existingDoc, setFieldOp.Document)
if err != nil {
if unsetFieldOp, ok := factory.FieldOperators[string(UnSet)]; ok {
if out, err = factory.remove(out, unsetFieldOp.Input); err != nil {
return nil, err
}
}

return out, nil
}

func (factory *FieldOperatorFactory) remove(out jsoniter.RawMessage, toRemove jsoniter.RawMessage) (jsoniter.RawMessage, error) {
var unsetArray []string
if err := jsoniter.Unmarshal(toRemove, &unsetArray); err != nil {
return nil, err
}

for _, unset := range unsetArray {
unsetKeys := strings.Split(unset, ".")
out = jsonparser.Delete(out, unsetKeys...)
}

return out, nil
}

func (factory *FieldOperatorFactory) apply(input jsoniter.RawMessage, setDoc jsoniter.RawMessage) (jsoniter.RawMessage, error) {
func (factory *FieldOperatorFactory) set(existingDoc jsoniter.RawMessage, setDoc jsoniter.RawMessage) (jsoniter.RawMessage, error) {
var (
output []byte = input
output []byte = existingDoc
err error
)
err = jsonparser.ObjectEach(setDoc, func(key []byte, value []byte, dataType jsonparser.ValueType, offset int) error {
if dataType == jsonparser.String {
value = []byte(fmt.Sprintf(`"%s"`, value))
}
output, err = jsonparser.Set(output, value, string(key))

keys := strings.Split(string(key), ".")
output, err = jsonparser.Set(output, value, keys...)
if err != nil {
return err
}
Expand All @@ -97,16 +122,16 @@ func (factory *FieldOperatorFactory) apply(input jsoniter.RawMessage, setDoc jso
// A FieldOperator can be of the following type:
// { "$set": { <field1>: <value1>, ... } }
// { "$incr": { <field1>: <value> } }
// { "$remove": ["d"] }.
// { "$unset": ["d"] }.
type FieldOperator struct {
Op FieldOPType
Document jsoniter.RawMessage
Op FieldOPType
Input jsoniter.RawMessage
}

// NewFieldOperator returns a FieldOperator.
func NewFieldOperator(op FieldOPType, val jsoniter.RawMessage) *FieldOperator {
return &FieldOperator{
Op: op,
Document: val,
Op: op,
Input: val,
}
}
137 changes: 137 additions & 0 deletions query/update/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"fmt"
"testing"

"github.com/buger/jsonparser"
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/require"
"github.com/tigrisdata/tigris/lib/json"
)

func TestMergeAndGet(t *testing.T) {
Expand Down Expand Up @@ -54,6 +56,11 @@ func TestMergeAndGet(t *testing.T) {
[]byte(`{"a": 1, "b": "foo", "c": 1.01, "d": {"f": 22, "g": 44}}`),
[]byte(`{"a": 1.000000022, "b": "foo", "c": 23, "d": {"f": 22, "g": 44},"e":"again"}`),
Set,
}, {
[]byte(`{"e": "again", "d.f": 29, "d.g": "bar", "d.h": "new nested"}`),
[]byte(`{"a":1, "b":"foo", "c":1.01, "d": {"f": 22, "g": "foo"}}`),
[]byte(`{"a":1, "b":"foo", "c":1.01, "d": {"f": 29, "g": "bar","h":"new nested"},"e":"again"}`),
Set,
},
}
for _, c := range cases {
Expand All @@ -67,6 +74,56 @@ func TestMergeAndGet(t *testing.T) {
}
}

func TestMergeAndGetWithUnset(t *testing.T) {
cases := []struct {
inputSet jsoniter.RawMessage
inputRemove jsoniter.RawMessage
existingDoc jsoniter.RawMessage
outputDoc jsoniter.RawMessage
}{
{
[]byte(`{"a":10}`),
[]byte(`["a", "nested"]`),
[]byte(`{"a":1,"b":"first","c":1.01,"nested":{"f":22,"g":44}}`),
[]byte(`{"b":"first","c":1.01}`),
}, {
[]byte(`{"b":"second","a":10}`),
[]byte(`["c"]`),
[]byte(`{"a":1,"b":"first","c":1.01,"nested":{"f":22,"g":44}}`),
[]byte(`{"a":10,"b":"second","nested":{"f":22,"g":44}}`),
}, {
[]byte(`{"b":"second","c":10.22}`),
[]byte(`["nested.f"]`),
[]byte(`{"a":1,"b":"first","c":1.01,"nested":{"f":22,"g":44}}`),
[]byte(`{"a":1,"b":"second","c":10.22,"nested":{"g":44}}`),
}, {
[]byte(`{"c":10.000022,"e":"not_present"}`),
[]byte(`["nested.f", "nested.g"]`),
[]byte(`{"a":1,"b":"first","c":1.01,"nested":{"f":22,"g": 4}}`),
[]byte(`{"a":1,"b":"first","c":10.000022,"nested":{},"e":"not_present"}`),
}, {
[]byte(`{"e":"not_present","a":1.000000022,"c":23}`),
[]byte(`["c", "b"]`),
[]byte(`{"a":1,"b":"first","c":1.01,"nested":{"f":22,"g":44}}`),
[]byte(`{"a":1.000000022,"nested":{"f":22,"g":44},"e":"not_present"}`),
}, {
[]byte(`{"e":"not_present","nested.f":29,"nested.g":"bar","nested.h":"new nested"}`),
[]byte(`["z", "y"]`),
[]byte(`{"a":1,"b":"first","c":1.01,"nested":{"f":22,"g":"foo"}}`),
[]byte(`{"a":1,"b":"first","c":1.01,"nested":{"f":29,"g":"bar","h":"new nested"},"e":"not_present"}`),
},
}
for _, c := range cases {
reqInput := []byte(fmt.Sprintf(`{"$set": %s, "$unset": %s}`, c.inputSet, c.inputRemove))
f, err := BuildFieldOperators(reqInput)
require.NoError(t, err)

actualOut, err := f.MergeAndGet(c.existingDoc)
require.NoError(t, err)
require.Equal(t, c.outputDoc, actualOut, fmt.Sprintf("exp '%s' actual '%s'", string(c.outputDoc), string(actualOut)))
}
}

func TestMergeAndGet_MarshalInput(t *testing.T) {
cases := []struct {
inputDoc map[string]interface{}
Expand Down Expand Up @@ -108,3 +165,83 @@ func TestMergeAndGet_MarshalInput(t *testing.T) {
require.JSONEqf(t, string(c.outputDoc), string(actualOut), fmt.Sprintf("exp '%s' actual '%s'", string(c.outputDoc), string(actualOut)))
}
}

func BenchmarkSetNoDeserialization(b *testing.B) {
existingDoc := []byte(`{
"name": "Women's Fiona Handbag",
"brand": "Michael Cors",
"labels": "Handbag, Purse, Women's fashion",
"price": 99999.12345,
"key": "1",
"categories": ["random", "fashion", "handbags", "women's"],
"description": "A typical product catalog will have many json objects like this. This benchmark is testing if not deserializing is better than deserializing JSON inputs and existing doc.",
"random": "abc defg hij klm nopqr stuv wxyz 1234 56 78 90 abcd efghijkl mnopqrstuvwxyzA BCD EFGHIJKL MNOPQRS TUVW XYZ"
}`)

f, err := BuildFieldOperators([]byte(`{"$set": {"b": "bar", "a": 10}}`))
require.NoError(b, err)
for i := 0; i < b.N; i++ {
err = f.testSetNoDeserialization(existingDoc, []byte(`{"$set": {"name": "Men's Wallet", "labels": "Handbag, Purse, Men's fashion, shoes, clothes", "price": 75}}`))
require.NoError(b, err)
}
}

func BenchmarkSetDeserializeInput(b *testing.B) {
existingDoc := []byte(`{
"name": "Women's Fiona Handbag",
"brand": "Michael Cors",
"labels": "Handbag, Purse, Women's fashion",
"price": 99999.12345,
"key": "1",
"categories": ["random", "fashion", "handbags", "women's"],
"description": "A typical product catalog will have many json objects like this. This benchmark is testing if deserializing is better than not deserializing JSON inputs and existing doc.",
"random": "abc defg hij klm nopqr stuv wxyz 1234 56 78 90 abcd efghijkl mnopqrstuvwxyzA BCD EFGHIJKL MNOPQRS TUVW XYZ"
}`)

f, err := BuildFieldOperators([]byte(`{"$set": {"b": "bar", "a": 10}}`))
require.NoError(b, err)

for i := 0; i < b.N; i++ {
mp, err := json.Decode(existingDoc)
require.NoError(b, err)

err = f.testSetDeserializeInput(mp, []byte(`{"$set": {"name": "Men's Wallet", "labels": "Handbag, Purse, Men's fashion, shoes, clothes", "price": 75}}`))
require.NoError(b, err)
}
}

func (factory *FieldOperatorFactory) testSetDeserializeInput(outMap map[string]any, setDoc jsoniter.RawMessage) error {
setMap, err := json.Decode(setDoc)
if err != nil {
return err
}

for key, value := range setMap {
outMap[key] = value
}

return nil
}

func (factory *FieldOperatorFactory) testSetNoDeserialization(input jsoniter.RawMessage, setDoc jsoniter.RawMessage) error {
var (
output []byte = input
err error
)
err = jsonparser.ObjectEach(setDoc, func(key []byte, value []byte, dataType jsonparser.ValueType, offset int) error {
if dataType == jsonparser.String {
value = []byte(fmt.Sprintf(`"%s"`, value))
}
output, err = jsonparser.Set(output, value, string(key))
if err != nil {
return err
}
return nil
})

if err != nil {
return err
}

return nil
}
15 changes: 8 additions & 7 deletions server/services/v1/query_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,13 +445,12 @@ func (runner *UpdateQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten
return nil, ctx, err
}

fieldOperator := factory.FieldOperators[string(update.Set)]
if fieldOperator == nil {
return nil, ctx, api.Errorf(api.Code_INVALID_ARGUMENT, "missing '$set operator")
}
fieldOperator.Document, err = runner.mutateAndValidatePayload(collection, fieldOperator.Document)
if err != nil {
return nil, ctx, err
if fieldOperator, ok := factory.FieldOperators[string(update.Set)]; ok {
// Set operation needs schema validation as well as mutation if we need to convert numeric fields from string to int64
fieldOperator.Input, err = runner.mutateAndValidatePayload(collection, fieldOperator.Input)
if err != nil {
return nil, ctx, err
}
}

table, err := runner.encoder.EncodeTableName(tenant.GetNamespace(), db, collection)
Expand Down Expand Up @@ -502,6 +501,8 @@ func (runner *UpdateQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten
return nil, ctx, err
}

// MergeAndGet merge the user input with existing doc and return the merged JSON document which we need to
// persist back.
merged, er := factory.MergeAndGet(row.Data.RawData)
if er != nil {
return nil, ctx, err
Expand Down
Loading

0 comments on commit 853434e

Please sign in to comment.