Skip to content

Commit

Permalink
Merge pull request #9785 from influxdata/rename-bad-tsm-file
Browse files Browse the repository at this point in the history
Rename & log corrupt tsm files on load
  • Loading branch information
benbjohnson committed Apr 30, 2018
2 parents 63b9c98 + 108fa09 commit f459d87
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
3 changes: 1 addition & 2 deletions cmd/influx_inspect/deletetsm/deletetsm.go
Expand Up @@ -148,6 +148,5 @@ Usage: influx_inspect deletetsm [flags] path...
-measurement NAME
The name of the measurement to remove.
-v
Enable verbose logging.
`)
Enable verbose logging.`)
}
17 changes: 14 additions & 3 deletions tsdb/engine/tsm1/file_store.go
Expand Up @@ -26,6 +26,9 @@ import (
const (
// The extension used to describe temporary snapshot files.
TmpTSMFileExtension = "tmp"

// The extension used to describe corrupt snapshot files.
BadTSMFileExtension = "bad"
)

// TSMFile represents an on-disk TSM file.
Expand Down Expand Up @@ -488,9 +491,15 @@ func (f *FileStore) Open() error {
zap.Int("id", idx),
zap.Duration("duration", time.Since(start)))

// If we are unable to read a TSM file then log the error, rename
// the file, and continue loading the shard without it.
if err != nil {
readerC <- &res{r: df, err: fmt.Errorf("error opening memory map for file %s: %v", file.Name(), err)}
return
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
return
}
}
readerC <- &res{r: df}
}(i, file)
Expand All @@ -500,10 +509,12 @@ func (f *FileStore) Open() error {
for range files {
res := <-readerC
if res.err != nil {

return res.err
} else if res.r == nil {
continue
}
f.files = append(f.files, res.r)

// Accumulate file store size stats
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
for _, ts := range res.r.TombstoneFiles() {
Expand Down

0 comments on commit f459d87

Please sign in to comment.