Skip to content

Commit

Permalink
first part to move feature interfaces into dedicated directory
Browse files Browse the repository at this point in the history
  • Loading branch information
DarienRaymond committed Oct 11, 2018
1 parent 88387f2 commit b6dc31d
Show file tree
Hide file tree
Showing 40 changed files with 263 additions and 196 deletions.
4 changes: 3 additions & 1 deletion app/commander/commander.go
Expand Up @@ -8,9 +8,11 @@ import (
"sync"

"google.golang.org/grpc"

"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/features/outbound"
)

// Commander is a V2Ray feature that provides gRPC methods to external clients.
Expand All @@ -19,7 +21,7 @@ type Commander struct {
server *grpc.Server
config Config
v *core.Instance
ohm core.OutboundHandlerManager
ohm outbound.HandlerManager
}

// NewCommander creates a new Commander based on the given config.
Expand Down
10 changes: 5 additions & 5 deletions app/commander/outbound.go
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"sync"

"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/common/vio"
"v2ray.com/core/transport/pipe"
)

Expand Down Expand Up @@ -60,16 +60,16 @@ func (l *OutboundListener) Addr() net.Addr {
}
}

// Outbound is a core.OutboundHandler that handles gRPC connections.
// Outbound is a outbound.Handler that handles gRPC connections.
type Outbound struct {
tag string
listener *OutboundListener
access sync.RWMutex
closed bool
}

// Dispatch implements core.OutboundHandler.
func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) {
// Dispatch implements outbound.Handler.
func (co *Outbound) Dispatch(ctx context.Context, link *vio.Link) {
co.access.RLock()

if co.closed {
Expand All @@ -86,7 +86,7 @@ func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) {
<-closeSignal.Wait()
}

// Tag implements core.OutboundHandler.
// Tag implements outbound.Handler.
func (co *Outbound) Tag() string {
return co.tag
}
Expand Down
23 changes: 13 additions & 10 deletions app/dispatcher/default.go
Expand Up @@ -16,6 +16,9 @@ import (
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/session"
"v2ray.com/core/common/stats"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/outbound"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/pipe"
)

Expand Down Expand Up @@ -79,8 +82,8 @@ func (r *cachedReader) CloseError() {

// DefaultDispatcher is a default implementation of Dispatcher.
type DefaultDispatcher struct {
ohm core.OutboundHandlerManager
router core.Router
ohm outbound.HandlerManager
router routing.Router
policy core.PolicyManager
stats core.StatManager
}
Expand All @@ -95,7 +98,7 @@ func NewDefaultDispatcher(ctx context.Context, config *Config) (*DefaultDispatch
stats: v.Stats(),
}

if err := v.RegisterFeature((*core.Dispatcher)(nil), d); err != nil {
if err := v.RegisterFeature((*routing.Dispatcher)(nil), d); err != nil {
return nil, newError("unable to register Dispatcher").Base(err)
}
return d, nil
Expand All @@ -109,17 +112,17 @@ func (*DefaultDispatcher) Start() error {
// Close implements common.Closable.
func (*DefaultDispatcher) Close() error { return nil }

func (d *DefaultDispatcher) getLink(ctx context.Context) (*core.Link, *core.Link) {
func (d *DefaultDispatcher) getLink(ctx context.Context) (*vio.Link, *vio.Link) {
opt := pipe.OptionsFromContext(ctx)
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)

inboundLink := &core.Link{
inboundLink := &vio.Link{
Reader: downlinkReader,
Writer: uplinkWriter,
}

outboundLink := &core.Link{
outboundLink := &vio.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
}
Expand Down Expand Up @@ -159,8 +162,8 @@ func shouldOverride(result SniffResult, domainOverride []string) bool {
return false
}

// Dispatch implements core.Dispatcher.
func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*core.Link, error) {
// Dispatch implements routing.Dispatcher.
func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destination) (*vio.Link, error) {
if !destination.IsValid() {
panic("Dispatcher: Invalid destination.")
}
Expand Down Expand Up @@ -214,7 +217,7 @@ func sniffer(ctx context.Context, cReader *cachedReader) (SniffResult, error) {
cReader.Cache(payload)
if !payload.IsEmpty() {
result, err := sniffer.Sniff(payload.Bytes())
if err != core.ErrNoClue {
if err != common.ErrNoClue {
return result, err
}
}
Expand All @@ -225,7 +228,7 @@ func sniffer(ctx context.Context, cReader *cachedReader) (SniffResult, error) {
}
}

func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *core.Link, destination net.Destination) {
func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *vio.Link, destination net.Destination) {
dispatcher := d.ohm.GetDefaultHandler()
if d.router != nil {
if tag, err := d.router.PickRoute(ctx); err == nil {
Expand Down
6 changes: 3 additions & 3 deletions app/dispatcher/sniffer.go
@@ -1,7 +1,7 @@
package dispatcher

import (
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/protocol/bittorrent"
"v2ray.com/core/common/protocol/http"
"v2ray.com/core/common/protocol/tls"
Expand Down Expand Up @@ -34,7 +34,7 @@ func (s *Sniffer) Sniff(payload []byte) (SniffResult, error) {
var pendingSniffer []protocolSniffer
for _, s := range s.sniffer {
result, err := s(payload)
if err == core.ErrNoClue {
if err == common.ErrNoClue {
pendingSniffer = append(pendingSniffer, s)
continue
}
Expand All @@ -46,7 +46,7 @@ func (s *Sniffer) Sniff(payload []byte) (SniffResult, error) {

if len(pendingSniffer) > 0 {
s.sniffer = pendingSniffer
return nil, core.ErrNoClue
return nil, common.ErrNoClue
}

return nil, errUnknownContent
Expand Down
4 changes: 2 additions & 2 deletions app/dns/udpns.go
Expand Up @@ -7,9 +7,9 @@ import (
"time"

"v2ray.com/core/common/session"
"v2ray.com/core/features/routing"

"github.com/miekg/dns"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
Expand Down Expand Up @@ -48,7 +48,7 @@ type ClassicNameServer struct {
clientIP net.IP
}

func NewClassicNameServer(address net.Destination, dispatcher core.Dispatcher, clientIP net.IP) *ClassicNameServer {
func NewClassicNameServer(address net.Destination, dispatcher routing.Dispatcher, clientIP net.IP) *ClassicNameServer {
s := &ClassicNameServer{
address: address,
ips: make(map[string][]IPRecord),
Expand Down
7 changes: 4 additions & 3 deletions app/proxyman/command/command.go
Expand Up @@ -6,6 +6,7 @@ import (
grpc "google.golang.org/grpc"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/features/outbound"
"v2ray.com/core/proxy"
)

Expand All @@ -18,7 +19,7 @@ type InboundOperation interface {
// OutboundOperation is the interface for operations that applies to outbound handlers.
type OutboundOperation interface {
// ApplyOutbound applies this operation to the given outbound handler.
ApplyOutbound(context.Context, core.OutboundHandler) error
ApplyOutbound(context.Context, outbound.Handler) error
}

func getInbound(handler core.InboundHandler) (proxy.Inbound, error) {
Expand Down Expand Up @@ -62,7 +63,7 @@ func (op *RemoveUserOperation) ApplyInbound(ctx context.Context, handler core.In
type handlerServer struct {
s *core.Instance
ihm core.InboundHandlerManager
ohm core.OutboundHandlerManager
ohm outbound.HandlerManager
}

func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundRequest) (*AddInboundResponse, error) {
Expand Down Expand Up @@ -104,7 +105,7 @@ func (s *handlerServer) AddOutbound(ctx context.Context, request *AddOutboundReq
if err != nil {
return nil, err
}
handler, ok := rawHandler.(core.OutboundHandler)
handler, ok := rawHandler.(outbound.Handler)
if !ok {
return nil, newError("not an OutboundHandler.")
}
Expand Down
4 changes: 2 additions & 2 deletions app/proxyman/inbound/inbound.go
Expand Up @@ -67,7 +67,7 @@ func (m *Manager) GetHandler(ctx context.Context, tag string) (core.InboundHandl
// RemoveHandler implements core.InboundHandlerManager.
func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
if len(tag) == 0 {
return core.ErrNoClue
return common.ErrNoClue
}

m.access.Lock()
Expand All @@ -81,7 +81,7 @@ func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
return nil
}

return core.ErrNoClue
return common.ErrNoClue
}

// Start implements common.Runnable.
Expand Down
5 changes: 3 additions & 2 deletions app/proxyman/inbound/worker.go
Expand Up @@ -15,6 +15,7 @@ import (
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/common/task"
"v2ray.com/core/features/routing"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tcp"
Expand All @@ -36,7 +37,7 @@ type tcpWorker struct {
stream *internet.MemoryStreamConfig
recvOrigDest bool
tag string
dispatcher core.Dispatcher
dispatcher routing.Dispatcher
sniffingConfig *proxyman.SniffingConfig
uplinkCounter core.StatCounter
downlinkCounter core.StatCounter
Expand Down Expand Up @@ -223,7 +224,7 @@ type udpWorker struct {
port net.Port
tag string
stream *internet.MemoryStreamConfig
dispatcher core.Dispatcher
dispatcher routing.Dispatcher
uplinkCounter core.StatCounter
downlinkCounter core.StatCounter

Expand Down
24 changes: 13 additions & 11 deletions app/proxyman/mux/mux.go
Expand Up @@ -18,6 +18,8 @@ import (
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/common/vio"
"v2ray.com/core/features/routing"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/pipe"
)
Expand All @@ -42,7 +44,7 @@ func NewClientManager(p proxy.Outbound, d proxy.Dialer, c *proxyman.Multiplexing
}
}

func (m *ClientManager) Dispatch(ctx context.Context, link *core.Link) error {
func (m *ClientManager) Dispatch(ctx context.Context, link *vio.Link) error {
m.access.Lock()
defer m.access.Unlock()

Expand Down Expand Up @@ -77,7 +79,7 @@ func (m *ClientManager) onClientFinish() {

type Client struct {
sessionManager *SessionManager
link core.Link
link vio.Link
done *done.Instance
manager *ClientManager
concurrency uint32
Expand All @@ -99,7 +101,7 @@ func NewClient(pctx context.Context, p proxy.Outbound, dialer proxy.Dialer, m *C

c := &Client{
sessionManager: NewSessionManager(),
link: core.Link{
link: vio.Link{
Reader: downlinkReader,
Writer: upLinkWriter,
},
Expand All @@ -109,7 +111,7 @@ func NewClient(pctx context.Context, p proxy.Outbound, dialer proxy.Dialer, m *C
}

go func() {
if err := p.Process(ctx, &core.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil {
if err := p.Process(ctx, &vio.Link{Reader: uplinkReader, Writer: downlinkWriter}, dialer); err != nil {
errors.New("failed to handler mux client connection").Base(err).WriteToLog()
}
common.Must(c.done.Close())
Expand Down Expand Up @@ -188,7 +190,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
}
}

func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool {
func (m *Client) Dispatch(ctx context.Context, link *vio.Link) bool {
sm := m.sessionManager
if sm.Size() >= int(m.concurrency) || sm.Count() >= maxTotal {
return false
Expand Down Expand Up @@ -297,7 +299,7 @@ func (m *Client) fetchOutput() {
}

type Server struct {
dispatcher core.Dispatcher
dispatcher routing.Dispatcher
}

// NewServer creates a new mux.Server.
Expand All @@ -308,7 +310,7 @@ func NewServer(ctx context.Context) *Server {
return s
}

func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link, error) {
func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
if dest.Address != muxCoolAddress {
return s.dispatcher.Dispatch(ctx, dest)
}
Expand All @@ -319,14 +321,14 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link

worker := &ServerWorker{
dispatcher: s.dispatcher,
link: &core.Link{
link: &vio.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
},
sessionManager: NewSessionManager(),
}
go worker.run(ctx)
return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
}

// Start implements common.Runnable.
Expand All @@ -340,8 +342,8 @@ func (s *Server) Close() error {
}

type ServerWorker struct {
dispatcher core.Dispatcher
link *core.Link
dispatcher routing.Dispatcher
link *vio.Link
sessionManager *SessionManager
}

Expand Down

0 comments on commit b6dc31d

Please sign in to comment.