Skip to content

Commit

Permalink
Importer fixes. fixes #4650 #4651
Browse files Browse the repository at this point in the history
  • Loading branch information
corylanou committed Nov 3, 2015
1 parent ae8b458 commit ac563ad
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
- [#4596](https://github.com/influxdb/influxdb/pull/4596): Skip empty string for start position when parsing line protocol @Thanks @ch33hau
- [#4610](https://github.com/influxdb/influxdb/pull/4610): Make internal stats names consistent with Go style.
- [#4625](https://github.com/influxdb/influxdb/pull/4625): Correctly handle bad write requests. Thanks @oiooj.
- [#4650](https://github.com/influxdb/influxdb/issues/4650): Importer should skip empty lines
- [#4651](https://github.com/influxdb/influxdb/issues/4651): Importer doesn't flush out last batch

## v0.9.4 [2015-09-14]

Expand Down
34 changes: 23 additions & 11 deletions importer/v8/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func (i *Importer) processDDL(scanner *bufio.Scanner) {
if strings.HasPrefix(line, "#") {
continue
}
// Skip blank lines
if strings.TrimSpace(line) == "" {
continue
}
i.queryExecutor(line)
}
}
Expand All @@ -162,8 +166,14 @@ func (i *Importer) processDML(scanner *bufio.Scanner) {
if strings.HasPrefix(line, "#") {
continue
}
// Skip blank lines
if strings.TrimSpace(line) == "" {
continue
}
i.batchAccumulator(line, start)
}
// Call batchWrite one last time to flush anything out in the batch
i.batchWrite()
}

func (i *Importer) execute(command string) {
Expand All @@ -185,14 +195,7 @@ func (i *Importer) queryExecutor(command string) {
func (i *Importer) batchAccumulator(line string, start time.Time) {
i.batch = append(i.batch, line)
if len(i.batch) == batchSize {
if e := i.batchWrite(); e != nil {
log.Println("error writing batch: ", e)
// Output failed lines to STDOUT so users can capture lines that failed to import
fmt.Println(strings.Join(i.batch, "\n"))
i.failedInserts += len(i.batch)
} else {
i.totalInserts += len(i.batch)
}
i.batchWrite()
i.batch = i.batch[:0]
// Give some status feedback every 100000 lines processed
processed := i.totalInserts + i.failedInserts
Expand All @@ -204,7 +207,7 @@ func (i *Importer) batchAccumulator(line string, start time.Time) {
}
}

func (i *Importer) batchWrite() error {
func (i *Importer) batchWrite() {
// Accumulate the batch size to see how many points we have written this second
i.throttlePointsWritten += len(i.batch)

Expand All @@ -226,11 +229,20 @@ func (i *Importer) batchWrite() error {

// Decrement the batch size back out as it is going to get called again
i.throttlePointsWritten -= len(i.batch)
return i.batchWrite()
i.batchWrite()
return
}

_, e := i.client.WriteLineProtocol(strings.Join(i.batch, "\n"), i.database, i.retentionPolicy, i.config.Precision, i.config.WriteConsistency)
if e != nil {
log.Println("error writing batch: ", e)
// Output failed lines to STDOUT so users can capture lines that failed to import
fmt.Println(strings.Join(i.batch, "\n"))
i.failedInserts += len(i.batch)
} else {
i.totalInserts += len(i.batch)
}
i.throttlePointsWritten = 0
i.lastWrite = time.Now()
return e
return
}

0 comments on commit ac563ad

Please sign in to comment.