diff --git a/mongoimport/common.go b/mongoimport/common.go index a3f98de48..2a9dcb5a6 100644 --- a/mongoimport/common.go +++ b/mongoimport/common.go @@ -4,19 +4,52 @@ import ( "bufio" "bytes" "fmt" + "io" + "sort" + "strconv" + "strings" + "sync" + "github.com/mongodb/mongo-tools/common/bsonutil" "github.com/mongodb/mongo-tools/common/db" "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/util" "gopkg.in/mgo.v2/bson" "gopkg.in/tomb.v2" - "io" - "sort" - "strconv" - "strings" - "sync" ) +type ParseGrace int + +const ( + pgAutoCast ParseGrace = iota + pgSkipField + pgSkipRow + pgStop +) + +// ValidatePG ensures the user-provided parseGrace is one of the allowed +// values. +func ValidatePG(pg string) (ParseGrace, error) { + switch pg { + case "autoCast": + return pgAutoCast, nil + case "skipField": + return pgSkipField, nil + case "skipRow": + return pgSkipRow, nil + case "stop": + return pgStop, nil + default: + return pgAutoCast, fmt.Errorf("invalid parse grace: %s", pg) + } +} + +// ParsePG interprets the user-provided parseGrace, assuming it is valid. +func ParsePG(pg string) (res ParseGrace) { + res, _ = ValidatePG(pg) + return +} + // Converter is an interface that adds the basic Convert method which returns a // valid BSON document that has been converted by the underlying implementation. // If conversion fails, err will be set. @@ -169,22 +202,6 @@ func doSequentialStreaming(workers []*importWorker, readDocs chan Converter, out } } -// getParsedValue returns the appropriate concrete type for the given token -// it first attempts to convert it to an int, if that doesn't succeed, it -// attempts conversion to a float, if that doesn't succeed, it returns the -// token as is. -func getParsedValue(token string) interface{} { - parsedInt, err := strconv.Atoi(token) - if err == nil { - return parsedInt - } - parsedFloat, err := strconv.ParseFloat(token, 64) - if err == nil { - return parsedFloat - } - return token -} - // getUpsertValue takes a given BSON document and a given field, and returns the // field's associated value in the document. The field is specified using dot // notation for nested fields. e.g. "person.age" would return 34 would return @@ -328,23 +345,37 @@ func streamDocuments(ordered bool, numDecoders int, readDocs chan Converter, out return } -// tokensToBSON reads in slice of records - along with ordered fields names - +// tokensToBSON reads in slice of records - along with ordered column names - // and returns a BSON document for the record. -func tokensToBSON(fields, tokens []string, numProcessed uint64) (bson.D, error) { +func tokensToBSON(colSpecs []ColumnSpec, tokens []string, numProcessed uint64) (bson.D, error) { log.Logf(log.DebugHigh, "got line: %v", tokens) var parsedValue interface{} document := bson.D{} for index, token := range tokens { - parsedValue = getParsedValue(token) - if index < len(fields) { - if strings.Index(fields[index], ".") != -1 { - setNestedValue(fields[index], parsedValue, &document) + if index < len(colSpecs) { + parsedValue, err := colSpecs[index].Parser.Parse(token) + if err != nil { + switch colSpecs[index].ParseGrace { + case pgAutoCast: + parsedValue = autoParse(token) + case pgSkipField: + continue + case pgSkipRow: + return nil, nil + case pgStop: + return nil, fmt.Errorf("type coercion failure for field name %s on token %s in document #%v", + colSpecs[index].Name, token, numProcessed) + } + } + if strings.Index(colSpecs[index].Name, ".") != -1 { + setNestedValue(colSpecs[index].Name, parsedValue, &document) } else { - document = append(document, bson.DocElem{fields[index], parsedValue}) + document = append(document, bson.DocElem{colSpecs[index].Name, parsedValue}) } } else { + parsedValue = autoParse(token) key := "field" + strconv.Itoa(index) - if util.StringSliceContains(fields, key) { + if util.StringSliceContains(ColumnNames(colSpecs), key) { return nil, fmt.Errorf("duplicate field name - on %v - for token #%v ('%v') in document #%v", key, index+1, parsedValue, numProcessed) } @@ -424,6 +455,9 @@ func (iw *importWorker) processDocuments(ordered bool) error { if err != nil { return err } + if document == nil { + continue + } iw.processedDocumentChan <- document case <-iw.tomb.Dying(): return nil diff --git a/mongoimport/common_test.go b/mongoimport/common_test.go index fe3b134cc..d8a50ea22 100644 --- a/mongoimport/common_test.go +++ b/mongoimport/common_test.go @@ -2,6 +2,9 @@ package mongoimport import ( "fmt" + "io" + "testing" + "github.com/mongodb/mongo-tools/common/db" "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/options" @@ -9,8 +12,6 @@ import ( . "github.com/smartystreets/goconvey/convey" "gopkg.in/mgo.v2/bson" "gopkg.in/tomb.v2" - "io" - "testing" ) func init() { @@ -23,29 +24,49 @@ var ( index = uint64(0) csvConverters = []CSVConverter{ CSVConverter{ - fields: []string{"field1", "field2", "field3"}, - data: []string{"a", "b", "c"}, - index: index, + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast}, + {"field2", new(FieldAutoParser), pgAutoCast}, + {"field3", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"a", "b", "c"}, + index: index, }, CSVConverter{ - fields: []string{"field4", "field5", "field6"}, - data: []string{"d", "e", "f"}, - index: index, + colSpecs: []ColumnSpec{ + {"field4", new(FieldAutoParser), pgAutoCast}, + {"field5", new(FieldAutoParser), pgAutoCast}, + {"field6", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"d", "e", "f"}, + index: index, }, CSVConverter{ - fields: []string{"field7", "field8", "field9"}, - data: []string{"d", "e", "f"}, - index: index, + colSpecs: []ColumnSpec{ + {"field7", new(FieldAutoParser), pgAutoCast}, + {"field8", new(FieldAutoParser), pgAutoCast}, + {"field9", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"d", "e", "f"}, + index: index, }, CSVConverter{ - fields: []string{"field10", "field11", "field12"}, - data: []string{"d", "e", "f"}, - index: index, + colSpecs: []ColumnSpec{ + {"field10", new(FieldAutoParser), pgAutoCast}, + {"field11", new(FieldAutoParser), pgAutoCast}, + {"field12", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"d", "e", "f"}, + index: index, }, CSVConverter{ - fields: []string{"field13", "field14", "field15"}, - data: []string{"d", "e", "f"}, - index: index, + colSpecs: []ColumnSpec{ + {"field13", new(FieldAutoParser), pgAutoCast}, + {"field14", new(FieldAutoParser), pgAutoCast}, + {"field15", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"d", "e", "f"}, + index: index, }, } expectedDocuments = []bson.D{ @@ -192,22 +213,6 @@ func TestConstructUpsertDocument(t *testing.T) { }) } -func TestGetParsedValue(t *testing.T) { - testutil.VerifyTestType(t, testutil.UnitTestType) - - Convey("Given a string token to parse", t, func() { - Convey("an int token should return the underlying int value", func() { - So(getParsedValue("3"), ShouldEqual, 3) - }) - Convey("a float token should return the underlying float value", func() { - So(getParsedValue(".33"), ShouldEqual, 0.33) - }) - Convey("a string token should return the underlying string value", func() { - So(getParsedValue("sd"), ShouldEqual, "sd") - }) - }) -} - func TestSetNestedValue(t *testing.T) { testutil.VerifyTestType(t, testutil.UnitTestType) @@ -296,22 +301,31 @@ func TestRemoveBlankFields(t *testing.T) { func TestTokensToBSON(t *testing.T) { testutil.VerifyTestType(t, testutil.UnitTestType) - Convey("Given an slice of fields and tokens to convert to BSON", t, func() { - Convey("the expected ordered BSON should be produced for the fields/tokens given", func() { - fields := []string{"a", "b", "c"} + Convey("Given an slice of column specs and tokens to convert to BSON", t, func() { + Convey("the expected ordered BSON should be produced for the given"+ + "column specs and tokens", func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } tokens := []string{"1", "2", "hello"} expectedDocument := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, bson.DocElem{"c", "hello"}, } - bsonD, err := tokensToBSON(fields, tokens, uint64(0)) + bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0)) So(err, ShouldBeNil) So(bsonD, ShouldResemble, expectedDocument) }) Convey("if there are more tokens than fields, additional fields should be prefixed"+ " with 'fields' and an index indicating the header number", func() { - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } tokens := []string{"1", "2", "hello", "mongodb", "user"} expectedDocument := bson.D{ bson.DocElem{"a", 1}, @@ -320,18 +334,26 @@ func TestTokensToBSON(t *testing.T) { bson.DocElem{"field3", "mongodb"}, bson.DocElem{"field4", "user"}, } - bsonD, err := tokensToBSON(fields, tokens, uint64(0)) + bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0)) So(err, ShouldBeNil) So(bsonD, ShouldResemble, expectedDocument) }) Convey("an error should be thrown if duplicate headers are found", func() { - fields := []string{"a", "b", "field3"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"field3", new(FieldAutoParser), pgAutoCast}, + } tokens := []string{"1", "2", "hello", "mongodb", "user"} - _, err := tokensToBSON(fields, tokens, uint64(0)) + _, err := tokensToBSON(colSpecs, tokens, uint64(0)) So(err, ShouldNotBeNil) }) Convey("fields with nested values should be set appropriately", func() { - fields := []string{"a", "b", "c.a"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c.a", new(FieldAutoParser), pgAutoCast}, + } tokens := []string{"1", "2", "hello"} expectedDocument := bson.D{ bson.DocElem{"a", 1}, @@ -340,7 +362,7 @@ func TestTokensToBSON(t *testing.T) { bson.DocElem{"a", "hello"}, }}, } - bsonD, err := tokensToBSON(fields, tokens, uint64(0)) + bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0)) So(err, ShouldBeNil) So(expectedDocument[0].Name, ShouldResemble, bsonD[0].Name) So(expectedDocument[0].Value, ShouldResemble, bsonD[0].Value) @@ -359,14 +381,22 @@ func TestProcessDocuments(t *testing.T) { index := uint64(0) csvConverters := []CSVConverter{ CSVConverter{ - fields: []string{"field1", "field2", "field3"}, - data: []string{"a", "b", "c"}, - index: index, + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast}, + {"field2", new(FieldAutoParser), pgAutoCast}, + {"field3", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"a", "b", "c"}, + index: index, }, CSVConverter{ - fields: []string{"field4", "field5", "field6"}, - data: []string{"d", "e", "f"}, - index: index, + colSpecs: []ColumnSpec{ + {"field4", new(FieldAutoParser), pgAutoCast}, + {"field5", new(FieldAutoParser), pgAutoCast}, + {"field6", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"d", "e", "f"}, + index: index, }, } expectedDocuments := []bson.D{ @@ -498,9 +528,12 @@ func TestStreamDocuments(t *testing.T) { Convey("the entire pipeline should complete with error if an error is encountered", func() { // stream in some documents - create duplicate headers to simulate an error csvConverter := CSVConverter{ - fields: []string{"field1", "field2"}, - data: []string{"a", "b", "c"}, - index: uint64(0), + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast}, + {"field2", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"a", "b", "c"}, + index: uint64(0), } inputChannel <- csvConverter close(inputChannel) diff --git a/mongoimport/csv.go b/mongoimport/csv.go index 9beab3212..d556fd0d5 100644 --- a/mongoimport/csv.go +++ b/mongoimport/csv.go @@ -2,16 +2,16 @@ package mongoimport import ( "fmt" + "io" + "github.com/mongodb/mongo-tools/mongoimport/csv" "gopkg.in/mgo.v2/bson" - "io" ) // CSVInputReader implements the InputReader interface for CSV input types. type CSVInputReader struct { - - // fields is a list of field names in the BSON documents to be imported - fields []string + // colSpecs is a list of column specifications in the BSON documents to be imported + colSpecs []ColumnSpec // csvReader is the underlying reader used to read data in from the CSV or CSV file csvReader *csv.Reader @@ -31,21 +31,22 @@ type CSVInputReader struct { // CSVConverter implements the Converter interface for CSV input. type CSVConverter struct { - fields, data []string - index uint64 + colSpecs []ColumnSpec + data []string + index uint64 } // NewCSVInputReader returns a CSVInputReader configured to read data from the -// given io.Reader, extracting only the specified fields using exactly "numDecoders" +// given io.Reader, extracting only the specified columns using exactly "numDecoders" // goroutines. -func NewCSVInputReader(fields []string, in io.Reader, numDecoders int) *CSVInputReader { +func NewCSVInputReader(colSpecs []ColumnSpec, in io.Reader, numDecoders int) *CSVInputReader { szCount := newSizeTrackingReader(newBomDiscardingReader(in)) csvReader := csv.NewReader(szCount) - // allow variable number of fields in document + // allow variable number of colSpecs in document csvReader.FieldsPerRecord = -1 csvReader.TrimLeadingSpace = true return &CSVInputReader{ - fields: fields, + colSpecs: colSpecs, csvReader: csvReader, numProcessed: uint64(0), numDecoders: numDecoders, @@ -60,8 +61,22 @@ func (r *CSVInputReader) ReadAndValidateHeader() (err error) { if err != nil { return err } - r.fields = fields - return validateReaderFields(r.fields) + r.colSpecs = ParseAutoHeaders(fields) + return validateReaderFields(ColumnNames(r.colSpecs)) +} + +// ReadAndValidateHeader reads the header from the underlying reader and validates +// the header fields. It sets err if the read/validation fails. +func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) { + fields, err := r.csvReader.Read() + if err != nil { + return err + } + r.colSpecs, err = ParseTypedHeaders(fields, parseGrace) + if err != nil { + return err + } + return validateReaderFields(ColumnNames(r.colSpecs)) } // StreamDocument takes a boolean indicating if the documents should be streamed @@ -87,9 +102,9 @@ func (r *CSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (ret return } csvRecordChan <- CSVConverter{ - fields: r.fields, - data: r.csvRecord, - index: r.numProcessed, + colSpecs: r.colSpecs, + data: r.csvRecord, + index: r.numProcessed, } r.numProcessed++ } @@ -106,7 +121,7 @@ func (r *CSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (ret // CSVConverter struct to a BSON document. func (c CSVConverter) Convert() (bson.D, error) { return tokensToBSON( - c.fields, + c.colSpecs, c.data, c.index, ) diff --git a/mongoimport/csv_test.go b/mongoimport/csv_test.go index 6a7b461f4..9d0bcdfb3 100644 --- a/mongoimport/csv_test.go +++ b/mongoimport/csv_test.go @@ -2,15 +2,16 @@ package mongoimport import ( "bytes" + "io" + "os" + "strings" + "testing" + "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/options" "github.com/mongodb/mongo-tools/common/testutil" . "github.com/smartystreets/goconvey/convey" "gopkg.in/mgo.v2/bson" - "io" - "os" - "strings" - "testing" ) func init() { @@ -24,61 +25,85 @@ func TestCSVStreamDocument(t *testing.T) { Convey("With a CSV input reader", t, func() { Convey("badly encoded CSV should result in a parsing error", func() { contents := `1, 2, foo"bar` - fields := []string{"a", "b", "c"} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldNotBeNil) }) Convey("escaped quotes are parsed correctly", func() { contents := `1, 2, "foo""bar"` - fields := []string{"a", "b", "c"} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldBeNil) }) Convey("multiple escaped quotes separated by whitespace parsed correctly", func() { contents := `1, 2, "foo"" ""bar"` - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, bson.DocElem{"c", `foo" "bar`}, } - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedRead) }) Convey("integer valued strings should be converted", func() { contents := `1, 2, " 3e"` - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, bson.DocElem{"c", " 3e"}, } - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedRead) }) Convey("extra fields should be prefixed with 'field'", func() { contents := `1, 2f , " 3e" , " may"` - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", "2f"}, bson.DocElem{"c", " 3e"}, bson.DocElem{"field3", " may"}, } - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedRead) }) Convey("nested CSV fields should be imported properly", func() { contents := `1, 2f , " 3e" , " may"` - fields := []string{"a", "b.c", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b.c", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", bson.D{ @@ -87,7 +112,7 @@ func TestCSVStreamDocument(t *testing.T) { bson.DocElem{"c", " 3e"}, bson.DocElem{"field3", " may"}, } - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 4) So(r.StreamDocument(true, docChan), ShouldBeNil) @@ -100,22 +125,34 @@ func TestCSVStreamDocument(t *testing.T) { }) Convey("whitespace separated quoted strings are still an error", func() { contents := `1, 2, "foo" "bar"` - fields := []string{"a", "b", "c"} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldNotBeNil) }) Convey("nested CSV fields causing header collisions should error", func() { contents := `1, 2f , " 3e" , " may", june` - fields := []string{"a", "b.c", "field3"} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b.c", new(FieldAutoParser), pgAutoCast}, + {"field3", new(FieldAutoParser), pgAutoCast}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldNotBeNil) }) Convey("calling StreamDocument() for CSVs should return next set of "+ "values", func() { contents := "1, 2, 3\n4, 5, 6" - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedReadOne := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, @@ -126,7 +163,7 @@ func TestCSVStreamDocument(t *testing.T) { bson.DocElem{"b", 5}, bson.DocElem{"c", 6}, } - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 2) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedReadOne) @@ -134,7 +171,11 @@ func TestCSVStreamDocument(t *testing.T) { }) Convey("valid CSV input file that starts with the UTF-8 BOM should "+ "not raise an error", func() { - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedReads := []bson.D{ bson.D{ bson.DocElem{"a", 1}, @@ -149,7 +190,7 @@ func TestCSVStreamDocument(t *testing.T) { } fileHandle, err := os.Open("testdata/test_bom.csv") So(err, ShouldBeNil) - r := NewCSVInputReader(fields, fileHandle, 1) + r := NewCSVInputReader(colSpecs, fileHandle, 1) docChan := make(chan bson.D, len(expectedReads)) So(r.StreamDocument(true, docChan), ShouldBeNil) for _, expectedRead := range expectedReads { @@ -168,98 +209,106 @@ func TestCSVReadAndValidateHeader(t *testing.T) { Convey("With a CSV input reader", t, func() { Convey("setting the header should read the first line of the CSV", func() { contents := "extraHeader1, extraHeader2, extraHeader3" - fields := []string{} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{} + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldBeNil) - So(len(r.fields), ShouldEqual, 3) + So(len(r.colSpecs), ShouldEqual, 3) }) Convey("setting non-colliding nested CSV headers should not raise an error", func() { contents := "a, b, c" - fields := []string{} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{} + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldBeNil) - So(len(r.fields), ShouldEqual, 3) + So(len(r.colSpecs), ShouldEqual, 3) contents = "a.b.c, a.b.d, c" - fields = []string{} - r = NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldBeNil) - So(len(r.fields), ShouldEqual, 3) + So(len(r.colSpecs), ShouldEqual, 3) contents = "a.b, ab, a.c" - fields = []string{} - r = NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldBeNil) - So(len(r.fields), ShouldEqual, 3) + So(len(r.colSpecs), ShouldEqual, 3) contents = "a, ab, ac, dd" - fields = []string{} - r = NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldBeNil) - So(len(r.fields), ShouldEqual, 4) + So(len(r.colSpecs), ShouldEqual, 4) }) Convey("setting colliding nested CSV headers should raise an error", func() { contents := "a, a.b, c" - fields := []string{} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{} + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldNotBeNil) contents = "a.b.c, a.b.d.c, a.b.d" - fields = []string{} - r = NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldNotBeNil) contents = "a, a, a" - fields = []string{} - r = NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldNotBeNil) }) Convey("setting the header that ends in a dot should error", func() { contents := "c, a., b" - fields := []string{} + colSpecs := []ColumnSpec{} So(err, ShouldBeNil) - So(NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1).ReadAndValidateHeader(), ShouldNotBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1).ReadAndValidateHeader(), ShouldNotBeNil) }) Convey("setting the header that starts in a dot should error", func() { contents := "c, .a, b" - fields := []string{} - So(NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1).ReadAndValidateHeader(), ShouldNotBeNil) + colSpecs := []ColumnSpec{} + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1).ReadAndValidateHeader(), ShouldNotBeNil) }) Convey("setting the header that contains multiple consecutive dots should error", func() { contents := "c, a..a, b" - fields := []string{} - So(NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1).ReadAndValidateHeader(), ShouldNotBeNil) + colSpecs := []ColumnSpec{} + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1).ReadAndValidateHeader(), ShouldNotBeNil) contents = "c, a.a, b.b...b" - fields = []string{} - So(NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1).ReadAndValidateHeader(), ShouldNotBeNil) + colSpecs = []ColumnSpec{} + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1).ReadAndValidateHeader(), ShouldNotBeNil) }) Convey("setting the header using an empty file should return EOF", func() { contents := "" - fields := []string{} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{} + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldEqual, io.EOF) - So(len(r.fields), ShouldEqual, 0) + So(len(r.colSpecs), ShouldEqual, 0) }) - Convey("setting the header with fields already set, should "+ - "the header line with the existing fields", func() { + Convey("setting the header with column specs already set should replace "+ + "the existing column specs", func() { contents := "extraHeader1,extraHeader2,extraHeader3" - fields := []string{"a", "b", "c"} - r := NewCSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldBeNil) - // if ReadAndValidateHeader() is called with fields already passed in, - // the header should be replaced with the read header line - So(len(r.fields), ShouldEqual, 3) - So(r.fields, ShouldResemble, strings.Split(contents, ",")) + // if ReadAndValidateHeader() is called with column specs already passed + // in, the header should be replaced with the read header line + So(len(r.colSpecs), ShouldEqual, 3) + So(ColumnNames(r.colSpecs), ShouldResemble, strings.Split(contents, ",")) }) Convey("plain CSV input file sources should be parsed correctly and "+ "subsequent imports should parse correctly", func() { - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedReadOne := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, @@ -272,7 +321,7 @@ func TestCSVReadAndValidateHeader(t *testing.T) { } fileHandle, err := os.Open("testdata/test.csv") So(err, ShouldBeNil) - r := NewCSVInputReader(fields, fileHandle, 1) + r := NewCSVInputReader(colSpecs, fileHandle, 1) docChan := make(chan bson.D, 50) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedReadOne) @@ -286,9 +335,13 @@ func TestCSVConvert(t *testing.T) { Convey("With a CSV input reader", t, func() { Convey("calling convert on a CSVConverter should return the expected BSON document", func() { csvConverter := CSVConverter{ - fields: []string{"field1", "field2", "field3"}, - data: []string{"a", "b", "c"}, - index: uint64(0), + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast}, + {"field2", new(FieldAutoParser), pgAutoCast}, + {"field3", new(FieldAutoParser), pgAutoCast}, + }, + data: []string{"a", "b", "c"}, + index: uint64(0), } expectedDocument := bson.D{ bson.DocElem{"field1", "a"}, diff --git a/mongoimport/dateconv/dateconv.go b/mongoimport/dateconv/dateconv.go new file mode 100644 index 000000000..ba9745301 --- /dev/null +++ b/mongoimport/dateconv/dateconv.go @@ -0,0 +1,73 @@ +package dateconv + +import ( + "strings" +) + +var ( + msReplacers = []string{ + "dddd", "Monday", + "ddd", "Mon", + "dd", "02", + "d", "2", + "MMMM", "January", + "MMM", "Jan", + "MM", "01", + "M", "1", + // "gg", "?", + "hh", "03", + "h", "3", + "HH", "15", + "H", "15", + "mm", "04", + "m", "4", + "ss", "05", + "s", "5", + // "f", "?", + "tt", "PM", + // "t", "?", + "yyyy", "2006", + "yyy", "2006", + "yy", "06", + // "y", "?", + "zzz", "-07:00", + "zz", "-07", + // "z", "?", + } + msStringReplacer = strings.NewReplacer(msReplacers...) +) + +// FromMS reformats a datetime layout string from the Microsoft SQL Server +// FORMAT function into go's parse format. +func FromMS(layout string) string { + return msStringReplacer.Replace(layout) +} + +var ( + oracleReplacers = []string{ + "AM", "PM", + "DAY", "Monday", + "DY", "Mon", + "DD", "02", + "HH12", "03", + "HH24", "15", + "HH", "03", + "MI", "04", + "MONTH", "January", + "MON", "Jan", + "MM", "01", + "SS", "05", + "TZD", "MST", + "TZH:TZM", "-07:00", + "TZH", "-07", + "YYYY", "2006", + "YY", "06", + } + oracleStringReplacer = strings.NewReplacer(oracleReplacers...) +) + +// FromOrace reformats a datetime layout string from the Oracle Database +// TO_DATE function into go's parse format. +func FromOracle(layout string) string { + return oracleStringReplacer.Replace(strings.ToUpper(layout)) +} diff --git a/mongoimport/json.go b/mongoimport/json.go index d5d37a50c..3c405cb1f 100644 --- a/mongoimport/json.go +++ b/mongoimport/json.go @@ -3,12 +3,13 @@ package mongoimport import ( "errors" "fmt" + "io" + "strings" + "github.com/mongodb/mongo-tools/common/bsonutil" "github.com/mongodb/mongo-tools/common/json" "github.com/mongodb/mongo-tools/common/log" "gopkg.in/mgo.v2/bson" - "io" - "strings" ) // JSONInputReader is an implementation of InputReader that reads documents @@ -86,6 +87,11 @@ func (r *JSONInputReader) ReadAndValidateHeader() error { return nil } +// ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil. +func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error { + return nil +} + // StreamDocument takes a boolean indicating if the documents should be streamed // in read order and a channel on which to stream the documents processed from // the underlying reader. Returns a non-nil error if encountered diff --git a/mongoimport/json_test.go b/mongoimport/json_test.go index 01da428b6..a06097527 100644 --- a/mongoimport/json_test.go +++ b/mongoimport/json_test.go @@ -2,12 +2,13 @@ package mongoimport import ( "bytes" - "github.com/mongodb/mongo-tools/common/testutil" - . "github.com/smartystreets/goconvey/convey" - "gopkg.in/mgo.v2/bson" "io" "os" "testing" + + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" ) func TestJSONArrayStreamDocument(t *testing.T) { diff --git a/mongoimport/main/mongoimport.go b/mongoimport/main/mongoimport.go index 70d125ca5..225c44358 100644 --- a/mongoimport/main/mongoimport.go +++ b/mongoimport/main/mongoimport.go @@ -3,13 +3,14 @@ package main import ( "fmt" + "os" + "github.com/mongodb/mongo-tools/common/db" "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/options" "github.com/mongodb/mongo-tools/common/signals" "github.com/mongodb/mongo-tools/common/util" "github.com/mongodb/mongo-tools/mongoimport" - "os" ) func main() { diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 47ceed83b..43df2fe07 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -74,6 +74,11 @@ type InputReader interface { // nil otherwise. No-op for JSON input readers. ReadAndValidateHeader() error + // ReadAndValidateTypedHeader is the same as ReadAndValidateHeader, + // except it also parses types from the fields of the header. Parse errors + // will be handled according parseGrace. + ReadAndValidateTypedHeader(parseGrace ParseGrace) error + // embedded io.Reader that tracks number of bytes read, to allow feeding into progress bar. sizeTracker } @@ -126,6 +131,10 @@ func (imp *MongoImport) ValidateSettings(args []string) error { return fmt.Errorf("incompatible options: --fieldFile and --headerline") } } + + if _, err := ValidatePG(imp.InputOptions.ParseGrace); err != nil { + return err + } } else { // input type is JSON if imp.InputOptions.HeaderLine { @@ -266,7 +275,12 @@ func (imp *MongoImport) ImportDocuments() (uint64, error) { } if imp.InputOptions.HeaderLine { - if err = inputReader.ReadAndValidateHeader(); err != nil { + if imp.InputOptions.ColumnsHaveTypes { + err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace)) + } else { + err = inputReader.ReadAndValidateHeader() + } + if err != nil { return 0, err } } @@ -492,30 +506,59 @@ func (up *upserter) Flush() error { return nil } +func splitInlineHeader(header string) (headers []string) { + var level uint8 + var currentField string + for _, c := range header { + if c == '(' { + level++ + } else if c == ')' && level > 0 { + level-- + } + if c == ',' && level == 0 { + headers = append(headers, currentField) + currentField = "" + } else { + currentField = currentField + string(c) + } + } + headers = append(headers, currentField) // add last field + return +} + // getInputReader returns an implementation of InputReader based on the input type func (imp *MongoImport) getInputReader(in io.Reader) (InputReader, error) { - var fields []string + var colSpecs []ColumnSpec + var headers []string var err error if imp.InputOptions.Fields != nil { - fields = strings.Split(*imp.InputOptions.Fields, ",") + headers = splitInlineHeader(*imp.InputOptions.Fields) } else if imp.InputOptions.FieldFile != nil { - fields, err = util.GetFieldsFromFile(*imp.InputOptions.FieldFile) + headers, err = util.GetFieldsFromFile(*imp.InputOptions.FieldFile) if err != nil { return nil, err } } + if imp.InputOptions.ColumnsHaveTypes { + colSpecs, err = ParseTypedHeaders(headers, ParsePG(imp.InputOptions.ParseGrace)) + if err != nil { + return nil, err + } + } else { + colSpecs = ParseAutoHeaders(headers) + } // header fields validation can only happen once we have an input reader if !imp.InputOptions.HeaderLine { - if err = validateReaderFields(fields); err != nil { + if err = validateReaderFields(ColumnNames(colSpecs)); err != nil { return nil, err } } if imp.InputOptions.Type == CSV { - return NewCSVInputReader(fields, in, imp.ToolOptions.NumDecodingWorkers), nil + return NewCSVInputReader(colSpecs, in, imp.ToolOptions.NumDecodingWorkers), nil } else if imp.InputOptions.Type == TSV { - return NewTSVInputReader(fields, in, imp.ToolOptions.NumDecodingWorkers), nil + return NewTSVInputReader(colSpecs, in, imp.ToolOptions.NumDecodingWorkers), nil } return NewJSONInputReader(imp.InputOptions.JSONArray, in, imp.ToolOptions.NumDecodingWorkers), nil } diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go index 05dc3d65d..f028907aa 100644 --- a/mongoimport/mongoimport_test.go +++ b/mongoimport/mongoimport_test.go @@ -2,16 +2,17 @@ package mongoimport import ( "fmt" - "github.com/mongodb/mongo-tools/common/db" - "github.com/mongodb/mongo-tools/common/options" - "github.com/mongodb/mongo-tools/common/testutil" - . "github.com/smartystreets/goconvey/convey" - "gopkg.in/mgo.v2/bson" "io" "io/ioutil" "os" "reflect" "testing" + + "github.com/mongodb/mongo-tools/common/db" + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" ) const ( @@ -71,7 +72,9 @@ func getBasicToolOptions() *options.ToolOptions { func NewMongoImport() (*MongoImport, error) { toolOptions := getBasicToolOptions() - inputOptions := &InputOptions{} + inputOptions := &InputOptions{ + ParseGrace: "stop", + } ingestOptions := &IngestOptions{} provider, err := db.NewSessionProvider(*toolOptions) if err != nil { @@ -340,6 +343,26 @@ func TestGetSourceReader(t *testing.T) { func TestGetInputReader(t *testing.T) { testutil.VerifyTestType(t, testutil.UnitTestType) Convey("Given a io.Reader on calling getInputReader", t, func() { + Convey("should parse --fields using valid csv escaping", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Fields = new(string) + *imp.InputOptions.Fields = "foo.auto(),bar.date(January 2, 2006)" + imp.InputOptions.File = "/path/to/input/file/dot/input.txt" + imp.InputOptions.ColumnsHaveTypes = true + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("should complain about non-escaped new lines in --fields", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Fields = new(string) + *imp.InputOptions.Fields = "foo.auto(),\nblah.binary(hex),bar.date(January 2, 2006)" + imp.InputOptions.File = "/path/to/input/file/dot/input.txt" + imp.InputOptions.ColumnsHaveTypes = true + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) Convey("no error should be thrown if neither --fields nor --fieldFile "+ "is used", func() { imp, err := NewMongoImport() diff --git a/mongoimport/options.go b/mongoimport/options.go index ebdba1285..b29b571e0 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -9,7 +9,7 @@ See http://docs.mongodb.org/manual/reference/program/mongoimport/ for more infor // InputOptions defines the set of options for reading input data. type InputOptions struct { // Fields is an option to directly specify comma-separated fields to import to CSV. - Fields *string `long:"fields" value-name:"[,]*" short:"f" description:"comma separated list of field names, e.g. -f name,age"` + Fields *string `long:"fields" value-name:"[,]*" short:"f" description:"comma separated list of fields, e.g. -f name,age"` // FieldFile is a filename that refers to a list of fields to import, 1 per line. FieldFile *string `long:"fieldFile" value-name:"" description:"file with field names - 1 per line"` @@ -23,8 +23,14 @@ type InputOptions struct { // Indicates that the underlying input source contains a single JSON array with the documents to import. JSONArray bool `long:"jsonArray" description:"treat input source as a JSON array"` + // Indicates how to handle type coercion failures + ParseGrace string `long:"parseGrace" value-name:"" default:"stop" description:"controls behavior when type coercion fails - one of: autoCast, skipField, skipRow, stop (defaults to 'stop')"` + // Specifies the file type to import. The default format is JSON, but it’s possible to import CSV and TSV files. Type string `long:"type" value-name:"" default:"json" default-mask:"-" description:"input format to import: json, csv, or tsv (defaults to 'json')"` + + // Indicates that field names include type descriptions + ColumnsHaveTypes bool `long:"columnsHaveTypes" description:"field list specifies types (CSV and TSV only)"` } // Name returns a description of the InputOptions struct. diff --git a/mongoimport/testdata/test_type.csv b/mongoimport/testdata/test_type.csv new file mode 100644 index 000000000..444321ee5 --- /dev/null +++ b/mongoimport/testdata/test_type.csv @@ -0,0 +1,4 @@ +zip.string(),number.double() +12345,20.2 +12345-1234,40.4 +23455,BLAH diff --git a/mongoimport/tsv.go b/mongoimport/tsv.go index 61e038ec5..f40edd94f 100644 --- a/mongoimport/tsv.go +++ b/mongoimport/tsv.go @@ -3,9 +3,10 @@ package mongoimport import ( "bufio" "fmt" - "gopkg.in/mgo.v2/bson" "io" "strings" + + "gopkg.in/mgo.v2/bson" ) const ( @@ -16,8 +17,8 @@ const ( // TSVInputReader is a struct that implements the InputReader interface for a // TSV input source. type TSVInputReader struct { - // fields is a list of field names in the BSON documents to be imported - fields []string + // colSpecs is a list of column specifications in the BSON documents to be imported + colSpecs []ColumnSpec // tsvReader is the underlying reader used to read data in from the TSV // or TSV file @@ -38,17 +39,17 @@ type TSVInputReader struct { // TSVConverter implements the Converter interface for TSV input. type TSVConverter struct { - fields []string - data string - index uint64 + colSpecs []ColumnSpec + data string + index uint64 } // NewTSVInputReader returns a TSVInputReader configured to read input from the -// given io.Reader, extracting the specified fields only. -func NewTSVInputReader(fields []string, in io.Reader, numDecoders int) *TSVInputReader { +// given io.Reader, extracting the specified columns only. +func NewTSVInputReader(colSpecs []ColumnSpec, in io.Reader, numDecoders int) *TSVInputReader { szCount := newSizeTrackingReader(newBomDiscardingReader(in)) return &TSVInputReader{ - fields: fields, + colSpecs: colSpecs, tsvReader: bufio.NewReader(szCount), numProcessed: uint64(0), numDecoders: numDecoders, @@ -64,9 +65,30 @@ func (r *TSVInputReader) ReadAndValidateHeader() (err error) { return err } for _, field := range strings.Split(header, tokenSeparator) { - r.fields = append(r.fields, strings.TrimRight(field, "\r\n")) + r.colSpecs = append(r.colSpecs, ColumnSpec{ + Name: strings.TrimRight(field, "\r\n"), + Parser: new(FieldAutoParser), + }) + } + return validateReaderFields(ColumnNames(r.colSpecs)) +} + +// ReadAndValidateTypedHeader reads the header from the underlying reader and validates +// the header fields. It sets err if the read/validation fails. +func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) { + header, err := r.tsvReader.ReadString(entryDelimiter) + if err != nil { + return err + } + var headerFields []string + for _, field := range strings.Split(header, tokenSeparator) { + headerFields = append(headerFields, strings.TrimRight(field, "\r\n")) + } + r.colSpecs, err = ParseTypedHeaders(headerFields, parseGrace) + if err != nil { + return err } - return validateReaderFields(r.fields) + return validateReaderFields(ColumnNames(r.colSpecs)) } // StreamDocument takes a boolean indicating if the documents should be streamed @@ -92,9 +114,9 @@ func (r *TSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (ret return } tsvRecordChan <- TSVConverter{ - fields: r.fields, - data: r.tsvRecord, - index: r.numProcessed, + colSpecs: r.colSpecs, + data: r.tsvRecord, + index: r.numProcessed, } r.numProcessed++ } @@ -112,7 +134,7 @@ func (r *TSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (ret // TSVConverter struct to a BSON document. func (c TSVConverter) Convert() (bson.D, error) { return tokensToBSON( - c.fields, + c.colSpecs, strings.Split(strings.TrimRight(c.data, "\r\n"), tokenSeparator), c.index, ) diff --git a/mongoimport/tsv_test.go b/mongoimport/tsv_test.go index 6f6fd8741..5debd684f 100644 --- a/mongoimport/tsv_test.go +++ b/mongoimport/tsv_test.go @@ -2,11 +2,12 @@ package mongoimport import ( "bytes" + "os" + "testing" + "github.com/mongodb/mongo-tools/common/testutil" . "github.com/smartystreets/goconvey/convey" "gopkg.in/mgo.v2/bson" - "os" - "testing" ) func TestTSVStreamDocument(t *testing.T) { @@ -14,13 +15,17 @@ func TestTSVStreamDocument(t *testing.T) { Convey("With a TSV input reader", t, func() { Convey("integer valued strings should be converted tsv1", func() { contents := "1\t2\t3e\n" - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, bson.DocElem{"c", "3e"}, } - r := NewTSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedRead) @@ -28,7 +33,11 @@ func TestTSVStreamDocument(t *testing.T) { Convey("valid TSV input file that starts with the UTF-8 BOM should "+ "not raise an error", func() { - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, @@ -36,7 +45,7 @@ func TestTSVStreamDocument(t *testing.T) { } fileHandle, err := os.Open("testdata/test_bom.tsv") So(err, ShouldBeNil) - r := NewTSVInputReader(fields, fileHandle, 1) + r := NewTSVInputReader(colSpecs, fileHandle, 1) docChan := make(chan bson.D, 2) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedRead) @@ -44,29 +53,37 @@ func TestTSVStreamDocument(t *testing.T) { Convey("integer valued strings should be converted tsv2", func() { contents := "a\tb\t\"cccc,cccc\"\td\n" - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", "a"}, bson.DocElem{"b", "b"}, bson.DocElem{"c", `"cccc,cccc"`}, bson.DocElem{"field3", "d"}, } - r := NewTSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedRead) }) - Convey("extra fields should be prefixed with 'field'", func() { + Convey("extra columns should be prefixed with 'field'", func() { contents := "1\t2\t3e\t may\n" - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, bson.DocElem{"c", "3e"}, bson.DocElem{"field3", " may"}, } - r := NewTSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedRead) @@ -74,14 +91,19 @@ func TestTSVStreamDocument(t *testing.T) { Convey("mixed values should be parsed correctly", func() { contents := "12\t13.3\tInline\t14\n" - fields := []string{"a", "b", "c", "d"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + {"d", new(FieldAutoParser), pgAutoCast}, + } expectedRead := bson.D{ bson.DocElem{"a", 12}, bson.DocElem{"b", 13.3}, bson.DocElem{"c", "Inline"}, bson.DocElem{"d", 14}, } - r := NewTSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 1) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedRead) @@ -90,7 +112,11 @@ func TestTSVStreamDocument(t *testing.T) { Convey("calling StreamDocument() in succession for TSVs should "+ "return the correct next set of values", func() { contents := "1\t2\t3\n4\t5\t6\n" - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedReads := []bson.D{ bson.D{ bson.DocElem{"a", 1}, @@ -103,7 +129,7 @@ func TestTSVStreamDocument(t *testing.T) { bson.DocElem{"c", 6}, }, } - r := NewTSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, len(expectedReads)) So(r.StreamDocument(true, docChan), ShouldBeNil) for i := 0; i < len(expectedReads); i++ { @@ -117,7 +143,11 @@ func TestTSVStreamDocument(t *testing.T) { Convey("calling StreamDocument() in succession for TSVs that contain "+ "quotes should return the correct next set of values", func() { contents := "1\t2\t3\n4\t\"\t6\n" - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedReadOne := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, @@ -128,7 +158,7 @@ func TestTSVStreamDocument(t *testing.T) { bson.DocElem{"b", `"`}, bson.DocElem{"c", 6}, } - r := NewTSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) docChan := make(chan bson.D, 2) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedReadOne) @@ -138,7 +168,11 @@ func TestTSVStreamDocument(t *testing.T) { Convey("plain TSV input file sources should be parsed correctly and "+ "subsequent imports should parse correctly", func() { - fields := []string{"a", "b", "c"} + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast}, + {"b", new(FieldAutoParser), pgAutoCast}, + {"c", new(FieldAutoParser), pgAutoCast}, + } expectedReadOne := bson.D{ bson.DocElem{"a", 1}, bson.DocElem{"b", 2}, @@ -151,7 +185,7 @@ func TestTSVStreamDocument(t *testing.T) { } fileHandle, err := os.Open("testdata/test.tsv") So(err, ShouldBeNil) - r := NewTSVInputReader(fields, fileHandle, 1) + r := NewTSVInputReader(colSpecs, fileHandle, 1) docChan := make(chan bson.D, 50) So(r.StreamDocument(true, docChan), ShouldBeNil) So(<-docChan, ShouldResemble, expectedReadOne) @@ -165,10 +199,10 @@ func TestTSVReadAndValidateHeader(t *testing.T) { Convey("With a TSV input reader", t, func() { Convey("setting the header should read the first line of the TSV", func() { contents := "extraHeader1\textraHeader2\textraHeader3\n" - fields := []string{} - r := NewTSVInputReader(fields, bytes.NewReader([]byte(contents)), 1) + colSpecs := []ColumnSpec{} + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), 1) So(r.ReadAndValidateHeader(), ShouldBeNil) - So(len(r.fields), ShouldEqual, 3) + So(len(r.colSpecs), ShouldEqual, 3) }) }) } @@ -178,9 +212,13 @@ func TestTSVConvert(t *testing.T) { Convey("With a TSV input reader", t, func() { Convey("calling convert on a TSVConverter should return the expected BSON document", func() { tsvConverter := TSVConverter{ - fields: []string{"field1", "field2", "field3"}, - data: "a\tb\tc", - index: uint64(0), + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast}, + {"field2", new(FieldAutoParser), pgAutoCast}, + {"field3", new(FieldAutoParser), pgAutoCast}, + }, + data: "a\tb\tc", + index: uint64(0), } expectedDocument := bson.D{ bson.DocElem{"field1", "a"}, diff --git a/mongoimport/typed_fields.go b/mongoimport/typed_fields.go new file mode 100644 index 000000000..717e1080d --- /dev/null +++ b/mongoimport/typed_fields.go @@ -0,0 +1,267 @@ +package mongoimport + +import ( + "encoding/base32" + "encoding/base64" + "encoding/hex" + "fmt" + "regexp" + "strconv" + "strings" + "time" + + "github.com/mongodb/mongo-tools/mongoimport/dateconv" +) + +// columnType defines different types for columns that can be parsed distinctly +type columnType int + +const ( + ctAuto columnType = iota + ctBinary + ctBoolean + ctDate + ctDateGo + ctDateMS + ctDateOracle + ctDouble + ctInt32 + ctInt64 + ctString +) + +var ( + columnTypeRE = regexp.MustCompile(`(?s)^(.*)\.(\w+)\((.*)\)$`) + columnTypeNameMap = map[string]columnType{ + "auto": ctAuto, + "binary": ctBinary, + "boolean": ctBoolean, + "date": ctDate, + "date_go": ctDateGo, + "date_ms": ctDateMS, + "date_oracle": ctDateOracle, + "double": ctDouble, + "int32": ctInt32, + "int64": ctInt64, + "string": ctString, + } +) + +type binaryEncoding int + +const ( + beBase64 binaryEncoding = iota + beBase32 + beHex +) + +var binaryEncodingNameMap = map[string]binaryEncoding{ + "base64": beBase64, + "base32": beBase32, + "hex": beHex, +} + +// ColumnSpec keeps information for each 'column' of import. +type ColumnSpec struct { + Name string + Parser FieldParser + ParseGrace ParseGrace +} + +// ColumnNames maps a ColumnSpec slice to their associated names +func ColumnNames(fs []ColumnSpec) (s []string) { + for _, f := range fs { + s = append(s, f.Name) + } + return +} + +// ParseTypedHeader produces a ColumnSpec from a header item, extracting type +// information from the it. The parseGrace is passed along to the new ColumnSpec. +func ParseTypedHeader(header string, parseGrace ParseGrace) (f ColumnSpec, err error) { + match := columnTypeRE.FindStringSubmatch(header) + if len(match) != 4 { + err = fmt.Errorf("could not parse type from header %s", header) + return + } + t, ok := columnTypeNameMap[match[2]] + if !ok { + err = fmt.Errorf("invalid type %s in header %s", match[2], header) + return + } + p, err := NewFieldParser(t, match[3]) + if err != nil { + return + } + return ColumnSpec{match[1], p, parseGrace}, nil +} + +// ParseTypedHeaders performs ParseTypedHeader on each item, returning an +// error if any single one fails. +func ParseTypedHeaders(headers []string, parseGrace ParseGrace) (fs []ColumnSpec, err error) { + fs = make([]ColumnSpec, len(headers)) + for i, f := range headers { + fs[i], err = ParseTypedHeader(f, parseGrace) + if err != nil { + return + } + } + return +} + +// ParseAutoHeaders converts a list of header items to ColumnSpec objects, with +// automatic parsers. +func ParseAutoHeaders(headers []string) (fs []ColumnSpec) { + fs = make([]ColumnSpec, len(headers)) + for i, f := range headers { + fs[i] = ColumnSpec{f, new(FieldAutoParser), pgAutoCast} + } + return +} + +// FieldParser is the interface for any parser of a field item. +type FieldParser interface { + Parse(in string) (interface{}, error) +} + +var ( + escapeReplacements = []string{ + `\\`, `\`, + `\(`, `(`, + `\)`, `)`, + } + escapeReplacer = strings.NewReplacer(escapeReplacements...) +) + +// NewFieldParser yields a FieldParser corresponding to the given columnType. +// arg is passed along to the specific type's parser, if it permits an +// argument. An error will be raised if arg is not valid for the type's +// parser. +func NewFieldParser(t columnType, arg string) (parser FieldParser, err error) { + arg = escapeReplacer.Replace(arg) + + switch t { // validate argument + case ctBinary: + case ctDate: + case ctDateGo: + case ctDateMS: + case ctDateOracle: + default: + if arg != "" { + err = fmt.Errorf("type %v does not support arguments", t) + return + } + } + + switch t { + case ctBinary: + parser, err = NewFieldBinaryParser(arg) + case ctBoolean: + parser = new(FieldBooleanParser) + case ctDate: + fallthrough + case ctDateGo: + parser = &FieldDateParser{arg} + case ctDateMS: + parser = &FieldDateParser{dateconv.FromMS(arg)} + case ctDateOracle: + parser = &FieldDateParser{dateconv.FromOracle(arg)} + case ctDouble: + parser = new(FieldDoubleParser) + case ctInt32: + parser = new(FieldInt32Parser) + case ctInt64: + parser = new(FieldInt64Parser) + case ctString: + parser = new(FieldStringParser) + default: // ctAuto + parser = new(FieldAutoParser) + } + return +} + +func autoParse(in string) interface{} { + parsedInt, err := strconv.Atoi(in) + if err == nil { + return parsedInt + } + parsedFloat, err := strconv.ParseFloat(in, 64) + if err == nil { + return parsedFloat + } + return in +} + +type FieldAutoParser struct{} + +func (ap *FieldAutoParser) Parse(in string) (interface{}, error) { + return autoParse(in), nil +} + +type FieldBinaryParser struct { + enc binaryEncoding +} + +func (bp *FieldBinaryParser) Parse(in string) (interface{}, error) { + switch bp.enc { + case beBase32: + return base32.StdEncoding.DecodeString(in) + case beBase64: + return base64.StdEncoding.DecodeString(in) + default: // beHex + return hex.DecodeString(in) + } +} + +func NewFieldBinaryParser(arg string) (*FieldBinaryParser, error) { + enc, ok := binaryEncodingNameMap[arg] + if !ok { + return nil, fmt.Errorf("invalid binary encoding: %s", arg) + } + return &FieldBinaryParser{enc}, nil +} + +type FieldBooleanParser struct{} + +func (bp *FieldBooleanParser) Parse(in string) (interface{}, error) { + if strings.ToLower(in) == "true" || in == "1" { + return true, nil + } + if strings.ToLower(in) == "false" || in == "0" { + return false, nil + } + return nil, fmt.Errorf("failed to parse boolean: %s", in) +} + +type FieldDateParser struct { + layout string +} + +func (dp *FieldDateParser) Parse(in string) (interface{}, error) { + return time.Parse(dp.layout, in) +} + +type FieldDoubleParser struct{} + +func (dp *FieldDoubleParser) Parse(in string) (interface{}, error) { + return strconv.ParseFloat(in, 64) +} + +type FieldInt32Parser struct{} + +func (ip *FieldInt32Parser) Parse(in string) (interface{}, error) { + value, err := strconv.ParseInt(in, 10, 32) + return int32(value), err +} + +type FieldInt64Parser struct{} + +func (ip *FieldInt64Parser) Parse(in string) (interface{}, error) { + return strconv.ParseInt(in, 10, 64) +} + +type FieldStringParser struct{} + +func (sp *FieldStringParser) Parse(in string) (interface{}, error) { + return in, nil +} diff --git a/mongoimport/typed_fields_test.go b/mongoimport/typed_fields_test.go new file mode 100644 index 000000000..9d164451c --- /dev/null +++ b/mongoimport/typed_fields_test.go @@ -0,0 +1,386 @@ +package mongoimport + +import ( + "testing" + "time" + + "github.com/mongodb/mongo-tools/common/log" + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" +) + +func init() { + log.SetVerbosity(&options.Verbosity{ + VLevel: 4, + }) +} + +func TestTypedHeaderParser(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Using 'zip.string(),number.double(),foo.auto()'", t, func() { + var headers = []string{"zip.string()", "number.double()", "foo.auto()", "bar.date(January 2, 2006)"} + var colSpecs []ColumnSpec + var err error + + Convey("with parse grace: auto", func() { + colSpecs, err = ParseTypedHeaders(headers, pgAutoCast) + So(colSpecs, ShouldResemble, []ColumnSpec{ + {"zip", new(FieldStringParser), pgAutoCast}, + {"number", new(FieldDoubleParser), pgAutoCast}, + {"foo", new(FieldAutoParser), pgAutoCast}, + {"bar", &FieldDateParser{"January 2, 2006"}, pgAutoCast}, + }) + So(err, ShouldBeNil) + }) + Convey("with parse grace: skipRow", func() { + colSpecs, err = ParseTypedHeaders(headers, pgSkipRow) + So(colSpecs, ShouldResemble, []ColumnSpec{ + {"zip", new(FieldStringParser), pgSkipRow}, + {"number", new(FieldDoubleParser), pgSkipRow}, + {"foo", new(FieldAutoParser), pgSkipRow}, + {"bar", &FieldDateParser{"January 2, 2006"}, pgSkipRow}, + }) + So(err, ShouldBeNil) + }) + }) + + Convey("Using various bad headers", t, func() { + var columnSpec ColumnSpec + var err error + + Convey("with non-empty arguments for types that don't want them", func() { + columnSpec, err = ParseTypedHeader("zip.string(blah)", pgAutoCast) + So(err, ShouldNotBeNil) + columnSpec, err = ParseTypedHeader("zip.string(0)", pgAutoCast) + So(err, ShouldNotBeNil) + columnSpec, err = ParseTypedHeader("zip.int32(0)", pgAutoCast) + So(err, ShouldNotBeNil) + columnSpec, err = ParseTypedHeader("zip.int64(0)", pgAutoCast) + So(err, ShouldNotBeNil) + columnSpec, err = ParseTypedHeader("zip.double(0)", pgAutoCast) + So(err, ShouldNotBeNil) + columnSpec, err = ParseTypedHeader("zip.auto(0)", pgAutoCast) + So(err, ShouldNotBeNil) + }) + Convey("with bad arguments for the binary type", func() { + columnSpec, err = ParseTypedHeader("zip.binary(blah)", pgAutoCast) + So(err, ShouldNotBeNil) + columnSpec, err = ParseTypedHeader("zip.binary(binary)", pgAutoCast) + So(err, ShouldNotBeNil) + columnSpec, err = ParseTypedHeader("zip.binary(decimal)", pgAutoCast) + So(err, ShouldNotBeNil) + }) + }) +} + +func TestAutoHeaderParser(t *testing.T) { + Convey("Using 'zip,number'", t, func() { + var headers = []string{"zip", "number", "foo"} + var colSpecs = ParseAutoHeaders(headers) + So(colSpecs, ShouldResemble, []ColumnSpec{ + {"zip", new(FieldAutoParser), pgAutoCast}, + {"number", new(FieldAutoParser), pgAutoCast}, + {"foo", new(FieldAutoParser), pgAutoCast}, + }) + }) +} + +func TestFieldParsers(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Using FieldAutoParser", t, func() { + var p, _ = NewFieldParser(ctAuto, "") + var value interface{} + var err error + + Convey("parses integers when it can", func() { + value, err = p.Parse("2147483648") + So(value.(int), ShouldEqual, 2147483648) + So(err, ShouldBeNil) + value, err = p.Parse("42") + So(value.(int), ShouldEqual, 42) + So(err, ShouldBeNil) + value, err = p.Parse("-2147483649") + So(value.(int), ShouldEqual, -2147483649) + }) + Convey("parses decimals when it can", func() { + value, err = p.Parse("3.14159265") + So(value.(float64), ShouldEqual, 3.14159265) + So(err, ShouldBeNil) + value, err = p.Parse("0.123123") + So(value.(float64), ShouldEqual, 0.123123) + So(err, ShouldBeNil) + value, err = p.Parse("-123456.789") + So(value.(float64), ShouldEqual, -123456.789) + So(err, ShouldBeNil) + value, err = p.Parse("-1.") + So(value.(float64), ShouldEqual, -1.0) + So(err, ShouldBeNil) + }) + Convey("leaves everything else as a string", func() { + value, err = p.Parse("12345-6789") + So(value.(string), ShouldEqual, "12345-6789") + So(err, ShouldBeNil) + value, err = p.Parse("06/02/1997") + So(value.(string), ShouldEqual, "06/02/1997") + So(err, ShouldBeNil) + value, err = p.Parse("") + So(value.(string), ShouldEqual, "") + So(err, ShouldBeNil) + }) + }) + + Convey("Using FieldBooleanParser", t, func() { + var p, _ = NewFieldParser(ctBoolean, "") + var value interface{} + var err error + + Convey("parses representations of true correctly", func() { + value, err = p.Parse("true") + So(value.(bool), ShouldBeTrue) + So(err, ShouldBeNil) + value, err = p.Parse("TrUe") + So(value.(bool), ShouldBeTrue) + So(err, ShouldBeNil) + value, err = p.Parse("1") + So(value.(bool), ShouldBeTrue) + So(err, ShouldBeNil) + }) + Convey("parses representations of false correctly", func() { + value, err = p.Parse("false") + So(value.(bool), ShouldBeFalse) + So(err, ShouldBeNil) + value, err = p.Parse("FaLsE") + So(value.(bool), ShouldBeFalse) + So(err, ShouldBeNil) + value, err = p.Parse("0") + So(value.(bool), ShouldBeFalse) + So(err, ShouldBeNil) + }) + Convey("does not parse other boolean representations", func() { + _, err = p.Parse("") + So(err, ShouldNotBeNil) + _, err = p.Parse("t") + So(err, ShouldNotBeNil) + _, err = p.Parse("f") + So(err, ShouldNotBeNil) + _, err = p.Parse("yes") + So(err, ShouldNotBeNil) + _, err = p.Parse("no") + So(err, ShouldNotBeNil) + }) + }) + + Convey("Using FieldBinaryParser", t, func() { + var value interface{} + var err error + + Convey("using hex encoding", func() { + var p, _ = NewFieldParser(ctBinary, "hex") + Convey("parses valid hex values correctly", func() { + value, err = p.Parse("400a11") + So(value.([]byte), ShouldResemble, []byte{64, 10, 17}) + So(err, ShouldBeNil) + value, err = p.Parse("400A11") + So(value.([]byte), ShouldResemble, []byte{64, 10, 17}) + So(err, ShouldBeNil) + value, err = p.Parse("0b400A11") + So(value.([]byte), ShouldResemble, []byte{11, 64, 10, 17}) + So(err, ShouldBeNil) + value, err = p.Parse("") + So(value.([]byte), ShouldResemble, []byte{}) + So(err, ShouldBeNil) + }) + }) + Convey("using base32 encoding", func() { + var p, _ = NewFieldParser(ctBinary, "base32") + Convey("parses valid base32 values correctly", func() { + value, err = p.Parse("") + So(value.([]uint8), ShouldResemble, []uint8{}) + So(err, ShouldBeNil) + value, err = p.Parse("MZXW6YTBOI======") + So(value.([]uint8), ShouldResemble, []uint8{102, 111, 111, 98, 97, 114}) + So(err, ShouldBeNil) + }) + }) + Convey("using base64 encoding", func() { + var p, _ = NewFieldParser(ctBinary, "base64") + Convey("parses valid base64 values correctly", func() { + value, err = p.Parse("") + So(value.([]uint8), ShouldResemble, []uint8{}) + So(err, ShouldBeNil) + value, err = p.Parse("Zm9vYmFy") + So(value.([]uint8), ShouldResemble, []uint8{102, 111, 111, 98, 97, 114}) + So(err, ShouldBeNil) + }) + }) + }) + + Convey("Using FieldDateParser", t, func() { + var value interface{} + var err error + + Convey("with Go's format", func() { + var p, _ = NewFieldParser(ctDateGo, "01/02/2006 3:04:05pm MST") + Convey("parses valid timestamps correctly", func() { + value, err = p.Parse("01/04/2000 5:38:10pm UTC") + So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC)) + So(err, ShouldBeNil) + }) + Convey("does not parse invalid dates", func() { + _, err = p.Parse("01/04/2000 5:38:10pm") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 5:38:10 pm UTC") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000") + So(err, ShouldNotBeNil) + }) + }) + Convey("with MS's format", func() { + var p, _ = NewFieldParser(ctDateMS, "MM/dd/yyyy h:mm:sstt") + Convey("parses valid timestamps correctly", func() { + value, err = p.Parse("01/04/2000 5:38:10PM") + So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC)) + So(err, ShouldBeNil) + }) + Convey("does not parse invalid dates", func() { + _, err = p.Parse("01/04/2000 :) 05:38:10PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 005:38:10PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 5:38:10 PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000") + So(err, ShouldNotBeNil) + }) + }) + Convey("with Oracle's format", func() { + var p, _ = NewFieldParser(ctDateOracle, "mm/Dd/yYYy hh:MI:SsAm") + Convey("parses valid timestamps correctly", func() { + value, err = p.Parse("01/04/2000 05:38:10PM") + So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC)) + So(err, ShouldBeNil) + }) + Convey("does not parse invalid dates", func() { + _, err = p.Parse("01/04/2000 :) 05:38:10PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 005:38:10PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 5:38:10 PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000") + So(err, ShouldNotBeNil) + }) + }) + }) + + Convey("Using FieldDoubleParser", t, func() { + var p, _ = NewFieldParser(ctDouble, "") + var value interface{} + var err error + + Convey("parses valid decimal values correctly", func() { + value, err = p.Parse("3.14159265") + So(value.(float64), ShouldEqual, 3.14159265) + So(err, ShouldBeNil) + value, err = p.Parse("0.123123") + So(value.(float64), ShouldEqual, 0.123123) + So(err, ShouldBeNil) + value, err = p.Parse("-123456.789") + So(value.(float64), ShouldEqual, -123456.789) + So(err, ShouldBeNil) + value, err = p.Parse("-1.") + So(value.(float64), ShouldEqual, -1.0) + So(err, ShouldBeNil) + }) + Convey("does not parse invalid numbers", func() { + _, err = p.Parse("") + So(err, ShouldNotBeNil) + _, err = p.Parse("1.1.1") + So(err, ShouldNotBeNil) + _, err = p.Parse("1-2.0") + So(err, ShouldNotBeNil) + _, err = p.Parse("80-") + So(err, ShouldNotBeNil) + }) + }) + + Convey("Using FieldInt32Parser", t, func() { + var p, _ = NewFieldParser(ctInt32, "") + var value interface{} + var err error + + Convey("parses valid integer values correctly", func() { + value, err = p.Parse("2147483647") + So(value.(int32), ShouldEqual, 2147483647) + So(err, ShouldBeNil) + value, err = p.Parse("42") + So(value.(int32), ShouldEqual, 42) + So(err, ShouldBeNil) + value, err = p.Parse("-2147483648") + So(value.(int32), ShouldEqual, -2147483648) + }) + Convey("does not parse invalid numbers", func() { + _, err = p.Parse("") + So(err, ShouldNotBeNil) + _, err = p.Parse("42.0") + So(err, ShouldNotBeNil) + _, err = p.Parse("1-2") + So(err, ShouldNotBeNil) + _, err = p.Parse("80-") + So(err, ShouldNotBeNil) + value, err = p.Parse("2147483648") + So(err, ShouldNotBeNil) + value, err = p.Parse("-2147483649") + So(err, ShouldNotBeNil) + }) + }) + + Convey("Using FieldInt64Parser", t, func() { + var p, _ = NewFieldParser(ctInt64, "") + var value interface{} + var err error + + Convey("parses valid integer values correctly", func() { + value, err = p.Parse("2147483648") + So(value.(int64), ShouldEqual, 2147483648) + So(err, ShouldBeNil) + value, err = p.Parse("42") + So(value.(int64), ShouldEqual, 42) + So(err, ShouldBeNil) + value, err = p.Parse("-2147483649") + So(value.(int64), ShouldEqual, -2147483649) + }) + Convey("does not parse invalid numbers", func() { + _, err = p.Parse("") + So(err, ShouldNotBeNil) + _, err = p.Parse("42.0") + So(err, ShouldNotBeNil) + _, err = p.Parse("1-2") + So(err, ShouldNotBeNil) + _, err = p.Parse("80-") + So(err, ShouldNotBeNil) + }) + }) + + Convey("Using FieldStringParser", t, func() { + var p, _ = NewFieldParser(ctString, "") + var value interface{} + var err error + + Convey("parses strings as strings only", func() { + value, err = p.Parse("42") + So(value.(string), ShouldEqual, "42") + So(err, ShouldBeNil) + value, err = p.Parse("true") + So(value.(string), ShouldEqual, "true") + So(err, ShouldBeNil) + value, err = p.Parse("") + So(value.(string), ShouldEqual, "") + So(err, ShouldBeNil) + }) + }) + +} diff --git a/test/qa-tests/jstests/import/parse_grace.js b/test/qa-tests/jstests/import/parse_grace.js new file mode 100644 index 000000000..1dc713322 --- /dev/null +++ b/test/qa-tests/jstests/import/parse_grace.js @@ -0,0 +1,114 @@ +(function() { + if (typeof getToolTest === 'undefined') { + load('jstests/configs/plain_28.config.js'); + } + var formats = ["csv", "tsv"] + var header = "a.string(),b.int32(),c.xyz.date_oracle(Month d, yyyy HH:mi:ss),c.noop.boolean(),d.hij.lkm.binary(hex)" + var expectedDocs = [{ + a: "foo", + b: 12, + c: { + xyz: ISODate("1997-06-02T15:24:00Z"), + noop: true, + }, + d: {hij: {lkm:BinData(0,"e8MEnzZoFyMmD7WSHdNrFJyEk8M=")} }, + },{ + a: "bar", + b: 24, + c: { + xyz: "06/08/2016 09:26:00", + noop: true, + }, + d: {hij: {lkm:BinData(0,"dGVzdAo=")} }, + },{ + a: "baz", + b: 36, + c: { + xyz: ISODate("2016-06-08T09:26:00Z"), + noop: false, + }, + d: {hij: {lkm:BinData(0,"dGVzdAo=")} }, + }] + //jsTest.log('Testing running import with headerline'); + + var checkCollectionContents = function(coll){ + var importedDoc = coll.findOne({a: "foo"}) + delete importedDoc["_id"] + assert.docEq(importedDoc, expectedDocs[0]) + importedDoc = coll.findOne({a: "baz"}) + delete importedDoc["_id"] + assert.docEq(importedDoc, expectedDocs[2]) + } + + var reset = function(coll){ + coll.drop() + assert.eq(coll.count(), 0) + } + + var toolTest = getToolTest("import_fields") + var db1 = toolTest.db + var commonToolArgs= getCommonToolArguments() + + var c = db1.c.getDB().getSiblingDB("testdb")["testcoll"] + + // parseGrace=fail should cause a failure + var ret = toolTest.runTool.apply(toolTest, ["import", "--file", + "jstests/import/testdata/parse_grace.csv", + "--type", "csv", + "--db", "testdb", + "--collection", "testcoll", + "--columnsHaveTypes", + "--parseGrace", "stop", + "--headerline"].concat(commonToolArgs)) + assert.neq(ret, 0) + reset(c) + + // parseGrace=skipRow should not import the row + // with an un-coercable field + var ret = toolTest.runTool.apply(toolTest, ["import", "--file", + "jstests/import/testdata/parse_grace.csv", + "--type", "csv", + "--db", "testdb", + "--collection", "testcoll", + "--columnsHaveTypes", + "--parseGrace", "skipRow", + "--headerline"].concat(commonToolArgs)) + checkCollectionContents(c) + assert.eq(c.count(), 2) + reset(c) + + // parseGrace=skipField should not import the + // an un-coercable field, but still keep the rest + // of the row + var ret = toolTest.runTool.apply(toolTest, ["import", "--file", + "jstests/import/testdata/parse_grace.csv", + "--type", "csv", + "--db", "testdb", + "--collection", "testcoll", + "--columnsHaveTypes", + "--parseGrace", "skipField", + "--headerline"].concat(commonToolArgs)) + checkCollectionContents(c) + assert.eq(c.count(), 3) + assert.neq(c.findOne({a: "bar"}), null) + assert.eq(c.findOne({a: "bar"}).c.xyz, undefined) + reset(c) + + // parseGrace=autoCast should import the un-coercable field + var ret = toolTest.runTool.apply(toolTest, ["import", "--file", + "jstests/import/testdata/parse_grace.csv", + "--type", "csv", + "--db", "testdb", + "--collection", "testcoll", + "--columnsHaveTypes", + "--parseGrace", "autoCast", + "--headerline"].concat(commonToolArgs)) + checkCollectionContents(c) + assert.eq(c.count(), 3) + var importedDoc = c.findOne({a: "bar"}) + delete importedDoc["_id"] + assert.docEq(importedDoc, expectedDocs[1]) + reset(c) + + toolTest.stop(); +}()); diff --git a/test/qa-tests/jstests/import/testdata/parse_grace.csv b/test/qa-tests/jstests/import/testdata/parse_grace.csv new file mode 100644 index 000000000..447c1bd64 --- /dev/null +++ b/test/qa-tests/jstests/import/testdata/parse_grace.csv @@ -0,0 +1,4 @@ +a.string(),b.int32(),"c.xyz.date_oracle(Month dd, yyyy HH24:mi:ss)",c.noop.boolean(),d.hij.lkm.binary(hex) +foo,12,"June 02, 1997 15:24:00",true,7bc3049f36681723260fb5921dd36b149c8493c3 +bar,24,"06/08/2016 09:26:00",true,746573740a +baz,36,"June 08, 2016 09:26:00",false,746573740a diff --git a/test/qa-tests/jstests/import/testdata/typed_extrafields.csv b/test/qa-tests/jstests/import/testdata/typed_extrafields.csv new file mode 100644 index 000000000..8d398a374 --- /dev/null +++ b/test/qa-tests/jstests/import/testdata/typed_extrafields.csv @@ -0,0 +1,3 @@ +foo,12,"June 02, 1997 15:24:00",true,7bc3049f36681723260fb5921dd36b149c8493c3 +bar,24,"June 08, 2016 09:26:00",false,746573740a +one,2,"May 08, 2016 09:26:00",false,746573740a,extra1,extra2 diff --git a/test/qa-tests/jstests/import/testdata/typed_extrafields.tsv b/test/qa-tests/jstests/import/testdata/typed_extrafields.tsv new file mode 100644 index 000000000..a4ca42f45 --- /dev/null +++ b/test/qa-tests/jstests/import/testdata/typed_extrafields.tsv @@ -0,0 +1,3 @@ +foo 12 June 02, 1997 15:24:00 true 7bc3049f36681723260fb5921dd36b149c8493c3 +bar 24 June 08, 2016 09:26:00 false 746573740a +one 2 May 08, 2016 09:26:00 false 746573740a extra1 extra2 diff --git a/test/qa-tests/jstests/import/testdata/typed_header.csv b/test/qa-tests/jstests/import/testdata/typed_header.csv new file mode 100644 index 000000000..1140f31b2 --- /dev/null +++ b/test/qa-tests/jstests/import/testdata/typed_header.csv @@ -0,0 +1,3 @@ +a.string(),b.int32(),"c.xyz.date_oracle(Month dd, yyyy HH24:mi:ss)",c.noop.boolean(),d.hij.lkm.binary(hex) +foo,12,"June 02, 1997 15:24:00",true,7bc3049f36681723260fb5921dd36b149c8493c3 +bar,24,"June 08, 2016 09:26:00",false,746573740a diff --git a/test/qa-tests/jstests/import/testdata/typed_header.tsv b/test/qa-tests/jstests/import/testdata/typed_header.tsv new file mode 100644 index 000000000..a80b16848 --- /dev/null +++ b/test/qa-tests/jstests/import/testdata/typed_header.tsv @@ -0,0 +1,3 @@ +a.string() b.int32() c.xyz.date_oracle(Month dd, yyyy HH24:mi:ss) c.noop.boolean() d.hij.lkm.binary(hex) +foo 12 June 02, 1997 15:24:00 true 7bc3049f36681723260fb5921dd36b149c8493c3 +bar 24 June 08, 2016 09:26:00 false 746573740a diff --git a/test/qa-tests/jstests/import/testdata/typed_noheader.csv b/test/qa-tests/jstests/import/testdata/typed_noheader.csv new file mode 100644 index 000000000..50eeda2d8 --- /dev/null +++ b/test/qa-tests/jstests/import/testdata/typed_noheader.csv @@ -0,0 +1,2 @@ +foo,12,"June 02, 1997 15:24:00",true,7bc3049f36681723260fb5921dd36b149c8493c3 +bar,24,"June 08, 2016 09:26:00",false,746573740a diff --git a/test/qa-tests/jstests/import/testdata/typed_noheader.tsv b/test/qa-tests/jstests/import/testdata/typed_noheader.tsv new file mode 100644 index 000000000..a4eb1896c --- /dev/null +++ b/test/qa-tests/jstests/import/testdata/typed_noheader.tsv @@ -0,0 +1,2 @@ +foo 12 June 02, 1997 15:24:00 true 7bc3049f36681723260fb5921dd36b149c8493c3 +bar 24 June 08, 2016 09:26:00 false 746573740a diff --git a/test/qa-tests/jstests/import/testdata/typedfieldfile b/test/qa-tests/jstests/import/testdata/typedfieldfile new file mode 100644 index 000000000..006816600 --- /dev/null +++ b/test/qa-tests/jstests/import/testdata/typedfieldfile @@ -0,0 +1,5 @@ +a.string() +b.int32() +c.xyz.date_oracle(Month dd, yyyy HH24:mi:ss) +c.noop.boolean() +d.hij.lkm.binary(hex) diff --git a/test/qa-tests/jstests/import/typed_fields.js b/test/qa-tests/jstests/import/typed_fields.js new file mode 100644 index 000000000..195a0c875 --- /dev/null +++ b/test/qa-tests/jstests/import/typed_fields.js @@ -0,0 +1,110 @@ +(function() { + if (typeof getToolTest === 'undefined') { + load('jstests/configs/plain_28.config.js'); + } + var formats = ["csv", "tsv"] + var header = "a.string(),b.int32(),c.xyz.date_oracle(Month dd, yyyy HH24:mi:ss),c.noop.boolean(),d.hij.lkm.binary(hex)" + var expectedDocs = [{ + a: "foo", + b: 12, + c: { + xyz: ISODate("1997-06-02T15:24:00Z"), + noop: true, + }, + d: { hij: { lkm: BinData(0,"e8MEnzZoFyMmD7WSHdNrFJyEk8M=") } } + },{ + a: "bar", + b: 24, + c: { + xyz: ISODate("2016-06-08T09:26:00Z"), + noop: false, + }, + d: { hij: { lkm:BinData(0,"dGVzdAo=") } }, + }] + jsTest.log('Testing typed fields in CSV/TSV'); + + var checkCollectionContents = function(coll){ + var importedDoc = coll.findOne({a: "foo"}) + delete importedDoc["_id"] + assert.docEq(importedDoc, expectedDocs[0]) + importedDoc = coll.findOne({a: "bar"}) + delete importedDoc["_id"] + assert.docEq(importedDoc, expectedDocs[1]) + assert.eq(coll.count(), 2) + } + + var reset = function(coll){ + coll.drop() + assert.eq(coll.count(), 0) + } + + var toolTest = getToolTest("import_fields") + var db1 = toolTest.db + var commonToolArgs= getCommonToolArguments() + for(var i=0;i