diff --git a/src/kafka/encode_decode.go b/src/kafka/encode_decode.go index a654d14..33fa25d 100644 --- a/src/kafka/encode_decode.go +++ b/src/kafka/encode_decode.go @@ -10,10 +10,10 @@ import ( "sync" - "github.com/inloco/kafka-elasticsearch-injector/src/models" - "github.com/inloco/kafka-elasticsearch-injector/src/schema_registry" "github.com/Shopify/sarama" "github.com/inloco/goavro" + "github.com/inloco/kafka-elasticsearch-injector/src/models" + "github.com/inloco/kafka-elasticsearch-injector/src/schema_registry" ) // DecodeMessageFunc extracts a user-domain request object from an Kafka @@ -93,12 +93,13 @@ func makeTimestamp(timestamp time.Time) int64 { func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage) (*models.Record, error) { var jsonValue map[string]interface{} err := json.Unmarshal(msg.Value, &jsonValue) - jsonValue[kafkaTimestampKey] = makeTimestamp(msg.Timestamp) if err != nil { return nil, err } + jsonValue[kafkaTimestampKey] = makeTimestamp(msg.Timestamp) + return &models.Record{ Topic: msg.Topic, Partition: msg.Partition, diff --git a/src/kafka/encode_decode_test.go b/src/kafka/encode_decode_test.go index c1118ae..a088e6d 100644 --- a/src/kafka/encode_decode_test.go +++ b/src/kafka/encode_decode_test.go @@ -35,3 +35,17 @@ func TestDecoder_JsonMessageToRecord(t *testing.T) { assert.Nil(t, err) assert.Equal(t, val, returnedVal) } + +func TestDecoder_JsonMessageToRecord_MalformedJson(t *testing.T) { + d := &Decoder{CodecCache: sync.Map{}} + jsonBytes := []byte(`{"alo": 60"`) + record, err := d.JsonMessageToRecord(context.Background(), &sarama.ConsumerMessage{ + Value: jsonBytes, + Topic: "test", + Partition: 1, + Offset: 54, + Timestamp: time.Now(), + }) + assert.Nil(t, record) + assert.NotNil(t, err) +}