Skip to content

Commit

Permalink
libs: Call Flush() before rename #2428 (#2439)
Browse files Browse the repository at this point in the history
* fix Group.RotateFile need call Flush() before rename. #2428
* fix some review issue. #2428
 refactor Group's config: replace  setting member with initial option
* fix a handwriting mistake
* fix a time window error between rename and write.
* fix a syntax mistake.
* change option name Get_ to With_
* fix review issue
* fix review issue
  • Loading branch information
goolAdapter authored and xla committed Sep 25, 2018
1 parent 587116d commit 110b07f
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ IMPROVEMENTS:
- [p2p] [\#2169](https://github.com/cosmos/cosmos-sdk/issues/2169) add additional metrics

BUG FIXES:
- [autofile] \#2428 Group.RotateFile need call Flush() before rename (@goolAdapter)
- [node] \#2434 Make node respond to signal interrupts while sleeping for genesis time
4 changes: 2 additions & 2 deletions consensus/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ type baseWAL struct {
enc *WALEncoder
}

func NewWAL(walFile string) (*baseWAL, error) {
func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*baseWAL, error) {
err := cmn.EnsureDir(filepath.Dir(walFile), 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to ensure WAL directory is in place")
}

group, err := auto.OpenGroup(walFile)
group, err := auto.OpenGroup(walFile, groupOptions...)
if err != nil {
return nil, err
}
Expand Down
38 changes: 24 additions & 14 deletions consensus/wal_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand All @@ -23,12 +24,11 @@ import (
"github.com/tendermint/tendermint/types"
)

// WALWithNBlocks generates a consensus WAL. It does this by spining up a
// WALGenerateNBlocks generates a consensus WAL. It does this by spining up a
// stripped down version of node (proxy app, event bus, consensus state) with a
// persistent kvstore application and special consensus wal instance
// (byteBufferWAL) and waits until numBlocks are created. Then it returns a WAL
// content. If the node fails to produce given numBlocks, it returns an error.
func WALWithNBlocks(numBlocks int) (data []byte, err error) {
// (byteBufferWAL) and waits until numBlocks are created. If the node fails to produce given numBlocks, it returns an error.
func WALGenerateNBlocks(wr io.Writer, numBlocks int) (err error) {
config := getConfig()

app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "wal_generator"))
Expand All @@ -43,26 +43,26 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
privValidator := privval.LoadOrGenFilePV(privValidatorFile)
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil {
return nil, errors.Wrap(err, "failed to read genesis file")
return errors.Wrap(err, "failed to read genesis file")
}
stateDB := db.NewMemDB()
blockStoreDB := db.NewMemDB()
state, err := sm.MakeGenesisState(genDoc)
if err != nil {
return nil, errors.Wrap(err, "failed to make genesis state")
return errors.Wrap(err, "failed to make genesis state")
}
blockStore := bc.NewBlockStore(blockStoreDB)
proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start proxy app connections")
return errors.Wrap(err, "failed to start proxy app connections")
}
defer proxyApp.Stop()

eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start event bus")
return errors.Wrap(err, "failed to start event bus")
}
defer eventBus.Stop()
mempool := sm.MockMempool{}
Expand All @@ -78,29 +78,39 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
/////////////////////////////////////////////////////////////////////////////

// set consensus wal to buffered WAL, which will write all incoming msgs to buffer
var b bytes.Buffer
wr := bufio.NewWriter(&b)
numBlocksWritten := make(chan struct{})
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103
wal.Write(EndHeightMessage{0})
consensusState.wal = wal

if err := consensusState.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start consensus state")
return errors.Wrap(err, "failed to start consensus state")
}

select {
case <-numBlocksWritten:
consensusState.Stop()
wr.Flush()
return b.Bytes(), nil
return nil
case <-time.After(1 * time.Minute):
consensusState.Stop()
return []byte{}, fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)
return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)
}
}

//WALWithNBlocks returns a WAL content with numBlocks.
func WALWithNBlocks(numBlocks int) (data []byte, err error) {
var b bytes.Buffer
wr := bufio.NewWriter(&b)

if err := WALGenerateNBlocks(wr, numBlocks); err != nil {
return []byte{}, err
}

wr.Flush()
return b.Bytes(), nil
}

// f**ing long, but unique for each test
func makePathname() string {
// get path
Expand Down
50 changes: 50 additions & 0 deletions consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,68 @@ import (
"bytes"
"crypto/rand"
"fmt"
"io/ioutil"
"os"
"path/filepath"

// "sync"
"testing"
"time"

"github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/autofile"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWALTruncate(t *testing.T) {
walDir, err := ioutil.TempDir("", "wal")
if err != nil {
panic(fmt.Errorf("failed to create temp WAL file: %v", err))
}
defer os.RemoveAll(walDir)

walFile := filepath.Join(walDir, "wal")

//this magic number 4K can truncate the content when RotateFile. defaultHeadSizeLimit(10M) is hard to simulate.
//this magic number 1 * time.Millisecond make RotateFile check frequently. defaultGroupCheckDuration(5s) is hard to simulate.
wal, err := NewWAL(walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond))
if err != nil {
t.Fatal(err)
}

wal.Start()
defer wal.Stop()

//60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file.
//at this time, RotateFile is called, truncate content exist in each file.
err = WALGenerateNBlocks(wal.Group(), 60)
if err != nil {
t.Fatal(err)
}

time.Sleep(1 * time.Millisecond) //wait groupCheckDuration, make sure RotateFile run

wal.Group().Flush()

h := int64(50)
gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{})
assert.NoError(t, err, fmt.Sprintf("expected not to err on height %d", h))
assert.True(t, found, fmt.Sprintf("expected to find end height for %d", h))
assert.NotNil(t, gr, "expected group not to be nil")
defer gr.Close()

dec := NewWALDecoder(gr)
msg, err := dec.Decode()
assert.NoError(t, err, "expected to decode a message")
rs, ok := msg.Msg.(tmtypes.EventDataRoundState)
assert.True(t, ok, "expected message of type EventDataRoundState")
assert.Equal(t, rs.Height, h+1, fmt.Sprintf("wrong height"))
}

func TestWALEncoderDecoder(t *testing.T) {
now := tmtime.Now()
msgs := []TimedWALMessage{
Expand Down
5 changes: 2 additions & 3 deletions libs/autofile/cmd/logjack.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ func main() {
}

// Open Group
group, err := auto.OpenGroup(headPath)
group, err := auto.OpenGroup(headPath, auto.GroupHeadSizeLimit(chopSize), auto.GroupTotalSizeLimit(limitSize))
if err != nil {
fmt.Printf("logjack couldn't create output file %v\n", headPath)
os.Exit(1)
}
group.SetHeadSizeLimit(chopSize)
group.SetTotalSizeLimit(limitSize)

err = group.Start()
if err != nil {
fmt.Printf("logjack couldn't start with file %v\n", headPath)
Expand Down
105 changes: 66 additions & 39 deletions libs/autofile/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
)

const (
groupCheckDuration = 5000 * time.Millisecond
defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
maxFilesToRemove = 4 // needs to be greater than 1
defaultGroupCheckDuration = 5000 * time.Millisecond
defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
maxFilesToRemove = 4 // needs to be greater than 1
)

/*
Expand Down Expand Up @@ -56,40 +56,47 @@ assuming that marker lines are written occasionally.
type Group struct {
cmn.BaseService

ID string
Head *AutoFile // The head AutoFile to write to
headBuf *bufio.Writer
Dir string // Directory that contains .Head
ticker *time.Ticker
mtx sync.Mutex
headSizeLimit int64
totalSizeLimit int64
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to
ID string
Head *AutoFile // The head AutoFile to write to
headBuf *bufio.Writer
Dir string // Directory that contains .Head
ticker *time.Ticker
mtx sync.Mutex
headSizeLimit int64
totalSizeLimit int64
groupCheckDuration time.Duration
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to

// TODO: When we start deleting files, we need to start tracking GroupReaders
// and their dependencies.
}

// OpenGroup creates a new Group with head at headPath. It returns an error if
// it fails to open head file.
func OpenGroup(headPath string) (g *Group, err error) {
func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err error) {
dir := path.Dir(headPath)
head, err := OpenAutoFile(headPath)
if err != nil {
return nil, err
}

g = &Group{
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir,
headSizeLimit: defaultHeadSizeLimit,
totalSizeLimit: defaultTotalSizeLimit,
minIndex: 0,
maxIndex: 0,
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir,
headSizeLimit: defaultHeadSizeLimit,
totalSizeLimit: defaultTotalSizeLimit,
groupCheckDuration: defaultGroupCheckDuration,
minIndex: 0,
maxIndex: 0,
}

for _, option := range groupOptions {
option(g)
}

g.BaseService = *cmn.NewBaseService(nil, "Group", g)

gInfo := g.readGroupInfo()
Expand All @@ -98,10 +105,31 @@ func OpenGroup(headPath string) (g *Group, err error) {
return
}

// GroupCheckDuration allows you to overwrite default groupCheckDuration.
func GroupCheckDuration(duration time.Duration) func(*Group) {
return func(g *Group) {
g.groupCheckDuration = duration
}
}

// GroupHeadSizeLimit allows you to overwrite default head size limit - 10MB.
func GroupHeadSizeLimit(limit int64) func(*Group) {
return func(g *Group) {
g.headSizeLimit = limit
}
}

// GroupTotalSizeLimit allows you to overwrite default total size limit of the group - 1GB.
func GroupTotalSizeLimit(limit int64) func(*Group) {
return func(g *Group) {
g.totalSizeLimit = limit
}
}

// OnStart implements Service by starting the goroutine that checks file and
// group limits.
func (g *Group) OnStart() error {
g.ticker = time.NewTicker(groupCheckDuration)
g.ticker = time.NewTicker(g.groupCheckDuration)
go g.processTicks()
return nil
}
Expand All @@ -122,28 +150,13 @@ func (g *Group) Close() {
g.mtx.Unlock()
}

// SetHeadSizeLimit allows you to overwrite default head size limit - 10MB.
func (g *Group) SetHeadSizeLimit(limit int64) {
g.mtx.Lock()
g.headSizeLimit = limit
g.mtx.Unlock()
}

// HeadSizeLimit returns the current head size limit.
func (g *Group) HeadSizeLimit() int64 {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.headSizeLimit
}

// SetTotalSizeLimit allows you to overwrite default total size limit of the
// group - 1GB.
func (g *Group) SetTotalSizeLimit(limit int64) {
g.mtx.Lock()
g.totalSizeLimit = limit
g.mtx.Unlock()
}

// TotalSizeLimit returns total size limit of the group.
func (g *Group) TotalSizeLimit() int64 {
g.mtx.Lock()
Expand Down Expand Up @@ -266,6 +279,14 @@ func (g *Group) RotateFile() {

headPath := g.Head.Path

if err := g.headBuf.Flush(); err != nil {
panic(err) //panic is used for consistent with below
}

if err := g.Head.Sync(); err != nil {
panic(err)
}

if err := g.Head.closeFile(); err != nil {
panic(err)
}
Expand All @@ -275,6 +296,12 @@ func (g *Group) RotateFile() {
panic(err)
}

//make sure head file exist, there is a window time between rename and next write
//when NewReader(maxIndex), lead to "open /tmp/wal058868562/wal: no such file or directory"
if err := g.Head.openFile(); err != nil {
panic(err)
}

g.maxIndex++
}

Expand Down
4 changes: 1 addition & 3 deletions libs/autofile/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ func createTestGroupWithHeadSizeLimit(t *testing.T, headSizeLimit int64) *Group
require.NoError(t, err, "Error creating dir")

headPath := testDir + "/myfile"
g, err := OpenGroup(headPath)
g, err := OpenGroup(headPath, GroupHeadSizeLimit(headSizeLimit))
require.NoError(t, err, "Error opening Group")
require.NotEqual(t, nil, g, "Failed to create Group")

g.SetHeadSizeLimit(headSizeLimit)

return g
}

Expand Down

0 comments on commit 110b07f

Please sign in to comment.