From d26b503dcaa59707723212db8c2d86af0c1b0d30 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sat, 18 Jun 2016 03:07:21 -0400 Subject: [PATCH] Copy in out of tree patches These patches are proposed upstream changes and code for and from etcd. Ideally we would revert this patch when/if things are merged upstream! The majority of the work is in: https://github.com/coreos/etcd/pull/5584 --- etcd.go | 404 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 400 insertions(+), 4 deletions(-) diff --git a/etcd.go b/etcd.go index 103400e56..b4c3c4ab2 100644 --- a/etcd.go +++ b/etcd.go @@ -60,7 +60,6 @@ import ( "time" etcd "github.com/coreos/etcd/clientv3" // "clientv3" - "github.com/coreos/etcd/etcdmain" "github.com/coreos/etcd/etcdserver" rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" etcdtypes "github.com/coreos/etcd/pkg/types" @@ -69,6 +68,37 @@ import ( "google.golang.org/grpc" ) +// XXX: these imports are used for the EtcdConn code temporarily sitting here +import ( + "crypto/tls" + "github.com/cockroachdb/cmux" + "github.com/coreos/etcd/etcdserver/api/v2http" + "github.com/coreos/etcd/etcdserver/api/v3rpc" + "github.com/coreos/etcd/pkg/cors" + runtimeutil "github.com/coreos/etcd/pkg/runtime" + etcdtransport "github.com/coreos/etcd/pkg/transport" + "github.com/coreos/etcd/rafthttp" + "github.com/coreos/pkg/capnslog" + defaultLog "log" + "net" + "net/http" + "path" +) + +var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mgmt") // XXX EtcdConn + +// internal fd usage includes disk usage and transport usage. +// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs +// at most 2 to read/lock/write WALs. One case that it needs to 2 is to +// read all logs after some snapshot index, which locates at the end of +// the second last and the head of the last. For purging, it needs to read +// directory, so it needs 1. For fd monitor, it needs 1. +// For transport, rafthttp builds two long-polling connections and at most +// four temporary connections with each member. There are at most 9 members +// in a cluster, so it should reserve 96. +// For the safety, we set the total reserved number to 150. +const reservedInternalFDNum = 150 // XXX: used for the EtcdConn code temporarily + const ( NS = "_mgmt" // root namespace for mgmt operations seedSentinel = "_seed" // you must not name your hostname this @@ -176,7 +206,7 @@ type EmbdEtcd struct { // EMBeddeD etcd // etcd server related serverwg sync.WaitGroup // wait for server to shutdown - etcdConn *etcdmain.EtcdConn + etcdConn *EtcdConn server *etcdserver.EtcdServer dataDir string // XXX: incorporate into the "/var" functionality... } @@ -1526,7 +1556,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) initialPeerURLsMap[memberName] = peerURLs } - obj.etcdConn = etcdmain.NewEtcdConn( + obj.etcdConn = NewEtcdConn( // XXX: get this upstream! false, false, // TODO: for now nil, nil, // TODO: for now nil, peerURLs, obj.clientURLs, @@ -1535,7 +1565,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) log.Printf("Etcd: StartServer: Initializing connections...") if err := obj.etcdConn.Init(); err != nil { - if fatalErr, ok := err.(*etcdmain.ErrEtcdConnFatal); ok { // match? + if fatalErr, ok := err.(*ErrEtcdConnFatal); ok { // match? log.Printf("Etcd: StartServer: Fatal: %+v", fatalErr.Err) return fatalErr.Err } @@ -2116,3 +2146,369 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err } return urlsmap, nil } + +// XXX Everything below here is intended to go upstream into etcd + +type ErrEtcdConnFatal struct { + Err error +} + +func (e *ErrEtcdConnFatal) Error() string { + return fmt.Sprintf("ErrEtcdConnFatal: %v", e.Err) +} + +// EtcdConn is a helper struct that contains all the connection starting code +type EtcdConn struct { + PeerAutoTLS bool + ClientAutoTLS bool + PeerTLSInfo *etcdtransport.TLSInfo + ClientTLSInfo *etcdtransport.TLSInfo + CorsInfo *cors.CORSInfo + Lpurls []url.URL + Lcurls []url.URL + Dir string + Plog *capnslog.PackageLogger + + listen func(*etcdserver.EtcdServer) + defers []func() + err error +} + +// NewEtcdConn wraps the common net and listener code for embedded etcd reuse! +func NewEtcdConn(peerAutoTLS, clientAutoTLS bool, peerTLSInfo, clientTLSInfo *etcdtransport.TLSInfo, corsInfo *cors.CORSInfo, lpurls, lcurls []url.URL, dir string, plog *capnslog.PackageLogger) *EtcdConn { + return &EtcdConn{ + PeerAutoTLS: peerAutoTLS, + ClientAutoTLS: clientAutoTLS, + PeerTLSInfo: peerTLSInfo, + ClientTLSInfo: clientTLSInfo, + CorsInfo: corsInfo, + Lpurls: lpurls, + Lcurls: lcurls, + Dir: dir, + Plog: plog, + + listen: nil, + defers: []func(){}, + err: fmt.Errorf("not initialized"), + } +} + +func (ec *EtcdConn) Init() error { + var err error + if ec.PeerAutoTLS && ec.PeerTLSInfo != nil && ec.PeerTLSInfo.Empty() { + var phosts []string + for _, u := range ec.Lpurls { + phosts = append(phosts, u.Host) + } + peerTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/peer"), phosts) + ec.PeerTLSInfo = &peerTLSInfoPointer + if err != nil { + if ec.Plog != nil { + ec.Plog.Fatalf("could not get certs (%v)", err) + } else { + ec.err = fmt.Errorf("could not get certs (%v)", err) + return &ErrEtcdConnFatal{err} + } + } + } else if ec.PeerAutoTLS { + if ec.Plog != nil { + ec.Plog.Warningf("ignoring peer auto TLS since certs given") + } + } + if ec.Plog != nil { + if !ec.PeerTLSInfo.Empty() { + ec.Plog.Infof("peerTLS: %s", ec.PeerTLSInfo) + } + } + + var plns []net.Listener + for _, u := range ec.Lpurls { + if u.Scheme == "http" { + if ec.Plog != nil { + if !ec.PeerTLSInfo.Empty() { + ec.Plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) + } + if ec.PeerTLSInfo.ClientCertAuth { + ec.Plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + } + } + } + var ( + l net.Listener + tlscfg *tls.Config + ) + + if ec.PeerTLSInfo != nil && !ec.PeerTLSInfo.Empty() { + tlscfg, err = ec.PeerTLSInfo.ServerConfig() + if err != nil { + ec.err = err + return err + } + } + + l, err = rafthttp.NewListener(u, tlscfg) + if err != nil { + ec.err = err + return err + } + + urlStr := u.String() + if ec.Plog != nil { + ec.Plog.Info("listening for peers on ", urlStr) + } + ll := l // make a unique copy for the closure + d1 := func() { + if err != nil { // XXX + ll.Close() + if ec.Plog != nil { + ec.Plog.Info("stopping listening for peers on ", urlStr) + } + } + } + ec.defers = append(ec.defers, d1) + plns = append(plns, ll) + } + + if ec.ClientAutoTLS && ec.ClientTLSInfo != nil && ec.ClientTLSInfo.Empty() { + var chosts []string + for _, u := range ec.Lcurls { + chosts = append(chosts, u.Host) + } + clientTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/client"), chosts) + ec.ClientTLSInfo = &clientTLSInfoPointer + if err != nil { + if ec.Plog != nil { + ec.Plog.Fatalf("could not get certs (%v)", err) + } else { + ec.err = fmt.Errorf("could not get certs (%v)", err) + return &ErrEtcdConnFatal{err} + } + } + } else if ec.ClientAutoTLS { + if ec.Plog != nil { + ec.Plog.Warningf("ignoring client auto TLS since certs given") + } + } + + var ctlscfg *tls.Config + if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() { + if ec.Plog != nil { + ec.Plog.Infof("clientTLS: %s", ec.ClientTLSInfo) + } + ctlscfg, err = ec.ClientTLSInfo.ServerConfig() + if err != nil { + ec.err = err + return err + } + } + + sctxs := make(map[string]*serveCtx) + for _, u := range ec.Lcurls { + if u.Scheme == "http" { + if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() { + if ec.Plog != nil { + ec.Plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) + } + } + if ec.ClientTLSInfo != nil && ec.ClientTLSInfo.ClientCertAuth { + if ec.Plog != nil { + ec.Plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + } + } + } + if u.Scheme == "https" && ctlscfg == nil { + err = fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String()) + ec.err = err + return err + } + + ctx := &serveCtx{host: u.Host} + + if u.Scheme == "https" { + ctx.secure = true + } else { + ctx.insecure = true + } + + if sctxs[u.Host] != nil { + if ctx.secure { + sctxs[u.Host].secure = true + } + if ctx.insecure { + sctxs[u.Host].insecure = true + } + continue + } + + var l net.Listener + + l, err = net.Listen("tcp", u.Host) + if err != nil { + ec.err = err + return err + } + + var fdLimit uint64 + if fdLimit, err = runtimeutil.FDLimit(); err == nil { + if fdLimit <= reservedInternalFDNum { + if ec.Plog != nil { + ec.Plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) + } else { + err = fmt.Errorf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) + ec.err = err + return &ErrEtcdConnFatal{err} + } + } + l = etcdtransport.LimitListener(l, int(fdLimit-reservedInternalFDNum)) + } + + l, err = etcdtransport.NewKeepAliveListener(l, "tcp", nil) + ctx.l = l + if err != nil { + ec.err = err + return err + } + + if ec.Plog != nil { + ec.Plog.Info("listening for client requests on ", u.Host) + } + ll := l + msg := u.Host + d2 := func() { + if err != nil { // XXX + ll.Close() + if ec.Plog != nil { + ec.Plog.Info("stopping listening for client requests on ", msg) + } + } + } + ec.defers = append(ec.defers, d2) + sctxs[u.Host] = ctx + } + + // plns []net.Listener, ctlscfg *tls.Config, sctxs map[string]*serveCtx + listen := func(s *etcdserver.EtcdServer) { + ch := http.Handler(&cors.CORSHandler{ + Handler: v2http.NewClientHandler(s, s.Cfg.ReqTimeout()), + Info: ec.CorsInfo, + }) + ph := v2http.NewPeerHandler(s) + + // Start the peer server in a goroutine + for _, l := range plns { + go func(l net.Listener) { + e := servePeerHTTP(l, ph) + if ec.Plog != nil { + ec.Plog.Fatal(e) + } else { + os.Exit(1) // TODO: write on a ch to notify + } + }(l) + } + // Start a client server goroutine for each listen address + for _, sctx := range sctxs { + go func(sctx *serveCtx) { + // read timeout does not work with http close notify + // TODO: https://github.com/golang/go/issues/9524 + e := serve(sctx, s, ctlscfg, ch) + if ec.Plog != nil { + ec.Plog.Fatal(e) + } else { + os.Exit(1) // TODO: write on a ch to notify + } + }(sctx) + } + } + ec.listen = listen + ec.err = err // likely nil + return err +} + +func (ec *EtcdConn) Listen(s *etcdserver.EtcdServer) error { + if ec.err != nil { // error on init + return ec.err + } + ec.listen(s) + return nil +} + +func (ec *EtcdConn) Close() { + for _, f := range ec.defers { + f() + } +} + +// copied from etcdmain/serve.go +type serveCtx struct { + l net.Listener + host string + secure bool + insecure bool +} + +// serve accepts incoming connections on the listener l, +// creating a new service goroutine for each. The service goroutines +// read requests and then call handler to reply to them. +func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error { + logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) + + <-s.ReadyNotify() + plog.Info("ready to serve client requests") + + m := cmux.New(sctx.l) + + if sctx.insecure { + gs := v3rpc.Server(s, nil) + grpcl := m.Match(cmux.HTTP2()) + go func() { plog.Fatal(gs.Serve(grpcl)) }() + + srvhttp := &http.Server{ + Handler: handler, + ErrorLog: logger, // do not log user error + } + httpl := m.Match(cmux.HTTP1()) + go func() { plog.Fatal(srvhttp.Serve(httpl)) }() + plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.host) + } + + if sctx.secure { + gs := v3rpc.Server(s, tlscfg) + handler = grpcHandlerFunc(gs, handler) + + tlsl := tls.NewListener(m.Match(cmux.Any()), tlscfg) + // TODO: add debug flag; enable logging when debug flag is set + srv := &http.Server{ + Handler: handler, + TLSConfig: tlscfg, + ErrorLog: logger, // do not log user error + } + go func() { plog.Fatal(srv.Serve(tlsl)) }() + + plog.Infof("serving client requests on %s", sctx.host) + } + + return m.Serve() +} + +// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC +// connections or otherHandler otherwise. Copied from cockroachdb. +func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { + grpcServer.ServeHTTP(w, r) + } else { + otherHandler.ServeHTTP(w, r) + } + }) +} + +func servePeerHTTP(l net.Listener, handler http.Handler) error { + logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) + // TODO: add debug flag; enable logging when debug flag is set + srv := &http.Server{ + Handler: handler, + ReadTimeout: 5 * time.Minute, + ErrorLog: logger, // do not log user error + } + return srv.Serve(l) +}