From 9c12fb7ed83659b99ad9a1fa1e51360c886d3ea7 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 10 Jul 2014 17:59:45 -0400 Subject: [PATCH] Move time series metadata to Raft Fixes #414. All series metadata, that is which series exist, what columns they have, and what the ids of those columns has been moved into Raft. This means that list series queries are now very fast since they operate on an in memory data structure. It also means that writes no longer have to do gets agains the storage engine to resolve what a column's id is. Fixes #358. List series now returns as a single series with a `name` column. This fix has the potential to slow things down in scenarios that people suddenly write many thousands of new series in less than a second. That's because the definition of new series now needs to go through the raft leader and get consensus. I think it's unlikely that it'll be a problem, but we'll need to do extensive testing. Finally, there is the issue that the Raft snapshots can potentially become big. We should do testing with setups that have millions of series to ensure that performance and startup time remain good. --- Makefile.in | 4 + src/api/http/api_test.go | 2 +- src/cluster/cluster_configuration.go | 34 +- src/cluster/shard.go | 52 +-- src/coordinator/command.go | 54 +++ src/coordinator/coordinator.go | 112 +++-- src/coordinator/coordinator_test.go | 5 +- src/coordinator/interface.go | 1 + src/coordinator/protobuf_request_handler.go | 9 - src/coordinator/raft_server.go | 78 +++- src/datastore/shard.go | 427 ++++++++------------ src/datastore/shard_datastore.go | 12 +- src/datastore/shard_datastore_test.go | 2 +- src/datastore/storage/leveldb.go | 9 + src/integration/data_test.go | 31 +- src/integration/multiple_servers_test.go | 37 +- src/integration/single_server_test.go | 61 ++- src/integration/test_config_single.toml | 2 +- src/integration/test_missing_points.go | 6 +- src/metastore/store.go | 227 +++++++++++ src/protocol/protocol.proto | 1 + src/server/server.go | 9 +- 22 files changed, 745 insertions(+), 430 deletions(-) create mode 100644 src/metastore/store.go diff --git a/Makefile.in b/Makefile.in index ad3f5538e30..e189cd0f6cd 100644 --- a/Makefile.in +++ b/Makefile.in @@ -134,6 +134,10 @@ endif ifneq ($(GO_BUILD_TAGS),) GO_BUILD_OPTIONS += -tags '$(GO_BUILD_TAGS)' endif +race = off +ifneq ($(race),off) + GO_BUILD_OPTIONS += -race +endif ifneq ($(uname_S),Linux) PYTHONPATH ?= /usr/local/lib/python2.7/site-packages/ diff --git a/src/api/http/api_test.go b/src/api/http/api_test.go index 8d57d945d69..fddafc9e0c5 100644 --- a/src/api/http/api_test.go +++ b/src/api/http/api_test.go @@ -189,7 +189,7 @@ func (self *ApiSuite) SetUpSuite(c *C) { dir, self.coordinator, self.manager, - cluster.NewClusterConfiguration(&configuration.Configuration{}, nil, nil, nil), + cluster.NewClusterConfiguration(&configuration.Configuration{}, nil, nil, nil, nil), nil) var err error self.listener, err = net.Listen("tcp4", ":8081") diff --git a/src/cluster/cluster_configuration.go b/src/cluster/cluster_configuration.go index df604b07b09..3026fb61c75 100644 --- a/src/cluster/cluster_configuration.go +++ b/src/cluster/cluster_configuration.go @@ -11,6 +11,7 @@ import ( "fmt" "math" "math/rand" + "metastore" "parser" "protocol" "sort" @@ -85,6 +86,7 @@ type ClusterConfiguration struct { shardsByIdLock sync.RWMutex LocalRaftName string writeBuffers []*WriteBuffer + MetaStore *metastore.Store } type ContinuousQuery struct { @@ -100,7 +102,8 @@ func NewClusterConfiguration( config *configuration.Configuration, wal WAL, shardStore LocalShardStore, - connectionCreator func(string) ServerConnection) *ClusterConfiguration { + connectionCreator func(string) ServerConnection, + metaStore *metastore.Store) *ClusterConfiguration { return &ClusterConfiguration{ DatabaseReplicationFactors: make(map[string]struct{}), clusterAdmins: make(map[string]*ClusterAdmin), @@ -117,6 +120,7 @@ func NewClusterConfiguration( shortTermShards: make([]*ShardData, 0), random: rand.New(rand.NewSource(time.Now().UnixNano())), shardsById: make(map[uint32]*ShardData, 0), + MetaStore: metaStore, } } @@ -329,6 +333,17 @@ func (self *ClusterConfiguration) DropDatabase(name string) error { defer self.usersLock.Unlock() delete(self.dbUsers, name) + + fields, err := self.MetaStore.DropDatabase(name) + if err != nil { + return err + } + go func() { + shards := self.GetAllShards() + for _, s := range shards { + s.DropFields(fields) + } + }() return nil } @@ -514,6 +529,7 @@ type SavedConfiguration struct { ShortTermShards []*NewShardData LongTermShards []*NewShardData ContinuousQueries map[string][]*ContinuousQuery + MetaStore *metastore.Store LastShardIdUsed uint32 } @@ -528,6 +544,7 @@ func (self *ClusterConfiguration) Save() ([]byte, error) { ShortTermShards: self.convertShardsToNewShardData(self.shortTermShards), LongTermShards: self.convertShardsToNewShardData(self.longTermShards), LastShardIdUsed: self.lastShardIdUsed, + MetaStore: self.MetaStore, } for k := range self.DatabaseReplicationFactors { @@ -590,6 +607,7 @@ func (self *ClusterConfiguration) Recovery(b []byte) error { self.clusterAdmins = data.Admins self.dbUsers = data.DbUsers self.servers = data.Servers + self.MetaStore.UpdateFromSnapshot(data.MetaStore) for _, server := range self.servers { log.Info("Checking whether %s is the local server %s", server.RaftName, self.LocalRaftName) @@ -1049,6 +1067,20 @@ func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) return nil } +func (self *ClusterConfiguration) DropSeries(database, series string) error { + fields, err := self.MetaStore.DropSeries(database, series) + if err != nil { + return err + } + go func() { + shards := self.GetAllShards() + for _, s := range shards { + s.DropFields(fields) + } + }() + return nil +} + func (self *ClusterConfiguration) RecoverFromWAL() error { writeBuffer := NewWriteBuffer("local", self.shardStore, self.wal, self.LocalServer.Id, self.config.LocalStoreWriteBufferSize) self.writeBuffers = append(self.writeBuffers, writeBuffer) diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 59a29ca06d0..da31240431d 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -1,14 +1,16 @@ package cluster import ( - "common" - "engine" "fmt" - "parser" - p "protocol" "sort" "strings" "time" + + "common" + "engine" + "metastore" + "parser" + p "protocol" "wal" log "code.google.com/p/log4go" @@ -111,7 +113,7 @@ var ( type LocalShardDb interface { Write(database string, series []*p.Series) error Query(*parser.QuerySpec, QueryProcessor) error - DropDatabase(database string) error + DropFields(fields []*metastore.Field) error IsClosed() bool } @@ -179,6 +181,17 @@ func (self *ShardData) ServerIds() []uint32 { return self.serverIds } +func (self *ShardData) DropFields(fields []*metastore.Field) error { + if !self.IsLocal { + return nil + } + shard, err := self.store.GetOrCreateShard(self.id) + if err != nil { + return err + } + return shard.DropFields(fields) +} + func (self *ShardData) SyncWrite(request *p.Request) error { request.ShardId = &self.id for _, server := range self.clusterServers { @@ -283,10 +296,12 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo } defer self.store.ReturnShard(self.id) err = shard.Query(querySpec, processor) - processor.Close() + // if we call Close() in case of an error it will mask the error if err != nil { response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())} + return } + processor.Close() response <- &p.Response{Type: &endStreamResponse} return } @@ -320,31 +335,6 @@ func (self *ShardData) randomHealthyServer() *ClusterServer { return nil } -func (self *ShardData) DropDatabase(database string, sendToServers bool) { - if self.IsLocal { - if shard, err := self.store.GetOrCreateShard(self.id); err == nil { - defer self.store.ReturnShard(self.id) - shard.DropDatabase(database) - } - } - - if !sendToServers { - return - } - - responses := make([]chan *p.Response, len(self.clusterServers), len(self.clusterServers)) - for i, server := range self.clusterServers { - responseChan := make(chan *p.Response, 1) - responses[i] = responseChan - request := &p.Request{Type: &dropDatabaseRequest, Database: &database, ShardId: &self.id} - go server.MakeRequest(request, responseChan) - } - for _, responseChan := range responses { - // TODO: handle error responses - <-responseChan - } -} - func (self *ShardData) String() string { serversString := make([]string, 0) for _, s := range self.servers { diff --git a/src/coordinator/command.go b/src/coordinator/command.go index 1bbc27da97e..ca7629ee54a 100644 --- a/src/coordinator/command.go +++ b/src/coordinator/command.go @@ -9,6 +9,8 @@ import ( log "code.google.com/p/log4go" "github.com/goraft/raft" + + "protocol" ) var internalRaftCommands map[string]raft.Command @@ -30,6 +32,8 @@ func init() { &SetContinuousQueryTimestampCommand{}, &CreateShardsCommand{}, &DropShardCommand{}, + &CreateSeriesFieldIdsCommand{}, + &DropSeriesCommand{}, } { internalRaftCommands[command.CommandName()] = command } @@ -378,3 +382,53 @@ func (c *DropShardCommand) Apply(server raft.Server) (interface{}, error) { err := config.DropShard(c.ShardId, c.ServerIds) return nil, err } + +type CreateSeriesFieldIdsCommand struct { + Database string + Series []*protocol.Series +} + +func NewCreateSeriesFieldIdsCommand(database string, series []*protocol.Series) *CreateSeriesFieldIdsCommand { + return &CreateSeriesFieldIdsCommand{Database: database, Series: series} +} + +func (c *CreateSeriesFieldIdsCommand) CommandName() string { + return "create_series_field_ids" +} + +// TODO: Encode/Decode are not needed once this pr +// https://github.com/goraft/raft/pull/221 is merged in and our goraft +// is updated to a commit that includes the pr + +func (c *CreateSeriesFieldIdsCommand) Encode(w io.Writer) error { + return json.NewEncoder(w).Encode(c) +} + +func (c *CreateSeriesFieldIdsCommand) Decode(r io.Reader) error { + return json.NewDecoder(r).Decode(c) +} + +func (c *CreateSeriesFieldIdsCommand) Apply(server raft.Server) (interface{}, error) { + config := server.Context().(*cluster.ClusterConfiguration) + err := config.MetaStore.GetOrSetFieldIds(c.Database, c.Series) + return c.Series, err +} + +type DropSeriesCommand struct { + Database string + Series string +} + +func NewDropSeriesCommand(database, series string) *DropSeriesCommand { + return &DropSeriesCommand{Database: database, Series: series} +} + +func (c *DropSeriesCommand) CommandName() string { + return "drop_series" +} + +func (c *DropSeriesCommand) Apply(server raft.Server) (interface{}, error) { + config := server.Context().(*cluster.ClusterConfiguration) + err := config.DropSeries(c.Database, c.Series) + return nil, err +} diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 61f8e4fb7f6..46450526c7c 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -11,9 +11,9 @@ import ( "protocol" "regexp" "strings" - "sync" "time" + "code.google.com/p/goprotobuf/proto" log "code.google.com/p/log4go" ) @@ -21,9 +21,14 @@ type CoordinatorImpl struct { clusterConfiguration *cluster.ClusterConfiguration raftServer ClusterConsensus config *configuration.Configuration + metastore Metastore permissions Permissions } +type Metastore interface { + ReplaceFieldNamesWithFieldIds(database string, series []*protocol.Series) error +} + const ( // this is the key used for the persistent atomic ints for sequence numbers POINT_SEQUENCE_NUMBER_KEY = "p" @@ -56,11 +61,16 @@ type SeriesWriter interface { Close() } -func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterConsensus, clusterConfiguration *cluster.ClusterConfiguration) *CoordinatorImpl { +func NewCoordinatorImpl( + config *configuration.Configuration, + raftServer ClusterConsensus, + clusterConfiguration *cluster.ClusterConfiguration, + metastore Metastore) *CoordinatorImpl { coordinator := &CoordinatorImpl{ config: config, clusterConfiguration: clusterConfiguration, raftServer: raftServer, + metastore: metastore, permissions: Permissions{}, } @@ -160,43 +170,20 @@ func (self *CoordinatorImpl) runQuery(querySpec *parser.QuerySpec, seriesWriter } func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { - shortTermShards := self.clusterConfiguration.GetShortTermShards() - if len(shortTermShards) > SHARDS_TO_QUERY_FOR_LIST_SERIES { - shortTermShards = shortTermShards[:SHARDS_TO_QUERY_FOR_LIST_SERIES] - } - longTermShards := self.clusterConfiguration.GetLongTermShards() - if len(longTermShards) > SHARDS_TO_QUERY_FOR_LIST_SERIES { - longTermShards = longTermShards[:SHARDS_TO_QUERY_FOR_LIST_SERIES] - } - seriesYielded := make(map[string]bool) - - var shards []*cluster.ShardData - shards = append(shards, shortTermShards...) - shards = append(shards, longTermShards...) + series := self.clusterConfiguration.MetaStore.GetSeriesForDatabase(querySpec.Database()) + name := "list_series_result" + fields := []string{"name"} + points := make([]*protocol.Point, len(series), len(series)) - var err error - for _, shard := range shards { - responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.StoragePointBatchSize)) - go shard.Query(querySpec, responseChan) - for { - response := <-responseChan - if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { - if response.ErrorMessage != nil && err != nil { - log.Debug("Error when querying shard: %s", err) - err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) - } - break - } - for _, series := range response.MultiSeries { - if !seriesYielded[*series.Name] { - seriesYielded[*series.Name] = true - seriesWriter.Write(series) - } - } - } + for i, s := range series { + fieldValues := []*protocol.FieldValue{{StringValue: proto.String(s)}} + points[i] = &protocol.Point{Values: fieldValues} } + + seriesResult := &protocol.Series{Name: &name, Fields: fields, Points: points} + seriesWriter.Write(seriesResult) seriesWriter.Close() - return err + return nil } func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { @@ -216,8 +203,14 @@ func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, ser if ok, err := self.permissions.AuthorizeDropSeries(user, db, series); !ok { return err } - querySpec.RunAgainstAllServersInShard = true - return self.runQuerySpec(querySpec, seriesWriter) + defer seriesWriter.Close() + fmt.Println("DROP series") + err := self.raftServer.DropSeries(db, series) + if err != nil { + return err + } + fmt.Println("DROP returning nil") + return nil } func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool { @@ -334,11 +327,11 @@ func (self *CoordinatorImpl) readFromResponseChannels(processor cluster.QueryPro writer SeriesWriter, isExplainQuery bool, errors chan<- error, - channels <-chan (<-chan *protocol.Response)) { + responseChannels <-chan (<-chan *protocol.Response)) { defer close(errors) - for responseChan := range channels { + for responseChan := range responseChannels { for response := range responseChan { //log.Debug("GOT RESPONSE: ", response.Type, response.Series) @@ -365,7 +358,7 @@ func (self *CoordinatorImpl) readFromResponseChannels(processor cluster.QueryPro if processor != nil { // if the data wasn't aggregated at the shard level, aggregate // the data here - log.Debug("YIELDING: %d points with %d columns", len(response.Series.Points), len(response.Series.Fields)) + log.Debug("YIELDING: %d points with %d columns for %s", len(response.Series.Points), len(response.Series.Fields), response.Series.GetName()) processor.YieldSeries(response.Series) continue } @@ -701,26 +694,36 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Se } func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard cluster.Shard, sync bool) error { + // replace all the field names, or error out if we can't assign the field ids. + err := self.metastore.ReplaceFieldNamesWithFieldIds(db, series) + if err != nil { + return err + } + + return self.writeWithoutAssigningId(db, series, shard, sync) +} + +func (self *CoordinatorImpl) writeWithoutAssigningId(db string, series []*protocol.Series, shard cluster.Shard, sync bool) error { request := &protocol.Request{Type: &write, Database: &db, MultiSeries: series} // break the request if it's too big if request.Size() >= MAX_REQUEST_SIZE { if l := len(series); l > 1 { // create two requests with half the serie - if err := self.write(db, series[:l/2], shard, sync); err != nil { + if err := self.writeWithoutAssigningId(db, series[:l/2], shard, sync); err != nil { return err } - return self.write(db, series[l/2:], shard, sync) + return self.writeWithoutAssigningId(db, series[l/2:], shard, sync) } // otherwise, split the points of the only series s := series[0] l := len(s.Points) - s1 := &protocol.Series{Name: s.Name, Fields: s.Fields, Points: s.Points[:l/2]} - if err := self.write(db, []*protocol.Series{s1}, shard, sync); err != nil { + s1 := &protocol.Series{Name: s.Name, FieldIds: s.FieldIds, Points: s.Points[:l/2]} + if err := self.writeWithoutAssigningId(db, []*protocol.Series{s1}, shard, sync); err != nil { return err } - s2 := &protocol.Series{Name: s.Name, Fields: s.Fields, Points: s.Points[l/2:]} - return self.write(db, []*protocol.Series{s2}, shard, sync) + s2 := &protocol.Series{Name: s.Name, FieldIds: s.FieldIds, Points: s.Points[l/2:]} + return self.writeWithoutAssigningId(db, []*protocol.Series{s2}, shard, sync) } if sync { return shard.SyncWrite(request) @@ -813,20 +816,7 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error { return err } - if err := self.raftServer.DropDatabase(db); err != nil { - return err - } - - var wait sync.WaitGroup - for _, shard := range self.clusterConfiguration.GetAllShards() { - wait.Add(1) - go func(shard *cluster.ShardData) { - shard.DropDatabase(db, true) - wait.Done() - }(shard) - } - wait.Wait() - return nil + return self.raftServer.DropDatabase(db) } func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) { diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index 865739372ca..6ecd3b4fc04 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -4,9 +4,10 @@ import ( "cluster" "configuration" "fmt" - . "launchpad.net/gocheck" "parser" "time" + + . "launchpad.net/gocheck" ) type CoordinatorSuite struct{} @@ -20,7 +21,7 @@ func (self *CoordinatorSuite) TestShouldQuerySequentially(c *C) { shards := []*cluster.ShardData{shard} coordinator := NewCoordinatorImpl(&configuration.Configuration{ ClusterMaxResponseBufferSize: 1000, - }, nil, nil) + }, nil, nil, nil) queries := map[string]bool{ "list series": false, "select count(foo) from /.*bar.*/ group by time(1d)": true, diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index 0ca02199804..0e4e7a0404e 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -32,6 +32,7 @@ type Coordinator interface { type ClusterConsensus interface { CreateDatabase(name string) error DropDatabase(name string) error + DropSeries(database, series string) error CreateContinuousQuery(db string, query string) error DeleteContinuousQuery(db string, id uint32) error SaveClusterAdminUser(u *cluster.ClusterAdmin) error diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index 13fffbec0a2..67eb6bc8029 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -33,8 +33,6 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con switch *request.Type { case protocol.Request_WRITE: go self.handleWrites(request, conn) - case protocol.Request_DROP_DATABASE: - go self.handleDropDatabase(request, conn) case protocol.Request_QUERY: go self.handleQuery(request, conn) case protocol.Request_HEARTBEAT: @@ -107,13 +105,6 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn } } -func (self *ProtobufRequestHandler) handleDropDatabase(request *protocol.Request, conn net.Conn) { - shard := self.clusterConfig.GetLocalShardById(*request.ShardId) - shard.DropDatabase(*request.Database, false) - response := &protocol.Response{Type: &endStreamResponse, RequestId: request.Id} - self.WriteResponse(conn, response) -} - func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error { if response.Size() >= MAX_RESPONSE_SIZE { l := len(response.Series.Points) diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index d8ad5abd998..728f8f0d267 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -17,6 +17,7 @@ import ( "parser" "path/filepath" "protocol" + "reflect" "strings" "sync" "time" @@ -173,9 +174,7 @@ func SendCommandToServer(url string, command raft.Command) (interface{}, error) return nil, errors.New(strings.TrimSpace(string(body))) } - var js interface{} - json.Unmarshal(body, &js) - return js, err2 + return body, err2 } @@ -188,6 +187,14 @@ func (s *RaftServer) CreateDatabase(name string) error { func (s *RaftServer) DropDatabase(name string) error { command := NewDropDatabaseCommand(name) _, err := s.doOrProxyCommand(command) + // TODO: Dropping database from the metastore is synchronous, but the underlying data + // delete is asynchronous. If the server crashes or restarts while this is happening + // there will be orphaned data sitting around. Not a huge deal, but we should fix this + // at some point. + // force a log compaction because we don't want this replaying after a server restarts + if err == nil { + err = s.ForceLogCompaction() + } return err } @@ -682,9 +689,14 @@ func (s *RaftServer) processCommandHandler(w http.ResponseWriter, req *http.Requ vars := mux.Vars(req) value := vars["command_type"] command := internalRaftCommands[value] + v := reflect.New(reflect.Indirect(reflect.ValueOf(command)).Type()).Interface() + copy, ok := v.(raft.Command) + if !ok { + panic(fmt.Sprintf("raft: Unable to copy command: %s (%v)", command.CommandName(), reflect.ValueOf(v).Kind().String())) + } - if result, err := s.marshalAndDoCommandFromBody(command, req); err != nil { - log.Error("command %T failed: %s", command, err) + if result, err := s.marshalAndDoCommandFromBody(copy, req); err != nil { + log.Error("command %T failed: %s", copy, err) http.Error(w, err.Error(), http.StatusInternalServerError) } else { if result != nil { @@ -702,17 +714,20 @@ func (self *RaftServer) CreateShards(shards []*cluster.NewShardData) ([]*cluster log.Error("RAFT: CreateShards: ", err) return nil, err } - js, err := json.Marshal(createShardsResult) - if err != nil { - return nil, err + if x, k := createShardsResult.([]byte); k { + newShards := make([]*cluster.NewShardData, 0) + err = json.Unmarshal(x, &newShards) + if err != nil { + log.Error("RAFT: error parsing new shard result: ", err) + return nil, err + } + return self.clusterConfig.MarshalNewShardArrayToShards(newShards) } - newShards := make([]*cluster.NewShardData, 0) - err = json.Unmarshal(js, &newShards) - if err != nil { - return nil, err + if x, k := createShardsResult.([]*cluster.NewShardData); k { + return self.clusterConfig.MarshalNewShardArrayToShards(x) } - log.Debug("NEW SHARDS: %v", newShards) - return self.clusterConfig.MarshalNewShardArrayToShards(newShards) + + return nil, fmt.Errorf("Unable to marshal Raft AddShards result!") } func (self *RaftServer) DropShard(id uint32, serverIds []uint32) error { @@ -724,3 +739,38 @@ func (self *RaftServer) DropShard(id uint32, serverIds []uint32) error { _, err := self.doOrProxyCommand(command) return err } + +func (self *RaftServer) GetOrSetFieldIdsForSeries(database string, series []*protocol.Series) ([]*protocol.Series, error) { + command := NewCreateSeriesFieldIdsCommand(database, series) + result, err := self.doOrProxyCommand(command) + if result == nil || err != nil { + return nil, err + } + if x, k := result.([]byte); k { + s := []*protocol.Series{} + err := json.Unmarshal(x, &s) + if err != nil { + return nil, err + } + return s, nil + } + if x, k := result.([]*protocol.Series); k { + return x, nil + } + return nil, nil +} + +func (self *RaftServer) DropSeries(database, series string) error { + command := NewDropSeriesCommand(database, series) + _, err := self.doOrProxyCommand(command) + + // TODO: Dropping series from the metastore is synchronous, but the underlying data + // delete is asynchronous. If the server crashes or restarts while this is happening + // there will be orphaned data sitting around. Not a huge deal, but we should fix this + // at some point. + // force a log compaction because we don't want this replaying after a server restarts + if err == nil { + err = self.ForceLogCompaction() + } + return err +} diff --git a/src/datastore/shard.go b/src/datastore/shard.go index e6386815867..1baddb3f9ca 100644 --- a/src/datastore/shard.go +++ b/src/datastore/shard.go @@ -9,11 +9,11 @@ import ( "errors" "fmt" "math" + "metastore" "parser" "protocol" "regexp" "strings" - "sync" "time" "code.google.com/p/goprotobuf/proto" @@ -22,32 +22,18 @@ import ( type Shard struct { db storage.Engine - lastIdUsed uint64 - columnIdMutex sync.Mutex closed bool pointBatchSize int writeBatchSize int + metaStore *metastore.Store } -func NewShard(db storage.Engine, pointBatchSize, writeBatchSize int) (*Shard, error) { - lastIdBytes, err2 := db.Get(NEXT_ID_KEY) - if err2 != nil { - return nil, err2 - } - - lastId := uint64(0) - if lastIdBytes != nil { - lastId, err2 = binary.ReadUvarint(bytes.NewBuffer(lastIdBytes)) - if err2 != nil { - return nil, err2 - } - } - +func NewShard(db storage.Engine, pointBatchSize, writeBatchSize int, metaStore *metastore.Store) (*Shard, error) { return &Shard{ db: db, - lastIdUsed: lastId, pointBatchSize: pointBatchSize, writeBatchSize: writeBatchSize, + metaStore: metaStore, }, nil } @@ -58,21 +44,21 @@ func (self *Shard) Write(database string, series []*protocol.Series) error { if len(s.Points) == 0 { return errors.New("Unable to write no data. Series was nil or had no points.") } + if len(s.FieldIds) == 0 { + return errors.New("Unable to write points without fields") + } count := 0 - for fieldIndex, field := range s.Fields { - temp := field - id, err := self.createIdForDbSeriesColumn(&database, s.Name, &temp) - if err != nil { - return err - } + for fieldIndex, id := range s.FieldIds { for _, point := range s.Points { + // keyBuffer and dataBuffer have to be recreated since we are + // batching the writes, otherwise new writes will override the + // old writes that are still in memory keyBuffer := bytes.NewBuffer(make([]byte, 0, 24)) dataBuffer := proto.NewBuffer(nil) - keyBuffer.Reset() - dataBuffer.Reset() + var err error - keyBuffer.Write(id) + binary.Write(keyBuffer, binary.BigEndian, &id) timestamp := self.convertTimestampToUint(point.GetTimestampInMicroseconds()) // pass the uint64 by reference so binary.Write() doesn't create a new buffer // see the source code for intDataSize() in binary.go @@ -112,8 +98,6 @@ func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryPro return self.executeListSeriesQuery(querySpec, processor) } else if querySpec.IsDeleteFromSeriesQuery() { return self.executeDeleteQuery(querySpec, processor) - } else if querySpec.IsDropSeriesQuery() { - return self.executeDropSeriesQuery(querySpec, processor) } seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() @@ -124,7 +108,7 @@ func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryPro for series, columns := range seriesAndColumns { if regex, ok := series.GetCompiledRegex(); ok { - seriesNames := self.getSeriesForDbAndRegex(querySpec.Database(), regex) + seriesNames := self.metaStore.GetSeriesForDatabaseAndRegex(querySpec.Database(), regex) for _, name := range seriesNames { if !querySpec.HasReadAccess(name) { continue @@ -144,18 +128,6 @@ func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryPro return nil } -func (self *Shard) DropDatabase(database string) error { - seriesNames := self.getSeriesForDatabase(database) - for _, name := range seriesNames { - if err := self.dropSeries(database, name); err != nil { - log.Error("DropDatabase: ", err) - return err - } - } - self.db.Compact() - return nil -} - func (self *Shard) IsClosed() bool { return self.closed } @@ -166,15 +138,8 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName fields, err := self.getFieldsForSeries(querySpec.Database(), seriesName, columns) if err != nil { - // because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore - switch err := err.(type) { - case FieldLookupError: - log.Debug("Cannot find fields %v", columns) - return nil - default: - log.Error("Error looking up fields for %s: %s", seriesName, err) - return fmt.Errorf("Error looking up fields for %s: %s", seriesName, err) - } + log.Error("Error looking up fields for %s: %s", seriesName, err) + return err } fieldCount := len(fields) @@ -224,7 +189,7 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName continue } - if !isPointInRange(fields[i].Id, startTimeBytes, endTimeBytes, key) { + if !isPointInRange(fields[i].IdAsBytes(), startTimeBytes, endTimeBytes, key) { continue } @@ -404,37 +369,10 @@ func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor clu return nil } -func (self *Shard) executeDropSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { - database := querySpec.Database() - series := querySpec.Query().DropSeriesQuery.GetTableName() - err := self.dropSeries(database, series) - if err != nil { - return err - } - self.db.Compact() - return nil -} - -func (self *Shard) dropSeries(database, series string) error { +func (self *Shard) DropFields(fields []*metastore.Field) error { startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} - - if err := self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes); err != nil { - return err - } - - wb := []storage.Write{} - - for _, name := range self.getColumnNamesForSeries(database, series) { - indexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(database+"~"+series+"~"+name)...) - wb = append(wb, storage.Write{indexKey, nil}) - } - - key := append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~"+series)...) - wb = append(wb, storage.Write{key, nil}) - - // remove the column indeces for this time series - return self.db.BatchPut(wb) + return self.deleteRangeOfFields(fields, startTimeBytes, endTimeBytes) } func (self *Shard) byteArrayForTimeInt(time int64) []byte { @@ -449,26 +387,21 @@ func (self *Shard) byteArraysForStartAndEndTimes(startTime, endTime int64) ([]by } func (self *Shard) deleteRangeOfSeriesCommon(database, series string, startTimeBytes, endTimeBytes []byte) error { - columns := self.getColumnNamesForSeries(database, series) - fields, err := self.getFieldsForSeries(database, series, columns) - if err != nil { - // because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore - switch err := err.(type) { - case FieldLookupError: - return nil - default: - return err - } - } + fields := self.metaStore.GetFieldsForSeries(database, series) + return self.deleteRangeOfFields(fields, startTimeBytes, endTimeBytes) +} + +func (self *Shard) deleteRangeOfFields(fields []*metastore.Field, startTimeBytes, endTimeBytes []byte) error { startKey := bytes.NewBuffer(nil) endKey := bytes.NewBuffer(nil) for _, field := range fields { + idBytes := field.IdAsBytes() startKey.Reset() - startKey.Write(field.Id) + startKey.Write(idBytes) startKey.Write(startTimeBytes) startKey.Write([]byte{0, 0, 0, 0, 0, 0, 0, 0}) endKey.Reset() - endKey.Write(field.Id) + endKey.Write(idBytes) endKey.Write(endTimeBytes) endKey.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}) @@ -492,7 +425,7 @@ func (self *Shard) deleteRangeOfSeries(database, series string, startTime, endTi } func (self *Shard) deleteRangeOfRegex(database string, regex *regexp.Regexp, startTime, endTime time.Time) error { - series := self.getSeriesForDbAndRegex(database, regex) + series := self.metaStore.GetSeriesForDatabaseAndRegex(database, regex) for _, name := range series { err := self.deleteRangeOfSeries(database, name, startTime, endTime) if err != nil { @@ -502,72 +435,6 @@ func (self *Shard) deleteRangeOfRegex(database string, regex *regexp.Regexp, sta return nil } -func (self *Shard) getFieldsForSeries(db, series string, columns []string) ([]*Field, error) { - isCountQuery := false - if len(columns) > 0 && columns[0] == "*" { - columns = self.getColumnNamesForSeries(db, series) - } else if len(columns) == 0 { - isCountQuery = true - columns = self.getColumnNamesForSeries(db, series) - } - if len(columns) == 0 { - return nil, FieldLookupError{"Coulnd't look up columns for series: " + series} - } - - fields := make([]*Field, len(columns), len(columns)) - - for i, name := range columns { - id, errId := self.getIdForDbSeriesColumn(&db, &series, &name) - if errId != nil { - return nil, errId - } - if id == nil { - return nil, FieldLookupError{"Field " + name + " doesn't exist in series " + series} - } - fields[i] = &Field{Name: name, Id: id} - } - - // if it's a count query we just want the column that will be the most efficient to - // scan through. So find that and return it. - if isCountQuery { - bestField := fields[0] - return []*Field{bestField}, nil - } - return fields, nil -} - -// TODO: WHY NO RETURN AN ERROR -func (self *Shard) getColumnNamesForSeries(db, series string) []string { - dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX) - seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...) - pred := func(key []byte) bool { - return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX) - } - it := self.db.Iterator() - defer it.Close() - - names := make([]string, 0) - for it.Seek(seekKey); it.Valid(); it.Next() { - key := it.Key() - if !pred(key) { - break - } - dbSeriesColumn := string(key[dbNameStart:]) - parts := strings.Split(dbSeriesColumn, "~") - if len(parts) > 2 { - if parts[0] != db || parts[1] != series { - break - } - names = append(names, parts[2]) - } - } - if err := it.Error(); err != nil { - log.Error("Error while getting columns for series %s: %s", series, err) - return nil - } - return names -} - func (self *Shard) hasReadAccess(querySpec *parser.QuerySpec) bool { for series := range querySpec.SeriesValuesAndColumns() { if _, isRegex := series.GetCompiledRegex(); !isRegex { @@ -586,88 +453,6 @@ func (self *Shard) byteArrayForTime(t time.Time) []byte { return timeBuffer.Bytes() } -func (self *Shard) getSeriesForDbAndRegex(database string, regex *regexp.Regexp) []string { - names := []string{} - allSeries := self.getSeriesForDatabase(database) - for _, name := range allSeries { - if regex.MatchString(name) { - names = append(names, name) - } - } - return names -} - -func (self *Shard) getSeriesForDatabase(database string) (series []string) { - err := self.yieldSeriesNamesForDb(database, func(name string) bool { - series = append(series, name) - return true - }) - if err != nil { - log.Error("Cannot get series names for db %s: %s", database, err) - return nil - } - return series -} - -func (self *Shard) createIdForDbSeriesColumn(db, series, column *string) (ret []byte, err error) { - ret, err = self.getIdForDbSeriesColumn(db, series, column) - if err != nil { - return - } - - if ret != nil { - return - } - - self.columnIdMutex.Lock() - defer self.columnIdMutex.Unlock() - ret, err = self.getIdForDbSeriesColumn(db, series, column) - if err != nil { - return - } - - if ret != nil { - return - } - - ret, err = self.getNextIdForColumn(db, series, column) - if err != nil { - return - } - s := fmt.Sprintf("%s~%s~%s", *db, *series, *column) - b := []byte(s) - key := append(SERIES_COLUMN_INDEX_PREFIX, b...) - err = self.db.Put(key, ret) - return -} - -func (self *Shard) getIdForDbSeriesColumn(db, series, column *string) (ret []byte, err error) { - s := fmt.Sprintf("%s~%s~%s", *db, *series, *column) - b := []byte(s) - key := append(SERIES_COLUMN_INDEX_PREFIX, b...) - if ret, err = self.db.Get(key); err != nil { - return nil, err - } - return ret, nil -} - -func (self *Shard) getNextIdForColumn(db, series, column *string) (ret []byte, err error) { - id := self.lastIdUsed + 1 - self.lastIdUsed += 1 - idBytes := make([]byte, 8, 8) - binary.PutUvarint(idBytes, id) - wb := make([]storage.Write, 0, 3) - wb = append(wb, storage.Write{NEXT_ID_KEY, idBytes}) - databaseSeriesIndexKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(*db+"~"+*series)...) - wb = append(wb, storage.Write{databaseSeriesIndexKey, []byte{}}) - seriesColumnIndexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(*db+"~"+*series+"~"+*column)...) - wb = append(wb, storage.Write{seriesColumnIndexKey, idBytes}) - if err = self.db.BatchPut(wb); err != nil { - return nil, err - } - return idBytes, nil -} - func (self *Shard) close() { self.closed = true self.db.Close() @@ -681,7 +466,7 @@ func (self *Shard) convertTimestampToUint(t *int64) uint64 { return uint64(*t) + uint64(math.MaxInt64) + uint64(1) } -func (self *Shard) fetchSinglePoint(querySpec *parser.QuerySpec, series string, fields []*Field) (*protocol.Series, error) { +func (self *Shard) fetchSinglePoint(querySpec *parser.QuerySpec, series string, fields []*metastore.Field) (*protocol.Series, error) { query := querySpec.SelectQuery() fieldCount := len(fields) fieldNames := make([]string, 0, fieldCount) @@ -701,9 +486,11 @@ func (self *Shard) fetchSinglePoint(querySpec *parser.QuerySpec, series string, timeAndSequenceBytes := timeAndSequenceBuffer.Bytes() for _, field := range fields { - pointKey := append(field.Id, timeAndSequenceBytes...) + pointKeyBuff := bytes.NewBuffer(make([]byte, 0, 24)) + pointKeyBuff.Write(field.IdAsBytes()) + pointKeyBuff.Write(timeAndSequenceBytes) - if data, err := self.db.Get(pointKey); err != nil { + if data, err := self.db.Get(pointKeyBuff.Bytes()); err != nil { return nil, err } else { fieldValue := &protocol.FieldValue{} @@ -723,21 +510,20 @@ func (self *Shard) fetchSinglePoint(querySpec *parser.QuerySpec, series string, return result, nil } -func (self *Shard) getIterators(fields []*Field, start, end []byte, isAscendingQuery bool) (fieldNames []string, iterators []storage.Iterator) { +func (self *Shard) getIterators(fields []*metastore.Field, start, end []byte, isAscendingQuery bool) (fieldNames []string, iterators []storage.Iterator) { iterators = make([]storage.Iterator, len(fields)) fieldNames = make([]string, len(fields)) // start the iterators to go through the series data for i, field := range fields { + idBytes := field.IdAsBytes() fieldNames[i] = field.Name iterators[i] = self.db.Iterator() if isAscendingQuery { - firstKey := append(field.Id, start...) - iterators[i].Seek(firstKey) + iterators[i].Seek(append(idBytes, start...)) } else { - firstKey := append(append(field.Id, end...), MAX_SEQUENCE...) - iterators[i].Seek(firstKey) + iterators[i].Seek(append(append(idBytes, end...), MAX_SEQUENCE...)) if iterators[i].Valid() { iterators[i].Prev() } @@ -757,3 +543,142 @@ func (self *Shard) convertUintTimestampToInt64(t *uint64) int64 { } return int64(*t) - math.MaxInt64 - int64(1) } + +func (self *Shard) getFieldsForSeries(db, series string, columns []string) ([]*metastore.Field, error) { + allFields := self.metaStore.GetFieldsForSeries(db, series) + if len(allFields) == 0 { + return nil, FieldLookupError{"Couldn't look up columns for series: " + series} + } + if len(columns) > 0 && columns[0] == "*" { + return allFields, nil + } + + fields := make([]*metastore.Field, len(columns), len(columns)) + + for i, name := range columns { + hasField := false + for _, f := range allFields { + if f.Name == name { + field := f + hasField = true + fields[i] = field + break + } + } + if !hasField { + return nil, FieldLookupError{"Field " + name + " doesn't exist in series " + series} + } + } + return fields, nil +} + +/* DEPRECATED methods do not use*/ + +// TODO: remove this on version 0.9 after people have had a chance to do migrations +func (self *Shard) getSeriesForDbAndRegexDEPRECATED(database string, regex *regexp.Regexp) []string { + names := []string{} + allSeries := self.metaStore.GetSeriesForDatabase(database) + for _, name := range allSeries { + if regex.MatchString(name) { + names = append(names, name) + } + } + return names +} + +// TODO: remove this on version 0.9 after people have had a chance to do migrations +func (self *Shard) getSeriesForDatabaseDEPRECATED(database string) (series []string) { + err := self.yieldSeriesNamesForDb(database, func(name string) bool { + series = append(series, name) + return true + }) + if err != nil { + log.Error("Cannot get series names for db %s: %s", database, err) + return nil + } + return series +} + +// TODO: remove this function. I'm keeping it around for the moment since it'll probably have to be +// used in the DB upgrate/migration that moves metadata from the shard to Raft +func (self *Shard) getFieldsForSeriesDEPRECATED(db, series string, columns []string) ([]*metastore.Field, error) { + isCountQuery := false + if len(columns) > 0 && columns[0] == "*" { + columns = self.getColumnNamesForSeriesDEPRECATED(db, series) + } else if len(columns) == 0 { + isCountQuery = true + columns = self.getColumnNamesForSeriesDEPRECATED(db, series) + } + if len(columns) == 0 { + return nil, FieldLookupError{"Couldn't look up columns for series: " + series} + } + + fields := make([]*metastore.Field, len(columns), len(columns)) + + for i, name := range columns { + id, errId := self.getIdForDbSeriesColumnDEPRECATED(&db, &series, &name) + if errId != nil { + return nil, errId + } + if id == nil { + return nil, FieldLookupError{"Field " + name + " doesn't exist in series " + series} + } + idInt, err := binary.ReadUvarint(bytes.NewBuffer(id)) + if err != nil { + return nil, err + } + fields[i] = &metastore.Field{Name: name, Id: idInt} + } + + // if it's a count query we just want the column that will be the most efficient to + // scan through. So find that and return it. + if isCountQuery { + bestField := fields[0] + return []*metastore.Field{bestField}, nil + } + return fields, nil +} + +// TODO: remove this function. I'm keeping it around for the moment since it'll probably have to be +// used in the DB upgrate/migration that moves metadata from the shard to Raft +func (self *Shard) getColumnNamesForSeriesDEPRECATED(db, series string) []string { + dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX) + seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...) + pred := func(key []byte) bool { + return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX) + } + it := self.db.Iterator() + defer it.Close() + + names := make([]string, 0) + for it.Seek(seekKey); it.Valid(); it.Next() { + key := it.Key() + if !pred(key) { + break + } + dbSeriesColumn := string(key[dbNameStart:]) + parts := strings.Split(dbSeriesColumn, "~") + if len(parts) > 2 { + if parts[0] != db || parts[1] != series { + break + } + names = append(names, parts[2]) + } + } + if err := it.Error(); err != nil { + log.Error("Error while getting columns for series %s: %s", series, err) + return nil + } + return names +} + +// TODO: remove this after a version that doesn't support migration from old non-raft metastore +func (self *Shard) getIdForDbSeriesColumnDEPRECATED(db, series, column *string) (ret []byte, err error) { + s := fmt.Sprintf("%s~%s~%s", *db, *series, *column) + b := []byte(s) + key := append(SERIES_COLUMN_INDEX_PREFIX, b...) + if ret, err = self.db.Get(key); err != nil { + return nil, err + } + return ret, nil +} diff --git a/src/datastore/shard_datastore.go b/src/datastore/shard_datastore.go index 45ca0634423..d52ca22047c 100644 --- a/src/datastore/shard_datastore.go +++ b/src/datastore/shard_datastore.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "math" + "metastore" "os" "path" "path/filepath" @@ -32,6 +33,7 @@ type ShardDatastore struct { maxOpenShards int pointBatchSize int writeBatchSize int + metaStore *metastore.Store } const ( @@ -59,18 +61,13 @@ var ( TRUE = true ) -type Field struct { - Id []byte - Name string -} - type rawColumnValue struct { time []byte sequence []byte value []byte } -func NewShardDatastore(config *configuration.Configuration) (*ShardDatastore, error) { +func NewShardDatastore(config *configuration.Configuration, metaStore *metastore.Store) (*ShardDatastore, error) { baseDbDir := filepath.Join(config.DataDir, SHARD_DATABASE_DIR) err := os.MkdirAll(baseDbDir, 0744) if err != nil { @@ -87,6 +84,7 @@ func NewShardDatastore(config *configuration.Configuration) (*ShardDatastore, er shardsToClose: make(map[uint32]bool), pointBatchSize: config.StoragePointBatchSize, writeBatchSize: config.StorageWriteBatchSize, + metaStore: metaStore, }, nil } @@ -192,7 +190,7 @@ func (self *ShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalShardDb, e } se, err := init.Initialize(dbDir, c) - db, err = NewShard(se, self.pointBatchSize, self.writeBatchSize) + db, err = NewShard(se, self.pointBatchSize, self.writeBatchSize, self.metaStore) if err != nil { log.Error("Error creating shard: ", err) se.Close() diff --git a/src/datastore/shard_datastore_test.go b/src/datastore/shard_datastore_test.go index e94f8d13938..56849850eb0 100644 --- a/src/datastore/shard_datastore_test.go +++ b/src/datastore/shard_datastore_test.go @@ -24,7 +24,7 @@ func (self *ShardDatastoreSuite) TestWillEnforceMaxOpenShards(c *C) { config.StorageMaxOpenShards = 2 config.StorageDefaultEngine = "leveldb" - store, err := NewShardDatastore(config) + store, err := NewShardDatastore(config, nil) c.Assert(err, IsNil) shard, err := store.GetOrCreateShard(uint32(2)) diff --git a/src/datastore/storage/leveldb.go b/src/datastore/storage/leveldb.go index 2db1ba641e1..30d4956229f 100644 --- a/src/datastore/storage/leveldb.go +++ b/src/datastore/storage/leveldb.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "fmt" + "strings" "sync" "configuration" @@ -69,8 +70,16 @@ func NewLevelDB(path string, config interface{}) (Engine, error) { opts.SetCreateIfMissing(true) opts.SetMaxOpenFiles(c.MaxOpenFiles) db, err := levigo.Open(path, opts) + wopts := levigo.NewWriteOptions() ropts := levigo.NewReadOptions() + + // these sentinel values are here so that we can seek to the end of + // the keyspace and have the iterator still be valid. this is for + // the series that is at either end of the keyspace. + db.Put(wopts, []byte(strings.Repeat("\x00", 24)), []byte{}) + db.Put(wopts, []byte(strings.Repeat("\xff", 24)), []byte{}) + return LevelDB{db, opts, wopts, ropts, path}, err } diff --git a/src/integration/data_test.go b/src/integration/data_test.go index 4e6d7a7c6be..b8a19fbf387 100644 --- a/src/integration/data_test.go +++ b/src/integration/data_test.go @@ -1026,7 +1026,7 @@ func (self *DataTestSuite) FilterWithInvalidCondition(c *C) (Fun, Fun) { data := CreatePoints("test_invalid_where_condition", 1, 1) client.WriteData(data, c) }, func(client Client) { - data := client.RunQuery("select cpu from test_invalid_where_condition where column0 > 0.1s", c, "m") + data := client.RunQuery("select * from test_invalid_where_condition where column0 > 0.1s", c, "m") // TODO: this should return an error c.Assert(data, HasLen, 0) } @@ -1084,7 +1084,7 @@ func (self *DataTestSuite) Issue85(c *C) (Fun, Fun) { data := CreatePoints("test_issue_85", 1, 1) client.WriteData(data, c) }, func(client Client) { - _ = client.RunQuery("select new_column from test_issue_85", c, "m") + _ = client.RunInvalidQuery("select new_column from test_issue_85", c, "m") data := client.RunQuery("select * from test_issue_85", c, "m") c.Assert(data, HasLen, 1) c.Assert(data[0].Columns, HasLen, 3) @@ -1552,15 +1552,16 @@ func (self *DataTestSuite) SinglePointSelectWithNullValues(c *C) (Fun, Fun) { query := "select * from test_single_points_with_nulls where name='john';" data := client.RunQuery(query, c, "u") - c.Assert(data[0].Points, HasLen, 1) + c.Assert(data, HasLen, 1) + maps := ToMap(data[0]) - for _, point := range data[0].Points { - query := fmt.Sprintf("select * from test_single_points_with_nulls where time = %.0fu and sequence_number = %0.f;", point[0].(float64), point[1]) + for _, m := range maps { + query := fmt.Sprintf("select * from test_single_points_with_nulls where time = %.0fu and sequence_number = %0.f;", m["time"].(float64), m["sequence_number"].(float64)) data := client.RunQuery(query, c, "u") c.Assert(data, HasLen, 1) - c.Assert(data[0].Points, HasLen, 1) - c.Assert(data[0].Points[0], HasLen, 3) - c.Assert(data[0].Points[0][2], Equals, point[3]) + actualMaps := ToMap(data[0]) + c.Assert(actualMaps, HasLen, 1) + c.Assert(actualMaps[0]["name"], Equals, maps[0]["name"]) } } } @@ -1621,9 +1622,11 @@ func (self *DataTestSuite) SeriesListing(c *C) (Fun, Fun) { `, c) }, func(client Client) { data := client.RunQuery("list series", c, "m") + c.Assert(data, HasLen, 1) + maps := ToMap(data[0]) names := map[string]bool{} - for _, series := range data { - names[series.Name] = true + for _, m := range maps { + names[m["name"].(string)] = true } c.Assert(names["test_series_listing"], Equals, true) } @@ -1983,11 +1986,11 @@ func (self *DataTestSuite) ListSeries(c *C) (Fun, Fun) { client.WriteJsonData(data, c, "s") }, func(client Client) { collection := client.RunQuery("list series", c) - c.Assert(collection, HasLen, 2) + c.Assert(collection, HasLen, 1) + maps := ToMap(collection[0]) names := map[string]bool{} - for _, s := range collection { - c.Assert(s.Points, HasLen, 0) - names[s.Name] = true + for _, m := range maps { + names[m["name"].(string)] = true } c.Assert(names["cluster_query"], Equals, true) c.Assert(names["another_query"], Equals, true) diff --git a/src/integration/multiple_servers_test.go b/src/integration/multiple_servers_test.go index bcbf03054ad..6adf6fb3b02 100644 --- a/src/integration/multiple_servers_test.go +++ b/src/integration/multiple_servers_test.go @@ -156,7 +156,10 @@ func (self *ServerSuite) TestRestartAfterCompaction(c *C) { }] ` self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) - self.serverProcesses[0].WaitForServerToSync() + + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } collection := self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c) c.Assert(collection.Members, HasLen, 1) @@ -168,6 +171,10 @@ func (self *ServerSuite) TestRestartAfterCompaction(c *C) { self.serverProcesses[0].Start() self.serverProcesses[0].WaitForServerToStart() + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } + collection = self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c) c.Assert(collection.Members, HasLen, 1) series = collection.GetSeries("test_restart_after_compaction", c) @@ -204,7 +211,9 @@ func (self *ServerSuite) TestRestartServers(c *C) { ` self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) - self.serverProcesses[0].WaitForServerToSync() + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } collection := self.serverProcesses[0].Query("test_rep", "select * from test_restart", false, c) c.Assert(collection.Members, HasLen, 1) @@ -323,7 +332,9 @@ func (self *ServerSuite) TestDeleteFullReplication(c *C) { "columns": ["val_1", "val_2"] }]` self.serverProcesses[0].Post("/db/full_rep/series?u=paul&p=pass", data, c) - self.serverProcesses[0].WaitForServerToSync() + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } collection := self.serverProcesses[0].Query("full_rep", "select count(val_1) from test_delete_full_replication", true, c) series := collection.GetSeries("test_delete_full_replication", c) c.Assert(series.GetValueForPointAndColumn(0, "count", c), Equals, 1.0) @@ -345,7 +356,9 @@ func (self *ServerSuite) TestDeleteReplication(c *C) { "columns": ["val_1", "val_2"] }]` self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) - self.serverProcesses[0].WaitForServerToSync() + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } collection := self.serverProcesses[0].Query("test_rep", "select count(val_1) from test_delete_replication", false, c) series := collection.GetSeries("test_delete_replication", c) c.Assert(series.GetValueForPointAndColumn(0, "count", c), Equals, 1.0) @@ -416,8 +429,8 @@ func (self *ServerSuite) TestDropDatabase(c *C) { } for _, s := range self.serverProcesses { fmt.Printf("Running query against: %d\n", s.ApiPort()) - collection := s.Query("drop_db", "select * from cluster_query", true, c) - c.Assert(collection.Members, HasLen, 0) + error, _ := s.GetErrorBody("drop_db", "select * from cluster_query", "paul", "pass", true, c) + c.Assert(error, Matches, ".*Couldn't look up column.*") } } @@ -434,7 +447,9 @@ func (self *ServerSuite) TestDropSeries(c *C) { "points": [[1]] }]` self.serverProcesses[0].Post("/db/drop_series/series?u=paul&p=pass", data, c) - self.serverProcesses[0].WaitForServerToSync() + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } for _, s := range self.serverProcesses { fmt.Printf("Running query against: %d\n", s.ApiPort()) collection := s.Query("drop_series", "select * from cluster_query.1", true, c) @@ -463,8 +478,8 @@ func (self *ServerSuite) TestDropSeries(c *C) { for _, s := range self.serverProcesses { fmt.Printf("Running query against: %d\n", s.ApiPort()) - collection := s.Query("drop_series", "select * from cluster_query.1", true, c) - c.Assert(collection.Members, HasLen, 0) + error, _ := s.GetErrorBody("drop_series", "select * from cluster_query.1", "paul", "pass", true, c) + c.Assert(error, Matches, ".*Couldn't look up.*") } } } @@ -512,7 +527,9 @@ func (self *ServerSuite) TestFailureAndReplicationReplays(c *C) { }]` self.serverProcesses[0].Post("/db/full_rep/series?u=paul&p=pass", data, c) - self.serverProcesses[0].WaitForServerToSync() + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } for _, s := range self.serverProcesses { collection := s.Query("full_rep", "select sum(val) from test_failure_replays;", true, c) diff --git a/src/integration/single_server_test.go b/src/integration/single_server_test.go index c024d894011..a4973f8cf05 100644 --- a/src/integration/single_server_test.go +++ b/src/integration/single_server_test.go @@ -78,12 +78,29 @@ func (self *SingleServerSuite) TestListSeriesAfterDropSeries(c *C) { series, err := client.Query("list series") c.Assert(err, IsNil) c.Assert(series, HasLen, 1) - c.Assert(series[0].Name, Equals, "test_drop_series") + c.Assert(series[0].Name, Equals, "list_series_result") + + hasSeries := false + for _, p := range series[0].Points { + if p[1].(string) == "test_drop_series" { + hasSeries = true + break + } + } + c.Assert(hasSeries, Equals, true) + _, err = client.Query("drop series test_drop_series") c.Assert(err, IsNil) series, err = client.Query("list series") - c.Assert(err, IsNil) - c.Assert(series, HasLen, 0) + + hasSeries = false + for _, p := range series[0].Points { + if p[1].(string) == "test_drop_series" { + hasSeries = true + break + } + } + c.Assert(hasSeries, Equals, false) } // issue #497 @@ -226,7 +243,7 @@ func (self *SingleServerSuite) TestUserWritePermissions(c *C) { // create two users one that can only read and one that can only write. both can access test_should_read // series only - /* c.Assert(rootUser.CreateDatabase("db1"), IsNil) */ + rootUser.CreateDatabase("db1") c.Assert(rootUser.CreateDatabaseUser("db1", "limited_user", "pass", "^$", "^$"), IsNil) verifyPermissions("db1", "limited_user", "^$", "^$") @@ -262,21 +279,26 @@ func (self *SingleServerSuite) TestUserWritePermissions(c *C) { c.Assert(json.Unmarshal([]byte(data), &series), IsNil) // readUser shouldn't be able to write c.Assert(user.WriteSeries(series), NotNil) - content := self.server.RunQueryAsRoot("select * from test_should_write", "m", c) - c.Assert(content, HasLen, 0) + actualSeries, err := rootUser.Query("select * from test_should_write", "s") + // if this test ran by itself there will be no shards to query, + // therefore no error will be returned + if err != nil { + c.Assert(err, ErrorMatches, ".*Couldn't look up.*") + } else { + c.Assert(actualSeries, HasLen, 0) + } rootUser.ChangeDatabaseUser("db1", "limited_user", "pass", false, "^$", "test_should_write") verifyPermissions("db1", "limited_user", "^$", "test_should_write") // write the data to test the write permissions c.Assert(user.WriteSeries(series), IsNil) - self.server.WaitForServerToSync() - invalidSeries := []*influxdb.Series{} - content = self.server.RunQueryAsRoot("select * from test_should_write", "m", c) + content := self.server.RunQueryAsRoot("select * from test_should_write", "m", c) c.Assert(content, HasLen, 1) + invalidSeries := []*influxdb.Series{} c.Assert(json.Unmarshal([]byte(invalidData), &invalidSeries), IsNil) c.Assert(user.WriteSeries(invalidSeries), NotNil) self.server.WaitForServerToSync() - content = self.server.RunQueryAsRoot("select * from test_should_not_write", "m", c) - c.Assert(content, HasLen, 0) + _, err = rootUser.Query("select * from test_should_not_write", "m") + c.Assert(err, ErrorMatches, ".*Couldn't look up.*") rootUser.ChangeDatabaseUser("db1", "limited_user", "pass", false, "^$", "test_.*") verifyPermissions("db1", "limited_user", "^$", "test_.*") c.Assert(user.WriteSeries(invalidSeries), IsNil) @@ -343,16 +365,12 @@ func (self *SingleServerSuite) TestAdminPermissionToDeleteData(c *C) { "name": "test_delete_admin_permission", "columns": ["val_1", "val_2"] }]` - fmt.Println("TESTAD writing") self.server.WriteData(data, c) - fmt.Println("TESTAD query root") series := self.server.RunQueryAsRoot("select count(val_1) from test_delete_admin_permission", "s", c) c.Assert(series[0].Points, HasLen, 1) c.Assert(series[0].Points[0][1], Equals, float64(1)) - fmt.Println("TESTAD deleting") _ = self.server.RunQueryAsRoot("delete from test_delete_admin_permission", "s", c) - fmt.Println("TESTAD query") series = self.server.RunQueryAsRoot("select count(val_1) from test_delete_admin_permission", "s", c) c.Assert(series, HasLen, 0) } @@ -379,7 +397,6 @@ func (self *SingleServerSuite) TestDeletingNewDatabase(c *C) { s := CreatePoints("data_resurrection", 1, 10) self.server.WriteData(s, c) self.server.WaitForServerToSync() - fmt.Printf("wrote some data\n") for i := 0; i < 2; i++ { c.Assert(client.CreateDatabase("delete1"), IsNil) @@ -411,7 +428,6 @@ func (self *SingleServerSuite) TestDataResurrectionAfterRestart(c *C) { s := CreatePoints("data_resurrection", 1, 10) self.server.WriteData(s, c) self.server.WaitForServerToSync() - fmt.Printf("wrote some data\n") series := self.server.RunQuery("select count(column0) from data_resurrection", "s", c) c.Assert(series, HasLen, 1) c.Assert(series[0].Points[0][1], Equals, 10.0) @@ -431,10 +447,11 @@ func (self *SingleServerSuite) TestDataResurrectionAfterRestart(c *C) { self.server.Stop() c.Assert(self.server.Start(), IsNil) self.server.WaitForServerToStart() - series = self.server.RunQuery("select count(column0) from data_resurrection", "s", c) - c.Assert(series, HasLen, 0) + error, _ := self.server.GetErrorBody("db1", "select count(column0) from data_resurrection", "user", "pass", true, c) + c.Assert(error, Matches, ".*Couldn't look up.*") series = self.server.RunQuery("list series", "s", c) - c.Assert(series, HasLen, 0) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Points, HasLen, 0) } // issue https://github.com/influxdb/influxdb/issues/702. Dropping shards can cause server crash @@ -594,8 +611,8 @@ func (self *SingleServerSuite) TestDbDelete(c *C) { self.createUser(c) // this shouldn't return any data - data = self.server.RunQuery("select val1 from test_deletetions", "m", c) - c.Assert(data, HasLen, 0) + error, _ := self.server.GetErrorBody("db1", "select val1 from test_deletetions", "root", "root", true, c) + c.Assert(error, Matches, ".*Couldn't look up.*") } // test delete query diff --git a/src/integration/test_config_single.toml b/src/integration/test_config_single.toml index 594c7022e27..d3fc00a1519 100644 --- a/src/integration/test_config_single.toml +++ b/src/integration/test_config_single.toml @@ -80,7 +80,7 @@ dir = "/tmp/influxdb/development/db" write-buffer-size = 10000 # the engine to use for new shards, old shards will continue to use the same engine -default-engine = "hyperleveldb" +default-engine = "leveldb" # The default setting on this is 0, which means unlimited. Set this to something if you want to # limit the max number of open files. max-open-files is per shard so this * that will be max. diff --git a/src/integration/test_missing_points.go b/src/integration/test_missing_points.go index 418662c3c54..bea51682311 100644 --- a/src/integration/test_missing_points.go +++ b/src/integration/test_missing_points.go @@ -51,13 +51,16 @@ func (self *MissingPointsSuite) TearDownSuite(c *C) { // test missing points when we have a split func (self *MissingPointsSuite) TestMissingPoints(c *C) { numberOfSeries := 25000 - batchSize := 5000 + batchSize := 100 numberOfPoints := 5 parallelism := 10 timeBase := 1399035078 client := self.serverProcesses[0].GetClient("", c) client.CreateDatabase("test_missing_points") + for _, s := range self.serverProcesses { + s.WaitForServerToSync() + } batches := make(chan []*influxdb.Series) @@ -86,7 +89,6 @@ func (self *MissingPointsSuite) TestMissingPoints(c *C) { t := (timeBase + i) * 1000 batch = append(batch, &influxdb.Series{name, columns, [][]interface{}{{p, t}}}) if len(batch) >= batchSize { - fmt.Printf("Inserting batch of %d points\n", len(batch)) batches <- batch batch = make([]*influxdb.Series, 0, batchSize) } diff --git a/src/metastore/store.go b/src/metastore/store.go new file mode 100644 index 00000000000..d26e6ad34de --- /dev/null +++ b/src/metastore/store.go @@ -0,0 +1,227 @@ +package metastore + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "regexp" + "sync" + + "protocol" +) + +type Store struct { + fieldsLock sync.RWMutex + + // Map of databases > series > fields + StringsToIds map[string]map[string]map[string]uint64 + // Map of ids to the field names. Don't need to know which database or series + // they track to since we have that information elsewhere + LastIdUsed uint64 + clusterConsensus ClusterConsensus +} + +type Field struct { + Id uint64 + Name string +} + +func (f *Field) IdAsBytes() []byte { + idBytes := bytes.NewBuffer(make([]byte, 0, 8)) + binary.Write(idBytes, binary.BigEndian, f.Id) + return idBytes.Bytes() +} + +type ClusterConsensus interface { + // Will return a collection of series objects that have their name, fields, and fieldIds set. + // This can be used to set the field ids and fill the cache. + GetOrSetFieldIdsForSeries(database string, series []*protocol.Series) ([]*protocol.Series, error) +} + +func NewStore() *Store { + return &Store{ + StringsToIds: make(map[string]map[string]map[string]uint64), + } +} + +func NewStoreFromJson(data []byte) (*Store, error) { + store := &Store{} + err := json.Unmarshal(data, store) + return store, err +} + +func (self *Store) ToJson() ([]byte, error) { + return json.Marshal(self) +} + +func (self *Store) UpdateFromSnapshot(other *Store) { + clusterConsensus := self.clusterConsensus + *self = *other + self.clusterConsensus = clusterConsensus +} + +func (self *Store) SetClusterConsensus(c ClusterConsensus) { + self.clusterConsensus = c +} + +func (self *Store) ReplaceFieldNamesWithFieldIds(database string, series []*protocol.Series) error { + allSetFromCache := self.setFieldIdsFromCache(database, series) + if allSetFromCache { + return nil + } + + // one or more of the fields weren't in the cache, so just get the whole list from Raft + seriesWithOnlyFields := make([]*protocol.Series, len(series), len(series)) + for i, s := range series { + seriesWithOnlyFields[i] = &protocol.Series{Name: s.Name, Fields: s.Fields} + } + seriesWithFieldIds, err := self.clusterConsensus.GetOrSetFieldIdsForSeries(database, seriesWithOnlyFields) + if err != nil { + return err + } + + for i, s := range series { + s.Fields = nil + s.FieldIds = seriesWithFieldIds[i].FieldIds + } + return nil +} + +func (self *Store) GetOrSetFieldIds(database string, series []*protocol.Series) error { + self.fieldsLock.Lock() + defer self.fieldsLock.Unlock() + databaseSeries := self.StringsToIds[database] + if databaseSeries == nil { + databaseSeries = make(map[string]map[string]uint64) + self.StringsToIds[database] = databaseSeries + } + for _, s := range series { + fieldIds := make([]uint64, len(s.Fields), len(s.Fields)) + seriesFieldMap := databaseSeries[*s.Name] + if seriesFieldMap == nil { + seriesFieldMap = make(map[string]uint64) + databaseSeries[*s.Name] = seriesFieldMap + } + for i, fieldName := range s.Fields { + fieldId, ok := seriesFieldMap[fieldName] + if !ok { + self.LastIdUsed += 1 + seriesFieldMap[fieldName] = self.LastIdUsed + fieldId = self.LastIdUsed + } + fieldIds[i] = fieldId + } + s.FieldIds = fieldIds + } + return nil +} + +func (self *Store) GetSeriesForDatabaseAndRegex(database string, regex *regexp.Regexp) []string { + self.fieldsLock.RLock() + defer self.fieldsLock.RUnlock() + databaseSeries, ok := self.StringsToIds[database] + if !ok { + return nil + } + matchingSeries := make([]string, 0) + for series := range databaseSeries { + if regex.MatchString(series) { + matchingSeries = append(matchingSeries, series) + } + } + return matchingSeries +} + +func (self *Store) GetSeriesForDatabase(database string) []string { + self.fieldsLock.RLock() + defer self.fieldsLock.RUnlock() + databaseSeries, ok := self.StringsToIds[database] + if !ok { + return nil + } + series := make([]string, 0, len(databaseSeries)) + for s := range databaseSeries { + series = append(series, s) + } + return series +} + +func (self *Store) getFieldsForDatabase(database string) []*Field { + fields := make([]*Field, 0) + databaseSeries, ok := self.StringsToIds[database] + if !ok { + return nil + } + for _, series := range databaseSeries { + for fieldName, fieldId := range series { + fields = append(fields, &Field{Id: fieldId, Name: fieldName}) + } + } + return fields +} + +func (self *Store) GetFieldsForDatabase(database string) []*Field { + self.fieldsLock.RLock() + defer self.fieldsLock.RUnlock() + return self.getFieldsForDatabase(database) +} + +func (self *Store) getFieldsForSeries(database, series string) []*Field { + databaseSeries, ok := self.StringsToIds[database] + if !ok { + return nil + } + fieldMap := databaseSeries[series] + fields := make([]*Field, 0, len(fieldMap)) + for name, id := range fieldMap { + fields = append(fields, &Field{Name: name, Id: id}) + } + return fields +} + +func (self *Store) GetFieldsForSeries(database, series string) []*Field { + self.fieldsLock.RLock() + defer self.fieldsLock.RUnlock() + return self.getFieldsForSeries(database, series) +} + +func (self *Store) DropSeries(database, series string) ([]*Field, error) { + self.fieldsLock.Lock() + defer self.fieldsLock.Unlock() + fields := self.getFieldsForSeries(database, series) + databaseSeries := self.StringsToIds[database] + if databaseSeries != nil { + delete(databaseSeries, series) + } + return fields, nil +} + +func (self *Store) DropDatabase(database string) ([]*Field, error) { + self.fieldsLock.Lock() + defer self.fieldsLock.Unlock() + fields := self.getFieldsForDatabase(database) + delete(self.StringsToIds, database) + return fields, nil +} + +func (self *Store) setFieldIdsFromCache(database string, series []*protocol.Series) bool { + self.fieldsLock.RLock() + defer self.fieldsLock.RUnlock() + databaseSeries := self.StringsToIds[database] + for _, s := range series { + seriesFields, ok := databaseSeries[*s.Name] + if !ok { + return false + } + fieldIds := make([]uint64, len(s.Fields), len(s.Fields)) + for i, f := range s.Fields { + fieldId, ok := seriesFields[f] + if !ok { + return false + } + fieldIds[i] = fieldId + } + s.FieldIds = fieldIds + } + return true +} diff --git a/src/protocol/protocol.proto b/src/protocol/protocol.proto index bf78e8d3dce..1614f00b2e2 100644 --- a/src/protocol/protocol.proto +++ b/src/protocol/protocol.proto @@ -18,6 +18,7 @@ message Series { repeated Point points = 1; required string name = 2; repeated string fields = 3; + repeated uint64 fieldIds = 4; } message QueryResponseChunk { diff --git a/src/server/server.go b/src/server/server.go index f9108d65371..16e6a236d68 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -9,6 +9,7 @@ import ( "configuration" "coordinator" "datastore" + "metastore" "runtime" "time" "wal" @@ -36,7 +37,8 @@ type Server struct { func NewServer(config *configuration.Configuration) (*Server, error) { log.Info("Opening database at %s", config.DataDir) - shardDb, err := datastore.NewShardDatastore(config) + metaStore := metastore.NewStore() + shardDb, err := datastore.NewShardDatastore(config, metaStore) if err != nil { return nil, err } @@ -49,13 +51,14 @@ func NewServer(config *configuration.Configuration) (*Server, error) { return nil, err } - clusterConfig := cluster.NewClusterConfiguration(config, writeLog, shardDb, newClient) + clusterConfig := cluster.NewClusterConfiguration(config, writeLog, shardDb, newClient, metaStore) raftServer := coordinator.NewRaftServer(config, clusterConfig) + metaStore.SetClusterConsensus(raftServer) clusterConfig.LocalRaftName = raftServer.GetRaftName() clusterConfig.SetShardCreator(raftServer) clusterConfig.CreateFutureShardsAutomaticallyBeforeTimeComes() - coord := coordinator.NewCoordinatorImpl(config, raftServer, clusterConfig) + coord := coordinator.NewCoordinatorImpl(config, raftServer, clusterConfig, metaStore) requestHandler := coordinator.NewProtobufRequestHandler(coord, clusterConfig) protobufServer := coordinator.NewProtobufServer(config.ProtobufListenString(), requestHandler)