Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move server and scheduler start code. #71

Merged
merged 10 commits into from
Apr 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
[submodule "src/vendor/github.com/nu7hatch/gouuid"]
path = src/vendor/github.com/nu7hatch/gouuid
url = https://github.com/nu7hatch/gouuid
[submodule "src/vendor/github.com/gorilla/mux"]
path = src/vendor/github.com/gorilla/mux
url = https://github.com/gorilla/mux
[submodule "src/vendor/github.com/ghodss/yaml"]
path = src/vendor/github.com/ghodss/yaml
url = https://github.com/ghodss/yaml
Expand Down
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
language: python
language: go
sudo: required
services:
- docker
python:
- "2.7"
go:
- 1.8
os:
- linux
before_install:
- sudo apt-get update
- sudo apt-get install -y golang
- sudo apt-get -y install python-pip
- pip install -r tests/requirements.txt
- docker pull minio/minio
- docker pull ubuntu
Expand Down
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ lint:
./buildtools/bin/gometalinter --install > /dev/null
./buildtools/bin/gometalinter --disable-all --enable=vet --enable=golint --enable=gofmt --vendor -s ga4gh -s proto ./src/funnel/...

test:
go-test:
go test funnel/...

test: go-test
docker build -t tes-wait -f tests/docker_files/tes-wait/Dockerfile tests/docker_files/tes-wait/
pip2.7 install -q -r tests/requirements.txt
nosetests-2.7 tests/
go test funnel/...

web:
cd web && \
Expand All @@ -71,5 +73,7 @@ gce-bundle:
GOOS=linux GOARCH=amd64 make
tar --exclude share/node_modules -czvf bin/gce-bundle.tar.gz bin/* gce/* share/*

full: proto install prune_deps add_deps tidy lint test web

.PHONY: proto web

113 changes: 50 additions & 63 deletions src/funnel/cmd/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"funnel/config"
"funnel/logger"
"funnel/scheduler"
Expand All @@ -12,34 +13,69 @@ import (
"github.com/imdario/mergo"
"github.com/spf13/cobra"
"os"
"strings"
)

var serverLog = logger.New("funnel-server")
var configFile string
var baseConf = config.Config{}

// serverCmd represents the server command
var serverCmd = &cobra.Command{
Use: "server",
Short: "",
Short: "Starts a Funnel server.",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
var err error
var conf = config.DefaultConfig()
if configFile != "" {
config.LoadConfigOrExit(configFile, &conf)
}

// file vals <- cli val
err := mergo.MergeWithOverwrite(&conf, baseConf)
err = mergo.MergeWithOverwrite(&conf, baseConf)
if err != nil {
panic(err)
return err
}

// make sure the proper defaults are set
conf.Worker = config.WorkerInheritConfigVals(conf)

startServer(conf)
initLogging(conf)

db, err := server.NewTaskBolt(conf)
if err != nil {
logger.Error("Couldn't open database", err)
return err
}

srv, err := server.NewServer(db, conf)
if err != nil {
return err
}

sched, err := scheduler.NewScheduler(db, conf)
if err != nil {
return err
}

sched.AddBackend(gce.Plugin)
sched.AddBackend(condor.Plugin)
sched.AddBackend(openstack.Plugin)
sched.AddBackend(local.Plugin)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start server
srv.Start(ctx)

// Start scheduler
err = sched.Start(ctx)
if err != nil {
return err
}

// Block
<-ctx.Done()
return nil
},
}

Expand All @@ -56,70 +92,21 @@ func init() {
serverCmd.Flags().StringVar(&baseConf.Scheduler, "scheduler", baseConf.Scheduler, "Name of scheduler to enable")
}

func startServer(conf config.Config) {
func initLogging(conf config.Config) {
logger.SetLevel(conf.LogLevel)
workerLog.Debug("Server Config", "config.Config", conf)

var err error

// TODO Good defaults, configuration, and reusable way to configure logging.
// Also, how do we get this to default to /var/log/tes/worker.log
// without having file permission problems?
// without having file permission problems? syslog?
// Configure logging
if conf.LogPath != "" {
logFile, err := os.OpenFile(conf.LogPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
logFile, err := os.OpenFile(
conf.LogPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666,
)
if err != nil {
workerLog.Error("Can't open log output file", "path", conf.LogPath)
logger.Error("Can't open log output file", "path", conf.LogPath)
} else {
logger.SetOutput(logFile)
}
}

err = os.MkdirAll(conf.WorkDir, 0755)
if err != nil {
panic(err)
}

taski, err := server.NewTaskBolt(conf)
if err != nil {
serverLog.Error("Couldn't open database", err)
return
}

s := server.NewGA4GHServer()
s.RegisterTaskServer(taski)
s.RegisterScheduleServer(taski)
go s.Start(conf.RPCPort)

var sched scheduler.Scheduler
switch strings.ToLower(conf.Scheduler) {
case "local":
// TODO worker will stay alive if the parent process panics
sched, err = local.NewScheduler(conf)
case "condor":
sched = condor.NewScheduler(conf)
case "gce":
sched, err = gce.NewScheduler(conf)
case "openstack":
sched, err = openstack.NewScheduler(conf)
default:
serverLog.Error("Unknown scheduler", "scheduler", conf.Scheduler)
return
}

if err != nil {
serverLog.Error("Couldn't create scheduler", err)
return
}

go scheduler.ScheduleLoop(taski, sched, conf)

// If the scheduler implements the Scaler interface,
// start a scaler loop
if s, ok := sched.(scheduler.Scaler); ok {
go scheduler.ScaleLoop(taski, s, conf)
}

// TODO if port 8000 is already busy, does this lock up silently?
server.StartHTTPProxy(conf.HostName+":"+conf.RPCPort, conf.HTTPPort, conf.ContentDir)
}
49 changes: 14 additions & 35 deletions src/funnel/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,20 @@ package cmd

import (
"funnel/config"
"funnel/logger"
"funnel/scheduler"
"funnel/worker"
"github.com/imdario/mergo"
"github.com/spf13/cobra"
"os"
)

var workerLog = logger.New("funnel-worker")
var workerConfigFile string
var workerBaseConf = config.Config{}

// workerCmd represents the worker command
var workerCmd = &cobra.Command{
Use: "worker",
Short: "",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
conf := config.DefaultConfig()

if workerConfigFile != "" {
Expand All @@ -30,10 +27,21 @@ var workerCmd = &cobra.Command{
// file vals <- cli val
err := mergo.MergeWithOverwrite(&conf.Worker, workerDconf)
if err != nil {
panic(err)
return err
}

startWorker(conf.Worker)
initLogging(conf)

if conf.Worker.ID == "" {
conf.Worker.ID = scheduler.GenWorkerID("funnel")
}

w, err := worker.NewWorker(conf.Worker)
if err != nil {
return err
}
w.Run()
return nil
},
}

Expand All @@ -48,32 +56,3 @@ func init() {
workerCmd.Flags().StringVar(&workerBaseConf.LogLevel, "log-level", workerBaseConf.LogLevel, "Level of logging")
workerCmd.Flags().StringVar(&workerBaseConf.LogPath, "log-path", workerBaseConf.LogLevel, "File path to write logs to")
}

func startWorker(conf config.Worker) {
logger.SetLevel(conf.LogLevel)
workerLog.Debug("Worker Config", "config.Worker", conf)

// TODO Good defaults, configuration, and reusable way to configure logging.
// Also, how do we get this to default to /var/log/tes/worker.log
// without having file permission problems?
// Configure logging
if conf.LogPath != "" {
logFile, err := os.OpenFile(conf.LogPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
workerLog.Error("Can't open log output file", "path", conf.LogPath)
} else {
logger.SetOutput(logFile)
}
}

if conf.ID == "" {
conf.ID = scheduler.GenWorkerID("funnel")
}

w, err := worker.NewWorker(conf)
if err != nil {
workerLog.Error("Can't create worker", err)
return
}
w.Run()
}
Loading