Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 132 additions & 52 deletions ExecutionContext/PThreadExecutionContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import Foundation
import CoreFoundation
import Result
#if os(Linux)
import Glibc
#endif

private func thread_proc(pm: UnsafeMutablePointer<Void>) -> UnsafeMutablePointer<Void> {
let pthread = Unmanaged<PThread>.fromOpaque(COpaquePointer(pm)).takeRetainedValue()
Expand Down Expand Up @@ -54,70 +57,137 @@
pthread_create(thread, nil, thread_proc, UnsafeMutablePointer<Void>(Unmanaged.passRetained(self).toOpaque()))
}
}


private class RunLoopFinalizer {
private let rl: CFRunLoop!
init(_ runLoop: CFRunLoop!) {
self.rl = runLoop
}
deinit {
CFRunLoopStop(rl)
}
}

private class RunLoopObject {
private var cfObject:AnyObject? = nil
private let task:SafeTask
private let finalizer:RunLoopFinalizer?

init(_ task:SafeTask, runLoopFinalizer: RunLoopFinalizer?) {
self.task = task
self.finalizer = runLoopFinalizer
}

func addToRunLoop(runLoop:CFRunLoop, mode: CFString) {
if cfObject == nil {
self.cfObject = createCFObject()
}
addCFObject(runLoop, mode: mode)
}

func signal() {}

private func createCFObject() -> AnyObject? { return nil }

private func addCFObject(runLoop:CFRunLoop, mode: CFString) {}
}

private func sourceMain(rls: UnsafeMutablePointer<Void>) {
let runLoopSource = Unmanaged<RunLoopSource>.fromOpaque(COpaquePointer(rls)).takeUnretainedValue()
runLoopSource.cfSource = nil
let runLoopSource = Unmanaged<RunLoopObject>.fromOpaque(COpaquePointer(rls)).takeUnretainedValue()
runLoopSource.cfObject = nil
runLoopSource.task()
}

private func sourceCancel(rls: UnsafeMutablePointer<Void>, rL: CFRunLoop!, mode:CFString!) {
let runLoopSource = Unmanaged<RunLoopSource>.fromOpaque(COpaquePointer(rls)).takeUnretainedValue()
runLoopSource.cfSource = nil
let runLoopSource = Unmanaged<RunLoopObject>.fromOpaque(COpaquePointer(rls)).takeUnretainedValue()
runLoopSource.cfObject = nil
}

private func sourceRetain(rls: UnsafePointer<Void>) -> UnsafePointer<Void> {
Unmanaged<RunLoopSource>.fromOpaque(COpaquePointer(rls)).retain()
Unmanaged<RunLoopObject>.fromOpaque(COpaquePointer(rls)).retain()
return rls
}

private func sourceRelease(rls: UnsafePointer<Void>) {
Unmanaged<RunLoopSource>.fromOpaque(COpaquePointer(rls)).release()
Unmanaged<RunLoopObject>.fromOpaque(COpaquePointer(rls)).release()
}

private class RunLoopSource {
private var cfSource:CFRunLoopSource? = nil
private let task:SafeTask
private class RunLoopSource : RunLoopObject {
private let priority:Int

init(_ task: SafeTask, priority: Int = 0) {
self.task = task
init(_ task: SafeTask, priority: Int = 0, finalizer: RunLoopFinalizer?) {
self.priority = priority
super.init(task, runLoopFinalizer: finalizer)
}

deinit {
if let s = cfSource {
if let s = cfObject as! CFRunLoopSource? {
if CFRunLoopSourceIsValid(s) { CFRunLoopSourceInvalidate(s) }
}
}

func addToRunLoop(runLoop:CFRunLoop, mode: CFString) {
if cfSource == nil {
var context = CFRunLoopSourceContext(
version: 0,
info: UnsafeMutablePointer<Void>(Unmanaged.passUnretained(self).toOpaque()),
retain: sourceRetain,
release: sourceRelease,
copyDescription: nil,
equal: nil,
hash: nil,
schedule: nil,
cancel: sourceCancel,
perform: sourceMain
)
self.cfSource = CFRunLoopSourceCreate(nil, priority, &context)
}

CFRunLoopAddSource(runLoop, cfSource!, mode)

private override func createCFObject() -> AnyObject? {
var context = CFRunLoopSourceContext(
version: 0,
info: UnsafeMutablePointer<Void>(Unmanaged.passUnretained(self).toOpaque()),
retain: sourceRetain,
release: sourceRelease,
copyDescription: nil,
equal: nil,
hash: nil,
schedule: nil,
cancel: sourceCancel,
perform: sourceMain
)
return CFRunLoopSourceCreate(nil, priority, &context)
}

func signal() {
if let s = cfSource {

private override func addCFObject(runLoop:CFRunLoop, mode: CFString) {
CFRunLoopAddSource(runLoop, (cfObject as! CFRunLoopSource?)!, mode)
}

override func signal() {
if let s = cfObject as! CFRunLoopSource? {
CFRunLoopSourceSignal(s)
}
}
}

private func timerCallback(timer: CFRunLoopTimer!, rlt: UnsafeMutablePointer<Void>) {
sourceMain(rlt)
}

private class RunLoopDelay : RunLoopObject {
private let delay:CFTimeInterval

init(_ task: SafeTask, delay: CFTimeInterval, finalizer: RunLoopFinalizer?) {
self.delay = delay
super.init(task, runLoopFinalizer: finalizer)
}

deinit {
if let t = cfObject as! CFRunLoopTimer? {
if CFRunLoopTimerIsValid(t) { CFRunLoopTimerInvalidate(t) }
}
}

private override func createCFObject() -> AnyObject? {
var context = CFRunLoopTimerContext(
version: 0,
info: UnsafeMutablePointer<Void>(Unmanaged.passUnretained(self).toOpaque()),
retain: sourceRetain,
release: sourceRelease,
copyDescription: nil
)
return CFRunLoopTimerCreate(nil, CFAbsoluteTimeGetCurrent()+delay, -1, 0, 0, timerCallback, &context)
}

private override func addCFObject(runLoop:CFRunLoop, mode: CFString) {
CFRunLoopAddTimer(runLoop, (cfObject as! CFRunLoopTimer?)!, mode)
}
}

private extension ExecutionContextType {
func syncThroughAsync<ReturnType>(task:() throws -> ReturnType) throws -> ReturnType {
var result:Result<ReturnType, AnyError>?
Expand All @@ -143,14 +213,26 @@
thread.start()
}

func async(after:Double, task:SafeTask) {
let thread = PThread(task: {
let sec = time_t(after)
let nsec = Int((after - Double(sec)) * 1000 * 1000 * 1000)//nano seconds
var time = timespec(tv_sec:sec, tv_nsec: nsec)

nanosleep(&time, nil)
task()
})
thread.start()
}

func sync<ReturnType>(task:() throws -> ReturnType) throws -> ReturnType {
return try syncThroughAsync(task)
}
}

private class SerialContext : ExecutionContextBase, ExecutionContextType {
private let rl:CFRunLoop!
private let ownRunLoop:Bool
private let finalizer: RunLoopFinalizer?

#if !os(Linux)
private static let defaultMode:CFString = "kCFRunLoopDefaultMode" as NSString
Expand All @@ -159,7 +241,6 @@
#endif

override init() {
ownRunLoop = true
var runLoop:CFRunLoop?
let cond = NSCondition()
cond.lock()
Expand All @@ -172,22 +253,12 @@
cond.wait()
cond.unlock()
self.rl = runLoop!
finalizer = RunLoopFinalizer(self.rl)
}

init(runLoop:CFRunLoop!) {
ownRunLoop = false
rl = runLoop
}

deinit {
if ownRunLoop {
let runLoop = rl
performRunLoopSource(RunLoopSource({
CFRunLoopStop(runLoop)
},
priority: -32768)
)
}
finalizer = nil
}

#if !os(Linux)
Expand All @@ -200,13 +271,18 @@
}
#endif

private func performRunLoopSource(rls: RunLoopSource) {
rls.addToRunLoop(rl, mode: SerialContext.defaultMode)
rls.signal()
private func performRunLoopObject(rlo: RunLoopObject) {
rlo.addToRunLoop(rl, mode: SerialContext.defaultMode)
rlo.signal()
CFRunLoopWakeUp(rl)
}

func async(task:SafeTask) {
performRunLoopSource(RunLoopSource(task))
performRunLoopObject(RunLoopSource(task, finalizer: finalizer))
}

func async(after:Double, task:SafeTask) {
performRunLoopObject(RunLoopDelay(task, delay: after, finalizer: finalizer))
}

func sync<ReturnType>(task:() throws -> ReturnType) throws -> ReturnType {
Expand Down Expand Up @@ -240,6 +316,10 @@
return try inner.sync(task)
}

public func async(after:Double, task:SafeTask) {
inner.async(after, task: task)
}

public static let main:ExecutionContextType = PThreadExecutionContext(inner: SerialContext(runLoop: CFRunLoopGetMain()))
public static let global:ExecutionContextType = PThreadExecutionContext(kind: .Parallel)
}
Expand Down