-
Notifications
You must be signed in to change notification settings - Fork 42
/
poll_linux.go
131 lines (115 loc) · 3.03 KB
/
poll_linux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// +build linux
package poll
import (
"context"
"github.com/ipfs/fs-repo-migrations/fs-repo-6-to-7/gx/ipfs/QmPXvegq26x982cQjSfbTvSzZXn7GiaMwhhVPHkeTEhrPT/sys/unix"
"sync"
"github.com/ipfs/fs-repo-migrations/fs-repo-6-to-7/gx/ipfs/QmSGRM5Udmy1jsFBr1Cawez7Lt7LZ3ZKA23GGVEsiEW6F3/eventfd"
)
type Poller struct {
epfd int
eventMain unix.EpollEvent
eventWait unix.EpollEvent
events []unix.EpollEvent
wake *eventfd.EventFD // Use eventfd to wakeup epoll
wakeMutex sync.Mutex
}
func New(fd int) (p *Poller, err error) {
p = &Poller{
events: make([]unix.EpollEvent, 32),
}
if p.epfd, err = unix.EpollCreate1(0); err != nil {
return nil, err
}
wake, err := eventfd.New()
if err != nil {
unix.Close(p.epfd)
return nil, err
}
p.wake = wake
p.eventMain.Events = unix.EPOLLOUT
p.eventMain.Fd = int32(fd)
if err = unix.EpollCtl(p.epfd, unix.EPOLL_CTL_ADD, fd, &p.eventMain); err != nil {
p.Close()
return nil, err
}
// poll that eventfd can be read
p.eventWait.Events = unix.EPOLLIN
p.eventWait.Fd = int32(wake.Fd())
if err = unix.EpollCtl(p.epfd, unix.EPOLL_CTL_ADD, wake.Fd(), &p.eventWait); err != nil {
p.wake.Close()
p.Close()
return nil, err
}
return p, nil
}
func (p *Poller) Close() error {
p.wakeMutex.Lock()
err1 := p.wake.Close()
// set wake to nil to be sure that we won't call write on closed wake
// it should never happen but if someone changes something this might show a bug
p.wake = nil
p.wakeMutex.Unlock()
err2 := unix.Close(p.epfd)
if err1 != nil {
return err1
} else {
return err2
}
}
func (p *Poller) WaitWriteCtx(ctx context.Context) error {
doneChan := make(chan struct{})
defer close(doneChan)
go func() {
select {
case <-doneChan:
return
case <-ctx.Done():
select {
case <-doneChan:
// if we re done with this function do not write to p.wake
// it might be already closed and the fd could be reopened for
// different purpose
return
default:
}
p.wakeMutex.Lock()
if p.wake != nil {
p.wake.WriteEvents(1) // send event to wake up epoll
}
// if it is nil then we already closed
p.wakeMutex.Unlock()
return
}
}()
n, err := unix.EpollWait(p.epfd, p.events, -1)
if err != nil {
return err
}
good := false
for i := 0; i < n; i++ {
ev := p.events[i]
switch ev.Fd {
case p.eventMain.Fd:
good = true
case p.eventWait.Fd:
p.wakeMutex.Lock()
p.wake.ReadEvents() // clear eventfd
p.wakeMutex.Unlock()
default:
// shouldn't happen as epoll should onlt return events we registered
}
}
if good {
// in case both eventMain and eventWait are lit, we got with eventMain
// as it is the success condition here and if both of them are returned
// at the same time it means that socket connected right as context timed out
return nil
}
if ctx.Err() == nil {
// notification is sent by other goroutine when context deadline was reached
// if we are here it means that we got notification buy the deadline wasn't reached
panic("notification but no deadline, this should be impossible")
}
return ctx.Err()
}