View
404 etcd.go
@@ -60,7 +60,6 @@ import (
"time"
etcd "github.com/coreos/etcd/clientv3" // "clientv3"
- "github.com/coreos/etcd/etcdmain"
"github.com/coreos/etcd/etcdserver"
rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
etcdtypes "github.com/coreos/etcd/pkg/types"
@@ -69,6 +68,37 @@ import (
"google.golang.org/grpc"
)
+// XXX: these imports are used for the EtcdConn code temporarily sitting here
+import (
+ "crypto/tls"
+ "github.com/cockroachdb/cmux"
+ "github.com/coreos/etcd/etcdserver/api/v2http"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc"
+ "github.com/coreos/etcd/pkg/cors"
+ runtimeutil "github.com/coreos/etcd/pkg/runtime"
+ etcdtransport "github.com/coreos/etcd/pkg/transport"
+ "github.com/coreos/etcd/rafthttp"
+ "github.com/coreos/pkg/capnslog"
+ defaultLog "log"
+ "net"
+ "net/http"
+ "path"
+)
+
+var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mgmt") // XXX EtcdConn
+
+// internal fd usage includes disk usage and transport usage.
+// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
+// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
+// read all logs after some snapshot index, which locates at the end of
+// the second last and the head of the last. For purging, it needs to read
+// directory, so it needs 1. For fd monitor, it needs 1.
+// For transport, rafthttp builds two long-polling connections and at most
+// four temporary connections with each member. There are at most 9 members
+// in a cluster, so it should reserve 96.
+// For the safety, we set the total reserved number to 150.
+const reservedInternalFDNum = 150 // XXX: used for the EtcdConn code temporarily
+
const (
NS = "_mgmt" // root namespace for mgmt operations
seedSentinel = "_seed" // you must not name your hostname this
@@ -176,7 +206,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
// etcd server related
serverwg sync.WaitGroup // wait for server to shutdown
- etcdConn *etcdmain.EtcdConn
+ etcdConn *EtcdConn
server *etcdserver.EtcdServer
dataDir string // XXX: incorporate into the "/var" functionality...
}
@@ -1526,7 +1556,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
initialPeerURLsMap[memberName] = peerURLs
}
- obj.etcdConn = etcdmain.NewEtcdConn(
+ obj.etcdConn = NewEtcdConn( // XXX: get this upstream!
false, false, // TODO: for now
nil, nil, // TODO: for now
nil, peerURLs, obj.clientURLs,
@@ -1535,7 +1565,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
log.Printf("Etcd: StartServer: Initializing connections...")
if err := obj.etcdConn.Init(); err != nil {
- if fatalErr, ok := err.(*etcdmain.ErrEtcdConnFatal); ok { // match?
+ if fatalErr, ok := err.(*ErrEtcdConnFatal); ok { // match?
log.Printf("Etcd: StartServer: Fatal: %+v", fatalErr.Err)
return fatalErr.Err
}
@@ -2116,3 +2146,369 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err
}
return urlsmap, nil
}
+
+// XXX Everything below here is intended to go upstream into etcd
+
+type ErrEtcdConnFatal struct {
+ Err error
+}
+
+func (e *ErrEtcdConnFatal) Error() string {
+ return fmt.Sprintf("ErrEtcdConnFatal: %v", e.Err)
+}
+
+// EtcdConn is a helper struct that contains all the connection starting code
+type EtcdConn struct {
+ PeerAutoTLS bool
+ ClientAutoTLS bool
+ PeerTLSInfo *etcdtransport.TLSInfo
+ ClientTLSInfo *etcdtransport.TLSInfo
+ CorsInfo *cors.CORSInfo
+ Lpurls []url.URL
+ Lcurls []url.URL
+ Dir string
+ Plog *capnslog.PackageLogger
+
+ listen func(*etcdserver.EtcdServer)
+ defers []func()
+ err error
+}
+
+// NewEtcdConn wraps the common net and listener code for embedded etcd reuse!
+func NewEtcdConn(peerAutoTLS, clientAutoTLS bool, peerTLSInfo, clientTLSInfo *etcdtransport.TLSInfo, corsInfo *cors.CORSInfo, lpurls, lcurls []url.URL, dir string, plog *capnslog.PackageLogger) *EtcdConn {
+ return &EtcdConn{
+ PeerAutoTLS: peerAutoTLS,
+ ClientAutoTLS: clientAutoTLS,
+ PeerTLSInfo: peerTLSInfo,
+ ClientTLSInfo: clientTLSInfo,
+ CorsInfo: corsInfo,
+ Lpurls: lpurls,
+ Lcurls: lcurls,
+ Dir: dir,
+ Plog: plog,
+
+ listen: nil,
+ defers: []func(){},
+ err: fmt.Errorf("not initialized"),
+ }
+}
+
+func (ec *EtcdConn) Init() error {
+ var err error
+ if ec.PeerAutoTLS && ec.PeerTLSInfo != nil && ec.PeerTLSInfo.Empty() {
+ var phosts []string
+ for _, u := range ec.Lpurls {
+ phosts = append(phosts, u.Host)
+ }
+ peerTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/peer"), phosts)
+ ec.PeerTLSInfo = &peerTLSInfoPointer
+ if err != nil {
+ if ec.Plog != nil {
+ ec.Plog.Fatalf("could not get certs (%v)", err)
+ } else {
+ ec.err = fmt.Errorf("could not get certs (%v)", err)
+ return &ErrEtcdConnFatal{err}
+ }
+ }
+ } else if ec.PeerAutoTLS {
+ if ec.Plog != nil {
+ ec.Plog.Warningf("ignoring peer auto TLS since certs given")
+ }
+ }
+ if ec.Plog != nil {
+ if !ec.PeerTLSInfo.Empty() {
+ ec.Plog.Infof("peerTLS: %s", ec.PeerTLSInfo)
+ }
+ }
+
+ var plns []net.Listener
+ for _, u := range ec.Lpurls {
+ if u.Scheme == "http" {
+ if ec.Plog != nil {
+ if !ec.PeerTLSInfo.Empty() {
+ ec.Plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
+ }
+ if ec.PeerTLSInfo.ClientCertAuth {
+ ec.Plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
+ }
+ }
+ }
+ var (
+ l net.Listener
+ tlscfg *tls.Config
+ )
+
+ if ec.PeerTLSInfo != nil && !ec.PeerTLSInfo.Empty() {
+ tlscfg, err = ec.PeerTLSInfo.ServerConfig()
+ if err != nil {
+ ec.err = err
+ return err
+ }
+ }
+
+ l, err = rafthttp.NewListener(u, tlscfg)
+ if err != nil {
+ ec.err = err
+ return err
+ }
+
+ urlStr := u.String()
+ if ec.Plog != nil {
+ ec.Plog.Info("listening for peers on ", urlStr)
+ }
+ ll := l // make a unique copy for the closure
+ d1 := func() {
+ if err != nil { // XXX
+ ll.Close()
+ if ec.Plog != nil {
+ ec.Plog.Info("stopping listening for peers on ", urlStr)
+ }
+ }
+ }
+ ec.defers = append(ec.defers, d1)
+ plns = append(plns, ll)
+ }
+
+ if ec.ClientAutoTLS && ec.ClientTLSInfo != nil && ec.ClientTLSInfo.Empty() {
+ var chosts []string
+ for _, u := range ec.Lcurls {
+ chosts = append(chosts, u.Host)
+ }
+ clientTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/client"), chosts)
+ ec.ClientTLSInfo = &clientTLSInfoPointer
+ if err != nil {
+ if ec.Plog != nil {
+ ec.Plog.Fatalf("could not get certs (%v)", err)
+ } else {
+ ec.err = fmt.Errorf("could not get certs (%v)", err)
+ return &ErrEtcdConnFatal{err}
+ }
+ }
+ } else if ec.ClientAutoTLS {
+ if ec.Plog != nil {
+ ec.Plog.Warningf("ignoring client auto TLS since certs given")
+ }
+ }
+
+ var ctlscfg *tls.Config
+ if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() {
+ if ec.Plog != nil {
+ ec.Plog.Infof("clientTLS: %s", ec.ClientTLSInfo)
+ }
+ ctlscfg, err = ec.ClientTLSInfo.ServerConfig()
+ if err != nil {
+ ec.err = err
+ return err
+ }
+ }
+
+ sctxs := make(map[string]*serveCtx)
+ for _, u := range ec.Lcurls {
+ if u.Scheme == "http" {
+ if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() {
+ if ec.Plog != nil {
+ ec.Plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
+ }
+ }
+ if ec.ClientTLSInfo != nil && ec.ClientTLSInfo.ClientCertAuth {
+ if ec.Plog != nil {
+ ec.Plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
+ }
+ }
+ }
+ if u.Scheme == "https" && ctlscfg == nil {
+ err = fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
+ ec.err = err
+ return err
+ }
+
+ ctx := &serveCtx{host: u.Host}
+
+ if u.Scheme == "https" {
+ ctx.secure = true
+ } else {
+ ctx.insecure = true
+ }
+
+ if sctxs[u.Host] != nil {
+ if ctx.secure {
+ sctxs[u.Host].secure = true
+ }
+ if ctx.insecure {
+ sctxs[u.Host].insecure = true
+ }
+ continue
+ }
+
+ var l net.Listener
+
+ l, err = net.Listen("tcp", u.Host)
+ if err != nil {
+ ec.err = err
+ return err
+ }
+
+ var fdLimit uint64
+ if fdLimit, err = runtimeutil.FDLimit(); err == nil {
+ if fdLimit <= reservedInternalFDNum {
+ if ec.Plog != nil {
+ ec.Plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
+ } else {
+ err = fmt.Errorf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
+ ec.err = err
+ return &ErrEtcdConnFatal{err}
+ }
+ }
+ l = etcdtransport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
+ }
+
+ l, err = etcdtransport.NewKeepAliveListener(l, "tcp", nil)
+ ctx.l = l
+ if err != nil {
+ ec.err = err
+ return err
+ }
+
+ if ec.Plog != nil {
+ ec.Plog.Info("listening for client requests on ", u.Host)
+ }
+ ll := l
+ msg := u.Host
+ d2 := func() {
+ if err != nil { // XXX
+ ll.Close()
+ if ec.Plog != nil {
+ ec.Plog.Info("stopping listening for client requests on ", msg)
+ }
+ }
+ }
+ ec.defers = append(ec.defers, d2)
+ sctxs[u.Host] = ctx
+ }
+
+ // plns []net.Listener, ctlscfg *tls.Config, sctxs map[string]*serveCtx
+ listen := func(s *etcdserver.EtcdServer) {
+ ch := http.Handler(&cors.CORSHandler{
+ Handler: v2http.NewClientHandler(s, s.Cfg.ReqTimeout()),
+ Info: ec.CorsInfo,
+ })
+ ph := v2http.NewPeerHandler(s)
+
+ // Start the peer server in a goroutine
+ for _, l := range plns {
+ go func(l net.Listener) {
+ e := servePeerHTTP(l, ph)
+ if ec.Plog != nil {
+ ec.Plog.Fatal(e)
+ } else {
+ os.Exit(1) // TODO: write on a ch to notify
+ }
+ }(l)
+ }
+ // Start a client server goroutine for each listen address
+ for _, sctx := range sctxs {
+ go func(sctx *serveCtx) {
+ // read timeout does not work with http close notify
+ // TODO: https://github.com/golang/go/issues/9524
+ e := serve(sctx, s, ctlscfg, ch)
+ if ec.Plog != nil {
+ ec.Plog.Fatal(e)
+ } else {
+ os.Exit(1) // TODO: write on a ch to notify
+ }
+ }(sctx)
+ }
+ }
+ ec.listen = listen
+ ec.err = err // likely nil
+ return err
+}
+
+func (ec *EtcdConn) Listen(s *etcdserver.EtcdServer) error {
+ if ec.err != nil { // error on init
+ return ec.err
+ }
+ ec.listen(s)
+ return nil
+}
+
+func (ec *EtcdConn) Close() {
+ for _, f := range ec.defers {
+ f()
+ }
+}
+
+// copied from etcdmain/serve.go
+type serveCtx struct {
+ l net.Listener
+ host string
+ secure bool
+ insecure bool
+}
+
+// serve accepts incoming connections on the listener l,
+// creating a new service goroutine for each. The service goroutines
+// read requests and then call handler to reply to them.
+func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error {
+ logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
+
+ <-s.ReadyNotify()
+ plog.Info("ready to serve client requests")
+
+ m := cmux.New(sctx.l)
+
+ if sctx.insecure {
+ gs := v3rpc.Server(s, nil)
+ grpcl := m.Match(cmux.HTTP2())
+ go func() { plog.Fatal(gs.Serve(grpcl)) }()
+
+ srvhttp := &http.Server{
+ Handler: handler,
+ ErrorLog: logger, // do not log user error
+ }
+ httpl := m.Match(cmux.HTTP1())
+ go func() { plog.Fatal(srvhttp.Serve(httpl)) }()
+ plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.host)
+ }
+
+ if sctx.secure {
+ gs := v3rpc.Server(s, tlscfg)
+ handler = grpcHandlerFunc(gs, handler)
+
+ tlsl := tls.NewListener(m.Match(cmux.Any()), tlscfg)
+ // TODO: add debug flag; enable logging when debug flag is set
+ srv := &http.Server{
+ Handler: handler,
+ TLSConfig: tlscfg,
+ ErrorLog: logger, // do not log user error
+ }
+ go func() { plog.Fatal(srv.Serve(tlsl)) }()
+
+ plog.Infof("serving client requests on %s", sctx.host)
+ }
+
+ return m.Serve()
+}
+
+// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
+// connections or otherHandler otherwise. Copied from cockroachdb.
+func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
+ grpcServer.ServeHTTP(w, r)
+ } else {
+ otherHandler.ServeHTTP(w, r)
+ }
+ })
+}
+
+func servePeerHTTP(l net.Listener, handler http.Handler) error {
+ logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
+ // TODO: add debug flag; enable logging when debug flag is set
+ srv := &http.Server{
+ Handler: handler,
+ ReadTimeout: 5 * time.Minute,
+ ErrorLog: logger, // do not log user error
+ }
+ return srv.Serve(l)
+}