Skip to content

Commit

Permalink
Make worker required service for cadence startup (#1228)
Browse files Browse the repository at this point in the history
* Make worker a required service

* Expose worker port from docker

* Edit docker startup to include worker service

* revert to using old tag
  • Loading branch information
andrewjdawson2016 committed Nov 8, 2018
1 parent 2cf59fb commit b074c80
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 26 deletions.
11 changes: 1 addition & 10 deletions cmd/server/cadence.go
Expand Up @@ -34,9 +34,6 @@ import (
// validServices is the list of all valid cadence services
var validServices = []string{historyService, matchingService, frontendService, workerService}

// inDevelopmentServices is the list of services we want to support skipping logic on startup if config does not exist
var inDevelopmentServices = map[string]bool{workerService: true}

// main entry point for the cadence server
func main() {
app := buildCLI()
Expand Down Expand Up @@ -71,15 +68,9 @@ func startHandler(c *cli.Context) {
}

services := getServices(c)
LoadServiceLoop:
for _, svc := range services {
if _, ok := cfg.Services[svc]; !ok {
if _, ok := inDevelopmentServices[svc]; len(services) > 1 && ok {
log.Printf("Config missing for development service `%v`. Skipping to load service.\n", svc)
continue LoadServiceLoop
} else {
log.Fatalf("`%v` service missing config", svc)
}
log.Fatalf("`%v` service missing config", svc)
}
server := newServer(svc, &cfg)
server.Start()
Expand Down
11 changes: 11 additions & 0 deletions config/development.yaml
Expand Up @@ -54,6 +54,17 @@ services:
pprof:
port: 7937

worker:
rpc:
port: 7939
bindOnLocalHost: true
metrics:
statsd:
hostPort: "127.0.0.1:8125"
prefix: "cadence"
pprof:
port: 7940

clustersInfo:
enableGlobalDomain: false
failoverVersionIncrement: 10
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Expand Up @@ -67,7 +67,7 @@ RUN go get -u github.com/golang/lint/golint
RUN git clone https://github.com/uber/cadence.git $CADENCE_HOME
RUN cd $CADENCE_HOME && git checkout $git_branch && make bins_nothrift

EXPOSE 7933 7934 7935
EXPOSE 7933 7934 7935 7939

COPY ./start.sh $CADENCE_HOME/start.sh
COPY ./config_template.yaml $CADENCE_HOME/config/docker_template.yaml
Expand Down
9 changes: 9 additions & 0 deletions docker/config_template.yaml
Expand Up @@ -52,6 +52,15 @@ services:
hostPort: "${STATSD_ENDPOINT}"
prefix: "cadence-history"

worker:
rpc:
port: 7939
bindOnLocalHost: ${BIND_ON_LOCALHOST}
metrics:
statsd:
hostPort: "${STATSD_ENDPOINT}"
prefix: "cadence-worker"

clustersInfo:
enableGlobalDomain: false
failoverVersionIncrement: 10
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Expand Up @@ -17,6 +17,7 @@ services:
- "7933:7933"
- "7934:7934"
- "7935:7935"
- "7939:7939"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "STATSD_ENDPOINT=statsd:8125"
Expand Down
4 changes: 2 additions & 2 deletions docker/start.sh
Expand Up @@ -78,7 +78,7 @@ init_env() {
fi

if [ -z "$RINGPOP_SEEDS" ]; then
export RINGPOP_SEEDS_JSON_ARRAY="[\"$HOST_IP:7933\",\"$HOST_IP:7934\",\"$HOST_IP:7935\"]"
export RINGPOP_SEEDS_JSON_ARRAY="[\"$HOST_IP:7933\",\"$HOST_IP:7934\",\"$HOST_IP:7935\",\"$HOST_IP:7939\"]"
else
array=(${RINGPOP_SEEDS//,/ })
export RINGPOP_SEEDS_JSON_ARRAY=$(json_array "${array[@]}")
Expand All @@ -100,7 +100,7 @@ if [ -z "$RF" ]; then
fi

if [ -z "$SERVICES" ]; then
SERVICES="history,matching,frontend"
SERVICES="history,matching,frontend,worker"
fi

init_env
Expand Down
33 changes: 20 additions & 13 deletions service/worker/service.go
Expand Up @@ -21,6 +21,7 @@
package worker

import (
"github.com/uber-common/bark"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/metrics"
persistencefactory "github.com/uber/cadence/common/persistence/persistence-factory"
Expand Down Expand Up @@ -80,6 +81,25 @@ func (s *Service) Start() {

s.metricsClient = base.GetMetricsClient()

if s.params.ClusterMetadata.IsGlobalDomainEnabled() {
s.startReplicator(params, base, log)
}

log.Infof("%v started", common.WorkerServiceName)
<-s.stopC
base.Stop()
}

// Stop is called to stop the service
func (s *Service) Stop() {
select {
case s.stopC <- struct{}{}:
default:
}
s.params.Logger.Infof("%v stopped", common.WorkerServiceName)
}

func (s *Service) startReplicator(params *service.BootstrapParams, base service.Service, log bark.Logger) {
pConfig := params.PersistenceConfig
pConfig.SetMaxQPS(pConfig.DefaultStore, s.config.PersistenceMaxQPS())
pFactory := persistencefactory.New(&pConfig, params.ClusterMetadata.GetCurrentClusterName(), s.metricsClient, log)
Expand All @@ -100,17 +120,4 @@ func (s *Service) Start() {
replicator.Stop()
log.Fatalf("Fail to start replicator: %v", err)
}

log.Infof("%v started", common.WorkerServiceName)
<-s.stopC
base.Stop()
}

// Stop is called to stop the service
func (s *Service) Stop() {
select {
case s.stopC <- struct{}{}:
default:
}
s.params.Logger.Infof("%v stopped", common.WorkerServiceName)
}

0 comments on commit b074c80

Please sign in to comment.