diff --git a/ExecutionContext/PThreadExecutionContext.swift b/ExecutionContext/PThreadExecutionContext.swift index b774770..4db46f8 100644 --- a/ExecutionContext/PThreadExecutionContext.swift +++ b/ExecutionContext/PThreadExecutionContext.swift @@ -83,7 +83,7 @@ sema.wait() - self.rl = RunLoop(runLoop!, autoStop: true) + self.rl = RunLoop(runLoop!) } init(runLoop:RunLoop) { @@ -91,7 +91,7 @@ } func async(task:SafeTask) { - rl.addSource(RunLoopSource(task), mode: RunLoop.defaultMode) + rl.addTask(task) } func async(after:Double, task:SafeTask) { diff --git a/ExecutionContext/RunLoop.swift b/ExecutionContext/RunLoop.swift index e7abb00..641236a 100644 --- a/ExecutionContext/RunLoop.swift +++ b/ExecutionContext/RunLoop.swift @@ -23,6 +23,54 @@ import CoreFoundation var cfString: CFString { return unsafeBitCast(self, CFString.self) } } + private class TaskQueueElement { + private let task : SafeTask + private let source: RunLoopSource + var next: TaskQueueElement? = nil + + init(_ task: SafeTask, runLoopSource: RunLoopSource) { + self.task = task + self.source = runLoopSource + } + + func run() { + task() + } + } + + private class TaskQueue { + private let lock = NSLock() + private var head:TaskQueueElement? = nil + private var tail:TaskQueueElement? = nil + + func enqueue(elem: TaskQueueElement) { + defer { + lock.unlock() + } + lock.lock() + if tail == nil { + head = elem + tail = elem + } else { + tail!.next = elem + tail = elem + } + } + + func dequeue() -> TaskQueueElement? { + defer { + lock.unlock() + } + lock.lock() + let elem = head + head = head?.next + if head == nil { + tail = nil + } + return elem + } + } + private class RunLoopCallbackInfo { private var task: SafeTask private var runLoops: [RunLoop] = [] @@ -37,10 +85,19 @@ import CoreFoundation } private func runLoopCallbackInfoRun(i: UnsafeMutablePointer) { - let info = Unmanaged.fromOpaque(COpaquePointer(i)).takeRetainedValue() + let info = Unmanaged.fromOpaque(COpaquePointer(i)).takeUnretainedValue() info.run() } + private func runLoopCallbackInfoRetain(i: UnsafePointer) -> UnsafePointer { + Unmanaged.fromOpaque(COpaquePointer(i)).retain() + return i + } + + private func runLoopCallbackInfoRelease(i: UnsafePointer) { + Unmanaged.fromOpaque(COpaquePointer(i)).release() + } + private protocol RunLoopCallback { var info : RunLoopCallbackInfo { get } var cfObject: AnyObject { get } @@ -56,9 +113,9 @@ import CoreFoundation if _source == nil { var context = CFRunLoopSourceContext( version: 0, - info: UnsafeMutablePointer(Unmanaged.passRetained(info).toOpaque()), - retain: nil, - release: nil, + info: UnsafeMutablePointer(Unmanaged.passUnretained(info).toOpaque()), + retain: runLoopCallbackInfoRetain, + release: runLoopCallbackInfoRelease, copyDescription: nil, equal: nil, hash: nil, @@ -76,6 +133,19 @@ import CoreFoundation self.info = RunLoopCallbackInfo(task) self.priority = priority } + + deinit { + if _source != nil && CFRunLoopSourceIsValid(_source) { + CFRunLoopSourceInvalidate(_source) + _source = nil + } + } + + func signal() { + if _source != nil { + CFRunLoopSourceSignal(_source) + } + } } private func timerRunCallback(timer: CFRunLoopTimer!, i: UnsafeMutablePointer) { @@ -92,9 +162,9 @@ import CoreFoundation if _timer == nil { var context = CFRunLoopTimerContext( version: 0, - info: UnsafeMutablePointer(Unmanaged.passRetained(info).toOpaque()), - retain: nil, - release: nil, + info: UnsafeMutablePointer(Unmanaged.passUnretained(info).toOpaque()), + retain: runLoopCallbackInfoRetain, + release: runLoopCallbackInfoRelease, copyDescription: nil ) _timer = CFRunLoopTimerCreate(nil, CFAbsoluteTimeGetCurrent()+delay, -1, 0, 0, timerRunCallback, &context) @@ -111,7 +181,9 @@ import CoreFoundation class RunLoop { private let cfRunLoop: CFRunLoop! - private let autoStop: Bool + + private var taskQueueSource: RunLoopSource + private var taskQueue: TaskQueue #if !os(Linux) static let defaultMode:NSString = "kCFRunLoopDefaultMode" as NSString @@ -119,27 +191,32 @@ import CoreFoundation static let defaultMode:NSString = "kCFRunLoopDefaultMode".bridge() #endif - init(_ cfRunLoop: CFRunLoop, autoStop: Bool = true) { + init(_ cfRunLoop: CFRunLoop) { self.cfRunLoop = cfRunLoop - self.autoStop = autoStop + + let queue = TaskQueue() + + taskQueueSource = RunLoopSource({ + var elem = queue.dequeue() + while elem != nil { + elem!.run() + elem = queue.dequeue() + } + }) + taskQueue = queue + addSource(taskQueueSource, mode: RunLoop.defaultMode, retainLoop: false) } - convenience init(_ runLoop: AnyObject, autoStop: Bool = true) { - self.init(unsafeBitCast(runLoop, CFRunLoop.self), autoStop: autoStop) + convenience init(_ runLoop: AnyObject) { + self.init(unsafeBitCast(runLoop, CFRunLoop.self)) } - deinit { - if autoStop && cfRunLoop != nil { - CFRunLoopStop(cfRunLoop) - } - } - - static func currentRunLoop(autoStop: Bool = false) -> RunLoop { - return RunLoop(CFRunLoopGetCurrent(), autoStop: autoStop) + static func currentRunLoop() -> RunLoop { + return RunLoop(CFRunLoopGetCurrent()) } static func mainRunLoop() -> RunLoop { - return RunLoop(CFRunLoopGetMain(), autoStop: false) + return RunLoop(CFRunLoopGetMain()) } static func currentCFRunLoop() -> AnyObject { @@ -156,9 +233,15 @@ import CoreFoundation static func runInMode(mode: NSString) { #if !os(Linux) - while CFRunLoopRunInMode(mode.cfString, Double.infinity, false) != .Stopped {} + var result:CFRunLoopRunResult + repeat { + result = CFRunLoopRunInMode(mode.cfString, Double.infinity, false) + } while result != .Finished && result != .Stopped #else - while CFRunLoopRunInMode(mode.cfString, Double.infinity, false) != Int32(kCFRunLoopRunStopped) {} + var result:Int32 + repeat { + result = CFRunLoopRunInMode(mode.cfString, Double.infinity, false) + } while result != Int32(kCFRunLoopRunStopped) && result != Int32(kCFRunLoopRunFinished) #endif } @@ -166,23 +249,29 @@ import CoreFoundation while true { run() } } - func addSource(rls: RunLoopSource, mode: NSString) { + func addSource(rls: RunLoopSource, mode: NSString, retainLoop: Bool = true) { let crls = unsafeBitCast(rls.cfObject, CFRunLoopSource.self) if CFRunLoopSourceIsValid(crls) { CFRunLoopAddSource(cfRunLoop, crls, mode.cfString) - rls.info.runLoops.append(self) - CFRunLoopSourceSignal(crls) + if retainLoop { rls.info.runLoops.append(self) } + rls.signal() CFRunLoopWakeUp(cfRunLoop) } } func addDelay(rld: RunLoopDelay, mode: NSString) { let crld = unsafeBitCast(rld.cfObject, CFRunLoopTimer.self) - if CFRunLoopTimerIsValid(crld) { + if CFRunLoopTimerIsValid(crld) && (rld.info.runLoops.count == 0 || rld.info.runLoops[0] === self) { CFRunLoopAddTimer(cfRunLoop, crld, mode.cfString) rld.info.runLoops.append(self) CFRunLoopWakeUp(cfRunLoop) } } + + func addTask(task: SafeTask) { + taskQueue.enqueue(TaskQueueElement(task, runLoopSource: taskQueueSource)) + taskQueueSource.signal() + CFRunLoopWakeUp(cfRunLoop) + } } //#endif \ No newline at end of file