Skip to content

Commit

Permalink
refactor addTypeSuffix
Browse files Browse the repository at this point in the history
  • Loading branch information
bughou committed Dec 4, 2018
1 parent 2894593 commit ac56329
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 143 deletions.
2 changes: 1 addition & 1 deletion collector/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *Reader) readSize(targetSize int) (rows []map[string]interface{}, drain
for err == nil && size < targetSize {
var line []byte
if line, err = r.readLine(); len(line) > 0 {
if row := r.parseLine(line); row != nil {
if row := r.parseLine(line); len(row) > 0 {
rows = append(rows, row)
size += len(line)
}
Expand Down
4 changes: 3 additions & 1 deletion collector/reader/reader_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ func (r *Reader) readLine() ([]byte, error) {

func (r *Reader) parseLine(line []byte) map[string]interface{} {
var row map[string]interface{}
if err := json.Unmarshal(line, &row); err == nil {
var decoder = json.NewDecoder(bytes.NewReader(line))
decoder.UseNumber()
if err := decoder.Decode(&row); err == nil {
return row
} else {
if line = bytes.TrimSpace(line); len(line) > 0 {
Expand Down
81 changes: 81 additions & 0 deletions outputs/elasticsearch/typesuffix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package elasticsearch

import (
"encoding/json"
"strings"
)

func arrayAddTypeSuffixToMapKeys(rows []map[string]interface{}, mapping map[string]interface{}) {
var excludes map[string]interface{}
if len(mapping) > 0 {
excludes, _ = mapping["properties"].(map[string]interface{})
}
for _, row := range rows {
addTypeSuffixToMapKeys(row, excludes)
}
}

func addTypeSuffixToMapKeys(
m map[string]interface{}, excludes map[string]interface{},
) {
suffixes := make(map[string]uint8)
for k, v := range m {
var ex interface{}
var exMap map[string]interface{}
if len(excludes) > 0 {
ex = excludes[k]
exMap, _ = ex.(map[string]interface{})
}
if suffix := getTypeSuffix(v, exMap); suffix > 0 && ex == nil {
suffixes[k] = suffix
}
}
for k, suffix := range suffixes {
m[k+string([]byte{'_', suffix})] = m[k]
}
for k := range suffixes {
delete(m, k)
}
}

func getTypeSuffix(v interface{}, mapping map[string]interface{}) uint8 {
// Unmarshal into interface, generates the 5 data type, see encoding/json/#Unmarshal
switch value := v.(type) {
case string:
return 's'
case json.Number:
if strings.IndexByte(string(value), '.') >= 0 {
return 'f'
} else {
return 'i'
}
case bool:
return 'b'
case map[string]interface{}:
var excludes map[string]interface{}
if len(mapping) > 0 {
excludes, _ = mapping["properties"].(map[string]interface{})
}
addTypeSuffixToMapKeys(value, excludes)
return 'o'
case []interface{}:
return getArrayTypeSuffix(value, mapping)
}
return 0
}

func getArrayTypeSuffix(slice []interface{}, mapping map[string]interface{}) uint8 {
if len(slice) == 0 {
return 0
}
suffix := getTypeSuffix(slice[0], mapping)
if suffix == 0 {
return 0
}
for i := 1; i < len(slice); i++ {
if suffix != getTypeSuffix(slice[i], mapping) {
return 0
}
}
return suffix
}
159 changes: 159 additions & 0 deletions outputs/elasticsearch/typesuffix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"log"
"sort"
)

func ExampleAddTypeSuffixToMapKeys_empty() {
m := map[string]interface{}{}
addTypeSuffixToMapKeys(m, nil)
fmt.Println(m)

// Output:
// map[]
}

func ExampleAddTypeSuffixToMapKeys_basic() {
m := map[string]interface{}{
"bool": true,
"float": json.Number("12.3"),
"int": json.Number("123"),
"intArray": []interface{}{json.Number("123")},
"str": "str",
}
addTypeSuffixToMapKeys(m, nil)
sortPrint(m)

// Output:
// bool_b: true
// float_f: 12.3
// intArray_i: [123]
// int_i: 123
// str_s: str
}

func ExampleAddTypeSuffixToMapKeys_basicWithExcludes() {
m := map[string]interface{}{
"bool": true,
"float": json.Number("12.3"),
"int": json.Number("123"),
"intArray": []interface{}{json.Number("123")},
"str": "str",
}
addTypeSuffixToMapKeys(m, map[string]interface{}{
"bool": "bool",
"float": map[string]string{"type": "float"},
})
sortPrint(m)

// Output:
// bool: true
// float: 12.3
// intArray_i: [123]
// int_i: 123
// str_s: str
}

func ExampleAddTypeSuffixToMapKeys_object() {
m := map[string]interface{}{
"object": map[string]interface{}{
"bool": true,
"boolArray": []interface{}{true, false},
"float": json.Number("12.3"),
"int": json.Number("123"),
"str": "str",
},
}
addTypeSuffixToMapKeys(m, nil)
sortPrint(m["object_o"].(map[string]interface{}))
// Output:
// boolArray_b: [true false]
// bool_b: true
// float_f: 12.3
// int_i: 123
// str_s: str
}

func ExampleAddTypeSuffixToMapKeys_objectWithExcludes() {
m := map[string]interface{}{
"object": map[string]interface{}{
"bool": true,
"boolArray": []interface{}{true, false},
"float": json.Number("12.3"),
"int": json.Number("123"),
"str": "str",
},
}
addTypeSuffixToMapKeys(m, map[string]interface{}{
"object": map[string]interface{}{
"properties": map[string]interface{}{
"bool": "bool",
"float": map[string]string{"type": "float"},
},
},
})
sortPrint(m["object"].(map[string]interface{}))
// Output:
// bool: true
// boolArray_b: [true false]
// float: 12.3
// int_i: 123
// str_s: str
}

func ExampleAddTypeSuffixToMapKeys_objectWithExcludes2() {
m := map[string]interface{}{
"object": map[string]interface{}{
"bool": true,
"boolArray": []interface{}{true, false},
"float": json.Number("12.3"),
"int": json.Number("123"),
"str": "str",
},
}
addTypeSuffixToMapKeys(m, map[string]interface{}{
"object": map[string]interface{}{},
})
sortPrint(m["object"].(map[string]interface{}))
// Output:
// boolArray_b: [true false]
// bool_b: true
// float_f: 12.3
// int_i: 123
// str_s: str
}

func ExampleUnmarshal() {
var data = make(map[string]interface{})
if err := json.Unmarshal([]byte(`{
"int": 1,
"intArray": [1,2],
"objectArray": [ { "k": "v" } ],
"map": { "k": "v" }
}`), &data); err != nil {
log.Panic(err)
}
fmt.Printf("%T\n", data["int"])
fmt.Printf("%T\n", data["intArray"])
fmt.Printf("%T\n", data["objectArray"])
fmt.Printf("%T\n", data["map"])
// Output:
// float64
// []interface {}
// []interface {}
// map[string]interface {}
}

func sortPrint(m map[string]interface{}) {
var slice [][2]interface{}
for k, v := range m {
slice = append(slice, [2]interface{}{k, v})
}
sort.Slice(slice, func(i, j int) bool { return slice[i][0].(string) < slice[j][0].(string) })
for _, row := range slice {
fmt.Printf("%s: %v\n", row[0], row[1])
}
}

0 comments on commit ac56329

Please sign in to comment.