Skip to content

Conversation

ktoso
Copy link
Contributor

@ktoso ktoso commented Nov 28, 2022

rdar://101965913
A TaskPool is similar to a TaskGroup, but functions very differently.

This is pending more discussion and would need a Swift Evolution proposal.

Tasks added to the pool execute the same way as in a group, however they cannot be awaited on explicitly. The implementation does not track ready tasks at all and therefore is not able to track the exact number of times a next() would have to be resumed.

Instead, it automatically removes completed child tasks, and removes their pending counts from the pool status. This allows us to implement a waitAll() however it is not possible to wait for individual completions (one could "wait for the next completion", but that is not very useful, as we would not know which one it was).

A TaskPool is useful in "runs forever" core loops of rpc and http servers, where the top level task of the server runs a TaskPool, and dispatches new tasks to handle each incoming request; and these may be running concurrently. We want to use child tasks to keep this efficient, but using a Group this is not possible because we must collect the tasks as soon as they complete, but cannot do so from the task group since it is a single task awaiting on a sequence.


Specifically, when implementing servers which stick to structured concurrency, we often end up with such code:

try await withThrowingTaskGroup(of: Void.self) { group in
    while let newConnection = try await listeningSocket.accept() {
        group.addTask {
            handleConnection(newConnection)
        }
    }
}

however, this use of a group is ill-suited for this use-case. As written, this task group will leak all the child Task objects until the listening socket either terminates or throws. If this was written for a long-running server, it is entirely possible for this Task Group to survive for a period of days, leaking thousands of Task objects.

Instead, a TaskPool removes child tasks as soon as they complete, making such implementation not only possible but efficient:

try await withTaskPool() { pool in
    while let newConnection = try await listeningSocket.accept() {
        pool.addTask {
            handleConnection(newConnection)
        }
    }
}

This is the only 100% safe way to utilize structured concurrency to implement such long lived servers, but we'll discuss this more in an upcoming proposal that @Lukasa has been working on.


TODOs:

  • semantics and shape discussions
  • backdeployment impl is not super clean; if we can and want to, I'll spend the cycles to clean up
  • the overflow protection here is actually silly, since we have 62 bits for the pending count; but the same protection should be ported back to TaskGroup where we have way less leeway, and there it'd make sense.
  • support DumpConcurrency
  • cleanup of all comments; some may be mentioning task group wrongly
  • remove the lock in TaskPool, it is not used for anything really -- everything we do is either record status lock protected (children list) or is the atomic status; there is no ready queue, unlike the task group
  • more tests

cc @Lukasa @FranzBusch @PeterAdams-A

A TaskPool is similar to a TaskGroup, but functions very differently.
Tasks added to the pool execute the same way as in a group, however they
cannot be awaited on explicitly. The implementation does not track ready
tasks at all and therefore is not able to track the exact number of
times a next() would have to be resumed.

Instead, it automatically removes completed child tasks, and removes
their pending counts from the pool status. This allows us to implement a
waitAll() however it is not possible to wait for individual completions
(one could "wait for the next completion", but that is not very useful,
as we would not know _which_ one it was).

A TaskPool is useful in "runs forever" core loops of rpc and http
servers, where the top level task of the server runs a TaskPool, and
dispatches new tasks to handle each incoming request; and these may be
running concurrently. We want to use child tasks to keep this efficient,
but using a Group this is not possible because we must collect the
tasks as soon as they complete, but cannot do so from the task group
since it is a single task awaiting on a sequence.
Task_CopyTaskLocals = 10,
Task_InheritContext = 11,
Task_EnqueueJob = 12,
Task_AddPendingGroupTaskUnconditionally = 13, // used for: TaskGroup, TaskPool
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ABI: Can we rename this actually? I suspect no since ABI even though the value remains the same.

BUILTIN_OPERAND_OWNERSHIP(DestroyingConsume, EndAsyncLetLifetime)
BUILTIN_OPERAND_OWNERSHIP(InstantaneousUse, CreateTaskGroup)
BUILTIN_OPERAND_OWNERSHIP(InstantaneousUse, DestroyTaskGroup)
BUILTIN_OPERAND_OWNERSHIP(InstantaneousUse, DestroyTaskGroup) // TODO: should this be destroying consume?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fly by question, wasn't sure?

SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift),
swift::, (TaskPool *pool, const Metadata *Void), (pool, Void))

OVERRIDE_TASK_STATUS(taskPool_attachChild, void,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, STATUS on purpose; has to be there to access the locks AFAIR

priority: priority, isChildTask: true, copyTaskLocals: false,
inheritContext: false, enqueueJob: true,
addPendingGroupTaskUnconditionally: false
addPendingTaskUnconditionally: false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ABI: I think we can rename the parameter...?


void lock() const { mutex_.lock(); }
void unlock() const { mutex_.unlock(); }
#endif
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: we definitely can remove this lock; so will do so once we have agreed on exact semantics of the pool.

/// This is because we will NOT add a task to a cancelled group, unless doing
/// so unconditionally.
///
/// Returns *assumed* new status, including the just performed +1.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • TODO: go over all comments again and remove mentions of group and make sure the comments actually cover the details of the Pool impl


void lock() const { mutex_.lock(); }
void unlock() const { mutex_.unlock(); }
#endif
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: remove the locks, we don't need them at all


TaskPoolTaskStatusRecord * TaskPool::getTaskRecord() {
return asImpl(this)->getTaskRecord();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual TaskPool impl

swift_asyncLet_end(nullptr);
}

// TODO: handle TaskPool
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

var isChildTask: Bool
var isFuture: Bool
var isGroupChildTask: Bool
// TODO: isPoolChildTask
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: support in dump concurrency

@ktoso
Copy link
Contributor Author

ktoso commented Nov 28, 2022

@swift-ci please smoke test

@ktoso ktoso added concurrency Feature: umbrella label for concurrency language features swift evolution proposal needed Flag → feature: A feature that warrants a Swift evolution proposal labels Nov 28, 2022
@ktoso
Copy link
Contributor Author

ktoso commented Nov 29, 2022

@swift-ci please build toolchain

public func withTaskPool<PoolResult>(
returning returnType: PoolResult.Type = PoolResult.self,
body: (inout TaskPool) async throws -> PoolResult
) async rethrows -> PoolResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think rethrows is right here: if the child tasks throw, this throws.

Copy link
Contributor Author

@ktoso ktoso Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's wrong. The entire throws handling is not done :) this'll need the same dance as the group, so two with... method versions - thank you for noticing :)

@ktoso
Copy link
Contributor Author

ktoso commented Dec 14, 2022

We're going with the "option" version of the implementation for now: #62361

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

concurrency Feature: umbrella label for concurrency language features swift evolution proposal needed Flag → feature: A feature that warrants a Swift evolution proposal

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants