Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added heapqueue collection. Fixed timers in asyncdispatch. #4122

Merged
merged 1 commit into from Apr 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 26 additions & 18 deletions lib/pure/asyncdispatch.nim
Expand Up @@ -9,7 +9,7 @@

include "system/inclrtl"

import os, oids, tables, strutils, macros, times
import os, oids, tables, strutils, macros, times, heapqueue

import nativesockets, net

Expand Down Expand Up @@ -354,16 +354,22 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =

type
PDispatcherBase = ref object of RootRef
timers: seq[tuple[finishAt: float, fut: Future[void]]]

proc processTimers(p: PDispatcherBase) =
var oldTimers = p.timers
p.timers = @[]
for t in oldTimers:
if epochTime() >= t.finishAt:
t.fut.complete()
else:
p.timers.add(t)
timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]

proc processTimers(p: PDispatcherBase) {.inline.} =
while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
p.timers.pop().fut.complete()

proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
# If dispatcher has active timers this proc returns the timeout
# of the nearest timer. Returns `timeout` otherwise.
result = timeout
if p.timers.len > 0:
let timerTimeout = p.timers[0].finishAt
let curTime = epochTime()
if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout:
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0

when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
Expand Down Expand Up @@ -396,7 +402,7 @@ when defined(windows) or defined(nimdoc):
new result
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[AsyncFD]()
result.timers = @[]
result.timers.newHeapQueue()

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
Expand Down Expand Up @@ -427,9 +433,11 @@ when defined(windows) or defined(nimdoc):
raise newException(ValueError,
"No handles or timers registered in dispatcher.")

let llTimeout =
if timeout == -1: winlean.INFINITE
else: timeout.int32
let at = p.adjustedTimeout(timeout)
var llTimeout =
if at == -1: winlean.INFINITE
else: at.int32

var lpNumberOfBytesTransferred: Dword
var lpCompletionKey: ULONG
var customOverlapped: PCustomOverlapped
Expand Down Expand Up @@ -956,7 +964,7 @@ else:
proc newDispatcher*(): PDispatcher =
new result
result.selector = newSelector()
result.timers = @[]
result.timers.newHeapQueue()

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
Expand Down Expand Up @@ -1014,7 +1022,7 @@ else:

proc poll*(timeout = 500) =
let p = getGlobalDispatcher()
for info in p.selector.select(timeout):
for info in p.selector.select(p.adjustedTimeout(timeout)):
let data = PData(info.key.data)
assert data.fd == info.key.fd.AsyncFD
#echo("In poll ", data.fd.cint)
Expand Down Expand Up @@ -1215,7 +1223,7 @@ proc sleepAsync*(ms: int): Future[void] =
## ``ms`` milliseconds.
var retFuture = newFuture[void]("sleepAsync")
let p = getGlobalDispatcher()
p.timers.add((epochTime() + (ms / 1000), retFuture))
p.timers.push((epochTime() + (ms / 1000), retFuture))
return retFuture

proc accept*(socket: AsyncFD,
Expand Down
107 changes: 107 additions & 0 deletions lib/pure/collections/heapqueue.nim
@@ -0,0 +1,107 @@
##[ Heap queue algorithm (a.k.a. priority queue). Ported from Python heapq.

Heaps are arrays for which a[k] <= a[2*k+1] and a[k] <= a[2*k+2] for
all k, counting elements from 0. For the sake of comparison,
non-existing elements are considered to be infinite. The interesting
property of a heap is that a[0] is always its smallest element.

]##

type HeapQueue*[T] = distinct seq[T]

proc newHeapQueue*[T](): HeapQueue[T] {.inline.} = HeapQueue[T](newSeq[T]())
proc newHeapQueue*[T](h: var HeapQueue[T]) {.inline.} = h = HeapQueue[T](newSeq[T]())

proc len*[T](h: HeapQueue[T]): int {.inline.} = seq[T](h).len
proc `[]`*[T](h: HeapQueue[T], i: int): T {.inline.} = seq[T](h)[i]
proc `[]=`[T](h: var HeapQueue[T], i: int, v: T) {.inline.} = seq[T](h)[i] = v
proc add[T](h: var HeapQueue[T], v: T) {.inline.} = seq[T](h).add(v)

proc heapCmp[T](x, y: T): bool {.inline.} =
return (x < y)

# 'heap' is a heap at all indices >= startpos, except possibly for pos. pos
# is the index of a leaf with a possibly out-of-order value. Restore the
# heap invariant.
proc siftdown[T](heap: var HeapQueue[T], startpos, p: int) =
var pos = p
var newitem = heap[pos]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

system.shallowCopy is faster than '=' for these things usually, where data is only moved around internally.

# Follow the path to the root, moving parents down until finding a place
# newitem fits.
while pos > startpos:
let parentpos = (pos - 1) shr 1
let parent = heap[parentpos]
if heapCmp(newitem, parent):
heap[pos] = parent
pos = parentpos
else:
break
heap[pos] = newitem

proc siftup[T](heap: var HeapQueue[T], p: int) =
let endpos = len(heap)
var pos = p
let startpos = pos
let newitem = heap[pos]
# Bubble up the smaller child until hitting a leaf.
var childpos = 2*pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of smaller child.
let rightpos = childpos + 1
if rightpos < endpos and not heapCmp(heap[childpos], heap[rightpos]):
childpos = rightpos
# Move the smaller child up.
heap[pos] = heap[childpos]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

pos = childpos
childpos = 2*pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
siftdown(heap, startpos, pos)

proc push*[T](heap: var HeapQueue[T], item: T) =
## Push item onto heap, maintaining the heap invariant.
(seq[T](heap)).add(item)
siftdown(heap, 0, len(heap)-1)

proc pop*[T](heap: var HeapQueue[T]): T =
## Pop the smallest item off the heap, maintaining the heap invariant.
let lastelt = seq[T](heap).pop()
if heap.len > 0:
result = heap[0]
heap[0] = lastelt
siftup(heap, 0)
else:
result = lastelt

proc replace*[T](heap: var HeapQueue[T], item: T): T =
## Pop and return the current smallest value, and add the new item.
## This is more efficient than pop() followed by push(), and can be
## more appropriate when using a fixed-size heap. Note that the value
## returned may be larger than item! That constrains reasonable uses of
## this routine unless written as part of a conditional replacement:

## if item > heap[0]:
## item = replace(heap, item)
result = heap[0]
heap[0] = item
siftup(heap, 0)

proc pushpop*[T](heap: var HeapQueue[T], item: T): T =
## Fast version of a push followed by a pop.
if heap.len > 0 and heapCmp(heap[0], item):
swap(item, heap[0])
siftup(heap, 0)
return item

when isMainModule:
# Simple sanity test
var heap = newHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
push(heap, item)
doAssert(heap[0] == 0)
var sort = newSeq[int]()
while heap.len > 0:
sort.add(pop(heap))
doAssert(sort == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])