Skip to content

Commit

Permalink
common: use same decoder for json unmarshalling
Browse files Browse the repository at this point in the history
Change-Id: Icdcbe31d24dae9ed1b76c6847c9b05f51a0ca075
Reviewed-on: https://softwarefactory-project.io/r/6245
Reviewed-by: Nicolas PLANEL <nplanel@redhat.com>
Tested-by: Nicolas PLANEL <nplanel@redhat.com>
Workflow: Nicolas PLANEL <nplanel@redhat.com>
  • Loading branch information
lebauce committed Feb 17, 2017
1 parent defc920 commit 3f668d0
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 58 deletions.
3 changes: 2 additions & 1 deletion api/api.go
Expand Up @@ -32,6 +32,7 @@ import (
etcd "github.com/coreos/etcd/client"
"golang.org/x/net/context"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (a *ApiServer) RegisterApiHandler(handler ApiHandler) error {
// keep the original ID
id := resource.ID()

if err := json.NewDecoder(r.Body).Decode(&resource); err != nil {
if err := common.JsonDecode(r.Body, &resource); err != nil {
writeError(w, http.StatusBadRequest, err)
return
}
Expand Down
6 changes: 2 additions & 4 deletions cmd/client/gremlin.go
Expand Up @@ -30,6 +30,7 @@ import (
"net/http"

"github.com/skydive-project/skydive/api"
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/flow"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/topology/graph"
Expand Down Expand Up @@ -63,10 +64,7 @@ func (g *GremlinQueryHelper) Query(query string, values interface{}) error {
return fmt.Errorf("%s: %s", resp.Status, string(data))
}

decoder := json.NewDecoder(resp.Body)
decoder.UseNumber()

if err = decoder.Decode(values); err != nil {
if err = common.JsonDecode(resp.Body, values); err != nil {
return err
}

Expand Down
8 changes: 8 additions & 0 deletions common/common.go
Expand Up @@ -23,8 +23,10 @@
package common

import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"os"
Expand Down Expand Up @@ -416,3 +418,9 @@ func ServiceAddressFromString(addressPort string) (ServiceAddress, error) {
Port: port,
}, nil
}

func JsonDecode(r io.Reader, i interface{}) error {
decoder := json.NewDecoder(r)
decoder.UseNumber()
return decoder.Decode(i)
}
10 changes: 6 additions & 4 deletions http/client.go
Expand Up @@ -30,6 +30,8 @@ import (
"io"
"io/ioutil"
"net/http"

"github.com/skydive-project/skydive/common"
)

type RestClient struct {
Expand Down Expand Up @@ -99,7 +101,7 @@ func (c *CrudClient) List(resource string, values interface{}) error {
return errors.New(fmt.Sprintf("Failed to list %s, %s: %s", resource, resp.Status, readBody(resp)))
}

return json.NewDecoder(resp.Body).Decode(values)
return common.JsonDecode(resp.Body, values)
}

func (c *CrudClient) Get(resource string, id string, value interface{}) error {
Expand All @@ -114,7 +116,7 @@ func (c *CrudClient) Get(resource string, id string, value interface{}) error {
return errors.New(fmt.Sprintf("Failed to get %s, %s: %s", resource, resp.Status, readBody(resp)))
}

return json.NewDecoder(resp.Body).Decode(value)
return common.JsonDecode(resp.Body, value)
}

func (c *CrudClient) Create(resource string, value interface{}) error {
Expand All @@ -137,7 +139,7 @@ func (c *CrudClient) Create(resource string, value interface{}) error {
return errors.New(fmt.Sprintf("Failed to create %s, %s: %s", resource, resp.Status, readBody(resp)))
}

return json.NewDecoder(resp.Body).Decode(value)
return common.JsonDecode(resp.Body, value)
}

func (c *CrudClient) Update(resource string, id string, value interface{}) error {
Expand All @@ -159,7 +161,7 @@ func (c *CrudClient) Update(resource string, id string, value interface{}) error
return errors.New(fmt.Sprintf("Failed to update %s, %s: %s", resource, resp.Status, readBody(resp)))
}

return json.NewDecoder(resp.Body).Decode(value)
return common.JsonDecode(resp.Body, value)
}

func (c *CrudClient) Delete(resource string, id string) error {
Expand Down
5 changes: 3 additions & 2 deletions packet_injector/server.go
Expand Up @@ -23,10 +23,11 @@
package packet_injector

import (
"encoding/json"
"bytes"
"fmt"
"net/http"

"github.com/skydive-project/skydive/common"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/topology/graph"
Expand All @@ -50,7 +51,7 @@ func (pis *PacketInjectorServer) injectPacket(msg shttp.WSMessage) (bool, string
Payload string
Count int
}{}
if err := json.Unmarshal([]byte(*msg.Obj), &params); err != nil {
if err := common.JsonDecode(bytes.NewBuffer([]byte(*msg.Obj)), &params); err != nil {
e := fmt.Sprintf("Unable to decode packet inject param message %v", msg)
return false, e
}
Expand Down
7 changes: 4 additions & 3 deletions tests/alert_test.go
Expand Up @@ -23,7 +23,7 @@
package tests

import (
"encoding/json"
"bytes"
"fmt"
"io/ioutil"
"net"
Expand All @@ -38,6 +38,7 @@ import (
"github.com/hydrogen18/stoppableListener"
"github.com/skydive-project/skydive/alert"
"github.com/skydive-project/skydive/api"
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/tests/helper"
Expand All @@ -46,7 +47,7 @@ import (

func checkMessage(t *testing.T, b []byte, al *api.Alert) (bool, error) {
var alertMsg alert.AlertMessage
if err := json.Unmarshal(b, &alertMsg); err == nil {
if err := common.JsonDecode(bytes.NewReader(b), &alertMsg); err == nil {
if alertMsg.UUID == al.UUID {
var nodes []*graph.Node
switch arr := alertMsg.ReasonData.(type) {
Expand Down Expand Up @@ -254,7 +255,7 @@ func TestAlertWithTimer(t *testing.T) {
}

var msg shttp.WSMessage
if err = json.Unmarshal(m, &msg); err != nil {
if err = common.JsonDecode(bytes.NewReader(m), &msg); err != nil {
t.Fatalf("Failed to unmarshal message: %s", err.Error())
}

Expand Down
21 changes: 10 additions & 11 deletions topology/graph/elasticsearch.go
Expand Up @@ -23,12 +23,14 @@
package graph

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/storage/elasticsearch"
Expand Down Expand Up @@ -207,18 +209,15 @@ func (b *ElasticSearchBackend) getElement(kind string, i Identifier, t *time.Tim

if resp.Found {
var obj map[string]interface{}
if err := json.Unmarshal([]byte(*resp.Source), &obj); err != nil {
if err := common.JsonDecode(bytes.NewReader([]byte(*resp.Source)), &obj); err != nil {
return err
}

deletedAt, ok := obj["DeletedAt"].(float64)
if ok && deletedAt == 0 {
switch e := element.(type) {
case *Node:
e.Decode(obj)
case *Edge:
e.Decode(obj)
}
switch e := element.(type) {
case *Node:
e.Decode(obj)
case *Edge:
e.Decode(obj)
}
}

Expand Down Expand Up @@ -289,7 +288,7 @@ func (b *ElasticSearchBackend) unflattenMetadata(obj map[string]interface{}) {

func (b *ElasticSearchBackend) hitToNode(source *json.RawMessage, node *Node) error {
var obj map[string]interface{}
if err := json.Unmarshal([]byte(*source), &obj); err != nil {
if err := common.JsonDecode(bytes.NewReader([]byte(*source)), &obj); err != nil {
return err
}
b.unflattenMetadata(obj)
Expand All @@ -301,7 +300,7 @@ func (b *ElasticSearchBackend) hitToNode(source *json.RawMessage, node *Node) er

func (b *ElasticSearchBackend) hitToEdge(source *json.RawMessage, edge *Edge) error {
var obj map[string]interface{}
if err := json.Unmarshal([]byte(*source), &obj); err != nil {
if err := common.JsonDecode(bytes.NewReader([]byte(*source)), &obj); err != nil {
return err
}
b.unflattenMetadata(obj)
Expand Down
34 changes: 8 additions & 26 deletions topology/graph/message.go
Expand Up @@ -27,24 +27,21 @@ import (
"encoding/json"
"time"

"github.com/skydive-project/skydive/common"
shttp "github.com/skydive-project/skydive/http"
)

func UnmarshalWSMessage(msg shttp.WSMessage) (string, interface{}, error) {
var obj interface{}
if err := common.JsonDecode(bytes.NewReader([]byte(*msg.Obj)), &obj); err != nil {
return "", msg, err
}

switch msg.Type {
case "SyncRequest":
var obj map[string]interface{}
if msg.Obj != nil {
decoder := json.NewDecoder(bytes.NewReader([]byte(*msg.Obj)))
decoder.UseNumber()

if err := decoder.Decode(&obj); err != nil {
return "", msg, err
}
}

m := obj.(map[string]interface{})
var context GraphContext
switch v := obj["Time"].(type) {
switch v := m["Time"].(type) {
case json.Number:
i, err := v.Int64()
if err != nil {
Expand All @@ -57,30 +54,15 @@ func UnmarshalWSMessage(msg shttp.WSMessage) (string, interface{}, error) {
return msg.Type, context, nil

case "HostGraphDeleted":
var obj interface{}
if err := json.Unmarshal([]byte(*msg.Obj), &obj); err != nil {
return "", msg, err
}
return msg.Type, obj, nil
case "NodeUpdated", "NodeDeleted", "NodeAdded":
var obj interface{}
if err := json.Unmarshal([]byte(*msg.Obj), &obj); err != nil {
return "", msg, err
}

var node Node
if err := node.Decode(obj); err != nil {
return "", msg, err
}

return msg.Type, &node, nil
case "EdgeUpdated", "EdgeDeleted", "EdgeAdded":
var obj interface{}
err := json.Unmarshal([]byte(*msg.Obj), &obj)
if err != nil {
return "", msg, err
}

var edge Edge
if err := edge.Decode(obj); err != nil {
return "", msg, err
Expand Down
8 changes: 4 additions & 4 deletions topology/graph/orientdb/client.go
Expand Up @@ -32,6 +32,8 @@ import (
"io/ioutil"
"net/http"
"strings"

"github.com/skydive-project/skydive/common"
)

type Document map[string]interface{}
Expand Down Expand Up @@ -100,7 +102,7 @@ type DocumentClass struct {

func parseError(body io.Reader) error {
var errs Errors
if err := json.NewDecoder(body).Decode(&errs); err != nil {
if err := common.JsonDecode(body, &errs); err != nil {
return fmt.Errorf("Error while parsing error: %s (%s)", err.Error(), body)
}
var s string
Expand Down Expand Up @@ -134,9 +136,7 @@ func parseResponse(resp *http.Response, result interface{}) error {
} else {
content, _ := ioutil.ReadAll(body)
if len(content) != 0 {
decoder := json.NewDecoder(bytes.NewBuffer(content))
decoder.UseNumber()
if err := decoder.Decode(result); err != nil {
if err := common.JsonDecode(bytes.NewBuffer(content), result); err != nil {
return fmt.Errorf("Error while parsing OrientDB response: %s (%s)", err.Error(), content)
}
}
Expand Down
4 changes: 1 addition & 3 deletions topology/probes/docker.go
Expand Up @@ -23,7 +23,6 @@
package probes

import (
"encoding/json"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -200,10 +199,9 @@ func (probe *DockerProbe) connect() error {

defer probe.wg.Done()

dec := json.NewDecoder(body)
for {
var event events.Message
err := dec.Decode(&event)
err := common.JsonDecode(body, &event)
if err != nil {
if err == io.EOF {
break
Expand Down

0 comments on commit 3f668d0

Please sign in to comment.