Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
166 lines (158 sloc) 3.51 KB
package dockerplugin
import (
"fmt"
"math"
"net"
"net/http"
"os"
"time"
"github.com/gengo/grpc-gateway/runtime"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/tylerb/graceful"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
const (
protocolTCP = iota
protocolUnix
)
type server struct {
protocol int
driverName string
apiServer *apiServer
registerFunc func(*grpc.Server)
httpRegisterFunc func(context.Context, *runtime.ServeMux, *grpc.ClientConn) error
groupOrAddress string
opts ServerOptions
}
func newServer(
protocol int,
driverName string,
implements []string,
registerFunc func(*grpc.Server),
httpRegisterFunc func(context.Context, *runtime.ServeMux, *grpc.ClientConn) error,
groupOrAddress string,
opts ServerOptions,
) *server {
return &server{
protocol,
driverName,
newAPIServer(
implements,
),
registerFunc,
httpRegisterFunc,
groupOrAddress,
opts,
}
}
func (s *server) Serve() (retErr error) {
var listener net.Listener
var spec string
var err error
var addr string
switch s.protocol {
case protocolTCP:
listener, spec, err = newTCPListener(s.driverName, s.groupOrAddress)
addr = s.groupOrAddress
case protocolUnix:
listener, spec, err = newUnixListener(s.driverName, s.groupOrAddress)
addr = s.driverName
default:
return fmt.Errorf("unknown protocol: %d", s.protocol)
}
if err != nil {
return err
}
grpcPort := s.opts.GRPCPort
if grpcPort == 0 {
grpcPort = DefaultGRPCPort
}
grpcServer := grpc.NewServer(grpc.MaxConcurrentStreams(math.MaxUint32))
RegisterAPIServer(grpcServer, s.apiServer)
s.registerFunc(grpcServer)
grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
if err != nil {
return err
}
grpcErrC := make(chan error)
httpErrC := make(chan error)
errCCount := 1
go func() { grpcErrC <- grpcServer.Serve(grpcListener) }()
time.Sleep(1 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
mux := runtime.NewServeMux(
runtime.WithForwardResponseOption(
func(_ context.Context, responseWriter http.ResponseWriter, _ proto.Message) error {
responseWriter.Header().Set("Content-Type", "application/vnd.docker.plugins.v1.1+json")
return nil
},
),
)
conn, err := grpc.Dial(fmt.Sprintf("0.0.0.0:%d", grpcPort), grpc.WithInsecure())
if err != nil {
glog.Flush()
cancel()
return err
}
go func() {
<-ctx.Done()
_ = conn.Close()
}()
if err := RegisterAPIHandler(ctx, mux, conn); err != nil {
_ = conn.Close()
glog.Flush()
cancel()
return err
}
if err := s.httpRegisterFunc(ctx, mux, conn); err != nil {
_ = conn.Close()
glog.Flush()
cancel()
return err
}
httpServer := &http.Server{
Addr: addr,
Handler: mux,
}
gracefulServer := &graceful.Server{
Timeout: 1 * time.Second,
ShutdownInitiated: func() {
glog.Flush()
cancel()
if spec != "" {
_ = os.Remove(spec)
}
},
Server: httpServer,
}
errCCount++
go func() {
httpErrC <- gracefulServer.Serve(listener)
}()
var errs []error
grpcStopped := false
for i := 0; i < errCCount; i++ {
select {
case grpcErr := <-grpcErrC:
if grpcErr != nil {
errs = append(errs, fmt.Errorf("grpc error: %s", grpcErr.Error()))
}
grpcStopped = true
case httpErr := <-httpErrC:
if httpErr != nil {
errs = append(errs, fmt.Errorf("http error: %s", httpErr.Error()))
}
if !grpcStopped {
grpcServer.Stop()
_ = listener.Close()
grpcStopped = true
}
}
}
if len(errs) > 0 {
return fmt.Errorf("%v", errs)
}
return nil
}