Skip to content

Commit

Permalink
patch: include a json merge patch field for UPDATE operations (#22)
Browse files Browse the repository at this point in the history
* proto: add patch field

* patch: include a json merge patch field

* patch: add basic tests

* patch: cover some happy cases
  • Loading branch information
tmc committed Sep 29, 2017
1 parent 9a78603 commit d02f3bd
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 37 deletions.
40 changes: 40 additions & 0 deletions patch.go
@@ -0,0 +1,40 @@
package pqstream

import (
"bytes"

jsonpatch "github.com/evanphx/json-patch"
"github.com/golang/protobuf/jsonpb"
ptypes_struct "github.com/golang/protobuf/ptypes/struct"
)

func generatePatch(a, b *ptypes_struct.Struct) (*ptypes_struct.Struct, error) {
abytes := &bytes.Buffer{}
bbytes := &bytes.Buffer{}
m := &jsonpb.Marshaler{}

if a != nil {
if err := m.Marshal(abytes, a); err != nil {
return nil, err
}
}
if b != nil {
if err := m.Marshal(bbytes, b); err != nil {
return nil, err
}
}
if abytes.Len() == 0 {
abytes.Write([]byte("{}"))
}
if bbytes.Len() == 0 {
bbytes.Write([]byte("{}"))
}
p, err := jsonpatch.CreateMergePatch(abytes.Bytes(), bbytes.Bytes())
if err != nil {
return nil, err
}
r := &ptypes_struct.Struct{}
rbytes := bytes.NewReader(p)
err = (&jsonpb.Unmarshaler{}).Unmarshal(rbytes, r)
return r, err
}
50 changes: 50 additions & 0 deletions patch_test.go
@@ -0,0 +1,50 @@
package pqstream

import (
"testing"

"github.com/golang/protobuf/jsonpb"
ptypes_struct "github.com/golang/protobuf/ptypes/struct"
"github.com/google/go-cmp/cmp"
)

func Test_generatePatch(t *testing.T) {
type args struct {
a *ptypes_struct.Struct
b *ptypes_struct.Struct
}
tests := []struct {
name string
args args
wantJson string
wantErr bool
}{
{"nils", args{nil, nil}, "{}", false},
{"empties", args{&ptypes_struct.Struct{}, &ptypes_struct.Struct{}}, "{}", false},
{"basic", args{&ptypes_struct.Struct{}, &ptypes_struct.Struct{
map[string]*ptypes_struct.Value{
"foo": &ptypes_struct.Value{
Kind: &ptypes_struct.Value_StringValue{
StringValue: "bar",
},
},
},
}}, `{"foo":"bar"}`, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := generatePatch(tt.args.a, tt.args.b)
if (err != nil) != tt.wantErr {
t.Errorf("generatePatch() error = %v, wantErr %v", err, tt.wantErr)
return
}
gotJson, err := (&jsonpb.Marshaler{}).MarshalToString(got)
if err != nil {
t.Error(err)
}
if !cmp.Equal(gotJson, tt.wantJson) {
t.Errorf("generatePatch() = %v, want %v\n%s", gotJson, tt.wantJson, cmp.Diff(gotJson, tt.wantJson))
}
})
}
}
117 changes: 94 additions & 23 deletions pqs/pqstream.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pqstream.proto
Expand Up @@ -23,6 +23,16 @@ enum Operation {
TRUNCATE = 4;
}

// RawEvent is an internal type.
message RawEvent {
string schema = 1;
string table = 2;
Operation op = 3;
string id = 4;
google.protobuf.Struct payload = 5;
google.protobuf.Struct previous = 6;
}

// A database event.
message Event {
string schema = 1;
Expand All @@ -32,5 +42,7 @@ message Event {
string id = 4;
// payload is a json encoded representation of the changed object.
google.protobuf.Struct payload = 5;
// patch is, in the event of op==UPDATE an RFC7386 JSON merge patch.
google.protobuf.Struct patch = 6;
}

15 changes: 14 additions & 1 deletion queries.go
Expand Up @@ -11,20 +11,33 @@ SELECT table_name
CREATE OR REPLACE FUNCTION pqstream_notify() RETURNS TRIGGER AS $$
DECLARE
payload json;
previous json;
notification json;
BEGIN
IF (TG_OP = 'DELETE') THEN
payload = row_to_json(OLD);
ELSE
payload = row_to_json(NEW);
END IF;
IF (TG_OP = 'UPDATE') THEN
previous = row_to_json(OLD);
END IF;
notification = json_build_object(
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'op', TG_OP,
'id', json_extract_path(payload, 'id')::text,
'payload', payload);
'payload', payload,
'previous', previous);
IF (length(notification::text) > 8000) THEN
notification = json_build_object(
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'op', TG_OP,
'id', json_extract_path(payload, 'id')::text,
'payload', payload);
END IF;
IF (length(notification::text) > 8000) THEN
notification = json_build_object(
'schema', TG_TABLE_SCHEMA,
Expand Down
16 changes: 12 additions & 4 deletions redactions.go
Expand Up @@ -15,13 +15,21 @@ func WithFieldRedactions(r FieldRedactions) ServerOption {

// redactFields search through redactionMap if there's any redacted fields
// specified that match the fields of the current event.
func (s *Server) redactFields(e *pqs.Event) {
func (s *Server) redactFields(e *pqs.RawEvent) {
if tables, ok := s.redactions[e.GetSchema()]; ok {
if fields, ok := tables[e.GetTable()]; ok {
for _, rf := range fields {
if _, ok := e.Payload.Fields[rf]; ok {
//remove field from payload
delete(e.Payload.Fields, rf)
if e.Payload != nil {
if _, ok := e.Payload.Fields[rf]; ok {
//remove field from payload
delete(e.Payload.Fields, rf)
}
}
if e.Previous != nil {
if _, ok := e.Previous.Fields[rf]; ok {
//remove field from previous payload
delete(e.Previous.Fields, rf)
}
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions redactions_test.go
Expand Up @@ -23,7 +23,7 @@ func TestServer_redactFields(t *testing.T) {
t.Fatal(err)
}

event := &pqs.Event{
event := &pqs.RawEvent{
Schema: "public",
Table: "users",
Payload: &google_protobuf.Struct{
Expand All @@ -46,19 +46,25 @@ func TestServer_redactFields(t *testing.T) {

type args struct {
redactions FieldRedactions
incoming *pqs.Event
expected *pqs.Event
incoming *pqs.RawEvent
expected *pqs.RawEvent
}
tests := []struct {
name string
args args
}{
{"nil", args{redactions: rfields, incoming: nil}},
{"nil payload", args{redactions: rfields, incoming: &pqs.RawEvent{}}},
{"nil payload", args{redactions: rfields, incoming: &pqs.RawEvent{
Schema: "public",
Table: "users",
}}},
{
name: "found",
args: args{
redactions: rfields,
incoming: event,
expected: &pqs.Event{
expected: &pqs.RawEvent{
Schema: "public",
Table: "users",
Payload: &google_protobuf.Struct{
Expand Down Expand Up @@ -89,7 +95,7 @@ func TestServer_redactFields(t *testing.T) {
s.redactions = tt.args.redactions
s.redactFields(tt.args.incoming)

if got := tt.args.incoming; !cmp.Equal(got, tt.args.expected) {
if got := tt.args.incoming; tt.args.expected != nil && !cmp.Equal(got, tt.args.expected) {
t.Errorf("s.redactFields()= %v, want %v", got, tt.args.expected)
}
})
Expand Down

0 comments on commit d02f3bd

Please sign in to comment.