/
listener.go
94 lines (81 loc) · 1.95 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package listener
import (
"io"
"net"
"sync/atomic"
"sync"
"github.com/zenhotels/astranet/addr"
"github.com/zenhotels/astranet/protocol"
"github.com/zenhotels/astranet/service"
"github.com/zenhotels/astranet/socket"
"github.com/zenhotels/astranet/transport"
)
type NewConnRequest struct {
Transport transport.Transport
HostId uint64
PortId uint32
}
type Listener struct {
service.ServiceInfo
network string
opChan []NewConnRequest
opLock sync.Mutex
opNew sync.Cond
closed uint32
onClose []func()
}
func New(network string, hostId uint64, lPort uint32, service string) *Listener {
var self = &Listener{}
self.ServiceInfo.Host = hostId
self.ServiceInfo.Port = lPort
self.ServiceInfo.Service = service
self.network = network
self.opNew.L = &self.opLock
return self
}
func (self *Listener) OnClose(c func()) {
self.onClose = append(self.onClose, c)
}
func (self *Listener) Close() error {
var cLock = atomic.AddUint32(&self.closed, 1)
self.opNew.Broadcast()
if cLock == 1 {
for _, c := range self.onClose {
c()
}
}
return nil
}
func (self *Listener) Recv(op protocol.Op, upstream transport.Transport) {
var cr = NewConnRequest{upstream, op.Local, op.LPort}
self.opLock.Lock()
self.opChan = append(self.opChan, cr)
self.opNew.Broadcast()
self.opLock.Unlock()
}
func (self *Listener) Accept() (net.Conn, error) {
var op NewConnRequest
self.opLock.Lock()
for atomic.LoadUint32(&self.closed) == 0 {
if len(self.opChan) == 0 {
self.opNew.Wait()
continue
}
op = self.opChan[0]
var n = copy(self.opChan[0:], self.opChan[1:])
self.opChan = self.opChan[0:n]
break
}
self.opLock.Unlock()
if op.Transport == nil {
self.Close()
return nil, io.EOF
}
return socket.NewServerSocket(
self.network, self.ServiceInfo.Host, self.ServiceInfo.Port,
op.HostId, op.PortId, op.Transport,
), nil
}
func (self *Listener) Addr() net.Addr {
return &addr.Addr{self.network, self.ServiceInfo.Host, self.ServiceInfo.Port}
}