Skip to content

Commit

Permalink
update activity timer
Browse files Browse the repository at this point in the history
  • Loading branch information
DarienRaymond committed May 27, 2018
1 parent 46dbbff commit ac6a0f7
Show file tree
Hide file tree
Showing 20 changed files with 140 additions and 218 deletions.
4 changes: 2 additions & 2 deletions app/commander/commander.go
Expand Up @@ -10,7 +10,7 @@ import (
"google.golang.org/grpc"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/signal/done"
)

// Commander is a V2Ray feature that provides gRPC methods to external clients.
Expand Down Expand Up @@ -64,7 +64,7 @@ func (c *Commander) Start() error {

listener := &OutboundListener{
buffer: make(chan net.Conn, 4),
done: signal.NewDone(),
done: done.New(),
}

go func() {
Expand Down
3 changes: 2 additions & 1 deletion app/commander/outbound.go
Expand Up @@ -8,13 +8,14 @@ import (
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/transport/pipe"
)

// OutboundListener is a net.Listener for listening gRPC connections.
type OutboundListener struct {
buffer chan net.Conn
done *signal.Done
done *done.Instance
}

func (l *OutboundListener) add(conn net.Conn) {
Expand Down
10 changes: 5 additions & 5 deletions app/proxyman/inbound/worker.go
Expand Up @@ -13,7 +13,7 @@ import (
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tcp"
Expand Down Expand Up @@ -115,7 +115,7 @@ type udpConn struct {
output func([]byte) (int, error)
remote net.Addr
local net.Addr
done *signal.Done
done *done.Instance
uplink core.StatCounter
downlink core.StatCounter
}
Expand Down Expand Up @@ -223,7 +223,7 @@ type udpWorker struct {
uplinkCounter core.StatCounter
downlinkCounter core.StatCounter

done *signal.Done
done *done.Instance
activeConn map[connID]*udpConn
}

Expand All @@ -248,7 +248,7 @@ func (w *udpWorker) getConnection(id connID) (*udpConn, bool) {
IP: w.address.IP(),
Port: int(w.port),
},
done: signal.NewDone(),
done: done.New(),
uplink: w.uplinkCounter,
downlink: w.downlinkCounter,
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func (w *udpWorker) removeConn(id connID) {

func (w *udpWorker) Start() error {
w.activeConn = make(map[connID]*udpConn, 16)
w.done = signal.NewDone()
w.done = done.New()
h, err := udp.ListenUDP(w.address, w.port, w.callback, udp.HubReceiveOriginalDestination(w.recvOrigDest), udp.HubCapacity(256))
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions app/proxyman/mux/mux.go
Expand Up @@ -16,7 +16,7 @@ import (
"v2ray.com/core/common/log"
"v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/pipe"
)
Expand Down Expand Up @@ -77,7 +77,7 @@ func (m *ClientManager) onClientFinish() {
type Client struct {
sessionManager *SessionManager
link core.Link
done *signal.Done
done *done.Instance
manager *ClientManager
concurrency uint32
}
Expand All @@ -100,7 +100,7 @@ func NewClient(pctx context.Context, p proxy.Outbound, dialer proxy.Dialer, m *C
Reader: downlinkReader,
Writer: upLinkWriter,
},
done: signal.NewDone(),
done: done.New(),
manager: m,
concurrency: m.config.Concurrency,
}
Expand Down
11 changes: 6 additions & 5 deletions common/log/logger.go
Expand Up @@ -7,7 +7,8 @@ import (
"time"

"v2ray.com/core/common/platform"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/common/signal/semaphore"
)

// Writer is the interface for writing logs.
Expand All @@ -22,17 +23,17 @@ type WriterCreator func() Writer
type generalLogger struct {
creator WriterCreator
buffer chan Message
access *signal.Semaphore
done *signal.Done
access *semaphore.Instance
done *done.Instance
}

// NewLogger returns a generic log handler that can handle all type of messages.
func NewLogger(logWriterCreator WriterCreator) Handler {
return &generalLogger{
creator: logWriterCreator,
buffer: make(chan Message, 16),
access: signal.NewSemaphore(1),
done: signal.NewDone(),
access: semaphore.New(1),
done: done.New(),
}
}

Expand Down
6 changes: 3 additions & 3 deletions common/net/connection.go
Expand Up @@ -7,7 +7,7 @@ import (

"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/signal/done"
)

type ConnectionOption func(*connection)
Expand Down Expand Up @@ -56,7 +56,7 @@ func ConnectionOnClose(n io.Closer) ConnectionOption {

func NewConnection(opts ...ConnectionOption) net.Conn {
c := &connection{
done: signal.NewDone(),
done: done.New(),
local: &net.TCPAddr{
IP: []byte{0, 0, 0, 0},
Port: 0,
Expand All @@ -77,7 +77,7 @@ func NewConnection(opts ...ConnectionOption) net.Conn {
type connection struct {
reader *buf.BufferedReader
writer buf.Writer
done *signal.Done
done *done.Instance
onClose io.Closer
local Addr
remote Addr
Expand Down
19 changes: 10 additions & 9 deletions common/protocol/address.go
Expand Up @@ -3,10 +3,11 @@ package protocol
import (
"io"

"v2ray.com/core/common/task"

"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/signal"
)

type AddressOption func(*AddressParser)
Expand Down Expand Up @@ -153,9 +154,9 @@ func (p *AddressParser) ReadAddressPort(buffer *buf.Buffer, input io.Reader) (ne
var err error

if p.portFirst {
err = signal.Execute(pTask, aTask)
err = task.Run(task.Sequential(pTask, aTask))()
} else {
err = signal.Execute(aTask, pTask)
err = task.Run(task.Sequential(aTask, pTask))()
}

if err != nil {
Expand All @@ -177,21 +178,21 @@ func (p *AddressParser) writeAddress(writer io.Writer, address net.Address) erro

switch address.Family() {
case net.AddressFamilyIPv4, net.AddressFamilyIPv6:
return signal.Execute(func() error {
return task.Run(task.Sequential(func() error {
return common.Error2(writer.Write([]byte{tb}))
}, func() error {
return common.Error2(writer.Write(address.IP()))
})
}))()
case net.AddressFamilyDomain:
domain := address.Domain()
if isDomainTooLong(domain) {
return newError("Super long domain is not supported: ", domain)
}
return signal.Execute(func() error {
return task.Run(task.Sequential(func() error {
return common.Error2(writer.Write([]byte{tb, byte(len(domain))}))
}, func() error {
return common.Error2(writer.Write([]byte(domain)))
})
}))()
default:
panic("Unknown family type.")
}
Expand All @@ -207,8 +208,8 @@ func (p *AddressParser) WriteAddressPort(writer io.Writer, addr net.Address, por
}

if p.portFirst {
return signal.Execute(pTask, aTask)
return task.Run(task.Sequential(pTask, aTask))()
}

return signal.Execute(aTask, pTask)
return task.Run(task.Sequential(aTask, pTask))()
}
18 changes: 9 additions & 9 deletions common/signal/done.go → common/signal/done/done.go
@@ -1,25 +1,25 @@
package signal
package done

import (
"sync"
)

// Done is a utility for notifications of something being done.
type Done struct {
// Instance is a utility for notifications of something being done.
type Instance struct {
access sync.Mutex
c chan struct{}
closed bool
}

// NewDone returns a new Done.
func NewDone() *Done {
return &Done{
// New returns a new Done.
func New() *Instance {
return &Instance{
c: make(chan struct{}),
}
}

// Done returns true if Close() is called.
func (d *Done) Done() bool {
func (d *Instance) Done() bool {
select {
case <-d.Wait():
return true
Expand All @@ -29,12 +29,12 @@ func (d *Done) Done() bool {
}

// Wait returns a channel for waiting for done.
func (d *Done) Wait() <-chan struct{} {
func (d *Instance) Wait() <-chan struct{} {
return d.c
}

// Close marks this Done 'done'. This method may be called multiple times. All calls after first call will have no effect on its status.
func (d *Done) Close() error {
func (d *Instance) Close() error {
d.access.Lock()
defer d.access.Unlock()

Expand Down
47 changes: 0 additions & 47 deletions common/signal/exec.go

This file was deleted.

43 changes: 0 additions & 43 deletions common/signal/exec_test.go

This file was deleted.

27 changes: 0 additions & 27 deletions common/signal/semaphore.go

This file was deleted.

0 comments on commit ac6a0f7

Please sign in to comment.