Skip to content

Commit

Permalink
Close HTTP server not the listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Feb 12, 2019
1 parent 30e0837 commit b9ca2aa
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 97 deletions.
2 changes: 1 addition & 1 deletion integration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
// NB(wjang): YAML does NOT allow tabs for indentation between levels, remember to use spaces.
testConfig1 = `
http:
listenAddress: 0.0.0.0:5678
listenAddress: localhost:5678
readTimeout: 1m
writeTimeout: 1m
handler:
Expand Down
27 changes: 20 additions & 7 deletions integration/raw_query_no_filter_orderby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,27 @@
package integration

import (
"sort"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestRawQueryNoFilterOrderBy(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

// Create server.
ts := newTestServerSetup(t, testConfig1)
defer ts.close(t)

// Start the server.
log := ts.dbOpts.InstrumentOptions().Logger()
log.Info("testing raw query without filter with order by clauses")
require.NoError(t, ts.startServer())
log.Info("server is now up")

testData := `
{"service":"testNamespace","@timestamp":"2019-01-22T13:25:42-08:00","st":true,"sid":1,"tt":"active","tz":-6,"v":1.5}
{"service":"testNamespace","@timestamp":"2019-01-22T13:26:42-08:00","st":true,"sid":1,"tt":"active","tz":-6,"v":1.5}
Expand Down Expand Up @@ -104,18 +116,19 @@ func TestRawQueryNoFilterOrderBy(t *testing.T) {
},
}

ts := newTestServerSetup(t, testConfig1)
ts.startServer()
defer ts.close(t)
// Write data.
client := ts.newClient()
require.NoError(t, ts.waitUntil(10*time.Second, client.serverIsHealthy))
require.NoError(t, client.write([]byte(strings.TrimSpace(testData))))

// Test queries.
for _, test := range tests {
resp, err := client.queryRaw([]byte(test.queryJSON))
require.NoError(t, err)
actual := resp.Raw
sort.Strings(actual)
require.Equal(t, test.expectedSortedResults, actual)
}

// Stop the server.
require.NoError(t, ts.stopServer())
log.Info("server is now down")
}
27 changes: 20 additions & 7 deletions integration/raw_query_with_filter_orderby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,27 @@
package integration

import (
"sort"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestRawQueryWithFilterOrderBy(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

// Create server.
ts := newTestServerSetup(t, testConfig1)
defer ts.close(t)

// Start the server.
log := ts.dbOpts.InstrumentOptions().Logger()
log.Info("testing raw query without filter with order by clauses")
require.NoError(t, ts.startServer())
log.Info("server is now up")

testData := `
{"service":"testNamespace","@timestamp":"2019-01-22T13:25:42-08:00","st":true,"sid":{"foo":1,"bar":2},"tt":"active","tz":-6,"v":1.5}
{"service":"testNamespace","@timestamp":"2019-01-22T13:26:42-08:00","st":true,"sid":{"foo":1,"bar":2},"tt":"active","tz":-6,"v":1.5}
Expand Down Expand Up @@ -147,18 +159,19 @@ func TestRawQueryWithFilterOrderBy(t *testing.T) {
},
}

ts := newTestServerSetup(t, testConfig1)
ts.startServer()
defer ts.close(t)
// Write data.
client := ts.newClient()
require.NoError(t, ts.waitUntil(10*time.Second, client.serverIsHealthy))
require.NoError(t, client.write([]byte(strings.TrimSpace(testData))))

// Test queries.
for _, test := range tests {
resp, err := client.queryRaw([]byte(test.queryJSON))
require.NoError(t, err)
actual := resp.Raw
sort.Strings(actual)
require.Equal(t, test.expectedSortedResults, actual)
}

// Stop the server.
require.NoError(t, ts.stopServer())
log.Info("server is now down")
}
122 changes: 67 additions & 55 deletions integration/setup.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
package integration

import (
"fmt"
"errors"
"testing"
"time"

"github.com/m3db/m3x/instrument"
"github.com/xichen2020/eventdb/server/http"
"github.com/xichen2020/eventdb/server/http/handlers"
"github.com/xichen2020/eventdb/services/eventdb/serve"
"github.com/xichen2020/eventdb/sharding"
"github.com/xichen2020/eventdb/storage"

"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/log"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
validator "gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
)

const (
gracefulShutdownTimeout = 15 * time.Second
serverStateChangeTimeout = 5 * time.Second
)

var (
logger = log.NullLogger
errServerStartTimedOut = errors.New("server took too long to start")
)

type testServerSetup struct {
db storage.Database
opts instrument.Options
cfg configuration
addr string
db storage.Database
namespaces []storage.NamespaceMetadata
shardSet sharding.ShardSet
dbOpts *storage.Options
handlerOpts *handlers.Options
serverOpts *http.Options

// Signals.
doneCh chan struct{}
Expand All @@ -37,82 +42,89 @@ type testServerSetup struct {
func newTestServerSetup(t *testing.T, config string) *testServerSetup {
cfg := loadConfig(t, config)

iOpts := instrument.NewOptions().
SetMetricsScope(tally.NoopScope).
SetLogger(logger)

namespaces, err := cfg.Database.NewNamespacesMetadata()
require.NoError(t, err)

shardSet, err := cfg.Database.NewShardSet()
require.NoError(t, err)

dbOpts, err := cfg.Database.NewOptions(iOpts)
dbOpts, err := cfg.Database.NewOptions(instrument.NewOptions())
require.NoError(t, err)

db := storage.NewDatabase(namespaces, shardSet, dbOpts)
require.NoError(t, db.Open())

return &testServerSetup{
db: db,
opts: iOpts,
cfg: cfg,
doneCh: make(chan struct{}),
closedCh: make(chan struct{}),
addr: cfg.HTTP.ListenAddress,
namespaces: namespaces,
shardSet: shardSet,
dbOpts: dbOpts,
handlerOpts: cfg.HTTP.Handler.NewOptions(dbOpts.InstrumentOptions()),
serverOpts: cfg.HTTP.NewServerOptions(dbOpts.InstrumentOptions()),
doneCh: make(chan struct{}),
closedCh: make(chan struct{}),
}
}

func (ts *testServerSetup) startServer() {
func (ts *testServerSetup) newClient() client {
return newClient(ts.addr)
}

func (ts *testServerSetup) startServer() error {
errCh := make(chan error, 1)

ts.db = storage.NewDatabase(ts.namespaces, ts.shardSet, ts.dbOpts)
if err := ts.db.Open(); err != nil {
return err
}

go func() {
// TODO (wjang): pass in 0.0.0.0:0 instead, have an automatically generated port and use it.
if err := serve.Serve(
ts.cfg.HTTP.ListenAddress,
ts.cfg.HTTP.Handler.NewOptions(ts.opts),
ts.cfg.HTTP.NewServerOptions(),
ts.addr,
ts.handlerOpts,
ts.serverOpts,
ts.db,
logger,
ts.dbOpts.InstrumentOptions().Logger(),
ts.doneCh,
); err != nil {
logger.Fatalf("could not start serving traffic: %v", err)
select {
case errCh <- err:
default:
}
}
close(ts.closedCh)
}()
}

func (ts *testServerSetup) stopServer(t *testing.T) {
close(ts.doneCh)
go func() {
select {
case errCh <- ts.waitUntilServerIsUp():
default:
}
}()

select {
case <-ts.closedCh:
t.Log("server closed clean")
case <-time.After(gracefulShutdownTimeout):
t.Logf("server closed due to %v timeout", gracefulShutdownTimeout)
}
return <-errCh
}

func (ts *testServerSetup) stopDB(t *testing.T) {
// TODO(wjang): Delete the database files as well.
require.NoError(t, ts.db.Close())
func (ts *testServerSetup) waitUntilServerIsUp() error {
c := ts.newClient()
serverIsUp := func() bool { return c.serverIsHealthy() }
if waitUntil(serverIsUp, serverStateChangeTimeout) {
return nil
}
return errServerStartTimedOut
}

func (ts *testServerSetup) close(t *testing.T) {
ts.stopServer(t)
ts.stopDB(t)
}
func (ts *testServerSetup) stopServer() error {
if err := ts.db.Close(); err != nil {
return err
}
close(ts.doneCh)

func (ts *testServerSetup) newClient() client {
return newClient(ts.cfg.HTTP.ListenAddress)
// Wait for graceful server shutdown.
<-ts.closedCh
return nil
}

func (ts *testServerSetup) waitUntil(timeout time.Duration, condition func() bool) error {
start := time.Now()
for !condition() {
time.Sleep(time.Millisecond * 10)
if dur := time.Now().Sub(start); dur >= timeout {
return fmt.Errorf("timeout waiting for condition")
}
}
return nil
func (ts *testServerSetup) close(t *testing.T) {
// TODO(wjang): Delete the database files as well.
}

func loadConfig(t *testing.T, config string) configuration {
Expand Down
16 changes: 16 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package integration

import "time"

type conditionFn func() bool

func waitUntil(fn conditionFn, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if fn() {
return true
}
time.Sleep(time.Second)
}
return false
}
24 changes: 20 additions & 4 deletions server/http/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package http

import (
"time"

"github.com/m3db/m3x/instrument"
)

const (
Expand All @@ -11,19 +13,33 @@ const (

// Options provide a set of HTTP server options.
type Options struct {
readTimeout time.Duration
writeTimeout time.Duration
instrumentOpts instrument.Options
readTimeout time.Duration
writeTimeout time.Duration
}

// NewOptions creates a new set of server options.
func NewOptions() *Options {
o := &Options{
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
instrumentOpts: instrument.NewOptions(),
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
}
return o
}

// SetInstrumentOptions sets the instrument options.
func (o *Options) SetInstrumentOptions(value instrument.Options) *Options {
opts := *o
opts.instrumentOpts = value
return &opts
}

// InstrumentOptions returns the instrument options.
func (o *Options) InstrumentOptions() instrument.Options {
return o.instrumentOpts
}

// SetReadTimeout sets the timeout for a read request.
func (o *Options) SetReadTimeout(value time.Duration) *Options {
opts := *o
Expand Down
Loading

0 comments on commit b9ca2aa

Please sign in to comment.