forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
engine.go
129 lines (105 loc) · 3.48 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package tsdb
import (
"errors"
"fmt"
"io"
"os"
"sort"
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
)
var (
// ErrFormatNotFound is returned when no format can be determined from a path.
ErrFormatNotFound = errors.New("format not found")
// ErrUnknownEngineFormat is returned when the engine format is
// unknown. ErrUnknownEngineFormat is currently returned if a format
// other than tsm1 is encountered.
ErrUnknownEngineFormat = errors.New("unknown engine format")
)
// Engine represents a swappable storage engine for the shard.
type Engine interface {
Open() error
Close() error
SetLogOutput(io.Writer)
LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error
Backup(w io.Writer, basePath string, since time.Time) error
Restore(r io.Reader, basePath string) error
CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
WritePoints(points []models.Point) error
ContainsSeries(keys []string) (map[string]bool, error)
DeleteSeries(keys []string) error
DeleteSeriesRange(keys []string, min, max int64) error
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)
MeasurementFields(measurement string) *MeasurementFields
CreateSnapshot() (string, error)
SetEnabled(enabled bool)
// Format will return the format for the engine
Format() EngineFormat
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
io.WriterTo
}
// EngineFormat represents the format for an engine.
type EngineFormat int
const (
// TSM1Format is the format used by the tsm1 engine.
TSM1Format EngineFormat = 2
)
// NewEngineFunc creates a new engine.
type NewEngineFunc func(id uint64, path string, walPath string, options EngineOptions) Engine
// newEngineFuncs is a lookup of engine constructors by name.
var newEngineFuncs = make(map[string]NewEngineFunc)
// RegisterEngine registers a storage engine initializer by name.
func RegisterEngine(name string, fn NewEngineFunc) {
if _, ok := newEngineFuncs[name]; ok {
panic("engine already registered: " + name)
}
newEngineFuncs[name] = fn
}
// RegisteredEngines returns the slice of currently registered engines.
func RegisteredEngines() []string {
a := make([]string, 0, len(newEngineFuncs))
for k := range newEngineFuncs {
a = append(a, k)
}
sort.Strings(a)
return a
}
// NewEngine returns an instance of an engine based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewEngine(id uint64, path string, walPath string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[options.EngineVersion](id, path, walPath, options), nil
}
// If it's a dir then it's a tsm1 engine
format := DefaultEngine
if fi, err := os.Stat(path); err != nil {
return nil, err
} else if !fi.Mode().IsDir() {
return nil, ErrUnknownEngineFormat
} else {
format = "tsm1"
}
// Lookup engine by format.
fn := newEngineFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid engine format: %q", format)
}
return fn(id, path, walPath, options), nil
}
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
ShardID uint64
Config Config
}
// NewEngineOptions returns the default options.
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
Config: NewConfig(),
}
}