Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in txsub.System.Tick() and tx_bad_seq errors #815

Merged
merged 4 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/horizon/internal/actions_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (action *RootAction) JSON() {
ledger.CurrentState(),
action.App.horizonVersion,
action.App.coreVersion,
action.App.networkPassphrase,
action.App.config.NetworkPassphrase,
action.App.protocolVersion,
action.App.config.FriendbotURL,
)
Expand Down
3 changes: 2 additions & 1 deletion services/horizon/internal/actions_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"encoding/json"
"testing"

"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestRootAction(t *testing.T) {
Expand All @@ -23,6 +23,7 @@ func TestRootAction(t *testing.T) {

ht.App.horizonVersion = "test-horizon"
ht.App.config.StellarCoreURL = server.URL
ht.App.config.NetworkPassphrase = "test"
ht.App.UpdateStellarCoreInfo()

w := ht.Get("/")
Expand Down
46 changes: 27 additions & 19 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"database/sql"
"fmt"
"net/http"
"os"
"runtime"
"sync"
"time"

"github.com/gomodule/redigo/redis"
"github.com/rcrowley/go-metrics"
"github.com/stellar/go/build"
"github.com/stellar/go/clients/stellarcore"
horizonContext "github.com/stellar/go/services/horizon/internal/context"
"github.com/stellar/go/services/horizon/internal/db2/core"
Expand All @@ -32,22 +32,21 @@ import (

// App represents the root of the state of a horizon instance.
type App struct {
config Config
web *Web
historyQ *history.Q
coreQ *core.Q
ctx context.Context
cancel func()
redis *redis.Pool
coreVersion string
horizonVersion string
networkPassphrase string
protocolVersion int32
submitter *txsub.System
paths paths.Finder
ingester *ingest.System
reaper *reap.System
ticks *time.Ticker
config Config
web *Web
historyQ *history.Q
coreQ *core.Q
ctx context.Context
cancel func()
redis *redis.Pool
coreVersion string
horizonVersion string
protocolVersion int32
submitter *txsub.System
paths paths.Finder
ingester *ingest.System
reaper *reap.System
ticks *time.Ticker

// metrics
metrics metrics.Registry
Expand All @@ -64,7 +63,6 @@ func NewApp(config Config) (*App, error) {

result := &App{config: config}
result.horizonVersion = app.Version()
result.networkPassphrase = build.TestNetwork.Passphrase
result.ticks = time.NewTicker(1 * time.Second)
result.init()
return result, nil
Expand Down Expand Up @@ -270,8 +268,18 @@ func (a *App) UpdateStellarCoreInfo() {
return
}

// Check if NetworkPassphrase is different, if so exit Horizon as it can break the
// state of the application.
if resp.Info.Network != a.config.NetworkPassphrase {
log.Errorf(
"Network passphrase of stellar-core (%s) does not match Horizon configuration (%s). Exiting...",
resp.Info.Network,
a.config.NetworkPassphrase,
)
os.Exit(1)
}

a.coreVersion = resp.Info.Build
a.networkPassphrase = resp.Info.Network
a.protocolVersion = int32(resp.Info.ProtocolVersion)
}

Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
RateLimit *throttled.RateQuota
RateLimitRedisKey string
RedisURL string
NetworkPassphrase string
FriendbotURL *url.URL
LogLevel logrus.Level
LogFile string
Expand Down
2 changes: 2 additions & 0 deletions services/horizon/internal/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"time"

"github.com/stellar/go/network"
"github.com/stellar/go/services/horizon/internal/test"
supportLog "github.com/stellar/go/support/log"
"github.com/throttled/throttled"
Expand All @@ -32,6 +33,7 @@ func NewTestConfig() Config {
},
ConnectionTimeout: 55 * time.Second, // Default
LogLevel: supportLog.InfoLevel,
NetworkPassphrase: network.TestNetworkPassphrase,
}
}

Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/init_ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ func initIngester(app *App) {
return
}

if app.networkPassphrase == "" {
if app.config.NetworkPassphrase == "" {
log.Fatal("Cannot start ingestion without network passphrase. Please confirm connectivity with stellar-core.")
}

app.ingester = ingest.New(
app.networkPassphrase,
app.config.NetworkPassphrase,
app.config.StellarCoreURL,
app.CoreSession(nil),
app.HorizonSession(nil),
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/init_txsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func initSubmissionSystem(app *App) {
History: &history.Q{Session: app.HorizonSession(nil)},
},
Sequences: cq.SequenceProvider(),
NetworkPassphrase: app.networkPassphrase,
NetworkPassphrase: app.config.NetworkPassphrase,
}
}

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/txsub/open_submission_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type openSubmission struct {

type submissionList struct {
sync.Mutex
submissions map[string]*openSubmission
submissions map[string]*openSubmission // hash => `*openSubmission`
log *log.Entry
}

Expand Down
52 changes: 39 additions & 13 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
// Its methods tie together the various pieces used to reliably submit transactions
// to a stellar-core instance.
type System struct {
initializer sync.Once
tickInProgress bool
initializer sync.Once

tickMutex sync.Mutex
tickInProgress bool

Pending OpenSubmissionList
Results ResultProvider
Expand Down Expand Up @@ -73,12 +74,20 @@ func (sys *System) Submit(ctx context.Context, env string) (result <-chan Result
// check the configured result provider for an existing result
r := sys.Results.ResultByHash(ctx, info.Hash)

if r.Err != ErrNoResults {
if r.Err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing wrong with the logic here but what is the reason that we have an Err field in r as oppose to have ResultByHash return r and err directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also used as an object returned by channel.

sys.Log.Ctx(ctx).WithField("hash", info.Hash).Info("Found submission result in a DB")
sys.finish(ctx, response, r)
return
}

if r.Err != ErrNoResults {
sys.Log.Ctx(ctx).WithField("hash", info.Hash).Info("Error getting submission result from a DB")
sys.finish(ctx, response, r)
return
}

// From now: r.Err == ErrNoResults

curSeq, err := sys.Sequences.Get([]string{info.SourceAddress})
if err != nil {
sys.finish(ctx, response, Result{Err: err, EnvelopeXDR: env})
Expand Down Expand Up @@ -170,25 +179,40 @@ func (sys *System) submitOnce(ctx context.Context, env string) SubmissionResult
return sr
}

// setTickInProgress sets `tickInProgress` to `true` if it's not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we set tickInProgress to true only if it is false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! #824

// `false`. Returns `true` if `tickInProgress` has been switched
// to `true` inside this method and `Tick()` should continue.
func (sys *System) setTickInProgress(ctx context.Context) bool {
sys.tickMutex.Lock()
defer sys.tickMutex.Unlock()

if sys.tickInProgress {
logger := log.Ctx(ctx)
logger.Info("ticking in progress")
return false
}

sys.tickInProgress = true
return true
}

func (sys *System) unsetTickInProgress() {
sys.tickMutex.Lock()
defer sys.tickMutex.Unlock()
sys.tickInProgress = false
}

// Tick triggers the system to update itself with any new data available.
func (sys *System) Tick(ctx context.Context) {
sys.Init()
logger := log.Ctx(ctx)

// Make sure Tick is not run concurrently
sys.tickMutex.Lock()
if sys.tickInProgress {
logger.Debug("ticking in progress")
if !sys.setTickInProgress(ctx) {
return
}
sys.tickInProgress = true
sys.tickMutex.Unlock()

defer func() {
sys.tickMutex.Lock()
sys.tickInProgress = false
sys.tickMutex.Unlock()
}()
defer sys.unsetTickInProgress()

logger.
WithField("queued", sys.SubmissionQueue.String()).
Expand All @@ -199,6 +223,7 @@ func (sys *System) Tick(ctx context.Context) {
curSeq, err := sys.Sequences.Get(addys)
if err != nil {
logger.WithStack(err).Error(err)
return
} else {
sys.SubmissionQueue.Update(curSeq)
}
Expand Down Expand Up @@ -229,6 +254,7 @@ func (sys *System) Tick(ctx context.Context) {
stillOpen, err := sys.Pending.Clean(ctx, sys.SubmissionTimeout)
if err != nil {
logger.WithStack(err).Error(err)
return
}

sys.Metrics.OpenSubmissionsGauge.Update(int64(stillOpen))
Expand Down
49 changes: 46 additions & 3 deletions services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package txsub
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -52,9 +53,9 @@ func (suite *SystemTestSuite) SetupTest() {
Err: ErrBadSequence,
}

suite.sequences.Results = map[string]uint64{
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H": 0,
}
suite.sequences.On("Get", []string{"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"}).
Return(map[string]uint64{"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H": 0}, nil).
Once()
}

// Returns the result provided by the ResultProvider.
Expand Down Expand Up @@ -117,6 +118,48 @@ func (suite *SystemTestSuite) TestTick_Noop() {
suite.system.Tick(suite.ctx)
}

// TestTick_Deadlock is a regression test for Tick() deadlock: if for any reason
// call to Tick() takes more time and another Tick() is called.
// This test starts two go routines: both calling Tick() but the call to
// `sys.Sequences.Get(addys)` is delayed by 1 second. It allows to simulate two
// calls to `Tick()` executed at the same time.
func (suite *SystemTestSuite) TestTick_Deadlock() {
secondDone := make(chan bool, 1)
testDone := make(chan bool)

go func() {
select {
case <-secondDone:
// OK!
case <-time.After(5 * time.Second):
assert.Fail(suite.T(), "Timeout, likely a deadlock in Tick()")
}

testDone <- true
}()

// Start first Tick
suite.system.SubmissionQueue.Push("address", 0)
// Configure suite.sequences to return after 1 second in a first call
suite.sequences.On("Get", []string{"address"}).After(time.Second).Return(map[string]uint64{}, nil)

go func() {
fmt.Println("Starting first Tick()")
suite.system.Tick(suite.ctx)
fmt.Println("Finished first Tick()")
}()

go func() {
// Start second Tick - should be deadlocked if mutex is not Unlock()'ed.
fmt.Println("Starting second Tick()")
suite.system.Tick(suite.ctx)
fmt.Println("Finished second Tick()")
secondDone <- true
}()

<-testDone
}

// Test that Tick finishes any available transactions,
func (suite *SystemTestSuite) TestTick_FinishesTransactions() {
l := make(chan Result, 1)
Expand Down
10 changes: 6 additions & 4 deletions services/horizon/internal/txsub/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package txsub

import (
"context"

"github.com/stretchr/testify/mock"
)

// MockSubmitter is a test helper that simplements the Submitter interface
Expand Down Expand Up @@ -43,11 +45,11 @@ func (results *MockResultProvider) ResultByHash(ctx context.Context, hash string
// MockSequenceProvider is a test helper that simplements the SequenceProvider
// interface
type MockSequenceProvider struct {
Results map[string]uint64
Err error
mock.Mock
}

// Get implements `txsub.SequenceProvider`
func (results *MockSequenceProvider) Get(addresses []string) (map[string]uint64, error) {
return results.Results, results.Err
func (o *MockSequenceProvider) Get(addresses []string) (map[string]uint64, error) {
args := o.Called(addresses)
return args.Get(0).(map[string]uint64), args.Error(1)
}
8 changes: 7 additions & 1 deletion services/horizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stellar/go/network"
"github.com/stellar/go/services/horizon/internal"
"github.com/stellar/go/support/log"
"github.com/throttled/throttled"
Expand Down Expand Up @@ -180,7 +181,7 @@ func init() {

rootCmd.PersistentFlags().String(
"network-passphrase",
"",
network.TestNetworkPassphrase,
"Override the network passphrase",
)

Expand Down Expand Up @@ -239,6 +240,10 @@ func initConfig() {
stdLog.Fatal("Invalid config: stellar-core-url is blank. Please specify --stellar-core-url on the command line or set the STELLAR_CORE_URL environment variable.")
}

if viper.GetString("network-passphrase") == "" {
stdLog.Fatal("Invalid config: network-passphrase is blank. Please specify --network-passphrase on the command line or set the NETWORK_PASSPHRASE environment variable.")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, is it a breaking change to make this required? It seems like it might be, and it'd be better to leave network-passphrase optional + default to testnet as we do currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, set default value to testnet passphrase but in the next minor we need to make this empty by default. Ex. if you run ingestion node without web server and when stellar-core is not running it should be explicitly set by Horizon admin (#817).

ll, err := logrus.ParseLevel(viper.GetString("log-level"))

if err != nil {
Expand Down Expand Up @@ -299,6 +304,7 @@ func initConfig() {
LogLevel: ll,
LogFile: lf,
MaxPathLength: uint(viper.GetInt("max-path-length")),
NetworkPassphrase: viper.GetString("network-passphrase"),
SentryDSN: viper.GetString("sentry-dsn"),
LogglyToken: viper.GetString("loggly-token"),
LogglyTag: viper.GetString("loggly-tag"),
Expand Down