diff --git a/api/api.go b/api/api.go index 64c385f357..af02f971cd 100644 --- a/api/api.go +++ b/api/api.go @@ -32,6 +32,7 @@ import ( etcd "github.com/coreos/etcd/client" "golang.org/x/net/context" + "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/config" shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" @@ -117,7 +118,7 @@ func (a *ApiServer) RegisterApiHandler(handler ApiHandler) error { // keep the original ID id := resource.ID() - if err := json.NewDecoder(r.Body).Decode(&resource); err != nil { + if err := common.JsonDecode(r.Body, &resource); err != nil { writeError(w, http.StatusBadRequest, err) return } diff --git a/cmd/client/gremlin.go b/cmd/client/gremlin.go index 4719a686ef..dd8fdfe4ab 100644 --- a/cmd/client/gremlin.go +++ b/cmd/client/gremlin.go @@ -30,6 +30,7 @@ import ( "net/http" "github.com/skydive-project/skydive/api" + "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/flow" shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/topology/graph" @@ -63,10 +64,7 @@ func (g *GremlinQueryHelper) Query(query string, values interface{}) error { return fmt.Errorf("%s: %s", resp.Status, string(data)) } - decoder := json.NewDecoder(resp.Body) - decoder.UseNumber() - - if err = decoder.Decode(values); err != nil { + if err = common.JsonDecode(resp.Body, values); err != nil { return err } diff --git a/common/common.go b/common/common.go index 652eee8583..f589425fc7 100644 --- a/common/common.go +++ b/common/common.go @@ -23,8 +23,10 @@ package common import ( + "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net" "os" @@ -416,3 +418,9 @@ func ServiceAddressFromString(addressPort string) (ServiceAddress, error) { Port: port, }, nil } + +func JsonDecode(r io.Reader, i interface{}) error { + decoder := json.NewDecoder(r) + decoder.UseNumber() + return decoder.Decode(i) +} diff --git a/http/client.go b/http/client.go index efd44149c4..b2b8764a61 100644 --- a/http/client.go +++ b/http/client.go @@ -30,6 +30,8 @@ import ( "io" "io/ioutil" "net/http" + + "github.com/skydive-project/skydive/common" ) type RestClient struct { @@ -99,7 +101,7 @@ func (c *CrudClient) List(resource string, values interface{}) error { return errors.New(fmt.Sprintf("Failed to list %s, %s: %s", resource, resp.Status, readBody(resp))) } - return json.NewDecoder(resp.Body).Decode(values) + return common.JsonDecode(resp.Body, values) } func (c *CrudClient) Get(resource string, id string, value interface{}) error { @@ -114,7 +116,7 @@ func (c *CrudClient) Get(resource string, id string, value interface{}) error { return errors.New(fmt.Sprintf("Failed to get %s, %s: %s", resource, resp.Status, readBody(resp))) } - return json.NewDecoder(resp.Body).Decode(value) + return common.JsonDecode(resp.Body, value) } func (c *CrudClient) Create(resource string, value interface{}) error { @@ -137,7 +139,7 @@ func (c *CrudClient) Create(resource string, value interface{}) error { return errors.New(fmt.Sprintf("Failed to create %s, %s: %s", resource, resp.Status, readBody(resp))) } - return json.NewDecoder(resp.Body).Decode(value) + return common.JsonDecode(resp.Body, value) } func (c *CrudClient) Update(resource string, id string, value interface{}) error { @@ -159,7 +161,7 @@ func (c *CrudClient) Update(resource string, id string, value interface{}) error return errors.New(fmt.Sprintf("Failed to update %s, %s: %s", resource, resp.Status, readBody(resp))) } - return json.NewDecoder(resp.Body).Decode(value) + return common.JsonDecode(resp.Body, value) } func (c *CrudClient) Delete(resource string, id string) error { diff --git a/packet_injector/server.go b/packet_injector/server.go index ec4666b2a4..0392962a7e 100644 --- a/packet_injector/server.go +++ b/packet_injector/server.go @@ -23,10 +23,11 @@ package packet_injector import ( - "encoding/json" + "bytes" "fmt" "net/http" + "github.com/skydive-project/skydive/common" shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" @@ -50,7 +51,7 @@ func (pis *PacketInjectorServer) injectPacket(msg shttp.WSMessage) (bool, string Payload string Count int }{} - if err := json.Unmarshal([]byte(*msg.Obj), ¶ms); err != nil { + if err := common.JsonDecode(bytes.NewBuffer([]byte(*msg.Obj)), ¶ms); err != nil { e := fmt.Sprintf("Unable to decode packet inject param message %v", msg) return false, e } diff --git a/tests/alert_test.go b/tests/alert_test.go index cf22e2a24c..f1d794e490 100644 --- a/tests/alert_test.go +++ b/tests/alert_test.go @@ -23,7 +23,7 @@ package tests import ( - "encoding/json" + "bytes" "fmt" "io/ioutil" "net" @@ -38,6 +38,7 @@ import ( "github.com/hydrogen18/stoppableListener" "github.com/skydive-project/skydive/alert" "github.com/skydive-project/skydive/api" + "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/config" shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/tests/helper" @@ -46,7 +47,7 @@ import ( func checkMessage(t *testing.T, b []byte, al *api.Alert) (bool, error) { var alertMsg alert.AlertMessage - if err := json.Unmarshal(b, &alertMsg); err == nil { + if err := common.JsonDecode(bytes.NewReader(b), &alertMsg); err == nil { if alertMsg.UUID == al.UUID { var nodes []*graph.Node switch arr := alertMsg.ReasonData.(type) { @@ -254,7 +255,7 @@ func TestAlertWithTimer(t *testing.T) { } var msg shttp.WSMessage - if err = json.Unmarshal(m, &msg); err != nil { + if err = common.JsonDecode(bytes.NewReader(m), &msg); err != nil { t.Fatalf("Failed to unmarshal message: %s", err.Error()) } diff --git a/topology/graph/elasticsearch.go b/topology/graph/elasticsearch.go index 85ec8f1cff..2d6c02db03 100644 --- a/topology/graph/elasticsearch.go +++ b/topology/graph/elasticsearch.go @@ -23,12 +23,14 @@ package graph import ( + "bytes" "encoding/json" "errors" "fmt" "strings" "time" + "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/config" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/storage/elasticsearch" @@ -207,18 +209,15 @@ func (b *ElasticSearchBackend) getElement(kind string, i Identifier, t *time.Tim if resp.Found { var obj map[string]interface{} - if err := json.Unmarshal([]byte(*resp.Source), &obj); err != nil { + if err := common.JsonDecode(bytes.NewReader([]byte(*resp.Source)), &obj); err != nil { return err } - deletedAt, ok := obj["DeletedAt"].(float64) - if ok && deletedAt == 0 { - switch e := element.(type) { - case *Node: - e.Decode(obj) - case *Edge: - e.Decode(obj) - } + switch e := element.(type) { + case *Node: + e.Decode(obj) + case *Edge: + e.Decode(obj) } } @@ -289,7 +288,7 @@ func (b *ElasticSearchBackend) unflattenMetadata(obj map[string]interface{}) { func (b *ElasticSearchBackend) hitToNode(source *json.RawMessage, node *Node) error { var obj map[string]interface{} - if err := json.Unmarshal([]byte(*source), &obj); err != nil { + if err := common.JsonDecode(bytes.NewReader([]byte(*source)), &obj); err != nil { return err } b.unflattenMetadata(obj) @@ -301,7 +300,7 @@ func (b *ElasticSearchBackend) hitToNode(source *json.RawMessage, node *Node) er func (b *ElasticSearchBackend) hitToEdge(source *json.RawMessage, edge *Edge) error { var obj map[string]interface{} - if err := json.Unmarshal([]byte(*source), &obj); err != nil { + if err := common.JsonDecode(bytes.NewReader([]byte(*source)), &obj); err != nil { return err } b.unflattenMetadata(obj) diff --git a/topology/graph/message.go b/topology/graph/message.go index c1d98a2007..c4a8f36107 100644 --- a/topology/graph/message.go +++ b/topology/graph/message.go @@ -27,24 +27,21 @@ import ( "encoding/json" "time" + "github.com/skydive-project/skydive/common" shttp "github.com/skydive-project/skydive/http" ) func UnmarshalWSMessage(msg shttp.WSMessage) (string, interface{}, error) { + var obj interface{} + if err := common.JsonDecode(bytes.NewReader([]byte(*msg.Obj)), &obj); err != nil { + return "", msg, err + } + switch msg.Type { case "SyncRequest": - var obj map[string]interface{} - if msg.Obj != nil { - decoder := json.NewDecoder(bytes.NewReader([]byte(*msg.Obj))) - decoder.UseNumber() - - if err := decoder.Decode(&obj); err != nil { - return "", msg, err - } - } - + m := obj.(map[string]interface{}) var context GraphContext - switch v := obj["Time"].(type) { + switch v := m["Time"].(type) { case json.Number: i, err := v.Int64() if err != nil { @@ -57,17 +54,8 @@ func UnmarshalWSMessage(msg shttp.WSMessage) (string, interface{}, error) { return msg.Type, context, nil case "HostGraphDeleted": - var obj interface{} - if err := json.Unmarshal([]byte(*msg.Obj), &obj); err != nil { - return "", msg, err - } return msg.Type, obj, nil case "NodeUpdated", "NodeDeleted", "NodeAdded": - var obj interface{} - if err := json.Unmarshal([]byte(*msg.Obj), &obj); err != nil { - return "", msg, err - } - var node Node if err := node.Decode(obj); err != nil { return "", msg, err @@ -75,12 +63,6 @@ func UnmarshalWSMessage(msg shttp.WSMessage) (string, interface{}, error) { return msg.Type, &node, nil case "EdgeUpdated", "EdgeDeleted", "EdgeAdded": - var obj interface{} - err := json.Unmarshal([]byte(*msg.Obj), &obj) - if err != nil { - return "", msg, err - } - var edge Edge if err := edge.Decode(obj); err != nil { return "", msg, err diff --git a/topology/graph/orientdb/client.go b/topology/graph/orientdb/client.go index 517207c632..777ac34013 100644 --- a/topology/graph/orientdb/client.go +++ b/topology/graph/orientdb/client.go @@ -32,6 +32,8 @@ import ( "io/ioutil" "net/http" "strings" + + "github.com/skydive-project/skydive/common" ) type Document map[string]interface{} @@ -100,7 +102,7 @@ type DocumentClass struct { func parseError(body io.Reader) error { var errs Errors - if err := json.NewDecoder(body).Decode(&errs); err != nil { + if err := common.JsonDecode(body, &errs); err != nil { return fmt.Errorf("Error while parsing error: %s (%s)", err.Error(), body) } var s string @@ -134,9 +136,7 @@ func parseResponse(resp *http.Response, result interface{}) error { } else { content, _ := ioutil.ReadAll(body) if len(content) != 0 { - decoder := json.NewDecoder(bytes.NewBuffer(content)) - decoder.UseNumber() - if err := decoder.Decode(result); err != nil { + if err := common.JsonDecode(bytes.NewBuffer(content), result); err != nil { return fmt.Errorf("Error while parsing OrientDB response: %s (%s)", err.Error(), content) } } diff --git a/topology/probes/docker.go b/topology/probes/docker.go index 53a34ddfb7..b36e908ac7 100644 --- a/topology/probes/docker.go +++ b/topology/probes/docker.go @@ -23,7 +23,6 @@ package probes import ( - "encoding/json" "fmt" "io" "sync" @@ -200,10 +199,9 @@ func (probe *DockerProbe) connect() error { defer probe.wg.Done() - dec := json.NewDecoder(body) for { var event events.Message - err := dec.Decode(&event) + err := common.JsonDecode(body, &event) if err != nil { if err == io.EOF { break