Skip to content

Commit

Permalink
*: remove unix socket. (#684)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored and siddontang committed Jul 19, 2017
1 parent c76b8c5 commit f4fbc5a
Show file tree
Hide file tree
Showing 23 changed files with 66 additions and 233 deletions.
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,4 @@ endif
mkdir -p _vendor
mv vendor _vendor/src

clean:
# clean unix socket
find . -type s -exec rm {} \;

.PHONY: update clean
14 changes: 1 addition & 13 deletions pd-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package pd

import (
"net"
"net/url"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -206,17 +204,7 @@ func (c *client) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return conn, nil
}

cc, err := grpc.Dial(addr, grpc.WithDialer(func(addr string, d time.Duration) (net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, errors.Trace(err)
}
// For tests.
if u.Scheme == "unix" || u.Scheme == "unixs" {
return net.DialTimeout(u.Scheme, u.Host, d)
}
return net.DialTimeout("tcp", u.Host, d)
}), grpc.WithInsecure()) // TODO: Support HTTPS.
cc, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure()) // TODO: Support HTTPS.
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
19 changes: 1 addition & 18 deletions pd-client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package pd

import (
"net"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -58,17 +57,9 @@ var (
}
)

var stripUnix = strings.NewReplacer("unix://", "")

func cleanServer(cfg *server.Config) {
// Clean data directory
os.RemoveAll(cfg.DataDir)

// Clean unix sockets
os.Remove(stripUnix.Replace(cfg.PeerUrls))
os.Remove(stripUnix.Replace(cfg.ClientUrls))
os.Remove(stripUnix.Replace(cfg.AdvertisePeerUrls))
os.Remove(stripUnix.Replace(cfg.AdvertiseClientUrls))
}

type cleanupFunc func()
Expand Down Expand Up @@ -120,16 +111,8 @@ func newServer(c *C) (*server.Server, cleanupFunc) {
return s, cleanup
}

var unixStripper = strings.NewReplacer("unix://", "", "unixs://", "")

func unixGrpcDialer(addr string, timeout time.Duration) (net.Conn, error) {
sock, err := net.DialTimeout("unix", unixStripper.Replace(addr), timeout)
return sock, err
}

func mustNewGrpcClient(c *C, addr string) pdpb.PDClient {
conn, err := grpc.Dial(addr, grpc.WithInsecure(),
grpc.WithDialer(unixGrpcDialer))
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())

c.Assert(err, IsNil)
return pdpb.NewPDClient(conn)
Expand Down
15 changes: 0 additions & 15 deletions pkg/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"encoding/json"
"io"
"io/ioutil"
"net"
"net/http"

"github.com/juju/errors"
)
Expand All @@ -39,16 +37,3 @@ func ReadJSON(r io.ReadCloser, data interface{}) error {

return nil
}

// NewHTTPTransport returns a proper http.RoundTripper.
func NewHTTPTransport(scheme string) *http.Transport {
tr := &http.Transport{}
if scheme == "unix" || scheme == "unixs" {
tr.Dial = unixDial
}
return tr
}

func unixDial(_, addr string) (net.Conn, error) {
return net.Dial("unix", addr)
}
24 changes: 4 additions & 20 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
package etcdutil

import (
"net/url"
"strings"
"net/http"
"time"

log "github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/types"
"github.com/juju/errors"
"github.com/pingcap/pd/pkg/apiutil"
"golang.org/x/net/context"
)

Expand All @@ -43,9 +41,6 @@ const (
checkEtcdRunningDelay = 1 * time.Second
)

// unixToHTTP replace unix scheme with http.
var unixToHTTP = strings.NewReplacer("unix://", "http://", "unixs://", "http://")

// CheckClusterID checks Etcd's cluster ID, returns an error if mismatch.
// This function will never block even quorum is not satisfied.
func CheckClusterID(localClusterID types.ID, um types.URLsMap) error {
Expand All @@ -58,20 +53,9 @@ func CheckClusterID(localClusterID types.ID, um types.URLsMap) error {
peerURLs = append(peerURLs, urls.StringSlice()...)
}

for i, u := range peerURLs {
u, gerr := url.Parse(u)
if gerr != nil {
return errors.Trace(gerr)
}
trp := apiutil.NewHTTPTransport(u.Scheme)

// For tests, change scheme to http.
// etcdserver/api/v3rpc does not recognize unix protocol.
if u.Scheme == "unix" || u.Scheme == "unixs" {
peerURLs[i] = unixToHTTP.Replace(peerURLs[i])
}

remoteCluster, gerr := etcdserver.GetClusterFromRemotePeers([]string{peerURLs[i]}, trp)
for _, u := range peerURLs {
trp := &http.Transport{}
remoteCluster, gerr := etcdserver.GetClusterFromRemotePeers([]string{u}, trp)
trp.CloseIdleConnections()
if gerr != nil {
// Do not return error, because other members may be not ready.
Expand Down
22 changes: 2 additions & 20 deletions pkg/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"io/ioutil"
"net/url"
"os"
"strings"
"testing"

"github.com/coreos/etcd/clientv3"
Expand All @@ -43,10 +42,10 @@ func newTestSingleConfig() *embed.Config {
cfg.Dir, _ = ioutil.TempDir("/tmp", "test_etcd")
cfg.WalDir = ""

pu, _ := url.Parse(testutil.UnixURL())
pu, _ := url.Parse(testutil.AllocTestURL())
cfg.LPUrls = []url.URL{*pu}
cfg.APUrls = cfg.LPUrls
cu, _ := url.Parse(testutil.UnixURL())
cu, _ := url.Parse(testutil.AllocTestURL())
cfg.LCUrls = []url.URL{*cu}
cfg.ACUrls = cfg.LCUrls

Expand All @@ -56,26 +55,9 @@ func newTestSingleConfig() *embed.Config {
return cfg
}

// TODO: move to package testutil.
var stripUnix = strings.NewReplacer("unix://", "")

func cleanConfig(cfg *embed.Config) {
// Clean data directory
os.RemoveAll(cfg.Dir)

// Clean unix sockets
for _, u := range cfg.APUrls {
os.Remove(stripUnix.Replace(u.String()))
}
for _, u := range cfg.LPUrls {
os.Remove(stripUnix.Replace(u.String()))
}
for _, u := range cfg.ACUrls {
os.Remove(stripUnix.Replace(u.String()))
}
for _, u := range cfg.LCUrls {
os.Remove(stripUnix.Replace(u.String()))
}
}

func (s *testEtcdutilSuite) TestMemberHelpers(c *C) {
Expand Down
21 changes: 15 additions & 6 deletions pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ package testutil

import (
"fmt"
"sync/atomic"
)
"net"

var unixURLCount uint64 = 1024
log "github.com/Sirupsen/logrus"
)

// UnixURL returns a unique unix socket url, used for test only.
func UnixURL() string {
return fmt.Sprintf("unix://localhost:%d", atomic.AddUint64(&unixURLCount, 1))
// AllocTestURL allocates a local URL for testing.
func AllocTestURL() string {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
log.Fatal(err)
}
addr := fmt.Sprintf("http://%s", l.Addr())
err = l.Close()
if err != nil {
log.Fatal(err)
}
return addr
}
3 changes: 1 addition & 2 deletions server/api/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func (s *testClusterInfo) SetUpSuite(c *C) {
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
httpAddr := mustUnixAddrToHTTPAddr(c, addr)
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", httpAddr, apiPrefix)
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
}

func (s *testClusterInfo) TearDownSuite(c *C) {
Expand Down
18 changes: 6 additions & 12 deletions server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"encoding/json"
"math/rand"
"net/http"
"strings"
"time"

. "github.com/pingcap/check"
Expand All @@ -31,7 +30,7 @@ type testConfigSuite struct {
}

func (s *testConfigSuite) SetUpSuite(c *C) {
s.hc = newUnixSocketClient()
s.hc = newHTTPClient()
}

func checkConfigResponse(c *C, body []byte, cfgs []*server.Config) {
Expand All @@ -46,8 +45,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) {
cfgs, _, clean := mustNewCluster(c, num)
defer clean()

parts := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config"}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
addr := cfgs[rand.Intn(len(cfgs))].ClientUrls + apiPrefix + "/api/v1/config"
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)
cfg := &server.Config{}
Expand Down Expand Up @@ -85,17 +83,15 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) {
cfgs, _, clean := mustNewCluster(c, num)
defer clean()

parts := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config/schedule"}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
addr := cfgs[rand.Intn(len(cfgs))].ClientUrls + apiPrefix + "/api/v1/config/schedule"
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)
sc := &server.ScheduleConfig{}
readJSON(resp.Body, sc)

sc.MaxStoreDownTime.Duration = time.Second
postData, err := json.Marshal(sc)
postURL := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config/schedule"}
postAddr := mustUnixAddrToHTTPAddr(c, strings.Join(postURL, ""))
postAddr := cfgs[rand.Intn(len(cfgs))].ClientUrls + apiPrefix + "/api/v1/config/schedule"
err = postJSON(s.hc, postAddr, postData)
c.Assert(err, IsNil)

Expand All @@ -113,8 +109,7 @@ func (s *testConfigSuite) TestConfigReplication(c *C) {
cfgs, _, clean := mustNewCluster(c, num)
defer clean()

parts := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config/replicate"}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
addr := cfgs[rand.Intn(len(cfgs))].ClientUrls + apiPrefix + "/api/v1/config/replicate"
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)

Expand All @@ -126,8 +121,7 @@ func (s *testConfigSuite) TestConfigReplication(c *C) {

rc1 := map[string]int{"max-replicas": 5}
postData, err := json.Marshal(rc1)
postURL := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config/replicate"}
postAddr := mustUnixAddrToHTTPAddr(c, strings.Join(postURL, ""))
postAddr := cfgs[rand.Intn(len(cfgs))].ClientUrls + apiPrefix + "/api/v1/config/replicate"
err = postJSON(s.hc, postAddr, postData)
c.Assert(err, IsNil)
rc.LocationLabels = []string{"zone", "rack"}
Expand Down
5 changes: 2 additions & 3 deletions server/api/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ func (s *testHistorySuite) SetUpSuite(c *C) {
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
httpAddr := mustUnixAddrToHTTPAddr(c, addr)
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", httpAddr, apiPrefix)
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)

mustBootstrapCluster(c, s.svr)
s.cli = newUnixSocketClient()
s.cli = newHTTPClient()

s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr())
regionHeartbeat, err := s.grpcPDClient.RegionHeartbeat(context.Background())
Expand Down
3 changes: 1 addition & 2 deletions server/api/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) {
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
httpAddr := mustUnixAddrToHTTPAddr(c, addr)
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", httpAddr, apiPrefix)
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)

mustBootstrapCluster(c, s.svr)
for _, store := range s.stores {
Expand Down
14 changes: 5 additions & 9 deletions server/api/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type testMemberAPISuite struct {
}

func (s *testMemberAPISuite) SetUpSuite(c *C) {
s.hc = newUnixSocketClient()
s.hc = newHTTPClient()
}

func relaxEqualStings(c *C, a, b []string) {
Expand Down Expand Up @@ -73,8 +73,7 @@ func (s *testMemberAPISuite) TestMemberList(c *C) {
cfgs, _, clean := mustNewCluster(c, num)
defer clean()

parts := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/members"}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
addr := cfgs[rand.Intn(len(cfgs))].ClientUrls + apiPrefix + "/api/v1/members"
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)
buf, err := ioutil.ReadAll(resp.Body)
Expand Down Expand Up @@ -134,8 +133,7 @@ func (s *testMemberAPISuite) TestMemberDelete(c *C) {
}

for _, t := range table {
parts := []string{clientURL, apiPrefix, "/api/v1/members/", t.name}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
addr := clientURL + apiPrefix + "/api/v1/members/" + t.name
req, err := http.NewRequest("DELETE", addr, nil)
c.Assert(err, IsNil)
resp, err := s.hc.Do(req)
Expand All @@ -147,8 +145,7 @@ func (s *testMemberAPISuite) TestMemberDelete(c *C) {
}

for i := 0; i < 10; i++ {
parts := []string{clientURL, apiPrefix, "/api/v1/members"}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
addr := clientURL + apiPrefix + "/api/v1/members"
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)
defer resp.Body.Close()
Expand All @@ -169,8 +166,7 @@ func (s *testMemberAPISuite) TestMemberLeader(c *C) {
leader, err := svrs[0].GetLeader()
c.Assert(err, IsNil)

parts := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/leader"}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
addr := cfgs[rand.Intn(len(cfgs))].ClientUrls + apiPrefix + "/api/v1/leader"
c.Assert(err, IsNil)
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)
Expand Down
Loading

0 comments on commit f4fbc5a

Please sign in to comment.