Skip to content

Commit

Permalink
del _
Browse files Browse the repository at this point in the history
  • Loading branch information
shaovie committed Sep 1, 2023
1 parent 031512b commit 56d733f
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 61 deletions.
2 changes: 1 addition & 1 deletion epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (ep *evPoll) remove(fd int, events uint32) error {
ev := syscall.EpollEvent{Events: ed.events}
*(**evData)(unsafe.Pointer(&ev.Fd)) = ed
if err := syscall.EpollCtl(ep.efd, syscall.EPOLL_CTL_MOD, fd, &ev); err != nil {
ed.events |= events;
ed.events |= events
return errors.New("epoll_ctl mod: " + err.Error())
}
return nil
Expand Down
77 changes: 37 additions & 40 deletions io_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,74 +12,71 @@ import (
type IOHandle struct {
noCopy

_asyncWriteWaiting bool
_fd int32
_asyncWriteBufSize int // total size of wait to write
asyncWriteWaiting bool
fd int32
asyncWriteBufSize int // total size of wait to write

_r *Reactor

_ep *evPoll

_ti *timerItem

_asyncWriteBufQ *RingBuffer[asyncWriteBuf] // 保存未直接发送完成的
r *Reactor
ep *evPoll
ti *timerItem
asyncWriteBufQ *RingBuffer[asyncWriteBuf] // 保存未直接发送完成的
}

// Init IOHandle must be called when reusing it.
func (h *IOHandle) Init() {
h._r, h._ep, h._ti = nil, nil, nil
h.r, h.ep, h.ti = nil, nil, nil
h.setFd(-1)
}

func (h *IOHandle) setParams(fd int, ep *evPoll) {
h.setFd(fd)
h._ep = ep
h.ep = ep
}

func (h *IOHandle) getEvPoll() *evPoll {
return h._ep
return h.ep
}

func (h *IOHandle) setReactor(r *Reactor) {
h._r = r
h.r = r
}

// GetReactor can retrieve the current event object bound to which Reactor
func (h *IOHandle) GetReactor() *Reactor {
return h._r
return h.r
}

func (h *IOHandle) setTimerItem(ti *timerItem) {
h._ti = ti
h.ti = ti
}

func (h *IOHandle) getTimerItem() *timerItem {
return h._ti
return h.ti
}

// Fd return fd
func (h *IOHandle) Fd() int {
return int(atomic.LoadInt32(&(h._fd)))
return int(atomic.LoadInt32(&(h.fd)))
}
func (h *IOHandle) setFd(fd int) {
atomic.StoreInt32(&(h._fd), int32(fd))
atomic.StoreInt32(&(h.fd), int32(fd))
}

// ScheduleTimer Add a timer event to an IOHandle that is already registered with the reactor
// to ensure that all event handling occurs within the same evpoll
//
// Only supports binding timers to I/O objects within evpoll internally.
func (h *IOHandle) ScheduleTimer(eh EvHandler, delay, interval int64) error {
if h._ep != nil {
return h._ep.scheduleTimer(eh, delay, interval)
if h.ep != nil {
return h.ep.scheduleTimer(eh, delay, interval)
}
return errors.New("ev handler has not been added to the reactor yet")
}

// CancelTimer cancels a timer that has been successfully scheduled
func (h *IOHandle) CancelTimer(eh EvHandler) {
if h._ep != nil {
h._ep.cancelTimer(eh)
if h.ep != nil {
h.ep.cancelTimer(eh)
}
}

Expand All @@ -91,8 +88,8 @@ func (h *IOHandle) Read() (bf []byte, n int, err error) {
if fd < 1 {
return nil, 0, syscall.EBADF
}
if h._ep != nil {
return h._ep.read(fd)
if h.ep != nil {
return h.ep.read(fd)
}
panic("goev: IOHandle.Read fd not register to evpoll")
}
Expand All @@ -101,15 +98,15 @@ func (h *IOHandle) Read() (bf []byte, n int, err error) {
//
// Can only be used within the poller goroutine
func (h *IOHandle) WriteBuff() []byte {
if h._ep != nil {
return h._ep.writeBuff()
if h.ep != nil {
return h.ep.writeBuff()
}
panic("goev: IOHandle.WriteBuff fd not register to evpoll")
}

// PCachedGet returns cached data store in evPoll, it's lock free
func (h *IOHandle) PCachedGet(id int) (any, bool) {
return h._ep.pCacheGet(id)
return h.ep.pCacheGet(id)
}

// Write synchronous write.
Expand All @@ -119,14 +116,14 @@ func (h *IOHandle) Write(bf []byte) (n int, err error) {
if fd < 1 { // NOTE fd must > 0
return 0, syscall.EBADF
}
if h._asyncWriteBufQ != nil && !h._asyncWriteBufQ.IsEmpty() {
if h.asyncWriteBufQ != nil && !h.asyncWriteBufQ.IsEmpty() {
abf := ioAllocBuff(len(bf))
n = copy(abf, bf)
h._asyncWriteBufQ.PushBack(asyncWriteBuf{
h.asyncWriteBufQ.PushBack(asyncWriteBuf{
len: n,
buf: abf,
})
h._asyncWriteBufSize += n
h.asyncWriteBufSize += n
return
}
for {
Expand All @@ -142,17 +139,17 @@ func (h *IOHandle) Write(bf []byte) (n int, err error) {
if n < len(bf) {
abf := ioAllocBuff(len(bf) - n)
n = copy(abf, bf[n:])
if h._asyncWriteBufQ == nil {
h._asyncWriteBufQ = NewRingBuffer[asyncWriteBuf](2)
if h.asyncWriteBufQ == nil {
h.asyncWriteBufQ = NewRingBuffer[asyncWriteBuf](2)
}
h._asyncWriteBufQ.PushBack(asyncWriteBuf{
h.asyncWriteBufQ.PushBack(asyncWriteBuf{
len: n,
buf: abf,
})
h._asyncWriteBufSize += n
if h._asyncWriteWaiting == false {
h._asyncWriteWaiting = true
h._ep.append(fd, EvOut) // No need to use ET mode
h.asyncWriteBufSize += n
if h.asyncWriteWaiting == false {
h.asyncWriteWaiting = true
h.ep.append(fd, EvOut) // No need to use ET mode
// eh needs to implement the OnWrite method, and the OnWrite method
// needs to call AsyncOrderedFlush.
}
Expand All @@ -170,9 +167,9 @@ func (h *IOHandle) Destroy(eh EvHandler) {
h.setFd(-1)
}

if h._asyncWriteBufQ != nil && !h._asyncWriteBufQ.IsEmpty() {
if h.asyncWriteBufQ != nil && !h.asyncWriteBufQ.IsEmpty() {
for {
abf, ok := h._asyncWriteBufQ.PopFront()
abf, ok := h.asyncWriteBufQ.PopFront()
if !ok {
break
}
Expand Down
40 changes: 20 additions & 20 deletions io_handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,29 @@ func (h *IOHandle) AsyncOrderedFlush(eh EvHandler) {
if fd < 1 {
return
}
n := h._asyncWriteBufQ.Len()
n := h.asyncWriteBufQ.Len()
// It is necessary to use n to limit the number of sending attempts.
// If there is a possibility of sending failure, the data should be saved again in _asyncWriteBufQ
for i := 0; i < n; i++ {
abf, ok := h._asyncWriteBufQ.PopFront()
abf, ok := h.asyncWriteBufQ.PopFront()
if !ok {
break
}
n, _ := syscall.Write(fd, abf.buf[abf.writen:abf.len])
if n > 0 {
h._asyncWriteBufSize -= n
h.asyncWriteBufSize -= n
if n == (abf.len - abf.writen) { // send completely
ioFreeBuff(abf.buf)
continue
}
abf.writen += n // Partially write, shift n
}
h._asyncWriteBufQ.PushFront(abf)
h.asyncWriteBufQ.PushFront(abf)
break
}
if h._asyncWriteBufQ.IsEmpty() {
h._ep.remove(fd, EvOut)
h._asyncWriteWaiting = false
if h.asyncWriteBufQ.IsEmpty() {
h.ep.remove(fd, EvOut)
h.asyncWriteWaiting = false
}
}

Expand All @@ -73,7 +73,7 @@ func (h *IOHandle) AsyncWrite(eh EvHandler, buf []byte) {
}
abf := ioAllocBuff(len(buf))
n := copy(abf, buf) // if n != len(buf) panic ?
h._ep.push(asyncWriteItem{
h.ep.push(asyncWriteItem{
fd: fd,
eh: eh,
abf: asyncWriteBuf{
Expand All @@ -89,15 +89,15 @@ func (h *IOHandle) asyncOrderedWrite(eh EvHandler, abf asyncWriteBuf) {
ioFreeBuff(abf.buf)
return
}
h._asyncWriteBufSize += abf.len
if h._asyncWriteBufQ != nil && !h._asyncWriteBufQ.IsEmpty() {
h._asyncWriteBufQ.PushBack(abf)
h.asyncWriteBufSize += abf.len
if h.asyncWriteBufQ != nil && !h.asyncWriteBufQ.IsEmpty() {
h.asyncWriteBufQ.PushBack(abf)
return
}

n, _ := syscall.Write(fd, abf.buf[abf.writen:abf.len])
if n > 0 {
h._asyncWriteBufSize -= n
h.asyncWriteBufSize -= n
if n == (abf.len - abf.writen) {
ioFreeBuff(abf.buf)
return
Expand All @@ -106,14 +106,14 @@ func (h *IOHandle) asyncOrderedWrite(eh EvHandler, abf asyncWriteBuf) {
}

// Error or Partially
if h._asyncWriteBufQ == nil {
h._asyncWriteBufQ = NewRingBuffer[asyncWriteBuf](4)
if h.asyncWriteBufQ == nil {
h.asyncWriteBufQ = NewRingBuffer[asyncWriteBuf](4)
}
h._asyncWriteBufQ.PushBack(abf)
h.asyncWriteBufQ.PushBack(abf)

if h._asyncWriteWaiting == false {
h._asyncWriteWaiting = true
h._ep.append(fd, EvOut) // No need to use ET mode
if h.asyncWriteWaiting == false {
h.asyncWriteWaiting = true
h.ep.append(fd, EvOut) // No need to use ET mode
// eh needs to implement the OnWrite method, and the OnWrite method
// needs to call AsyncOrderedFlush.
}
Expand All @@ -123,8 +123,8 @@ func (h *IOHandle) asyncOrderedWrite(eh EvHandler, abf asyncWriteBuf) {
//
// If it is too long, it indicates that the sending is slow and the receiving end is abnormal
func (h *IOHandle) AsyncWaitWriteQLen() int {
if h._asyncWriteBufQ == nil {
if h.asyncWriteBufQ == nil {
return 0
}
return h._asyncWriteBufQ.Len()
return h.asyncWriteBufQ.Len()
}

0 comments on commit 56d733f

Please sign in to comment.