Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graceful shutdown demo #109

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ type BaseSink struct {

// Compaction represents a configuration for compaction sinks
type Compaction struct {
Sinks []Sink `yaml:"sinks"`
NameFunc string `json:"nameFunc" yaml:"nameFunc" env:"NAMEFUNC"` // The lua script to compute file name given a row
Interval int `json:"interval" yaml:"interval" env:"INTERVAL"` // The compaction interval, in seconds
Sinks []Sink `yaml:"sinks"`
NameFunc string `json:"nameFunc" yaml:"nameFunc" env:"NAMEFUNC"` // The lua script to compute file name given a row
Interval int `json:"interval" yaml:"interval" env:"INTERVAL"` // The compaction interval, in seconds
CompactAllOnQuit bool `json:"compactAllOnQuit" yaml:"compactAllOnQuit" env:"COMPACTALLONQUIT"` // Read, compact and delete all data in DB or not
}

// Streams are lists of sinks to be streamed to
Expand Down
6 changes: 0 additions & 6 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ func (s *Server) Close() {
s.s3sqs.Close()
}

// Close all the open tables
for _, t := range s.tables {
if err := t.Close(); err != nil {
s.monitor.Error(err)
}
}
}

// ------------------------------------------------------------------------------------------------------------
Expand Down
12 changes: 6 additions & 6 deletions internal/storage/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,26 @@ type Storage struct {
}

// New creates a new storage implementation.
func New(buffer storage.Storage, dest BlockWriter, monitor monitor.Monitor, interval time.Duration) *Storage {
func New(ctx context.Context, buffer storage.Storage, dest BlockWriter, monitor monitor.Monitor, interval time.Duration) *Storage {
s := &Storage{
monitor: monitor,
buffer: buffer,
dest: dest,
}
s.compact = compactEvery(interval, s.Compact)
s.compact = compactEvery(ctx, interval, s.Compact)
return s
}

// compactEvery returns the task that compacts on a regular interval.
func compactEvery(interval time.Duration, compact async.Work) async.Task {
return async.Invoke(context.Background(), func(ctx context.Context) (interface{}, error) {
func compactEvery(ctx context.Context, interval time.Duration, compact async.Work) async.Task {
return async.Invoke(ctx, func(ictx context.Context) (interface{}, error) {
for {
select {
case <-ctx.Done():
case <-ictx.Done():
return nil, nil
default:
time.Sleep(interval)
compact(ctx)
compact(ictx)
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestRange(t *testing.T) {
return nil
}

store := New(buffer, dest, monitor.NewNoop(), 100*time.Millisecond)
store := New(context.Background(), dest, monitor.NewNoop(), 100*time.Millisecond)

// Insert out of order
_ = store.Append(key.New("A", time.Unix(0, 0)), input, 60*time.Second)
Expand Down
7 changes: 7 additions & 0 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package storage

import (
"context"
"io"
"time"

Expand All @@ -18,6 +19,7 @@ type Storage interface {
io.Closer
Iterator
Appender
Compacter
Delete(...key.Key) error
}

Expand All @@ -31,6 +33,11 @@ type Appender interface {
Append(key key.Key, value []byte, ttl time.Duration) error
}

// Compact...
type Compacter interface {
Compact(ctx context.Context) (interface{}, error)
}

// Merger represents a contract that merges two or more blocks together.
type Merger interface {
Merge([]block.Block, typeof.Schema) ([]byte, []byte)
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func ForStreaming(config config.Streams, monitor monitor.Monitor) (storage.Strea
}

// ForCompaction creates a compaction writer
func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store storage.Storage) (*compact.Storage, error) {
func ForCompaction(ctx context.Context, config *config.Compaction, monitor monitor.Monitor, store storage.Storage) (*compact.Storage, error) {
writer, err := newWriter(config.Sinks, monitor)
if err != nil {
return nil, err
Expand Down Expand Up @@ -76,7 +76,7 @@ func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store sto
return nil, err
}

return compact.New(store, flusher, monitor, interval), nil
return compact.New(ctx, store, flusher, monitor, interval), nil
}

// NewWriter creates a new writer from the configuration.
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/writer/writer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package writer

import (
"context"
"testing"

"github.com/kelindar/talaria/internal/config"
Expand All @@ -22,7 +23,7 @@ func TestForCompaction(t *testing.T) {
},
}

compact, err := ForCompaction(cfg,
compact, err := ForCompaction(context.Background(), cfg,
monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"),
disk.New(monitor.NewNoop()),
)
Expand Down
5 changes: 5 additions & 0 deletions internal/table/timeseries/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func New(name string, cluster Membership, monitor monitor.Monitor, store storage
return t
}

// Compact...
func (t *Table) Close() error {
return t.store.Compact()
}

// Close implements io.Closer interface.
func (t *Table) Close() error {
return t.store.Close()
Expand Down
33 changes: 23 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,30 @@ func main() {
// Open every table configured
tables := []table.Table{nodes.New(gossip), logTable}
for name, tableConf := range conf.Tables {
tables = append(tables, openTable(name, conf.Storage, tableConf, gossip, monitor))
tables = append(tables, openTable(ctx, name, conf.Storage, tableConf, gossip, monitor))
}

// Start the new server
server := server.New(configure, monitor, tables...)
serverIns := server.New(configure, monitor, tables...)

// onSignal will be called when a OS-level signal is received.
// gracefulShutdown will be called when a OS-level signal is received.
onSignal(func(_ os.Signal) {
cancel() // Cancel the context
gossip.Close() // Close the gossip layer
server.Close() // Close the server and database
cancel() // Cancel the context, which will cancel the perioddly compaction also
gossip.Close() // Close the gossip layer
serverIns.Close() // Close the server and database, which will wait RPC/SQS request on ingestion to DB finished
for _, t := range tables {
// Close all the open tables
if err := t.Close(); err != nil {
monitor.Error(err)
}
compactConf := conf.Tables[t.Name()].Compact
if compactConf != nil && compactConf.CompactAllOnQuit {

}
}
// for _, tableConf := range conf.Tables {
// }
//db.Consume()
})

// Join the cluster
Expand All @@ -90,20 +103,20 @@ func main() {
// Start listenHandler
monitor.Info("server: starting...")
monitor.Count1(logTag, "start")
if err := server.Listen(ctx, conf.Readers.Presto.Port, conf.Writers.GRPC.Port); err != nil {
if err := serverIns.Listen(ctx, conf.Readers.Presto.Port, conf.Writers.GRPC.Port); err != nil {
panic(err)
}
}

// openTable creates a new table with storage & optional compaction fully configured
func openTable(name string, storageConf config.Storage, tableConf config.Table, cluster cluster.Membership, monitor monitor.Monitor) table.Table {
func openTable(ctx context.Context, name string, storageConf config.Storage, tableConf config.Table, cluster cluster.Membership, monitor monitor.Monitor) table.Table {
monitor.Info("server: opening table %s...", name)

// Create a new storage layer and optional compaction
store := storage.Storage(disk.Open(storageConf.Directory, name, monitor, storageConf.Badger))
if tableConf.Compact != nil {
var err error
store, err = writer.ForCompaction(tableConf.Compact, monitor, store)
store, err = writer.ForCompaction(ctx, tableConf.Compact, monitor, store)
if err != nil {
panic(err)
}
Expand All @@ -121,7 +134,7 @@ func openTable(name string, storageConf config.Storage, tableConf config.Table,
// onSignal hooks a callback for a signal.
func onSignal(callback func(sig os.Signal)) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) // SIGQUIT is reserved as we want it exit with a stack dump by pressing ^\
go func() {
for sig := range c {
callback(sig)
Expand Down