Skip to content

Commit

Permalink
Merge pull request #367 from liftbridge-io/fix_stream_delete
Browse files Browse the repository at this point in the history
Fix bug related to stream deletion
  • Loading branch information
tylertreat committed Nov 8, 2021
2 parents e5a67df + 53c8c19 commit 44bde68
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 60 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/liftbridge-io/raft-boltdb v0.0.0-20200414234651-aaf6e08d8f73
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mitchellh/mapstructure v1.3.2 // indirect
github.com/natefinch/atomic v0.0.0-20200526193002-18c0533a5b09
github.com/natefinch/atomic v1.0.1
github.com/nats-io/nats-server/v2 v2.1.9
github.com/nats-io/nats.go v1.10.0
github.com/nats-io/nkeys v0.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/natefinch/atomic v0.0.0-20200526193002-18c0533a5b09 h1:DXR0VtCesBD2ss3toN9OEeXszpQmW9dc3SvUbUfiBC0=
github.com/natefinch/atomic v0.0.0-20200526193002-18c0533a5b09/go.mod h1:1rLVY/DWf3U6vSZgH16S7pymfrhK2lcUlXjgGglw/lY=
github.com/natefinch/atomic v1.0.1 h1:ZPYKxkqQOx3KZ+RsbnP/YsgvxWQPGxjC0oBt2AhwV0A=
github.com/natefinch/atomic v1.0.1/go.mod h1:N/D/ELrljoqDyT3rZrsUmtsuzvHkeB/wWjHV22AZRbM=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM=
Expand Down
36 changes: 25 additions & 11 deletions server/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ func (s *Server) Apply(l *raft.Log) interface{} {
// to recover the last committed Raft FSM log entry, if any, to determine
// the recovery high watermark. Once we apply all entries up to that point,
// we know we've completed the recovery process and subsequent entries are
// newly committed operations. During the recovery process, any recovered
// streams will not be started until recovery is finished to avoid starting
// streams in an intermediate state. When recovery completes, we'll call
// finishedRecovery() to start the recovered streams.
// newly committed operations.
//
// During the recovery process, any recovered streams will not be started
// until recovery is finished to avoid starting streams in an intermediate
// state. Streams that are deleted will only be marked for deletion to
// avoid deleting potentially valid stream data, e.g. in the case of a
// stream being deleted, recreated, and then published to. When recovery
// completes, we'll call finishedRecovery() to start the recovered streams
// and delete any tombstoned streams.
if !s.recoveryStarted {
lastCommittedLog, err := s.recoverLatestCommittedFSMLog(l.Index)
// If this returns an error, something is very wrong.
Expand All @@ -83,7 +88,7 @@ func (s *Server) Apply(l *raft.Log) interface{} {
if l.Index == s.latestRecoveredLog.Index {
// We've applied all entries up to the latest recovered log, so
// recovery is finished. Call finishedRecovery() to start any
// recovered streams.
// recovered streams and delete tombstoned streams.
defer func() {
count, err := s.finishedRecovery()
if err != nil {
Expand Down Expand Up @@ -170,7 +175,7 @@ func (s *Server) apply(log *proto.RaftLog, index uint64, recovered bool) (interf
var (
stream = log.DeleteStreamOp.Stream
)
if err := s.applyDeleteStream(stream); err != nil {
if err := s.applyDeleteStream(stream, recovered); err != nil {
return nil, err
}
case proto.Op_PAUSE_STREAM:
Expand Down Expand Up @@ -221,15 +226,21 @@ func (s *Server) startedRecovery() {

// finishedRecovery should be called when the FSM has finished replaying any
// unapplied log entries. This will start any stream partitions recovered
// during the replay. It returns the number of streams which had partitions
// that were recovered.
// during the replay and delete any tombstoned streams. It returns the number
// of streams which had partitions that were recovered.
func (s *Server) finishedRecovery() (int, error) {
// If LogRecovery is disabled, we need to restore the previous log output.
if !s.config.LogRecovery {
s.logger.SetWriter(s.loggerOut)
}
recoveredStreams := make(map[string]struct{})
for _, stream := range s.metadata.GetStreams() {
if stream.IsTombstoned() {
if err := s.metadata.RemoveTombstonedStream(stream); err != nil {
return 0, errors.Wrap(err, "failed to delete tombstoned stream")
}
continue
}
for _, partition := range stream.GetPartitions() {
recovered, err := partition.StartRecovered()
if err != nil {
Expand Down Expand Up @@ -411,14 +422,17 @@ func (s *Server) applyChangePartitionLeader(stream, leader string, partitionID i
return nil
}

// applyDeleteStream deletes the given stream partition.
func (s *Server) applyDeleteStream(streamName string) error {
// applyDeleteStream deletes the given stream partition. If this operation is
// being applied during recovery, this will only mark the stream with a
// tombstone. Tombstoned streams will be deleted after the recovery process
// completes.
func (s *Server) applyDeleteStream(streamName string, recovered bool) error {
stream := s.metadata.GetStream(streamName)
if stream == nil {
return ErrStreamNotFound
}

err := s.metadata.CloseAndDeleteStream(stream)
err := s.metadata.RemoveStream(stream, recovered)
if err != nil {
return errors.Wrap(err, "failed to delete stream")
}
Expand Down
74 changes: 74 additions & 0 deletions server/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package server

import (
"context"
"os"
"path/filepath"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -45,3 +48,74 @@ func TestFSMSnapshotRestore(t *testing.T) {
waitForPartition(t, 10*time.Second, "bar", 2, s1)
require.Len(t, s1.metadata.GetStreams(), 2)
}

// Ensure streams that get deleted are deleted on restart but streams that get
// deleted and then recreated do not get deleted on restart.
func TestTombstoneStreamsOnRestart(t *testing.T) {
defer cleanupStorage(t)

// Configure the server as a seed.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
defer s1.Stop()

// Wait to elect self as leader.
getMetadataLeader(t, 10*time.Second, s1)

client, err := lift.Connect([]string{"localhost:5050"})
require.NoError(t, err)
defer client.Close()

// Create some streams.
require.NoError(t, client.CreateStream(context.Background(), "foo", "foo"))
require.NoError(t, client.CreateStream(context.Background(), "bar", "bar"))

// Delete stream foo.
require.NoError(t, client.DeleteStream(context.Background(), "foo"))

// Delete and then recreate stream bar.
require.NoError(t, client.DeleteStream(context.Background(), "bar"))
require.NoError(t, client.CreateStream(context.Background(), "bar", "bar"))

// Publish some messages to bar.
for i := 0; i < 2; i++ {
_, err = client.Publish(context.Background(), "bar", []byte(strconv.Itoa(i)))
require.NoError(t, err)
}

// Restart the server.
s1.Stop()
s1.config.Port = 5051
s1 = runServerWithConfig(t, s1.config)
defer s1.Stop()

// Wait to elect self as leader.
getMetadataLeader(t, 10*time.Second, s1)
waitForPartition(t, 5*time.Second, "bar", 0, s1)

client, err = lift.Connect([]string{"localhost:5051"})
require.NoError(t, err)
defer client.Close()

// Ensure bar stream exists and has the expected messages.
ch := make(chan *lift.Message)
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, "bar", func(msg *lift.Message, err error) {
require.NoError(t, err)
ch <- msg
}, lift.StartAtEarliestReceived())
require.NoError(t, err)
for i := 0; i < 2; i++ {
select {
case msg := <-ch:
require.Equal(t, []byte(strconv.Itoa(i)), msg.Value())
case <-time.After(5 * time.Second):
t.Fatal("Did not receive expected message")
}
}
cancel()

// Ensure foo stream doesn't exist.
_, err = os.Stat(filepath.Join(s1Config.DataDir, "streams", "foo"))
require.True(t, os.IsNotExist(err))
}
103 changes: 78 additions & 25 deletions server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,26 @@ func (m *metadataAPI) AddStream(protoStream *proto.Stream, recovered bool) (*str
m.mu.Lock()
defer m.mu.Unlock()

_, ok := m.streams[protoStream.Name]
existing, ok := m.streams[protoStream.Name]
if ok {
return nil, ErrStreamExists
if !recovered {
return nil, ErrStreamExists
}
// If this operation is being applied during recovery, check if this
// stream is tombstoned, i.e. was marked for deletion previously. If it
// is, un-tombstone it by closing the existing stream and then
// recreating it, leaving the existing data intact.
if !existing.IsTombstoned() {
// This is an invalid state because it means the stream already
// exists.
return nil, ErrStreamExists
}
// Un-tombstone by closing the existing stream and removing it from
// the streams store so that it can be recreated.
if err := existing.Close(); err != nil {
return nil, err
}
m.removeStream(existing)
}

config := protoStream.GetConfig()
Expand All @@ -746,7 +763,7 @@ func (m *metadataAPI) AddStream(protoStream *proto.Stream, recovered bool) (*str

for _, partition := range protoStream.Partitions {
if err := m.addPartition(stream, partition, recovered, config); err != nil {
delete(m.streams, protoStream.Name)
m.removeStream(stream)
return nil, err
}
}
Expand Down Expand Up @@ -1000,31 +1017,25 @@ func (m *metadataAPI) Reset() error {
return nil
}

// CloseStream close a streams and clears corresponding state in the metadata
// store.
func (m *metadataAPI) CloseAndDeleteStream(stream *stream) error {
// RemoveStream closes the stream, removes it from the metadata store, and
// deletes the associated on-disk data for it. However, if this operation is
// being applied during Raft recovery, this will only mark the stream with a
// tombstone. Tombstoned streams will be deleted after the recovery process
// completes.
func (m *metadataAPI) RemoveStream(stream *stream, recovered bool) error {
m.mu.Lock()
defer m.mu.Unlock()

err := stream.Delete()
if err != nil {
return errors.Wrap(err, "failed to delete stream")
}

// Remove the stream data directory
streamDataDir := filepath.Join(m.Server.config.DataDir, "streams", stream.GetName())
err = os.RemoveAll(streamDataDir)
if err != nil {
return errors.Wrap(err, "failed to delete stream data directory")
}

delete(m.streams, stream.GetName())

for _, partition := range stream.GetPartitions() {
report, ok := m.leaderReports[partition]
if ok {
report.cancel()
delete(m.leaderReports, partition)
// If this operation is being applied during recovery, only tombstone the
// stream. We don't want to delete streams until recovery finishes to avoid
// deleting potentially valid data, e.g. in the case of a stream being
// deleted, recreated, and then published to. In this scenario, the
// recreate will un-tombstone the stream.
if recovered {
stream.Tombstone()
} else {
if err := m.deleteStream(stream); err != nil {
return err
}
}

Expand All @@ -1043,6 +1054,17 @@ func (m *metadataAPI) CloseAndDeleteStream(stream *stream) error {
return nil
}

// RemoveTombstonedStream closes the tombstoned stream, removes it from the
// metadata store, and deletes the associated on-disk data for it.
func (m *metadataAPI) RemoveTombstonedStream(stream *stream) error {
if !stream.IsTombstoned() {
return fmt.Errorf("cannot delete stream %s because it is not tombstoned", stream)
}
m.mu.Lock()
defer m.mu.Unlock()
return m.deleteStream(stream)
}

// LostLeadership should be called when the server loses metadata leadership.
func (m *metadataAPI) LostLeadership() {
m.mu.Lock()
Expand All @@ -1053,6 +1075,37 @@ func (m *metadataAPI) LostLeadership() {
m.leaderReports = make(map[*partition]*leaderReport)
}

// deleteStream deletes the stream and the associated on-disk data for it.
func (m *metadataAPI) deleteStream(stream *stream) error {
err := stream.Delete()
if err != nil {
return errors.Wrap(err, "failed to delete stream")
}

// Remove the stream data directory
streamDataDir := filepath.Join(m.Server.config.DataDir, "streams", stream.GetName())
err = os.RemoveAll(streamDataDir)
if err != nil {
return errors.Wrap(err, "failed to delete stream data directory")
}

m.removeStream(stream)
return nil
}

// removeStream removes the stream from the stream store and removes any
// leaderReports for its partitions.
func (m *metadataAPI) removeStream(stream *stream) {
delete(m.streams, stream.GetName())
for _, partition := range stream.GetPartitions() {
report, ok := m.leaderReports[partition]
if ok {
report.cancel()
delete(m.leaderReports, partition)
}
}
}

func (m *metadataAPI) getStreams() []*stream {
streams := make([]*stream, 0, len(m.streams))
for _, stream := range m.streams {
Expand Down
12 changes: 6 additions & 6 deletions server/signal.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
// +build !windows

package server

import (
"os"
"os/signal"
"syscall"
)

// handleSignals sets up a handler for SIGINT to do a graceful shutdown.
func (s *Server) handleSignals() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
signal.Notify(c, os.Interrupt)
go func() {
for sig := range c {
switch sig {
case syscall.SIGINT:
s.Stop()
case os.Interrupt:
if err := s.Stop(); err != nil {
s.logger.Errorf("Error occurred shutting down server while handling interrupt: %v", err)
os.Exit(1)
}
os.Exit(0)
}
}
Expand Down
17 changes: 0 additions & 17 deletions server/signal_windows.go

This file was deleted.

0 comments on commit 44bde68

Please sign in to comment.