Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
allow customer to set only part of client option
Browse files Browse the repository at this point in the history
  • Loading branch information
datoug committed Mar 14, 2017
1 parent 1cdcfc4 commit 19a34aa
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
50 changes: 40 additions & 10 deletions client/cherami/client.go
Expand Up @@ -85,10 +85,29 @@ func NewHyperbahnClient(serviceName string, bootstrapFile string, options *Clien
return newClientWithTChannel(ch, options)
}

// NewClientWithFE is used by Frontend to create a Cherami client for itself.
// It is used by non-streaming publish/consume APIs.
// NewClientWithFEClient is used by Cherami Frontend to create a Cherami client for itself.
// Cherami customers shouldn't need to use this function.
func NewClientWithFEClient(feClient cherami.TChanBFrontend, options *ClientOptions) (Client, error) {
options, err := verifyOptions(options)
if err != nil {
return nil, err
}

return &clientImpl{
client: feClient,
options: options,
retryPolicy: createDefaultRetryPolicy(),
}, nil
}

// NewClientWithFEClient is used by Cherami Frontend to create a Cherami client for itself.
// Cherami customers shouldn't need to use this function.
// This function is deprecated, use NewClientWithFEClient instead
func NewClientWithFE(feClient cherami.TChanBFrontend, options *ClientOptions) Client {
options = verifyOptions(options)
options, err := verifyOptions(options)
if err != nil {
panic(fmt.Sprintf(`Client option is invalid (and NewClientWithFEn is deprecated, use NewClientWithFEClient instead). Error: %v`, err))
}

return &clientImpl{
client: feClient,
Expand All @@ -98,7 +117,10 @@ func NewClientWithFE(feClient cherami.TChanBFrontend, options *ClientOptions) Cl
}

func newClientWithTChannel(ch *tchannel.Channel, options *ClientOptions) (Client, error) {
options = verifyOptions(options)
options, err := verifyOptions(options)
if err != nil {
return nil, err
}

tClient := thrift.NewClient(ch, getFrontEndServiceName(options.DeploymentStr), nil)

Expand Down Expand Up @@ -255,7 +277,10 @@ func (c *clientImpl) CreatePublisher(request *CreatePublisherRequest) Publisher

func (c *clientImpl) CreateConsumer(request *CreateConsumerRequest) Consumer {
if request.Options != nil {
request.Options = verifyOptions(request.Options)
var err error
if request.Options, err = verifyOptions(request.Options); err != nil {
panic(fmt.Sprintf(`Client option is invalid (and CreateConsumerRequest.Options is deprecated). Error: %v`, err))
}
} else {
request.Options = c.options
}
Expand Down Expand Up @@ -344,11 +369,16 @@ func getDefaultOptions() *ClientOptions {
// verifyOptions is used to verify if we have a metrics reporter and
// a logger. If not, just setup a default logger and a null reporter
// it also validate the timeout is sane
func verifyOptions(opts *ClientOptions) *ClientOptions{
func verifyOptions(opts *ClientOptions) (*ClientOptions, error){
if opts == nil {
opts = getDefaultOptions()
}
common.ValidateTimeout(opts.Timeout)
if opts.Timeout.Nanoseconds() == 0 {
opts.Timeout = getDefaultOptions().Timeout
}
if err := common.ValidateTimeout(opts.Timeout); err != nil {
return nil, err
}

if opts.Logger == nil {
opts.Logger = getDefaultLogger()
Expand All @@ -357,14 +387,14 @@ func verifyOptions(opts *ClientOptions) *ClientOptions{
if opts.MetricsReporter == nil {
opts.MetricsReporter = metrics.NewNullReporter()
}
// Now make sure we init the default metrics as well
opts.MetricsReporter.InitMetrics(metrics.MetricDefs)

if int64(opts.ReconfigurationPollingInterval/time.Second) == 0 {
opts.ReconfigurationPollingInterval = defaultReconfigurationPollingInterval
}

// Now make sure we init the default metrics as well
opts.MetricsReporter.InitMetrics(metrics.MetricDefs)
return opts
return opts, nil
}

func createDefaultRetryPolicy() backoff.RetryPolicy {
Expand Down
9 changes: 5 additions & 4 deletions common/util.go
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -276,13 +277,13 @@ func (avg *GeometricRollingAverage) GetGeometricRollingAverage() float64 {
return float64(*avg)
}

// ValidateTimeout panics if the passed timeout is unreasonable
func ValidateTimeout(t time.Duration) {
// ValidateTimeout returns an error if the passed timeout is unreasonable
func ValidateTimeout(t time.Duration) error {
if t >= time.Millisecond*100 && t <= time.Minute*5 {
return
return nil
}

panic(fmt.Sprintf(`Configured timeout is out of range: %v`, t))
return errors.New(fmt.Sprintf(`Configured timeout is out of range: %v`, t))
}

// MinInt returns the minimum of values (a, b)
Expand Down

0 comments on commit 19a34aa

Please sign in to comment.