Skip to content

Commit

Permalink
improve bsd
Browse files Browse the repository at this point in the history
  • Loading branch information
smallnest committed Aug 5, 2023
1 parent 7b2a7ce commit 31dd563
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 136 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Go

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:

build:
name: Build
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.20

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Get dependencies
run: |
go get -v -t -d ./...
- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
81 changes: 5 additions & 76 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,23 @@ package epoller

import (
"net"
"time"
)

func netConnToConn(in net.Conn) net.Conn {
// newConnImpl returns a net.Conn with GetFD() method.
func newConnImpl(in net.Conn) net.Conn {
if _, ok := in.(connImpl); ok {
return in
}

return connImpl{
conn: in,
Conn: in,
fd: socketFD(in),
}
}

type connImpl struct {
conn net.Conn
fd int
}

// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
func (c connImpl) Read(b []byte) (n int, err error) {
return c.conn.Read(b)
}

// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
func (c connImpl) Write(b []byte) (n int, err error) {
return c.conn.Write(b)
}

// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c connImpl) Close() error {
return c.conn.Close()
}

// LocalAddr returns the local network address, if known.
func (c connImpl) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}

// RemoteAddr returns the remote network address, if known.
func (c connImpl) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}

// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail instead of blocking. The deadline applies to all future
// and pending I/O, not just the immediately following call to
// Read or Write. After a deadline has been exceeded, the
// connection can be refreshed by setting a deadline in the future.
//
// If the deadline is exceeded a call to Read or Write or to other
// I/O methods will return an error that wraps os.ErrDeadlineExceeded.
// This can be tested using errors.Is(err, os.ErrDeadlineExceeded).
// The error's Timeout method will return true, but note that there
// are other possible errors for which the Timeout method will
// return true even if the deadline has not been exceeded.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (c connImpl) SetDeadline(t time.Time) error {
return c.conn.SetDeadline(t)
}

// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (c connImpl) SetReadDeadline(t time.Time) error {
return c.conn.SetReadDeadline(t)
}

// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (c connImpl) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
net.Conn
fd int
}

func (c connImpl) GetFD() int {
Expand Down
11 changes: 9 additions & 2 deletions epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ import (
"syscall"
)

// Poller is the interface for epoll/kqueue poller, special for network connections.
type Poller interface {
// Add adds the connection to poller.
Add(conn net.Conn) error
// Remove removes the connection from poller. Notice it doesn't call the conn.Close method.
Remove(conn net.Conn) error
// Wait waits for at most count events and returns the connections.
Wait(count int) ([]net.Conn, error)
// WaitWithBuffer waits for events with the buffered slice and returns the connections.
WaitWithBuffer() ([]net.Conn, error)
WaitChan(count int) <-chan []net.Conn
Close() error
// WaitChan waits for events and returns the connections with a channel.
WaitChan(count, chanBuffer int) <-chan []net.Conn
// Close closes the poller. If closeConns is true, it will close all the connections.
Close(closeConns bool) error
}

func socketFD(conn net.Conn) int {
Expand Down
84 changes: 54 additions & 30 deletions epoll_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,29 @@
package epoller

import (
"errors"
"net"
"sync"
"syscall"
)

type epoll struct {
fd int
ts syscall.Timespec
changes []syscall.Kevent_t
connections map[int]net.Conn
mu *sync.RWMutex
connbuf []net.Conn
events []syscall.Kevent_t
type Epoll struct {
fd int
ts syscall.Timespec
changes []syscall.Kevent_t
conns map[int]net.Conn
mu *sync.RWMutex
connbuf []net.Conn
events []syscall.Kevent_t
}

func NewPoller() (Poller, error) {
// NewPoller creates a new poller instance.
func NewPoller() (*Epoll, error) {
return NewPollerWithBuffer(128)
}

func NewPollerWithBuffer(count int) (Poller, error) {
// NewPollerWithBuffer creates a new poller instance with buffer size.
func NewPollerWithBuffer(count int) (*Epoll, error) {
p, err := syscall.Kqueue()
if err != nil {
panic(err)
Expand All @@ -37,28 +40,41 @@ func NewPollerWithBuffer(count int) (Poller, error) {
panic(err)
}

return &epoll{
fd: p,
ts: syscall.NsecToTimespec(1e9),
mu: &sync.RWMutex{},
connections: make(map[int]net.Conn),
connbuf: make([]net.Conn, count),
events: make([]syscall.Kevent_t, count),
return &Epoll{
fd: p,
ts: syscall.NsecToTimespec(1e9),
mu: &sync.RWMutex{},
conns: make(map[int]net.Conn),
connbuf: make([]net.Conn, count),
events: make([]syscall.Kevent_t, count),
}, nil
}

func (e *epoll) Close() error {
// Close closes the poller.
func (e *Epoll) Close(closeConns bool) error {
e.mu.Lock()
defer e.mu.Unlock()

e.connections = nil
if closeConns {
for _, conn := range e.conns {
conn.Close()
}
}

e.conns = nil
e.changes = nil
e.connbuf = e.connbuf[:0]

return syscall.Close(e.fd)
}

func (e *epoll) Add(conn net.Conn) error {
conn = netConnToConn(conn)
// Add adds a network connection to the poller.
func (e *Epoll) Add(conn net.Conn) error {
conn = newConnImpl(conn)
fd := socketFD(conn)
if e := syscall.SetNonblock(int(fd), true); e != nil {
return errors.New("udev: unix.SetNonblock failed")
}

e.mu.Lock()
defer e.mu.Unlock()
Expand All @@ -69,11 +85,14 @@ func (e *epoll) Add(conn net.Conn) error {
},
)

e.connections[fd] = conn
e.conns[fd] = conn

return nil
}

func (e *epoll) Remove(conn net.Conn) error {
// Remove removes a connection from the poller.
// If close is true, the connection will be closed.
func (e *Epoll) Remove(conn net.Conn) error {
fd := socketFD(conn)

e.mu.Lock()
Expand All @@ -92,11 +111,13 @@ func (e *epoll) Remove(conn net.Conn) error {
e.changes = changes
}

delete(e.connections, fd)
delete(e.conns, fd)

return nil
}

func (e *epoll) Wait(count int) ([]net.Conn, error) {
// Wait waits for events and returns the connections.
func (e *Epoll) Wait(count int) ([]net.Conn, error) {
events := make([]syscall.Kevent_t, count)

e.mu.RLock()
Expand All @@ -115,7 +136,7 @@ retry:
connections := make([]net.Conn, 0, n)
e.mu.RLock()
for i := 0; i < n; i++ {
conn := e.connections[int(events[i].Ident)]
conn := e.conns[int(events[i].Ident)]
if (events[i].Flags & syscall.EV_EOF) == syscall.EV_EOF {
conn.Close()
}
Expand All @@ -126,7 +147,9 @@ retry:
return connections, nil
}

func (e *epoll) WaitWithBuffer() ([]net.Conn, error) {
// WaitWithBuffer waits for events and returns the connections.
// It uses a connection slice as buffer to reduce memory allocations.
func (e *Epoll) WaitWithBuffer() ([]net.Conn, error) {
e.mu.RLock()
changes := e.changes
e.mu.RUnlock()
Expand All @@ -143,7 +166,7 @@ retry:
connections := e.connbuf[:0]
e.mu.RLock()
for i := 0; i < n; i++ {
conn := e.connections[int(e.events[i].Ident)]
conn := e.conns[int(e.events[i].Ident)]
if (e.events[i].Flags & syscall.EV_EOF) == syscall.EV_EOF {
conn.Close()
}
Expand All @@ -153,8 +176,9 @@ retry:
return connections, nil
}

func (e *epoll) WaitChan(count int) <-chan []net.Conn {
ch := make(chan []net.Conn)
// WaitChan returns a channel that you can use to receive connections.
func (e *Epoll) WaitChan(count, chanBuffer int) <-chan []net.Conn {
ch := make(chan []net.Conn, chanBuffer)
go func() {
for {
conns, err := e.Wait(count)
Expand Down

0 comments on commit 31dd563

Please sign in to comment.