Skip to content

Commit

Permalink
fix lint warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
DarienRaymond committed May 31, 2018
1 parent fe3ddb0 commit adade2b
Show file tree
Hide file tree
Showing 19 changed files with 155 additions and 76 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"--enable-gc",
"--no-config",
"--exclude=.*\\.pb\\.go",
"--disable=gas"
"--disable=gas",
"--disable=gocyclo"
],
"go.formatTool": "goimports",

Expand Down
2 changes: 1 addition & 1 deletion app/commander/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *Commander) Start() error {
if err != nil {
return err
}
rawService, err := c.v.CreateObject(config)
rawService, err := core.CreateObject(c.v, config)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions app/proxyman/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type handlerServer struct {
}

func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundRequest) (*AddInboundResponse, error) {
rawHandler, err := s.s.CreateObject(request.Inbound)
rawHandler, err := core.CreateObject(s.s, request.Inbound)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func (s *handlerServer) AlterInbound(ctx context.Context, request *AlterInboundR
}

func (s *handlerServer) AddOutbound(ctx context.Context, request *AddOutboundRequest) (*AddOutboundResponse, error) {
rawHandler, err := s.s.CreateObject(request.Outbound)
rawHandler, err := core.CreateObject(s.s, request.Outbound)
if err != nil {
return nil, err
}
Expand Down
13 changes: 11 additions & 2 deletions app/proxyman/inbound/always.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"v2ray.com/core/common"
"v2ray.com/core/common/dice"
"v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/proxy"
)

Expand Down Expand Up @@ -113,10 +114,18 @@ func (h *AlwaysOnInboundHandler) Start() error {
}

func (h *AlwaysOnInboundHandler) Close() error {
var errors []interface{}
for _, worker := range h.workers {
worker.Close()
if err := worker.Close(); err != nil {
errors = append(errors, err)
}
}
if err := h.mux.Close(); err != nil {
errors = append(errors, err)
}
if len(errors) > 0 {
return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
}
h.mux.Close()
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions app/proxyman/inbound/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func (h *DynamicInboundHandler) closeWorkers(workers []worker) {
ports2Del := make([]net.Port, len(workers))
for idx, worker := range workers {
ports2Del[idx] = worker.Port()
worker.Close()
if err := worker.Close(); err != nil {
newError("failed to close worker").Base(err).WriteToLog()
}
}

h.portMutex.Lock()
Expand All @@ -95,7 +97,7 @@ func (h *DynamicInboundHandler) refresh() error {

for i := uint32(0); i < concurrency; i++ {
port := h.allocatePort()
rawProxy, err := h.v.CreateObject(h.proxyConfig)
rawProxy, err := core.CreateObject(h.v, h.proxyConfig)
if err != nil {
newError("failed to create proxy instance").Base(err).AtWarning().WriteToLog()
continue
Expand Down
14 changes: 12 additions & 2 deletions app/proxyman/inbound/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"v2ray.com/core"
"v2ray.com/core/app/proxyman"
"v2ray.com/core/common"
"v2ray.com/core/common/serial"
)

// Manager is to manage all inbound handlers.
Expand Down Expand Up @@ -110,11 +111,20 @@ func (m *Manager) Close() error {

m.running = false

var errors []interface{}
for _, handler := range m.taggedHandlers {
handler.Close()
if err := handler.Close(); err != nil {
errors = append(errors, err)
}
}
for _, handler := range m.untaggedHandler {
handler.Close()
if err := handler.Close(); err != nil {
errors = append(errors, err)
}
}

if len(errors) > 0 {
return newError("failed to close all handlers").Base(newError(serial.Concat(errors...)))
}

return nil
Expand Down
87 changes: 56 additions & 31 deletions app/proxyman/inbound/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/common/task"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tcp"
Expand Down Expand Up @@ -97,9 +99,17 @@ func (w *tcpWorker) Start() error {
}

func (w *tcpWorker) Close() error {
var errors []interface{}
if w.hub != nil {
common.Close(w.hub)
common.Close(w.proxy)
if err := common.Close(w.hub); err != nil {
errors = append(errors, err)
}
if err := common.Close(w.proxy); err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
}

return nil
Expand Down Expand Up @@ -227,7 +237,7 @@ type udpWorker struct {
uplinkCounter core.StatCounter
downlinkCounter core.StatCounter

done *done.Instance
checker *task.Periodic
activeConn map[connID]*udpConn
}

Expand Down Expand Up @@ -295,7 +305,7 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest
if err := w.proxy.Process(ctx, net.Network_UDP, conn, w.dispatcher); err != nil {
newError("connection ends").Base(err).WriteToLog()
}
conn.Close()
conn.Close() // nolint: errcheck
w.removeConn(id)
}()
}
Expand All @@ -309,12 +319,37 @@ func (w *udpWorker) removeConn(id connID) {

func (w *udpWorker) Start() error {
w.activeConn = make(map[connID]*udpConn, 16)
w.done = done.New()
h, err := udp.ListenUDP(w.address, w.port, w.callback, udp.HubReceiveOriginalDestination(w.recvOrigDest), udp.HubCapacity(256))
if err != nil {
return err
}
go w.monitor()
w.checker = &task.Periodic{
Interval: time.Second * 16,
Execute: func() error {
nowSec := time.Now().Unix()
w.Lock()
if len(w.activeConn) == 0 {
return nil
}

for addr, conn := range w.activeConn {
if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 8 {
delete(w.activeConn, addr)
conn.Close() // nolint: errcheck
}
}

if len(w.activeConn) == 0 {
w.activeConn = make(map[connID]*udpConn, 16)
}
w.Unlock()

return nil
},
}
if err := w.checker.Start(); err != nil {
return err
}
w.hub = h
return nil
}
Expand All @@ -323,38 +358,28 @@ func (w *udpWorker) Close() error {
w.Lock()
defer w.Unlock()

var errors []interface{}

if w.hub != nil {
w.hub.Close()
if err := w.hub.Close(); err != nil {
errors = append(errors, err)
}
}

if w.done != nil {
common.Must(w.done.Close())
if w.checker != nil {
if err := w.checker.Close(); err != nil {
errors = append(errors, err)
}
}

common.Close(w.proxy)
return nil
}

func (w *udpWorker) monitor() {
timer := time.NewTicker(time.Second * 16)
defer timer.Stop()
if err := common.Close(w.proxy); err != nil {
errors = append(errors, err)
}

for {
select {
case <-w.done.Wait():
return
case <-timer.C:
nowSec := time.Now().Unix()
w.Lock()
for addr, conn := range w.activeConn {
if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 8 {
delete(w.activeConn, addr)
conn.Close()
}
}
w.Unlock()
}
if len(errors) > 0 {
return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
}
return nil
}

func (w *udpWorker) Port() net.Port {
Expand Down
4 changes: 3 additions & 1 deletion app/proxyman/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,12 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link
return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
}

// Start implements common.Runnable.
func (s *Server) Start() error {
return nil
}

// Close implements common.Closable.
func (s *Server) Close() error {
return nil
}
Expand Down Expand Up @@ -463,7 +465,7 @@ func (w *ServerWorker) run(ctx context.Context) {
input := w.link.Reader
reader := &buf.BufferedReader{Reader: input}

defer w.sessionManager.Close()
defer w.sessionManager.Close() // nolint: errcheck

for {
select {
Expand Down
18 changes: 10 additions & 8 deletions common/buf/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ type BufferToBytesWriter struct {
io.Writer
}

// NewBufferToBytesWriter returns a new BufferToBytesWriter.
func NewBufferToBytesWriter(writer io.Writer) *BufferToBytesWriter {
return &BufferToBytesWriter{
Writer: writer,
}
}

// WriteMultiBuffer implements Writer. This method takes ownership of the given buffer.
func (w *BufferToBytesWriter) WriteMultiBuffer(mb MultiBuffer) error {
defer mb.Release()
Expand Down Expand Up @@ -113,7 +106,16 @@ func (w *BufferedWriter) WriteMultiBuffer(b MultiBuffer) error {
// Flush flushes buffered content into underlying writer.
func (w *BufferedWriter) Flush() error {
if !w.buffer.IsEmpty() {
if err := w.writer.WriteMultiBuffer(NewMultiBufferValue(w.buffer)); err != nil {
b := w.buffer
w.buffer = nil

if writer, ok := w.writer.(io.Writer); ok {
_, err := writer.Write(b.Bytes())
b.Release()
if err != nil {
return err
}
} else if err := w.writer.WriteMultiBuffer(NewMultiBufferValue(b)); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ func (d *syncDNSClient) Set(client DNSClient) {
d.Lock()
defer d.Unlock()

common.Close(d.DNSClient)
common.Close(d.DNSClient) // nolint: errcheck
d.DNSClient = client
}
16 changes: 16 additions & 0 deletions functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package core

import (
"context"

"v2ray.com/core/common"
)

// CreateObject creates a new object based on the given V2Ray instance and config. The V2Ray instance may be nil.
func CreateObject(v *Instance, config interface{}) (interface{}, error) {
ctx := context.Background()
if v != nil {
ctx = context.WithValue(ctx, v2rayKey, v)
}
return common.CreateObject(ctx, config)
}
4 changes: 2 additions & 2 deletions network.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (m *syncInboundHandlerManager) Set(manager InboundHandlerManager) {
m.Lock()
defer m.Unlock()

common.Close(m.InboundHandlerManager)
common.Close(m.InboundHandlerManager) // nolint: errcheck
m.InboundHandlerManager = manager
}

Expand Down Expand Up @@ -172,6 +172,6 @@ func (m *syncOutboundHandlerManager) Set(manager OutboundHandlerManager) {
m.Lock()
defer m.Unlock()

common.Close(m.OutboundHandlerManager)
common.Close(m.OutboundHandlerManager) // nolint: errcheck
m.OutboundHandlerManager = manager
}
4 changes: 3 additions & 1 deletion policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ type BufferPolicy struct {
PerConnection int32
}

// SystemStatsPolicy contains stat policy settings on system level.
type SystemStatsPolicy struct {
// Whether or not to enable stat counter for uplink traffic in inbound handlers.
InboundUplink bool
// Whether or not to enable stat counter for downlink traffic in inbound handlers.
InboundDownlink bool
}

// SystemPolicy contains policy settings at system level.
type SystemPolicy struct {
Stats SystemStatsPolicy
Buffer BufferPolicy
Expand Down Expand Up @@ -178,6 +180,6 @@ func (m *syncPolicyManager) Set(manager PolicyManager) {
m.Lock()
defer m.Unlock()

common.Close(m.PolicyManager)
common.Close(m.PolicyManager) // nolint: errcheck
m.PolicyManager = manager
}
Loading

0 comments on commit adade2b

Please sign in to comment.