diff --git a/ExecutionContext/PThreadExecutionContext.swift b/ExecutionContext/PThreadExecutionContext.swift index 38acae1..2f96330 100644 --- a/ExecutionContext/PThreadExecutionContext.swift +++ b/ExecutionContext/PThreadExecutionContext.swift @@ -26,6 +26,9 @@ import Foundation import CoreFoundation import Result + #if os(Linux) + import Glibc + #endif private func thread_proc(pm: UnsafeMutablePointer) -> UnsafeMutablePointer { let pthread = Unmanaged.fromOpaque(COpaquePointer(pm)).takeRetainedValue() @@ -54,70 +57,137 @@ pthread_create(thread, nil, thread_proc, UnsafeMutablePointer(Unmanaged.passRetained(self).toOpaque())) } } + + + private class RunLoopFinalizer { + private let rl: CFRunLoop! + init(_ runLoop: CFRunLoop!) { + self.rl = runLoop + } + deinit { + CFRunLoopStop(rl) + } + } + + private class RunLoopObject { + private var cfObject:AnyObject? = nil + private let task:SafeTask + private let finalizer:RunLoopFinalizer? + + init(_ task:SafeTask, runLoopFinalizer: RunLoopFinalizer?) { + self.task = task + self.finalizer = runLoopFinalizer + } + + func addToRunLoop(runLoop:CFRunLoop, mode: CFString) { + if cfObject == nil { + self.cfObject = createCFObject() + } + addCFObject(runLoop, mode: mode) + } + + func signal() {} + + private func createCFObject() -> AnyObject? { return nil } + + private func addCFObject(runLoop:CFRunLoop, mode: CFString) {} + } private func sourceMain(rls: UnsafeMutablePointer) { - let runLoopSource = Unmanaged.fromOpaque(COpaquePointer(rls)).takeUnretainedValue() - runLoopSource.cfSource = nil + let runLoopSource = Unmanaged.fromOpaque(COpaquePointer(rls)).takeUnretainedValue() + runLoopSource.cfObject = nil runLoopSource.task() } private func sourceCancel(rls: UnsafeMutablePointer, rL: CFRunLoop!, mode:CFString!) { - let runLoopSource = Unmanaged.fromOpaque(COpaquePointer(rls)).takeUnretainedValue() - runLoopSource.cfSource = nil + let runLoopSource = Unmanaged.fromOpaque(COpaquePointer(rls)).takeUnretainedValue() + runLoopSource.cfObject = nil } private func sourceRetain(rls: UnsafePointer) -> UnsafePointer { - Unmanaged.fromOpaque(COpaquePointer(rls)).retain() + Unmanaged.fromOpaque(COpaquePointer(rls)).retain() return rls } private func sourceRelease(rls: UnsafePointer) { - Unmanaged.fromOpaque(COpaquePointer(rls)).release() + Unmanaged.fromOpaque(COpaquePointer(rls)).release() } - private class RunLoopSource { - private var cfSource:CFRunLoopSource? = nil - private let task:SafeTask + private class RunLoopSource : RunLoopObject { private let priority:Int - init(_ task: SafeTask, priority: Int = 0) { - self.task = task + init(_ task: SafeTask, priority: Int = 0, finalizer: RunLoopFinalizer?) { self.priority = priority + super.init(task, runLoopFinalizer: finalizer) } deinit { - if let s = cfSource { + if let s = cfObject as! CFRunLoopSource? { if CFRunLoopSourceIsValid(s) { CFRunLoopSourceInvalidate(s) } } } - - func addToRunLoop(runLoop:CFRunLoop, mode: CFString) { - if cfSource == nil { - var context = CFRunLoopSourceContext( - version: 0, - info: UnsafeMutablePointer(Unmanaged.passUnretained(self).toOpaque()), - retain: sourceRetain, - release: sourceRelease, - copyDescription: nil, - equal: nil, - hash: nil, - schedule: nil, - cancel: sourceCancel, - perform: sourceMain - ) - self.cfSource = CFRunLoopSourceCreate(nil, priority, &context) - } - - CFRunLoopAddSource(runLoop, cfSource!, mode) + + private override func createCFObject() -> AnyObject? { + var context = CFRunLoopSourceContext( + version: 0, + info: UnsafeMutablePointer(Unmanaged.passUnretained(self).toOpaque()), + retain: sourceRetain, + release: sourceRelease, + copyDescription: nil, + equal: nil, + hash: nil, + schedule: nil, + cancel: sourceCancel, + perform: sourceMain + ) + return CFRunLoopSourceCreate(nil, priority, &context) } - - func signal() { - if let s = cfSource { + + private override func addCFObject(runLoop:CFRunLoop, mode: CFString) { + CFRunLoopAddSource(runLoop, (cfObject as! CFRunLoopSource?)!, mode) + } + + override func signal() { + if let s = cfObject as! CFRunLoopSource? { CFRunLoopSourceSignal(s) } } } + private func timerCallback(timer: CFRunLoopTimer!, rlt: UnsafeMutablePointer) { + sourceMain(rlt) + } + + private class RunLoopDelay : RunLoopObject { + private let delay:CFTimeInterval + + init(_ task: SafeTask, delay: CFTimeInterval, finalizer: RunLoopFinalizer?) { + self.delay = delay + super.init(task, runLoopFinalizer: finalizer) + } + + deinit { + if let t = cfObject as! CFRunLoopTimer? { + if CFRunLoopTimerIsValid(t) { CFRunLoopTimerInvalidate(t) } + } + } + + private override func createCFObject() -> AnyObject? { + var context = CFRunLoopTimerContext( + version: 0, + info: UnsafeMutablePointer(Unmanaged.passUnretained(self).toOpaque()), + retain: sourceRetain, + release: sourceRelease, + copyDescription: nil + ) + return CFRunLoopTimerCreate(nil, CFAbsoluteTimeGetCurrent()+delay, -1, 0, 0, timerCallback, &context) + } + + private override func addCFObject(runLoop:CFRunLoop, mode: CFString) { + CFRunLoopAddTimer(runLoop, (cfObject as! CFRunLoopTimer?)!, mode) + } + } + private extension ExecutionContextType { func syncThroughAsync(task:() throws -> ReturnType) throws -> ReturnType { var result:Result? @@ -143,6 +213,18 @@ thread.start() } + func async(after:Double, task:SafeTask) { + let thread = PThread(task: { + let sec = time_t(after) + let nsec = Int((after - Double(sec)) * 1000 * 1000 * 1000)//nano seconds + var time = timespec(tv_sec:sec, tv_nsec: nsec) + + nanosleep(&time, nil) + task() + }) + thread.start() + } + func sync(task:() throws -> ReturnType) throws -> ReturnType { return try syncThroughAsync(task) } @@ -150,7 +232,7 @@ private class SerialContext : ExecutionContextBase, ExecutionContextType { private let rl:CFRunLoop! - private let ownRunLoop:Bool + private let finalizer: RunLoopFinalizer? #if !os(Linux) private static let defaultMode:CFString = "kCFRunLoopDefaultMode" as NSString @@ -159,7 +241,6 @@ #endif override init() { - ownRunLoop = true var runLoop:CFRunLoop? let cond = NSCondition() cond.lock() @@ -172,22 +253,12 @@ cond.wait() cond.unlock() self.rl = runLoop! + finalizer = RunLoopFinalizer(self.rl) } init(runLoop:CFRunLoop!) { - ownRunLoop = false rl = runLoop - } - - deinit { - if ownRunLoop { - let runLoop = rl - performRunLoopSource(RunLoopSource({ - CFRunLoopStop(runLoop) - }, - priority: -32768) - ) - } + finalizer = nil } #if !os(Linux) @@ -200,13 +271,18 @@ } #endif - private func performRunLoopSource(rls: RunLoopSource) { - rls.addToRunLoop(rl, mode: SerialContext.defaultMode) - rls.signal() + private func performRunLoopObject(rlo: RunLoopObject) { + rlo.addToRunLoop(rl, mode: SerialContext.defaultMode) + rlo.signal() + CFRunLoopWakeUp(rl) } func async(task:SafeTask) { - performRunLoopSource(RunLoopSource(task)) + performRunLoopObject(RunLoopSource(task, finalizer: finalizer)) + } + + func async(after:Double, task:SafeTask) { + performRunLoopObject(RunLoopDelay(task, delay: after, finalizer: finalizer)) } func sync(task:() throws -> ReturnType) throws -> ReturnType { @@ -240,6 +316,10 @@ return try inner.sync(task) } + public func async(after:Double, task:SafeTask) { + inner.async(after, task: task) + } + public static let main:ExecutionContextType = PThreadExecutionContext(inner: SerialContext(runLoop: CFRunLoopGetMain())) public static let global:ExecutionContextType = PThreadExecutionContext(kind: .Parallel) }