From b6dc31d3fe10c0a00c9f50770363183debe79428 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Thu, 11 Oct 2018 20:43:37 +0200 Subject: [PATCH] first part to move feature interfaces into dedicated directory --- app/commander/commander.go | 4 +- app/commander/outbound.go | 10 ++--- app/dispatcher/default.go | 23 +++++----- app/dispatcher/sniffer.go | 6 +-- app/dns/udpns.go | 4 +- app/proxyman/command/command.go | 7 +-- app/proxyman/inbound/inbound.go | 4 +- app/proxyman/inbound/worker.go | 5 ++- app/proxyman/mux/mux.go | 24 +++++----- app/proxyman/outbound/handler.go | 12 ++--- app/proxyman/outbound/handler_test.go | 6 +-- app/proxyman/outbound/outbound.go | 27 ++++++------ app/router/router.go | 11 ++--- common/common.go | 7 +++ common/protocol/bittorrent/bittorrent.go | 4 +- common/protocol/http/sniff.go | 6 +-- common/protocol/tls/sniff.go | 16 +++---- common/vio/link.go | 9 ++++ features/feature.go | 9 ++++ features/outbound/outbound.go | 30 +++++++++++++ features/routing/dispatcher.go | 18 ++++++++ features/routing/router.go | 15 +++++++ network.go | 54 +++++++---------------- proxy/blackhole/blackhole.go | 4 +- proxy/dokodemo/dokodemo.go | 3 +- proxy/freedom/freedom.go | 3 +- proxy/http/server.go | 7 +-- proxy/mtproto/client.go | 4 +- proxy/mtproto/server.go | 3 +- proxy/proxy.go | 7 +-- proxy/shadowsocks/client.go | 3 +- proxy/shadowsocks/server.go | 7 +-- proxy/socks/client.go | 3 +- proxy/socks/server.go | 9 ++-- proxy/vmess/inbound/inbound.go | 3 +- proxy/vmess/outbound/outbound.go | 3 +- router.go | 44 ++++-------------- transport/internet/udp/dispatcher.go | 9 ++-- transport/internet/udp/dispatcher_test.go | 10 ++--- v2ray.go | 26 ++++++----- 40 files changed, 263 insertions(+), 196 deletions(-) create mode 100644 common/vio/link.go create mode 100644 features/feature.go create mode 100644 features/outbound/outbound.go create mode 100644 features/routing/dispatcher.go create mode 100644 features/routing/router.go diff --git a/app/commander/commander.go b/app/commander/commander.go index 26794d04c2..0048128f84 100644 --- a/app/commander/commander.go +++ b/app/commander/commander.go @@ -8,9 +8,11 @@ import ( "sync" "google.golang.org/grpc" + "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/signal/done" + "v2ray.com/core/features/outbound" ) // Commander is a V2Ray feature that provides gRPC methods to external clients. @@ -19,7 +21,7 @@ type Commander struct { server *grpc.Server config Config v *core.Instance - ohm core.OutboundHandlerManager + ohm outbound.HandlerManager } // NewCommander creates a new Commander based on the given config. diff --git a/app/commander/outbound.go b/app/commander/outbound.go index 4af1c65244..e69659c680 100644 --- a/app/commander/outbound.go +++ b/app/commander/outbound.go @@ -4,10 +4,10 @@ import ( "context" "sync" - "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/net" "v2ray.com/core/common/signal/done" + "v2ray.com/core/common/vio" "v2ray.com/core/transport/pipe" ) @@ -60,7 +60,7 @@ func (l *OutboundListener) Addr() net.Addr { } } -// Outbound is a core.OutboundHandler that handles gRPC connections. +// Outbound is a outbound.Handler that handles gRPC connections. type Outbound struct { tag string listener *OutboundListener @@ -68,8 +68,8 @@ type Outbound struct { closed bool } -// Dispatch implements core.OutboundHandler. -func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) { +// Dispatch implements outbound.Handler. +func (co *Outbound) Dispatch(ctx context.Context, link *vio.Link) { co.access.RLock() if co.closed { @@ -86,7 +86,7 @@ func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) { <-closeSignal.Wait() } -// Tag implements core.OutboundHandler. +// Tag implements outbound.Handler. func (co *Outbound) Tag() string { return co.tag } diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index fe020215bd..553ab77271 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -16,6 +16,9 @@ import ( "v2ray.com/core/common/protocol" "v2ray.com/core/common/session" "v2ray.com/core/common/stats" + "v2ray.com/core/common/vio" + "v2ray.com/core/features/outbound" + "v2ray.com/core/features/routing" "v2ray.com/core/transport/pipe" ) @@ -79,8 +82,8 @@ func (r *cachedReader) CloseError() { // DefaultDispatcher is a default implementation of Dispatcher. type DefaultDispatcher struct { - ohm core.OutboundHandlerManager - router core.Router + ohm outbound.HandlerManager + router routing.Router policy core.PolicyManager stats core.StatManager } @@ -95,7 +98,7 @@ func NewDefaultDispatcher(ctx context.Context, config *Config) (*DefaultDispatch stats: v.Stats(), } - if err := v.RegisterFeature((*core.Dispatcher)(nil), d); err != nil { + if err := v.RegisterFeature((*routing.Dispatcher)(nil), d); err != nil { return nil, newError("unable to register Dispatcher").Base(err) } return d, nil @@ -109,17 +112,17 @@ func (*DefaultDispatcher) Start() error { // Close implements common.Closable. func (*DefaultDispatcher) Close() error { return nil } -func (d *DefaultDispatcher) getLink(ctx context.Context) (*core.Link, *core.Link) { +func (d *DefaultDispatcher) getLink(ctx context.Context) (*vio.Link, *vio.Link) { opt := pipe.OptionsFromContext(ctx) uplinkReader, uplinkWriter := pipe.New(opt...) downlinkReader, downlinkWriter := pipe.New(opt...) - inboundLink := &core.Link{ + inboundLink := &vio.Link{ Reader: downlinkReader, Writer: uplinkWriter, } - outboundLink := &core.Link{ + outboundLink := &vio.Link{ Reader: uplinkReader, Writer: downlinkWriter, } @@ -159,8 +162,8 @@ func shouldOverride(result SniffResult, domainOverride []string) bool { return false } -// Dispatch implements core.Dispatcher. -func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*core.Link, error) { +// Dispatch implements routing.Dispatcher. +func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*vio.Link, error) { if !destination.IsValid() { panic("Dispatcher: Invalid destination.") } @@ -214,7 +217,7 @@ func sniffer(ctx context.Context, cReader *cachedReader) (SniffResult, error) { cReader.Cache(payload) if !payload.IsEmpty() { result, err := sniffer.Sniff(payload.Bytes()) - if err != core.ErrNoClue { + if err != common.ErrNoClue { return result, err } } @@ -225,7 +228,7 @@ func sniffer(ctx context.Context, cReader *cachedReader) (SniffResult, error) { } } -func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *core.Link, destination net.Destination) { +func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *vio.Link, destination net.Destination) { dispatcher := d.ohm.GetDefaultHandler() if d.router != nil { if tag, err := d.router.PickRoute(ctx); err == nil { diff --git a/app/dispatcher/sniffer.go b/app/dispatcher/sniffer.go index 8301c5dd2d..665a413c8f 100644 --- a/app/dispatcher/sniffer.go +++ b/app/dispatcher/sniffer.go @@ -1,7 +1,7 @@ package dispatcher import ( - "v2ray.com/core" + "v2ray.com/core/common" "v2ray.com/core/common/protocol/bittorrent" "v2ray.com/core/common/protocol/http" "v2ray.com/core/common/protocol/tls" @@ -34,7 +34,7 @@ func (s *Sniffer) Sniff(payload []byte) (SniffResult, error) { var pendingSniffer []protocolSniffer for _, s := range s.sniffer { result, err := s(payload) - if err == core.ErrNoClue { + if err == common.ErrNoClue { pendingSniffer = append(pendingSniffer, s) continue } @@ -46,7 +46,7 @@ func (s *Sniffer) Sniff(payload []byte) (SniffResult, error) { if len(pendingSniffer) > 0 { s.sniffer = pendingSniffer - return nil, core.ErrNoClue + return nil, common.ErrNoClue } return nil, errUnknownContent diff --git a/app/dns/udpns.go b/app/dns/udpns.go index fe1513c08d..4ccb4b59d2 100644 --- a/app/dns/udpns.go +++ b/app/dns/udpns.go @@ -7,9 +7,9 @@ import ( "time" "v2ray.com/core/common/session" + "v2ray.com/core/features/routing" "github.com/miekg/dns" - "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" @@ -48,7 +48,7 @@ type ClassicNameServer struct { clientIP net.IP } -func NewClassicNameServer(address net.Destination, dispatcher core.Dispatcher, clientIP net.IP) *ClassicNameServer { +func NewClassicNameServer(address net.Destination, dispatcher routing.Dispatcher, clientIP net.IP) *ClassicNameServer { s := &ClassicNameServer{ address: address, ips: make(map[string][]IPRecord), diff --git a/app/proxyman/command/command.go b/app/proxyman/command/command.go index 2b9b92c000..30fb2e480b 100755 --- a/app/proxyman/command/command.go +++ b/app/proxyman/command/command.go @@ -6,6 +6,7 @@ import ( grpc "google.golang.org/grpc" "v2ray.com/core" "v2ray.com/core/common" + "v2ray.com/core/features/outbound" "v2ray.com/core/proxy" ) @@ -18,7 +19,7 @@ type InboundOperation interface { // OutboundOperation is the interface for operations that applies to outbound handlers. type OutboundOperation interface { // ApplyOutbound applies this operation to the given outbound handler. - ApplyOutbound(context.Context, core.OutboundHandler) error + ApplyOutbound(context.Context, outbound.Handler) error } func getInbound(handler core.InboundHandler) (proxy.Inbound, error) { @@ -62,7 +63,7 @@ func (op *RemoveUserOperation) ApplyInbound(ctx context.Context, handler core.In type handlerServer struct { s *core.Instance ihm core.InboundHandlerManager - ohm core.OutboundHandlerManager + ohm outbound.HandlerManager } func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundRequest) (*AddInboundResponse, error) { @@ -104,7 +105,7 @@ func (s *handlerServer) AddOutbound(ctx context.Context, request *AddOutboundReq if err != nil { return nil, err } - handler, ok := rawHandler.(core.OutboundHandler) + handler, ok := rawHandler.(outbound.Handler) if !ok { return nil, newError("not an OutboundHandler.") } diff --git a/app/proxyman/inbound/inbound.go b/app/proxyman/inbound/inbound.go index c64367136a..c249bcf4ac 100644 --- a/app/proxyman/inbound/inbound.go +++ b/app/proxyman/inbound/inbound.go @@ -67,7 +67,7 @@ func (m *Manager) GetHandler(ctx context.Context, tag string) (core.InboundHandl // RemoveHandler implements core.InboundHandlerManager. func (m *Manager) RemoveHandler(ctx context.Context, tag string) error { if len(tag) == 0 { - return core.ErrNoClue + return common.ErrNoClue } m.access.Lock() @@ -81,7 +81,7 @@ func (m *Manager) RemoveHandler(ctx context.Context, tag string) error { return nil } - return core.ErrNoClue + return common.ErrNoClue } // Start implements common.Runnable. diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 965d2025fe..b7081f582d 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -15,6 +15,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/signal/done" "v2ray.com/core/common/task" + "v2ray.com/core/features/routing" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/tcp" @@ -36,7 +37,7 @@ type tcpWorker struct { stream *internet.MemoryStreamConfig recvOrigDest bool tag string - dispatcher core.Dispatcher + dispatcher routing.Dispatcher sniffingConfig *proxyman.SniffingConfig uplinkCounter core.StatCounter downlinkCounter core.StatCounter @@ -223,7 +224,7 @@ type udpWorker struct { port net.Port tag string stream *internet.MemoryStreamConfig - dispatcher core.Dispatcher + dispatcher routing.Dispatcher uplinkCounter core.StatCounter downlinkCounter core.StatCounter diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index cdcd637269..71284872e4 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -18,6 +18,8 @@ import ( "v2ray.com/core/common/protocol" "v2ray.com/core/common/session" "v2ray.com/core/common/signal/done" + "v2ray.com/core/common/vio" + "v2ray.com/core/features/routing" "v2ray.com/core/proxy" "v2ray.com/core/transport/pipe" ) @@ -42,7 +44,7 @@ func NewClientManager(p proxy.Outbound, d proxy.Dialer, c *proxyman.Multiplexing } } -func (m *ClientManager) Dispatch(ctx context.Context, link *core.Link) error { +func (m *ClientManager) Dispatch(ctx context.Context, link *vio.Link) error { m.access.Lock() defer m.access.Unlock() @@ -77,7 +79,7 @@ func (m *ClientManager) onClientFinish() { type Client struct { sessionManager *SessionManager - link core.Link + link vio.Link done *done.Instance manager *ClientManager concurrency uint32 @@ -99,7 +101,7 @@ func NewClient(pctx context.Context, p proxy.Outbound, dialer proxy.Dialer, m *C c := &Client{ sessionManager: NewSessionManager(), - link: core.Link{ + link: vio.Link{ Reader: downlinkReader, Writer: upLinkWriter, }, @@ -109,7 +111,7 @@ func NewClient(pctx context.Context, p proxy.Outbound, dialer proxy.Dialer, m *C } go func() { - if err := p.Process(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil { + if err := p.Process(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil { errors.New("failed to handler mux client connection").Base(err).WriteToLog() } common.Must(c.done.Close()) @@ -188,7 +190,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { } } -func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool { +func (m *Client) Dispatch(ctx context.Context, link *vio.Link) bool { sm := m.sessionManager if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal { return false @@ -297,7 +299,7 @@ func (m *Client) fetchOutput() { } type Server struct { - dispatcher core.Dispatcher + dispatcher routing.Dispatcher } // NewServer creates a new mux.Server. @@ -308,7 +310,7 @@ func NewServer(ctx context.Context) *Server { return s } -func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) { +func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) { if dest.Address != muxCoolAddress { return s.dispatcher.Dispatch(ctx, dest) } @@ -319,14 +321,14 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link worker := &ServerWorker{ dispatcher: s.dispatcher, - link: &core.Link{ + link: &vio.Link{ Reader: uplinkReader, Writer: downlinkWriter, }, sessionManager: NewSessionManager(), } go worker.run(ctx) - return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil + return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil } // Start implements common.Runnable. @@ -340,8 +342,8 @@ func (s *Server) Close() error { } type ServerWorker struct { - dispatcher core.Dispatcher - link *core.Link + dispatcher routing.Dispatcher + link *vio.Link sessionManager *SessionManager } diff --git a/app/proxyman/outbound/handler.go b/app/proxyman/outbound/handler.go index 10300e42f5..fbab94399a 100644 --- a/app/proxyman/outbound/handler.go +++ b/app/proxyman/outbound/handler.go @@ -9,6 +9,8 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/net" "v2ray.com/core/common/session" + "v2ray.com/core/common/vio" + "v2ray.com/core/features/outbound" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/pipe" @@ -19,11 +21,11 @@ type Handler struct { senderSettings *proxyman.SenderConfig streamSettings *internet.MemoryStreamConfig proxy proxy.Outbound - outboundManager core.OutboundHandlerManager + outboundManager outbound.HandlerManager mux *mux.ClientManager } -func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (core.OutboundHandler, error) { +func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbound.Handler, error) { v := core.MustFromContext(ctx) h := &Handler{ config: config, @@ -75,13 +77,13 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (core.O return h, nil } -// Tag implements core.OutboundHandler. +// Tag implements outbound.Handler. func (h *Handler) Tag() string { return h.config.Tag } // Dispatch implements proxy.Outbound.Dispatch. -func (h *Handler) Dispatch(ctx context.Context, link *core.Link) { +func (h *Handler) Dispatch(ctx context.Context, link *vio.Link) { if h.mux != nil { if err := h.mux.Dispatch(ctx, link); err != nil { newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx)) @@ -115,7 +117,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn uplinkReader, uplinkWriter := pipe.New(opts...) downlinkReader, downlinkWriter := pipe.New(opts...) - go handler.Dispatch(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter}) + go handler.Dispatch(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter}) return net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader)), nil } diff --git a/app/proxyman/outbound/handler_test.go b/app/proxyman/outbound/handler_test.go index bd15e03495..3423ad8d1e 100644 --- a/app/proxyman/outbound/handler_test.go +++ b/app/proxyman/outbound/handler_test.go @@ -3,14 +3,14 @@ package outbound_test import ( "testing" - "v2ray.com/core" . "v2ray.com/core/app/proxyman/outbound" + "v2ray.com/core/features/outbound" . "v2ray.com/ext/assert" ) func TestInterfaces(t *testing.T) { assert := With(t) - assert((*Handler)(nil), Implements, (*core.OutboundHandler)(nil)) - assert((*Manager)(nil), Implements, (*core.OutboundHandlerManager)(nil)) + assert((*Handler)(nil), Implements, (*outbound.Handler)(nil)) + assert((*Manager)(nil), Implements, (*outbound.HandlerManager)(nil)) } diff --git a/app/proxyman/outbound/outbound.go b/app/proxyman/outbound/outbound.go index 6c5df2759b..adae1f7133 100644 --- a/app/proxyman/outbound/outbound.go +++ b/app/proxyman/outbound/outbound.go @@ -9,24 +9,25 @@ import ( "v2ray.com/core" "v2ray.com/core/app/proxyman" "v2ray.com/core/common" + "v2ray.com/core/features/outbound" ) // Manager is to manage all outbound handlers. type Manager struct { access sync.RWMutex - defaultHandler core.OutboundHandler - taggedHandler map[string]core.OutboundHandler - untaggedHandlers []core.OutboundHandler + defaultHandler outbound.Handler + taggedHandler map[string]outbound.Handler + untaggedHandlers []outbound.Handler running bool } // New creates a new Manager. func New(ctx context.Context, config *proxyman.OutboundConfig) (*Manager, error) { m := &Manager{ - taggedHandler: make(map[string]core.OutboundHandler), + taggedHandler: make(map[string]outbound.Handler), } v := core.MustFromContext(ctx) - if err := v.RegisterFeature((*core.OutboundHandlerManager)(nil), m); err != nil { + if err := v.RegisterFeature((*outbound.HandlerManager)(nil), m); err != nil { return nil, newError("unable to register OutboundHandlerManager").Base(err) } return m, nil @@ -72,8 +73,8 @@ func (m *Manager) Close() error { return nil } -// GetDefaultHandler implements core.OutboundHandlerManager. -func (m *Manager) GetDefaultHandler() core.OutboundHandler { +// GetDefaultHandler implements outbound.HandlerManager. +func (m *Manager) GetDefaultHandler() outbound.Handler { m.access.RLock() defer m.access.RUnlock() @@ -83,8 +84,8 @@ func (m *Manager) GetDefaultHandler() core.OutboundHandler { return m.defaultHandler } -// GetHandler implements core.OutboundHandlerManager. -func (m *Manager) GetHandler(tag string) core.OutboundHandler { +// GetHandler implements outbound.HandlerManager. +func (m *Manager) GetHandler(tag string) outbound.Handler { m.access.RLock() defer m.access.RUnlock() if handler, found := m.taggedHandler[tag]; found { @@ -93,8 +94,8 @@ func (m *Manager) GetHandler(tag string) core.OutboundHandler { return nil } -// AddHandler implements core.OutboundHandlerManager. -func (m *Manager) AddHandler(ctx context.Context, handler core.OutboundHandler) error { +// AddHandler implements outbound.HandlerManager. +func (m *Manager) AddHandler(ctx context.Context, handler outbound.Handler) error { m.access.Lock() defer m.access.Unlock() @@ -116,10 +117,10 @@ func (m *Manager) AddHandler(ctx context.Context, handler core.OutboundHandler) return nil } -// RemoveHandler implements core.OutboundHandlerManager. +// RemoveHandler implements outbound.HandlerManager. func (m *Manager) RemoveHandler(ctx context.Context, tag string) error { if len(tag) == 0 { - return core.ErrNoClue + return common.ErrNoClue } m.access.Lock() defer m.access.Unlock() diff --git a/app/router/router.go b/app/router/router.go index ae7f641271..1be4778be6 100644 --- a/app/router/router.go +++ b/app/router/router.go @@ -6,6 +6,7 @@ import ( "context" "v2ray.com/core/common/session" + "v2ray.com/core/features/routing" "v2ray.com/core" "v2ray.com/core/common" @@ -13,7 +14,7 @@ import ( "v2ray.com/core/proxy" ) -// Router is an implementation of core.Router. +// Router is an implementation of routing.Router. type Router struct { domainStrategy Config_DomainStrategy rules []Rule @@ -38,7 +39,7 @@ func NewRouter(ctx context.Context, config *Config) (*Router, error) { r.rules[idx].Condition = cond } - if err := v.RegisterFeature((*core.Router)(nil), r); err != nil { + if err := v.RegisterFeature((*routing.Router)(nil), r); err != nil { return nil, newError("unable to register Router").Base(err) } return r, nil @@ -72,7 +73,7 @@ func (r *ipResolver) Resolve() []net.Address { return r.ip } -// PickRoute implements core.Router. +// PickRoute implements routing.Router. func (r *Router) PickRoute(ctx context.Context) (string, error) { resolver := &ipResolver{ dns: r.dns, @@ -93,7 +94,7 @@ func (r *Router) PickRoute(ctx context.Context) (string, error) { } if outbound == nil || !outbound.Target.IsValid() { - return "", core.ErrNoClue + return "", common.ErrNoClue } dest := outbound.Target @@ -110,7 +111,7 @@ func (r *Router) PickRoute(ctx context.Context) (string, error) { } } - return "", core.ErrNoClue + return "", common.ErrNoClue } // Start implements common.Runnable. diff --git a/common/common.go b/common/common.go index f7f8b02c2b..b67f187095 100644 --- a/common/common.go +++ b/common/common.go @@ -2,8 +2,15 @@ // See each sub-package for detail. package common +import "v2ray.com/core/common/errors" + //go:generate errorgen +var ( + // ErrNoClue is for the situation that existing information is not enough to make a decision. For example, Router may return this error when there is no suitable route. + ErrNoClue = errors.New("not enough information for making a decision") +) + // Must panics if err is not nil. func Must(err error) { if err != nil { diff --git a/common/protocol/bittorrent/bittorrent.go b/common/protocol/bittorrent/bittorrent.go index fbb8157024..a0285f23d9 100644 --- a/common/protocol/bittorrent/bittorrent.go +++ b/common/protocol/bittorrent/bittorrent.go @@ -3,7 +3,7 @@ package bittorrent import ( "errors" - "v2ray.com/core" + "v2ray.com/core/common" ) type SniffHeader struct { @@ -21,7 +21,7 @@ var errNotBittorrent = errors.New("not bittorrent header") func SniffBittorrent(b []byte) (*SniffHeader, error) { if len(b) < 20 { - return nil, core.ErrNoClue + return nil, common.ErrNoClue } if b[0] == 19 && string(b[1:20]) == "BitTorrent protocol" { diff --git a/common/protocol/http/sniff.go b/common/protocol/http/sniff.go index 70432ab534..6aadb48f80 100644 --- a/common/protocol/http/sniff.go +++ b/common/protocol/http/sniff.go @@ -5,7 +5,7 @@ import ( "errors" "strings" - "v2ray.com/core" + "v2ray.com/core/common" ) type version byte @@ -48,7 +48,7 @@ func beginWithHTTPMethod(b []byte) error { } if len(b) < len(m) { - return core.ErrNoClue + return common.ErrNoClue } } @@ -86,5 +86,5 @@ func SniffHTTP(b []byte) (*SniffHeader, error) { return sh, nil } - return nil, core.ErrNoClue + return nil, common.ErrNoClue } diff --git a/common/protocol/tls/sniff.go b/common/protocol/tls/sniff.go index 36ee1792eb..784f2a5194 100644 --- a/common/protocol/tls/sniff.go +++ b/common/protocol/tls/sniff.go @@ -4,7 +4,7 @@ import ( "errors" "strings" - "v2ray.com/core" + "v2ray.com/core/common" "v2ray.com/core/common/serial" ) @@ -31,15 +31,15 @@ func IsValidTLSVersion(major, minor byte) bool { // https://github.com/golang/go/blob/master/src/crypto/tls/handshake_messages.go#L300 func ReadClientHello(data []byte, h *SniffHeader) error { if len(data) < 42 { - return core.ErrNoClue + return common.ErrNoClue } sessionIDLen := int(data[38]) if sessionIDLen > 32 || len(data) < 39+sessionIDLen { - return core.ErrNoClue + return common.ErrNoClue } data = data[39+sessionIDLen:] if len(data) < 2 { - return core.ErrNoClue + return common.ErrNoClue } // cipherSuiteLen is the number of bytes of cipher suite numbers. Since // they are uint16s, the number must be even. @@ -49,11 +49,11 @@ func ReadClientHello(data []byte, h *SniffHeader) error { } data = data[2+cipherSuiteLen:] if len(data) < 1 { - return core.ErrNoClue + return common.ErrNoClue } compressionMethodsLen := int(data[0]) if len(data) < 1+compressionMethodsLen { - return core.ErrNoClue + return common.ErrNoClue } data = data[1+compressionMethodsLen:] @@ -124,7 +124,7 @@ func ReadClientHello(data []byte, h *SniffHeader) error { func SniffTLS(b []byte) (*SniffHeader, error) { if len(b) < 5 { - return nil, core.ErrNoClue + return nil, common.ErrNoClue } if b[0] != 0x16 /* TLS Handshake */ { @@ -135,7 +135,7 @@ func SniffTLS(b []byte) (*SniffHeader, error) { } headerLen := int(serial.BytesToUint16(b[3:5])) if 5+headerLen > len(b) { - return nil, core.ErrNoClue + return nil, common.ErrNoClue } h := &SniffHeader{} diff --git a/common/vio/link.go b/common/vio/link.go new file mode 100644 index 0000000000..2fe8dfd45c --- /dev/null +++ b/common/vio/link.go @@ -0,0 +1,9 @@ +package vio + +import "v2ray.com/core/common/buf" + +// Link is a utility for connecting between an inbound and an outbound proxy handler. +type Link struct { + Reader buf.Reader + Writer buf.Writer +} diff --git a/features/feature.go b/features/feature.go new file mode 100644 index 0000000000..cc52c18d39 --- /dev/null +++ b/features/feature.go @@ -0,0 +1,9 @@ +package features + +import "v2ray.com/core/common" + +// Feature is the interface for V2Ray features. All features must implement this interface. +// All existing features have an implementation in app directory. These features can be replaced by third-party ones. +type Feature interface { + common.Runnable +} diff --git a/features/outbound/outbound.go b/features/outbound/outbound.go new file mode 100644 index 0000000000..d9f5bb2f5a --- /dev/null +++ b/features/outbound/outbound.go @@ -0,0 +1,30 @@ +package outbound + +import ( + "context" + + "v2ray.com/core/common" + "v2ray.com/core/common/vio" + "v2ray.com/core/features" +) + +// Handler is the interface for handlers that process outbound connections. +type Handler interface { + common.Runnable + Tag() string + Dispatch(ctx context.Context, link *vio.Link) +} + +// HandlerManager is a feature that manages outbound.Handlers. +type HandlerManager interface { + features.Feature + // GetHandler returns an outbound.Handler for the given tag. + GetHandler(tag string) Handler + // GetDefaultHandler returns the default outbound.Handler. It is usually the first outbound.Handler specified in the configuration. + GetDefaultHandler() Handler + // AddHandler adds a handler into this outbound.HandlerManager. + AddHandler(ctx context.Context, handler Handler) error + + // RemoveHandler removes a handler from outbound.HandlerManager. + RemoveHandler(ctx context.Context, tag string) error +} diff --git a/features/routing/dispatcher.go b/features/routing/dispatcher.go new file mode 100644 index 0000000000..ac3177a090 --- /dev/null +++ b/features/routing/dispatcher.go @@ -0,0 +1,18 @@ +package routing + +import ( + "context" + + "v2ray.com/core/common/net" + "v2ray.com/core/common/vio" + "v2ray.com/core/features" +) + +// Dispatcher is a feature that dispatches inbound requests to outbound handlers based on rules. +// Dispatcher is required to be registered in a V2Ray instance to make V2Ray function properly. +type Dispatcher interface { + features.Feature + + // Dispatch returns a Ray for transporting data for the given request. + Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) +} diff --git a/features/routing/router.go b/features/routing/router.go new file mode 100644 index 0000000000..ba70a37c8d --- /dev/null +++ b/features/routing/router.go @@ -0,0 +1,15 @@ +package routing + +import ( + "context" + + "v2ray.com/core/features" +) + +// Router is a feature to choose an outbound tag for the given request. +type Router interface { + features.Feature + + // PickRoute returns a tag of an OutboundHandler based on the given context. + PickRoute(ctx context.Context) (string, error) +} diff --git a/network.go b/network.go index 2f8513672c..060da26764 100644 --- a/network.go +++ b/network.go @@ -6,6 +6,7 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/net" + "v2ray.com/core/features/outbound" ) // InboundHandler is the interface for handlers that process inbound connections. @@ -18,13 +19,6 @@ type InboundHandler interface { GetRandomInboundProxy() (interface{}, net.Port, int) } -// OutboundHandler is the interface for handlers that process outbound connections. -type OutboundHandler interface { - common.Runnable - Tag() string - Dispatch(ctx context.Context, link *Link) -} - // InboundHandlerManager is a feature that manages InboundHandlers. type InboundHandlerManager interface { Feature @@ -94,77 +88,63 @@ func (m *syncInboundHandlerManager) Set(manager InboundHandlerManager) { m.InboundHandlerManager = manager } -// OutboundHandlerManager is a feature that manages OutboundHandlers. -type OutboundHandlerManager interface { - Feature - // GetHandler returns an OutboundHandler for the given tag. - GetHandler(tag string) OutboundHandler - // GetDefaultHandler returns the default OutboundHandler. It is usually the first OutboundHandler specified in the configuration. - GetDefaultHandler() OutboundHandler - // AddHandler adds a handler into this OutboundHandlerManager. - AddHandler(ctx context.Context, handler OutboundHandler) error - - // RemoveHandler removes a handler from OutboundHandlerManager. - RemoveHandler(ctx context.Context, tag string) error -} - type syncOutboundHandlerManager struct { sync.RWMutex - OutboundHandlerManager + outbound.HandlerManager } -func (m *syncOutboundHandlerManager) GetHandler(tag string) OutboundHandler { +func (m *syncOutboundHandlerManager) GetHandler(tag string) outbound.Handler { m.RLock() defer m.RUnlock() - if m.OutboundHandlerManager == nil { + if m.HandlerManager == nil { return nil } - return m.OutboundHandlerManager.GetHandler(tag) + return m.HandlerManager.GetHandler(tag) } -func (m *syncOutboundHandlerManager) GetDefaultHandler() OutboundHandler { +func (m *syncOutboundHandlerManager) GetDefaultHandler() outbound.Handler { m.RLock() defer m.RUnlock() - if m.OutboundHandlerManager == nil { + if m.HandlerManager == nil { return nil } - return m.OutboundHandlerManager.GetDefaultHandler() + return m.HandlerManager.GetDefaultHandler() } -func (m *syncOutboundHandlerManager) AddHandler(ctx context.Context, handler OutboundHandler) error { +func (m *syncOutboundHandlerManager) AddHandler(ctx context.Context, handler outbound.Handler) error { m.RLock() defer m.RUnlock() - if m.OutboundHandlerManager == nil { + if m.HandlerManager == nil { return newError("OutboundHandlerManager not set.").AtError() } - return m.OutboundHandlerManager.AddHandler(ctx, handler) + return m.HandlerManager.AddHandler(ctx, handler) } func (m *syncOutboundHandlerManager) Start() error { m.RLock() defer m.RUnlock() - if m.OutboundHandlerManager == nil { + if m.HandlerManager == nil { return newError("OutboundHandlerManager not set.").AtError() } - return m.OutboundHandlerManager.Start() + return m.HandlerManager.Start() } func (m *syncOutboundHandlerManager) Close() error { m.RLock() defer m.RUnlock() - return common.Close(m.OutboundHandlerManager) + return common.Close(m.HandlerManager) } -func (m *syncOutboundHandlerManager) Set(manager OutboundHandlerManager) { +func (m *syncOutboundHandlerManager) Set(manager outbound.HandlerManager) { if manager == nil { return } @@ -172,6 +152,6 @@ func (m *syncOutboundHandlerManager) Set(manager OutboundHandlerManager) { m.Lock() defer m.Unlock() - common.Close(m.OutboundHandlerManager) // nolint: errcheck - m.OutboundHandlerManager = manager + common.Close(m.HandlerManager) // nolint: errcheck + m.HandlerManager = manager } diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index 4451efd463..60806db23f 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -7,8 +7,8 @@ import ( "context" "time" - "v2ray.com/core" "v2ray.com/core/common" + "v2ray.com/core/common/vio" "v2ray.com/core/proxy" "v2ray.com/core/transport/pipe" ) @@ -30,7 +30,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) { } // Process implements OutboundHandler.Dispatch(). -func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { +func (h *Handler) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error { nBytes := h.response.WriteTo(link.Writer) if nBytes > 0 { // Sleep a little here to make sure the response is sent to client. diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 4d372d875c..4b44285fdf 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -13,6 +13,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" + "v2ray.com/core/features/routing" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/pipe" ) @@ -56,7 +57,7 @@ type hasHandshakeAddress interface { HandshakeAddress() net.Address } -func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { +func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error { newError("processing connection from: ", conn.RemoteAddr()).AtDebug().WriteToLog(session.ExportIDToError(ctx)) dest := net.Destination{ Network: network, diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index a55619c4ff..f7a1c3fd14 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -15,6 +15,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" + "v2ray.com/core/common/vio" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" ) @@ -75,7 +76,7 @@ func isValidAddress(addr *net.IPOrDomain) bool { } // Process implements proxy.Outbound. -func (h *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { +func (h *Handler) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error { outbound := session.OutboundFromContext(ctx) if outbound == nil || !outbound.Target.IsValid() { return newError("target not specified.") diff --git a/proxy/http/server.go b/proxy/http/server.go index a3e1e77d40..e7499c39a5 100755 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -20,6 +20,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" + "v2ray.com/core/features/routing" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/pipe" ) @@ -101,7 +102,7 @@ type readerOnly struct { io.Reader } -func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { +func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error { reader := bufio.NewReaderSize(readerOnly{conn}, buf.Size) Start: @@ -165,7 +166,7 @@ Start: return err } -func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader *bufio.Reader, conn internet.Connection, dest net.Destination, dispatcher core.Dispatcher) error { +func (s *Server) handleConnect(ctx context.Context, request *http.Request, reader *bufio.Reader, conn internet.Connection, dest net.Destination, dispatcher routing.Dispatcher) error { _, err := conn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) if err != nil { return newError("failed to write back OK response").Base(err) @@ -222,7 +223,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade var errWaitAnother = newError("keep alive") -func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, writer io.Writer, dest net.Destination, dispatcher core.Dispatcher) error { +func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, writer io.Writer, dest net.Destination, dispatcher routing.Dispatcher) error { if !s.config.AllowTransparent && len(request.URL.Host) <= 0 { // RFC 2068 (HTTP/1.1) requires URL to be absolute URL in HTTP proxy. response := &http.Response{ diff --git a/proxy/mtproto/client.go b/proxy/mtproto/client.go index d2f37b98fb..ef1d55bd9c 100644 --- a/proxy/mtproto/client.go +++ b/proxy/mtproto/client.go @@ -3,13 +3,13 @@ package mtproto import ( "context" - "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/crypto" "v2ray.com/core/common/net" "v2ray.com/core/common/session" "v2ray.com/core/common/task" + "v2ray.com/core/common/vio" "v2ray.com/core/proxy" ) @@ -20,7 +20,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { return &Client{}, nil } -func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { +func (c *Client) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error { outbound := session.OutboundFromContext(ctx) if outbound == nil || !outbound.Target.IsValid() { return newError("unknown destination.") diff --git a/proxy/mtproto/server.go b/proxy/mtproto/server.go index 108e445807..880d4a78c5 100644 --- a/proxy/mtproto/server.go +++ b/proxy/mtproto/server.go @@ -14,6 +14,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" + "v2ray.com/core/features/routing" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/pipe" ) @@ -74,7 +75,7 @@ func isValidConnectionType(c [4]byte) bool { return false } -func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { +func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error { sPolicy := s.policy.ForLevel(s.user.Level) if err := conn.SetDeadline(time.Now().Add(sPolicy.Timeouts.Handshake)); err != nil { diff --git a/proxy/proxy.go b/proxy/proxy.go index 96b22eabd7..f47b82ab94 100755 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -8,9 +8,10 @@ package proxy import ( "context" - "v2ray.com/core" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + "v2ray.com/core/common/vio" + "v2ray.com/core/features/routing" "v2ray.com/core/transport/internet" ) @@ -20,13 +21,13 @@ type Inbound interface { Network() net.NetworkList // Process processes a connection of given network. If necessary, the Inbound can dispatch the connection to an Outbound. - Process(context.Context, net.Network, internet.Connection, core.Dispatcher) error + Process(context.Context, net.Network, internet.Connection, routing.Dispatcher) error } // An Outbound process outbound connections. type Outbound interface { // Process processes the given connection. The given dialer may be used to dial a system outbound connection. - Process(context.Context, *core.Link, Dialer) error + Process(context.Context, *vio.Link, Dialer) error } // Dialer is used by OutboundHandler for creating outbound connections. diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 9ca16df1a3..799cc2be3c 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -5,6 +5,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/task" + "v2ray.com/core/common/vio" "v2ray.com/core" "v2ray.com/core/common" @@ -44,7 +45,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { } // Process implements OutboundHandler.Process(). -func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { +func (c *Client) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error { outbound := session.OutboundFromContext(ctx) if outbound == nil || !outbound.Target.IsValid() { return newError("target not specified") diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index f460135328..323bd0763f 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -13,6 +13,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" + "v2ray.com/core/features/routing" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/udp" "v2ray.com/core/transport/pipe" @@ -57,7 +58,7 @@ func (s *Server) Network() net.NetworkList { return list } -func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { +func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error { switch network { case net.Network_TCP: return s.handleConnection(ctx, conn, dispatcher) @@ -68,7 +69,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet } } -func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { +func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error { udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) { request := protocol.RequestHeaderFromContext(ctx) if request == nil { @@ -143,7 +144,7 @@ func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection return nil } -func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { +func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error { sessionPolicy := s.v.PolicyManager().ForLevel(s.user.Level) conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)) diff --git a/proxy/socks/client.go b/proxy/socks/client.go index f6f789ddc9..1f3398af51 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -6,6 +6,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/task" + "v2ray.com/core/common/vio" "v2ray.com/core" "v2ray.com/core/common" @@ -46,7 +47,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) { } // Process implements proxy.Outbound.Process. -func (c *Client) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { +func (c *Client) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error { outbound := session.OutboundFromContext(ctx) if outbound == nil || !outbound.Target.IsValid() { return newError("target not specified.") diff --git a/proxy/socks/server.go b/proxy/socks/server.go index d328ca7448..590ec19190 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -14,6 +14,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" + "v2ray.com/core/features/routing" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/udp" "v2ray.com/core/transport/pipe" @@ -58,7 +59,7 @@ func (s *Server) Network() net.NetworkList { } // Process implements proxy.Inbound. -func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error { +func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error { switch network { case net.Network_TCP: return s.processTCP(ctx, conn, dispatcher) @@ -69,7 +70,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet } } -func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { +func (s *Server) processTCP(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error { plcy := s.policy() if err := conn.SetReadDeadline(time.Now().Add(plcy.Timeouts.Handshake)); err != nil { newError("failed to set deadline").Base(err).WriteToLog(session.ExportIDToError(ctx)) @@ -131,7 +132,7 @@ func (*Server) handleUDP(c io.Reader) error { return common.Error2(io.Copy(buf.DiscardBytes, c)) } -func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher core.Dispatcher) error { +func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher routing.Dispatcher) error { ctx, cancel := context.WithCancel(ctx) timer := signal.CancelAfterInactivity(ctx, cancel, s.policy().Timeouts.ConnectionIdle) @@ -172,7 +173,7 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ return nil } -func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher core.Dispatcher) error { +func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error { udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) { newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx)) diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 4f6243263c..97205df323 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -20,6 +20,7 @@ import ( "v2ray.com/core/common/signal" "v2ray.com/core/common/task" "v2ray.com/core/common/uuid" + "v2ray.com/core/features/routing" "v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess/encoding" "v2ray.com/core/transport/internet" @@ -213,7 +214,7 @@ func isInsecureEncryption(s protocol.SecurityType) bool { } // Process implements proxy.Inbound.Process(). -func (h *Handler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher core.Dispatcher) error { +func (h *Handler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher routing.Dispatcher) error { sessionPolicy := h.policyManager.ForLevel(0) if err := connection.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil { return newError("unable to set read deadline").Base(err).AtWarning() diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 0e531b3693..2dc072b6a6 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -16,6 +16,7 @@ import ( "v2ray.com/core/common/session" "v2ray.com/core/common/signal" "v2ray.com/core/common/task" + "v2ray.com/core/common/vio" "v2ray.com/core/proxy" "v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess/encoding" @@ -49,7 +50,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) { } // Process implements proxy.Outbound.Process(). -func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dialer) error { +func (v *Handler) Process(ctx context.Context, link *vio.Link, dialer proxy.Dialer) error { var rec *protocol.ServerSpec var conn internet.Connection diff --git a/router.go b/router.go index 692a8a3707..ba05b3687b 100644 --- a/router.go +++ b/router.go @@ -5,32 +5,17 @@ import ( "sync" "v2ray.com/core/common" - "v2ray.com/core/common/buf" - "v2ray.com/core/common/errors" "v2ray.com/core/common/net" + "v2ray.com/core/common/vio" + "v2ray.com/core/features/routing" ) -// Link is a utility for connecting between an inbound and an outbound proxy handler. -type Link struct { - Reader buf.Reader - Writer buf.Writer -} - -// Dispatcher is a feature that dispatches inbound requests to outbound handlers based on rules. -// Dispatcher is required to be registered in a V2Ray instance to make V2Ray function properly. -type Dispatcher interface { - Feature - - // Dispatch returns a Ray for transporting data for the given request. - Dispatch(ctx context.Context, dest net.Destination) (*Link, error) -} - type syncDispatcher struct { sync.RWMutex - Dispatcher + routing.Dispatcher } -func (d *syncDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*Link, error) { +func (d *syncDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) { d.RLock() defer d.RUnlock() @@ -59,7 +44,7 @@ func (d *syncDispatcher) Close() error { return common.Close(d.Dispatcher) } -func (d *syncDispatcher) Set(disp Dispatcher) { +func (d *syncDispatcher) Set(disp routing.Dispatcher) { if disp == nil { return } @@ -71,22 +56,9 @@ func (d *syncDispatcher) Set(disp Dispatcher) { d.Dispatcher = disp } -var ( - // ErrNoClue is for the situation that existing information is not enough to make a decision. For example, Router may return this error when there is no suitable route. - ErrNoClue = errors.New("not enough information for making a decision") -) - -// Router is a feature to choose an outbound tag for the given request. -type Router interface { - Feature - - // PickRoute returns a tag of an OutboundHandler based on the given context. - PickRoute(ctx context.Context) (string, error) -} - type syncRouter struct { sync.RWMutex - Router + routing.Router } func (r *syncRouter) PickRoute(ctx context.Context) (string, error) { @@ -94,7 +66,7 @@ func (r *syncRouter) PickRoute(ctx context.Context) (string, error) { defer r.RUnlock() if r.Router == nil { - return "", ErrNoClue + return "", common.ErrNoClue } return r.Router.PickRoute(ctx) @@ -118,7 +90,7 @@ func (r *syncRouter) Close() error { return common.Close(r.Router) } -func (r *syncRouter) Set(router Router) { +func (r *syncRouter) Set(router routing.Router) { if router == nil { return } diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index 565e9053df..075406217a 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -5,18 +5,19 @@ import ( "sync" "time" - "v2ray.com/core" "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" "v2ray.com/core/common/session" "v2ray.com/core/common/signal" + "v2ray.com/core/common/vio" + "v2ray.com/core/features/routing" ) type ResponseCallback func(ctx context.Context, payload *buf.Buffer) type connEntry struct { - link *core.Link + link *vio.Link timer signal.ActivityUpdater cancel context.CancelFunc } @@ -24,11 +25,11 @@ type connEntry struct { type Dispatcher struct { sync.RWMutex conns map[net.Destination]*connEntry - dispatcher core.Dispatcher + dispatcher routing.Dispatcher callback ResponseCallback } -func NewDispatcher(dispatcher core.Dispatcher, callback ResponseCallback) *Dispatcher { +func NewDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) *Dispatcher { return &Dispatcher{ conns: make(map[net.Destination]*connEntry), dispatcher: dispatcher, diff --git a/transport/internet/udp/dispatcher_test.go b/transport/internet/udp/dispatcher_test.go index 60e7d69004..06ba01deea 100644 --- a/transport/internet/udp/dispatcher_test.go +++ b/transport/internet/udp/dispatcher_test.go @@ -6,19 +6,19 @@ import ( "testing" "time" - "v2ray.com/core" "v2ray.com/core/common/buf" "v2ray.com/core/common/net" + "v2ray.com/core/common/vio" . "v2ray.com/core/transport/internet/udp" "v2ray.com/core/transport/pipe" . "v2ray.com/ext/assert" ) type TestDispatcher struct { - OnDispatch func(ctx context.Context, dest net.Destination) (*core.Link, error) + OnDispatch func(ctx context.Context, dest net.Destination) (*vio.Link, error) } -func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) { +func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) { return d.OnDispatch(ctx, dest) } @@ -50,9 +50,9 @@ func TestSameDestinationDispatching(t *testing.T) { var count uint32 td := &TestDispatcher{ - OnDispatch: func(ctx context.Context, dest net.Destination) (*core.Link, error) { + OnDispatch: func(ctx context.Context, dest net.Destination) (*vio.Link, error) { atomic.AddUint32(&count, 1) - return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil + return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil }, } dest := net.UDPDestination(net.LocalHostIP, 53) diff --git a/v2ray.go b/v2ray.go index 06a50f906b..20e23a7b72 100755 --- a/v2ray.go +++ b/v2ray.go @@ -7,6 +7,8 @@ import ( "v2ray.com/core/common" "v2ray.com/core/common/serial" "v2ray.com/core/common/uuid" + "v2ray.com/core/features/outbound" + "v2ray.com/core/features/routing" ) // Server is an instance of V2Ray. At any time, there must be at most one Server instance running. @@ -76,12 +78,12 @@ func New(config *Config) (*Instance, error) { } } - for _, outbound := range config.Outbound { - rawHandler, err := CreateObject(server, outbound) + for _, outboundConfig := range config.Outbound { + rawHandler, err := CreateObject(server, outboundConfig) if err != nil { return nil, err } - handler, ok := rawHandler.(OutboundHandler) + handler, ok := rawHandler.(outbound.Handler) if !ok { return nil, newError("not an OutboundHandler") } @@ -147,14 +149,14 @@ func (s *Instance) RegisterFeature(feature interface{}, instance Feature) error s.dnsClient.Set(instance.(DNSClient)) case PolicyManager, *PolicyManager: s.policyManager.Set(instance.(PolicyManager)) - case Router, *Router: - s.router.Set(instance.(Router)) - case Dispatcher, *Dispatcher: - s.dispatcher.Set(instance.(Dispatcher)) + case routing.Router, *routing.Router: + s.router.Set(instance.(routing.Router)) + case routing.Dispatcher, *routing.Dispatcher: + s.dispatcher.Set(instance.(routing.Dispatcher)) case InboundHandlerManager, *InboundHandlerManager: s.ihm.Set(instance.(InboundHandlerManager)) - case OutboundHandlerManager, *OutboundHandlerManager: - s.ohm.Set(instance.(OutboundHandlerManager)) + case outbound.HandlerManager, *outbound.HandlerManager: + s.ohm.Set(instance.(outbound.HandlerManager)) case StatManager, *StatManager: s.stats.Set(instance.(StatManager)) default: @@ -198,12 +200,12 @@ func (s *Instance) PolicyManager() PolicyManager { } // Router returns the Router used by this Instance. The returned Router is always functional. -func (s *Instance) Router() Router { +func (s *Instance) Router() routing.Router { return &(s.router) } // Dispatcher returns the Dispatcher used by this Instance. If Dispatcher was not registered before, the returned value doesn't work, although it is not nil. -func (s *Instance) Dispatcher() Dispatcher { +func (s *Instance) Dispatcher() routing.Dispatcher { return &(s.dispatcher) } @@ -213,7 +215,7 @@ func (s *Instance) InboundHandlerManager() InboundHandlerManager { } // OutboundHandlerManager returns the OutboundHandlerManager used by this Instance. If OutboundHandlerManager was not registered before, the returned value doesn't work. -func (s *Instance) OutboundHandlerManager() OutboundHandlerManager { +func (s *Instance) OutboundHandlerManager() outbound.HandlerManager { return &(s.ohm) }