Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
lorepozo authored and zachjs committed Jun 10, 2016
1 parent 59a747a commit 12c5b1a
Show file tree
Hide file tree
Showing 26 changed files with 1,480 additions and 226 deletions.
92 changes: 63 additions & 29 deletions mongoimport/common.go
Expand Up @@ -4,19 +4,52 @@ import (
"bufio"
"bytes"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"

"github.com/mongodb/mongo-tools/common/bsonutil"
"github.com/mongodb/mongo-tools/common/db"
"github.com/mongodb/mongo-tools/common/log"
"github.com/mongodb/mongo-tools/common/util"
"gopkg.in/mgo.v2/bson"
"gopkg.in/tomb.v2"
"io"
"sort"
"strconv"
"strings"
"sync"
)

type ParseGrace int

const (
pgAutoCast ParseGrace = iota
pgSkipField
pgSkipRow
pgStop
)

// ValidatePG ensures the user-provided parseGrace is one of the allowed
// values.
func ValidatePG(pg string) (ParseGrace, error) {
switch pg {
case "autoCast":
return pgAutoCast, nil
case "skipField":
return pgSkipField, nil
case "skipRow":
return pgSkipRow, nil
case "stop":
return pgStop, nil
default:
return pgAutoCast, fmt.Errorf("invalid parse grace: %s", pg)
}
}

// ParsePG interprets the user-provided parseGrace, assuming it is valid.
func ParsePG(pg string) (res ParseGrace) {
res, _ = ValidatePG(pg)
return
}

// Converter is an interface that adds the basic Convert method which returns a
// valid BSON document that has been converted by the underlying implementation.
// If conversion fails, err will be set.
Expand Down Expand Up @@ -169,22 +202,6 @@ func doSequentialStreaming(workers []*importWorker, readDocs chan Converter, out
}
}

// getParsedValue returns the appropriate concrete type for the given token
// it first attempts to convert it to an int, if that doesn't succeed, it
// attempts conversion to a float, if that doesn't succeed, it returns the
// token as is.
func getParsedValue(token string) interface{} {
parsedInt, err := strconv.Atoi(token)
if err == nil {
return parsedInt
}
parsedFloat, err := strconv.ParseFloat(token, 64)
if err == nil {
return parsedFloat
}
return token
}

// getUpsertValue takes a given BSON document and a given field, and returns the
// field's associated value in the document. The field is specified using dot
// notation for nested fields. e.g. "person.age" would return 34 would return
Expand Down Expand Up @@ -328,23 +345,37 @@ func streamDocuments(ordered bool, numDecoders int, readDocs chan Converter, out
return
}

// tokensToBSON reads in slice of records - along with ordered fields names -
// tokensToBSON reads in slice of records - along with ordered column names -
// and returns a BSON document for the record.
func tokensToBSON(fields, tokens []string, numProcessed uint64) (bson.D, error) {
func tokensToBSON(colSpecs []ColumnSpec, tokens []string, numProcessed uint64) (bson.D, error) {
log.Logf(log.DebugHigh, "got line: %v", tokens)
var parsedValue interface{}
document := bson.D{}
for index, token := range tokens {
parsedValue = getParsedValue(token)
if index < len(fields) {
if strings.Index(fields[index], ".") != -1 {
setNestedValue(fields[index], parsedValue, &document)
if index < len(colSpecs) {
parsedValue, err := colSpecs[index].Parser.Parse(token)
if err != nil {
switch colSpecs[index].ParseGrace {
case pgAutoCast:
parsedValue = autoParse(token)
case pgSkipField:
continue
case pgSkipRow:
return nil, nil
case pgStop:
return nil, fmt.Errorf("type coercion failure for field name %s on token %s in document #%v",
colSpecs[index].Name, token, numProcessed)
}
}
if strings.Index(colSpecs[index].Name, ".") != -1 {
setNestedValue(colSpecs[index].Name, parsedValue, &document)
} else {
document = append(document, bson.DocElem{fields[index], parsedValue})
document = append(document, bson.DocElem{colSpecs[index].Name, parsedValue})
}
} else {
parsedValue = autoParse(token)
key := "field" + strconv.Itoa(index)
if util.StringSliceContains(fields, key) {
if util.StringSliceContains(ColumnNames(colSpecs), key) {
return nil, fmt.Errorf("duplicate field name - on %v - for token #%v ('%v') in document #%v",
key, index+1, parsedValue, numProcessed)
}
Expand Down Expand Up @@ -424,6 +455,9 @@ func (iw *importWorker) processDocuments(ordered bool) error {
if err != nil {
return err
}
if document == nil {
continue
}
iw.processedDocumentChan <- document
case <-iw.tomb.Dying():
return nil
Expand Down
137 changes: 85 additions & 52 deletions mongoimport/common_test.go
Expand Up @@ -2,15 +2,16 @@ package mongoimport

import (
"fmt"
"io"
"testing"

"github.com/mongodb/mongo-tools/common/db"
"github.com/mongodb/mongo-tools/common/log"
"github.com/mongodb/mongo-tools/common/options"
"github.com/mongodb/mongo-tools/common/testutil"
. "github.com/smartystreets/goconvey/convey"
"gopkg.in/mgo.v2/bson"
"gopkg.in/tomb.v2"
"io"
"testing"
)

func init() {
Expand All @@ -23,29 +24,49 @@ var (
index = uint64(0)
csvConverters = []CSVConverter{
CSVConverter{
fields: []string{"field1", "field2", "field3"},
data: []string{"a", "b", "c"},
index: index,
colSpecs: []ColumnSpec{
{"field1", new(FieldAutoParser), pgAutoCast},
{"field2", new(FieldAutoParser), pgAutoCast},
{"field3", new(FieldAutoParser), pgAutoCast},
},
data: []string{"a", "b", "c"},
index: index,
},
CSVConverter{
fields: []string{"field4", "field5", "field6"},
data: []string{"d", "e", "f"},
index: index,
colSpecs: []ColumnSpec{
{"field4", new(FieldAutoParser), pgAutoCast},
{"field5", new(FieldAutoParser), pgAutoCast},
{"field6", new(FieldAutoParser), pgAutoCast},
},
data: []string{"d", "e", "f"},
index: index,
},
CSVConverter{
fields: []string{"field7", "field8", "field9"},
data: []string{"d", "e", "f"},
index: index,
colSpecs: []ColumnSpec{
{"field7", new(FieldAutoParser), pgAutoCast},
{"field8", new(FieldAutoParser), pgAutoCast},
{"field9", new(FieldAutoParser), pgAutoCast},
},
data: []string{"d", "e", "f"},
index: index,
},
CSVConverter{
fields: []string{"field10", "field11", "field12"},
data: []string{"d", "e", "f"},
index: index,
colSpecs: []ColumnSpec{
{"field10", new(FieldAutoParser), pgAutoCast},
{"field11", new(FieldAutoParser), pgAutoCast},
{"field12", new(FieldAutoParser), pgAutoCast},
},
data: []string{"d", "e", "f"},
index: index,
},
CSVConverter{
fields: []string{"field13", "field14", "field15"},
data: []string{"d", "e", "f"},
index: index,
colSpecs: []ColumnSpec{
{"field13", new(FieldAutoParser), pgAutoCast},
{"field14", new(FieldAutoParser), pgAutoCast},
{"field15", new(FieldAutoParser), pgAutoCast},
},
data: []string{"d", "e", "f"},
index: index,
},
}
expectedDocuments = []bson.D{
Expand Down Expand Up @@ -192,22 +213,6 @@ func TestConstructUpsertDocument(t *testing.T) {
})
}

func TestGetParsedValue(t *testing.T) {
testutil.VerifyTestType(t, testutil.UnitTestType)

Convey("Given a string token to parse", t, func() {
Convey("an int token should return the underlying int value", func() {
So(getParsedValue("3"), ShouldEqual, 3)
})
Convey("a float token should return the underlying float value", func() {
So(getParsedValue(".33"), ShouldEqual, 0.33)
})
Convey("a string token should return the underlying string value", func() {
So(getParsedValue("sd"), ShouldEqual, "sd")
})
})
}

func TestSetNestedValue(t *testing.T) {
testutil.VerifyTestType(t, testutil.UnitTestType)

Expand Down Expand Up @@ -296,22 +301,31 @@ func TestRemoveBlankFields(t *testing.T) {
func TestTokensToBSON(t *testing.T) {
testutil.VerifyTestType(t, testutil.UnitTestType)

Convey("Given an slice of fields and tokens to convert to BSON", t, func() {
Convey("the expected ordered BSON should be produced for the fields/tokens given", func() {
fields := []string{"a", "b", "c"}
Convey("Given an slice of column specs and tokens to convert to BSON", t, func() {
Convey("the expected ordered BSON should be produced for the given"+
"column specs and tokens", func() {
colSpecs := []ColumnSpec{
{"a", new(FieldAutoParser), pgAutoCast},
{"b", new(FieldAutoParser), pgAutoCast},
{"c", new(FieldAutoParser), pgAutoCast},
}
tokens := []string{"1", "2", "hello"}
expectedDocument := bson.D{
bson.DocElem{"a", 1},
bson.DocElem{"b", 2},
bson.DocElem{"c", "hello"},
}
bsonD, err := tokensToBSON(fields, tokens, uint64(0))
bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0))
So(err, ShouldBeNil)
So(bsonD, ShouldResemble, expectedDocument)
})
Convey("if there are more tokens than fields, additional fields should be prefixed"+
" with 'fields' and an index indicating the header number", func() {
fields := []string{"a", "b", "c"}
colSpecs := []ColumnSpec{
{"a", new(FieldAutoParser), pgAutoCast},
{"b", new(FieldAutoParser), pgAutoCast},
{"c", new(FieldAutoParser), pgAutoCast},
}
tokens := []string{"1", "2", "hello", "mongodb", "user"}
expectedDocument := bson.D{
bson.DocElem{"a", 1},
Expand All @@ -320,18 +334,26 @@ func TestTokensToBSON(t *testing.T) {
bson.DocElem{"field3", "mongodb"},
bson.DocElem{"field4", "user"},
}
bsonD, err := tokensToBSON(fields, tokens, uint64(0))
bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0))
So(err, ShouldBeNil)
So(bsonD, ShouldResemble, expectedDocument)
})
Convey("an error should be thrown if duplicate headers are found", func() {
fields := []string{"a", "b", "field3"}
colSpecs := []ColumnSpec{
{"a", new(FieldAutoParser), pgAutoCast},
{"b", new(FieldAutoParser), pgAutoCast},
{"field3", new(FieldAutoParser), pgAutoCast},
}
tokens := []string{"1", "2", "hello", "mongodb", "user"}
_, err := tokensToBSON(fields, tokens, uint64(0))
_, err := tokensToBSON(colSpecs, tokens, uint64(0))
So(err, ShouldNotBeNil)
})
Convey("fields with nested values should be set appropriately", func() {
fields := []string{"a", "b", "c.a"}
colSpecs := []ColumnSpec{
{"a", new(FieldAutoParser), pgAutoCast},
{"b", new(FieldAutoParser), pgAutoCast},
{"c.a", new(FieldAutoParser), pgAutoCast},
}
tokens := []string{"1", "2", "hello"}
expectedDocument := bson.D{
bson.DocElem{"a", 1},
Expand All @@ -340,7 +362,7 @@ func TestTokensToBSON(t *testing.T) {
bson.DocElem{"a", "hello"},
}},
}
bsonD, err := tokensToBSON(fields, tokens, uint64(0))
bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0))
So(err, ShouldBeNil)
So(expectedDocument[0].Name, ShouldResemble, bsonD[0].Name)
So(expectedDocument[0].Value, ShouldResemble, bsonD[0].Value)
Expand All @@ -359,14 +381,22 @@ func TestProcessDocuments(t *testing.T) {
index := uint64(0)
csvConverters := []CSVConverter{
CSVConverter{
fields: []string{"field1", "field2", "field3"},
data: []string{"a", "b", "c"},
index: index,
colSpecs: []ColumnSpec{
{"field1", new(FieldAutoParser), pgAutoCast},
{"field2", new(FieldAutoParser), pgAutoCast},
{"field3", new(FieldAutoParser), pgAutoCast},
},
data: []string{"a", "b", "c"},
index: index,
},
CSVConverter{
fields: []string{"field4", "field5", "field6"},
data: []string{"d", "e", "f"},
index: index,
colSpecs: []ColumnSpec{
{"field4", new(FieldAutoParser), pgAutoCast},
{"field5", new(FieldAutoParser), pgAutoCast},
{"field6", new(FieldAutoParser), pgAutoCast},
},
data: []string{"d", "e", "f"},
index: index,
},
}
expectedDocuments := []bson.D{
Expand Down Expand Up @@ -498,9 +528,12 @@ func TestStreamDocuments(t *testing.T) {
Convey("the entire pipeline should complete with error if an error is encountered", func() {
// stream in some documents - create duplicate headers to simulate an error
csvConverter := CSVConverter{
fields: []string{"field1", "field2"},
data: []string{"a", "b", "c"},
index: uint64(0),
colSpecs: []ColumnSpec{
{"field1", new(FieldAutoParser), pgAutoCast},
{"field2", new(FieldAutoParser), pgAutoCast},
},
data: []string{"a", "b", "c"},
index: uint64(0),
}
inputChannel <- csvConverter
close(inputChannel)
Expand Down

0 comments on commit 12c5b1a

Please sign in to comment.