Skip to content

Commit

Permalink
Move time series metadata to Raft
Browse files Browse the repository at this point in the history
Fixes #414. All series metadata, that is which series exist, what columns they have, and what the ids of those columns has been moved into Raft.

This means that list series queries are now very fast since they operate on an in memory data structure. It also means that writes no longer have to do gets agains the storage engine to resolve what a column's id is.

Fixes #358. List series now returns as a single series with a `name` column.

This fix has the potential to slow things down in scenarios that people suddenly write many thousands of new series in less than a second. That's because the definition of new series now needs to go through the raft leader and get consensus. I think it's unlikely that it'll be a problem, but we'll need to do extensive testing.

Finally, there is the issue that the Raft snapshots can potentially become big. We should do testing with setups that have millions of series to ensure that performance and startup time remain good.
  • Loading branch information
pauldix committed Jul 10, 2014
1 parent 8a10160 commit 9c12fb7
Show file tree
Hide file tree
Showing 22 changed files with 745 additions and 430 deletions.
4 changes: 4 additions & 0 deletions Makefile.in
Expand Up @@ -134,6 +134,10 @@ endif
ifneq ($(GO_BUILD_TAGS),)
GO_BUILD_OPTIONS += -tags '$(GO_BUILD_TAGS)'
endif
race = off
ifneq ($(race),off)
GO_BUILD_OPTIONS += -race
endif

ifneq ($(uname_S),Linux)
PYTHONPATH ?= /usr/local/lib/python2.7/site-packages/
Expand Down
2 changes: 1 addition & 1 deletion src/api/http/api_test.go
Expand Up @@ -189,7 +189,7 @@ func (self *ApiSuite) SetUpSuite(c *C) {
dir,
self.coordinator,
self.manager,
cluster.NewClusterConfiguration(&configuration.Configuration{}, nil, nil, nil),
cluster.NewClusterConfiguration(&configuration.Configuration{}, nil, nil, nil, nil),
nil)
var err error
self.listener, err = net.Listen("tcp4", ":8081")
Expand Down
34 changes: 33 additions & 1 deletion src/cluster/cluster_configuration.go
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"math"
"math/rand"
"metastore"
"parser"
"protocol"
"sort"
Expand Down Expand Up @@ -85,6 +86,7 @@ type ClusterConfiguration struct {
shardsByIdLock sync.RWMutex
LocalRaftName string
writeBuffers []*WriteBuffer
MetaStore *metastore.Store
}

type ContinuousQuery struct {
Expand All @@ -100,7 +102,8 @@ func NewClusterConfiguration(
config *configuration.Configuration,
wal WAL,
shardStore LocalShardStore,
connectionCreator func(string) ServerConnection) *ClusterConfiguration {
connectionCreator func(string) ServerConnection,
metaStore *metastore.Store) *ClusterConfiguration {
return &ClusterConfiguration{
DatabaseReplicationFactors: make(map[string]struct{}),
clusterAdmins: make(map[string]*ClusterAdmin),
Expand All @@ -117,6 +120,7 @@ func NewClusterConfiguration(
shortTermShards: make([]*ShardData, 0),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
shardsById: make(map[uint32]*ShardData, 0),
MetaStore: metaStore,
}
}

Expand Down Expand Up @@ -329,6 +333,17 @@ func (self *ClusterConfiguration) DropDatabase(name string) error {
defer self.usersLock.Unlock()

delete(self.dbUsers, name)

fields, err := self.MetaStore.DropDatabase(name)
if err != nil {
return err
}
go func() {
shards := self.GetAllShards()
for _, s := range shards {
s.DropFields(fields)
}
}()
return nil
}

Expand Down Expand Up @@ -514,6 +529,7 @@ type SavedConfiguration struct {
ShortTermShards []*NewShardData
LongTermShards []*NewShardData
ContinuousQueries map[string][]*ContinuousQuery
MetaStore *metastore.Store
LastShardIdUsed uint32
}

Expand All @@ -528,6 +544,7 @@ func (self *ClusterConfiguration) Save() ([]byte, error) {
ShortTermShards: self.convertShardsToNewShardData(self.shortTermShards),
LongTermShards: self.convertShardsToNewShardData(self.longTermShards),
LastShardIdUsed: self.lastShardIdUsed,
MetaStore: self.MetaStore,
}

for k := range self.DatabaseReplicationFactors {
Expand Down Expand Up @@ -590,6 +607,7 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
self.clusterAdmins = data.Admins
self.dbUsers = data.DbUsers
self.servers = data.Servers
self.MetaStore.UpdateFromSnapshot(data.MetaStore)

for _, server := range self.servers {
log.Info("Checking whether %s is the local server %s", server.RaftName, self.LocalRaftName)
Expand Down Expand Up @@ -1049,6 +1067,20 @@ func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32)
return nil
}

func (self *ClusterConfiguration) DropSeries(database, series string) error {
fields, err := self.MetaStore.DropSeries(database, series)
if err != nil {
return err
}
go func() {
shards := self.GetAllShards()
for _, s := range shards {
s.DropFields(fields)
}
}()
return nil
}

func (self *ClusterConfiguration) RecoverFromWAL() error {
writeBuffer := NewWriteBuffer("local", self.shardStore, self.wal, self.LocalServer.Id, self.config.LocalStoreWriteBufferSize)
self.writeBuffers = append(self.writeBuffers, writeBuffer)
Expand Down
52 changes: 21 additions & 31 deletions src/cluster/shard.go
@@ -1,14 +1,16 @@
package cluster

import (
"common"
"engine"
"fmt"
"parser"
p "protocol"
"sort"
"strings"
"time"

"common"
"engine"
"metastore"
"parser"
p "protocol"
"wal"

log "code.google.com/p/log4go"
Expand Down Expand Up @@ -111,7 +113,7 @@ var (
type LocalShardDb interface {
Write(database string, series []*p.Series) error
Query(*parser.QuerySpec, QueryProcessor) error
DropDatabase(database string) error
DropFields(fields []*metastore.Field) error
IsClosed() bool
}

Expand Down Expand Up @@ -179,6 +181,17 @@ func (self *ShardData) ServerIds() []uint32 {
return self.serverIds
}

func (self *ShardData) DropFields(fields []*metastore.Field) error {
if !self.IsLocal {
return nil
}
shard, err := self.store.GetOrCreateShard(self.id)
if err != nil {
return err
}
return shard.DropFields(fields)
}

func (self *ShardData) SyncWrite(request *p.Request) error {
request.ShardId = &self.id
for _, server := range self.clusterServers {
Expand Down Expand Up @@ -283,10 +296,12 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
}
defer self.store.ReturnShard(self.id)
err = shard.Query(querySpec, processor)
processor.Close()
// if we call Close() in case of an error it will mask the error
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
return
}
processor.Close()
response <- &p.Response{Type: &endStreamResponse}
return
}
Expand Down Expand Up @@ -320,31 +335,6 @@ func (self *ShardData) randomHealthyServer() *ClusterServer {
return nil
}

func (self *ShardData) DropDatabase(database string, sendToServers bool) {
if self.IsLocal {
if shard, err := self.store.GetOrCreateShard(self.id); err == nil {
defer self.store.ReturnShard(self.id)
shard.DropDatabase(database)
}
}

if !sendToServers {
return
}

responses := make([]chan *p.Response, len(self.clusterServers), len(self.clusterServers))
for i, server := range self.clusterServers {
responseChan := make(chan *p.Response, 1)
responses[i] = responseChan
request := &p.Request{Type: &dropDatabaseRequest, Database: &database, ShardId: &self.id}
go server.MakeRequest(request, responseChan)
}
for _, responseChan := range responses {
// TODO: handle error responses
<-responseChan
}
}

func (self *ShardData) String() string {
serversString := make([]string, 0)
for _, s := range self.servers {
Expand Down
54 changes: 54 additions & 0 deletions src/coordinator/command.go
Expand Up @@ -9,6 +9,8 @@ import (

log "code.google.com/p/log4go"
"github.com/goraft/raft"

"protocol"
)

var internalRaftCommands map[string]raft.Command
Expand All @@ -30,6 +32,8 @@ func init() {
&SetContinuousQueryTimestampCommand{},
&CreateShardsCommand{},
&DropShardCommand{},
&CreateSeriesFieldIdsCommand{},
&DropSeriesCommand{},
} {
internalRaftCommands[command.CommandName()] = command
}
Expand Down Expand Up @@ -378,3 +382,53 @@ func (c *DropShardCommand) Apply(server raft.Server) (interface{}, error) {
err := config.DropShard(c.ShardId, c.ServerIds)
return nil, err
}

type CreateSeriesFieldIdsCommand struct {
Database string
Series []*protocol.Series
}

func NewCreateSeriesFieldIdsCommand(database string, series []*protocol.Series) *CreateSeriesFieldIdsCommand {
return &CreateSeriesFieldIdsCommand{Database: database, Series: series}
}

func (c *CreateSeriesFieldIdsCommand) CommandName() string {
return "create_series_field_ids"
}

// TODO: Encode/Decode are not needed once this pr
// https://github.com/goraft/raft/pull/221 is merged in and our goraft
// is updated to a commit that includes the pr

func (c *CreateSeriesFieldIdsCommand) Encode(w io.Writer) error {
return json.NewEncoder(w).Encode(c)
}

func (c *CreateSeriesFieldIdsCommand) Decode(r io.Reader) error {
return json.NewDecoder(r).Decode(c)
}

func (c *CreateSeriesFieldIdsCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*cluster.ClusterConfiguration)
err := config.MetaStore.GetOrSetFieldIds(c.Database, c.Series)
return c.Series, err
}

type DropSeriesCommand struct {
Database string
Series string
}

func NewDropSeriesCommand(database, series string) *DropSeriesCommand {
return &DropSeriesCommand{Database: database, Series: series}
}

func (c *DropSeriesCommand) CommandName() string {
return "drop_series"
}

func (c *DropSeriesCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*cluster.ClusterConfiguration)
err := config.DropSeries(c.Database, c.Series)
return nil, err
}

0 comments on commit 9c12fb7

Please sign in to comment.