Skip to content

Commit

Permalink
Added heapqueue collection. Fixed timers in asyncdispatch.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuriy Glukhov committed Apr 27, 2016
1 parent e31ec74 commit e213a9a
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 16 deletions.
40 changes: 24 additions & 16 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

include "system/inclrtl"

import os, oids, tables, strutils, macros, times
import os, oids, tables, strutils, macros, times, heapqueue

import nativesockets, net

Expand Down Expand Up @@ -354,16 +354,22 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =

type
PDispatcherBase = ref object of RootRef
timers: seq[tuple[finishAt: float, fut: Future[void]]]
timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]

proc processTimers(p: PDispatcherBase) =
var oldTimers = p.timers
p.timers = @[]
for t in oldTimers:
if epochTime() >= t.finishAt:
t.fut.complete()
else:
p.timers.add(t)
while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
let t = p.timers.pop()
t.fut.complete()

proc adjustedTimeout(p: PDispatcherBase, timeout: int): int =
# If dispatcher has active timers this proc returns the timeout
# of the nearest timer. Returns `timeout` otherwise.
result = timeout
if p.timers.len > 0:
let timerTimeout = p.timers[0].finishAt
let curTime = epochTime()
if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout:
result = int((timerTimeout - curTime) * 1000)

when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
Expand Down Expand Up @@ -396,7 +402,7 @@ when defined(windows) or defined(nimdoc):
new result
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[AsyncFD]()
result.timers = @[]
result.timers.newHeapQueue()

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
Expand Down Expand Up @@ -427,9 +433,11 @@ when defined(windows) or defined(nimdoc):
raise newException(ValueError,
"No handles or timers registered in dispatcher.")

let llTimeout =
if timeout == -1: winlean.INFINITE
else: timeout.int32
let at = p.adjustedTimeout(timeout)
var llTimeout =
if at == -1: winlean.INFINITE
else: at.int32

var lpNumberOfBytesTransferred: Dword
var lpCompletionKey: ULONG
var customOverlapped: PCustomOverlapped
Expand Down Expand Up @@ -956,7 +964,7 @@ else:
proc newDispatcher*(): PDispatcher =
new result
result.selector = newSelector()
result.timers = @[]
result.timers.newHeapQueue()

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
Expand Down Expand Up @@ -1014,7 +1022,7 @@ else:

proc poll*(timeout = 500) =
let p = getGlobalDispatcher()
for info in p.selector.select(timeout):
for info in p.selector.select(p.adjustedTimeout(timeout)):
let data = PData(info.key.data)
assert data.fd == info.key.fd.AsyncFD
#echo("In poll ", data.fd.cint)
Expand Down Expand Up @@ -1215,7 +1223,7 @@ proc sleepAsync*(ms: int): Future[void] =
## ``ms`` milliseconds.
var retFuture = newFuture[void]("sleepAsync")
let p = getGlobalDispatcher()
p.timers.add((epochTime() + (ms / 1000), retFuture))
p.timers.push((epochTime() + (ms / 1000), retFuture))
return retFuture

proc accept*(socket: AsyncFD,
Expand Down
107 changes: 107 additions & 0 deletions lib/pure/collections/heapqueue.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
##[ Heap queue algorithm (a.k.a. priority queue). Ported from Python heapq.
Heaps are arrays for which a[k] <= a[2*k+1] and a[k] <= a[2*k+2] for
all k, counting elements from 0. For the sake of comparison,
non-existing elements are considered to be infinite. The interesting
property of a heap is that a[0] is always its smallest element.
]##

type HeapQueue*[T] = distinct seq[T]

proc newHeapQueue*[T](): HeapQueue[T] {.inline.} = HeapQueue[T](newSeq[T]())
proc newHeapQueue*[T](h: var HeapQueue[T]) {.inline.} = h = HeapQueue[T](newSeq[T]())

proc len*[T](h: HeapQueue[T]): int {.inline.} = seq[T](h).len
proc `[]`*[T](h: HeapQueue[T], i: int): T {.inline.} = seq[T](h)[i]
proc `[]=`[T](h: var HeapQueue[T], i: int, v: T) {.inline.} = seq[T](h)[i] = v
proc add[T](h: var HeapQueue[T], v: T) {.inline.} = seq[T](h).add(v)

proc heapCmp[T](x, y: T): bool {.inline.} =
return (x < y)

# 'heap' is a heap at all indices >= startpos, except possibly for pos. pos
# is the index of a leaf with a possibly out-of-order value. Restore the
# heap invariant.
proc siftdown[T](heap: var HeapQueue[T], startpos, p: int) =
var pos = p
var newitem = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# newitem fits.
while pos > startpos:
let parentpos = (pos - 1) shr 1
let parent = heap[parentpos]
if heapCmp(newitem, parent):
heap[pos] = parent
pos = parentpos
else:
break
heap[pos] = newitem

proc siftup[T](heap: var HeapQueue[T], p: int) =
let endpos = len(heap)
var pos = p
let startpos = pos
let newitem = heap[pos]
# Bubble up the smaller child until hitting a leaf.
var childpos = 2*pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of smaller child.
let rightpos = childpos + 1
if rightpos < endpos and not heapCmp(heap[childpos], heap[rightpos]):
childpos = rightpos
# Move the smaller child up.
heap[pos] = heap[childpos]
pos = childpos
childpos = 2*pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
siftdown(heap, startpos, pos)

proc push*[T](heap: var HeapQueue[T], item: T) =
## Push item onto heap, maintaining the heap invariant.
(seq[T](heap)).add(item)
siftdown(heap, 0, len(heap)-1)

proc pop*[T](heap: var HeapQueue[T]): T =
## Pop the smallest item off the heap, maintaining the heap invariant.
let lastelt = seq[T](heap).pop()
if heap.len > 0:
result = heap[0]
heap[0] = lastelt
siftup(heap, 0)
else:
result = lastelt

proc replace*[T](heap: var HeapQueue[T], item: T): T =
## Pop and return the current smallest value, and add the new item.
## This is more efficient than pop() followed by push(), and can be
## more appropriate when using a fixed-size heap. Note that the value
## returned may be larger than item! That constrains reasonable uses of
## this routine unless written as part of a conditional replacement:

## if item > heap[0]:
## item = replace(heap, item)
result = heap[0]
heap[0] = item
siftup(heap, 0)

proc pushpop*[T](heap: var HeapQueue[T], item: T): T =
## Fast version of a push followed by a pop.
if heap.len > 0 and heapCmp(heap[0], item):
swap(item, heap[0])
siftup(heap, 0)
return item

when isMainModule:
# Simple sanity test
var heap = newHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
push(heap, item)
doAssert(heap[0] == 0)
var sort = newSeq[int]()
while heap.len > 0:
sort.add(pop(heap))
doAssert(sort == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

0 comments on commit e213a9a

Please sign in to comment.