Skip to content

Commit

Permalink
Darwin: Improve MTRAsyncWorkQueue logging by tracking unique item ids (
Browse files Browse the repository at this point in the history
…#29568)

* Darwin: Improve MTRAsyncWorkQueue logging by tracking unique item ids

Also return more information from the batching handler so we can log
both partial and full batching from the queue itself.

* Avoid unused variable warning / error

* Include work item id in various log messages as per review

* Tweaks from review

* restyle
  • Loading branch information
ksperling-apple committed Oct 11, 2023
1 parent 62cdbab commit 5647e0a
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 97 deletions.
33 changes: 26 additions & 7 deletions src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ typedef NS_ENUM(NSInteger, MTRAsyncWorkOutcome) {
/// being cancelled).
typedef BOOL (^MTRAsyncWorkCompletionBlock)(MTRAsyncWorkOutcome outcome);

typedef NS_ENUM(NSInteger, MTRBatchingOutcome) {
MTRNotBatched = 0,
MTRBatchedPartially, // some work was batched but the source item has work remaining
MTRBatchedFully, // the source item is now empty and can be dropped from the queue
};

/// An optional handler that controls batching of MTRAsyncWorkItem.
///
/// When a work item is dequeued to run, if it is of a type that can be
Expand All @@ -47,15 +53,16 @@ typedef BOOL (^MTRAsyncWorkCompletionBlock)(MTRAsyncWorkOutcome outcome);
/// `opaqueDataNext` is the data for the next item. The `fullyMerged` parameter
/// will be initialized to NO by the caller.
///
/// The handler is expected to mutate the data as needed to achieve batching.
///
/// If after the data mutations opaqueDataNext no longer requires any work, the
/// handler should set `fullyMerged` to YES to indicate that the next item can
/// be dropped from the queue. In this case, the handler may be called again to
/// possibly also batch the work item after the one that was dropped.
/// The handler is expected to mutate the data as needed to achieve batching,
/// and return an `MTRBatchingOutcome` to indicate if any or all of the work
/// from the next item was merged into the current item. A return value of
/// `MTRBatchedFully` indicates that `opaqueDataNext` no longer requires any
/// work and should be dropped from the queue. In this case, the handler may be
/// called again to possibly also batch the work item after the one that was
/// dropped.
///
/// @see MTRAsyncWorkItem
typedef void (^MTRAsyncWorkBatchingHandler)(id opaqueDataCurrent, id opaqueDataNext, BOOL * fullyMerged);
typedef MTRBatchingOutcome (^MTRAsyncWorkBatchingHandler)(id opaqueDataCurrent, id opaqueDataNext);

/// An optional handler than enables duplicate checking for MTRAsyncWorkItem.
///
Expand Down Expand Up @@ -100,6 +107,10 @@ MTR_TESTABLE
/// Creates a work item that will run on the specified dispatch queue.
- (instancetype)initWithQueue:(dispatch_queue_t)queue;

/// A unique (modulo overflow) ID automatically assigned to each work item for
/// the purpose of correlating log messages from the work queue.
@property (readonly, nonatomic) uint64_t uniqueID;

/// Called by the work queue to start this work item.
///
/// Will be called on the dispatch queue associated with this item.
Expand Down Expand Up @@ -188,6 +199,14 @@ MTR_TESTABLE
/// re-used.
- (void)enqueueWorkItem:(MTRAsyncWorkItem<ContextType> *)item;

/// Same as `enqueueWorkItem:` but includes the description in queue logging.
- (void)enqueueWorkItem:(MTRAsyncWorkItem<ContextType> *)item
description:(nullable NSString *)description;

/// Convenience for `enqueueWorkItem:description:` with a formatted string.
- (void)enqueueWorkItem:(MTRAsyncWorkItem<ContextType> *)item
descriptionWithFormat:(NSString *)format, ... NS_FORMAT_FUNCTION(2, 3);

/// Checks whether the queue already contains a work item matching the provided
/// details. A client may call this method to avoid enqueueing duplicate work.
///
Expand Down
135 changes: 109 additions & 26 deletions src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
#import "MTRDefines_Internal.h"
#import "MTRLogging_Internal.h"

#import <atomic>
#import <os/lock.h>

NS_ASSUME_NONNULL_BEGIN

typedef NS_ENUM(NSInteger, MTRAsyncWorkItemState) {
MTRAsyncWorkItemMutable,
MTRAsyncWorkItemComplete,
Expand All @@ -29,6 +32,20 @@ typedef NS_ENUM(NSInteger, MTRAsyncWorkItemState) {
MTRAsyncWorkItemRetryCountBase = MTRAsyncWorkItemRunning, // values >= MTRAsyncWorkItemRunning encode retryCount
};

// A helper struct that facilitates access to _context while
// - only reading the _context weak reference once and retaining a strong
// reference for the duration of a particular queue method call
// - avoiding calls to `[context description]` under our lock
struct ContextSnapshot {
id _Nullable reference;
NSString * _Nullable description;
ContextSnapshot(id _Nullable context)
{
reference = context;
description = [context description];
}
};

MTR_DIRECT_MEMBERS
@implementation MTRAsyncWorkItem {
dispatch_queue_t _queue;
Expand All @@ -41,19 +58,21 @@ - (instancetype)initWithQueue:(dispatch_queue_t)queue
{
NSParameterAssert(queue);
if (self = [super init]) {
static std::atomic<NSUInteger> nextUniqueID(1);
_uniqueID = nextUniqueID++;
_queue = queue;
_state = MTRAsyncWorkItemMutable;
}
return self;
}

- (void)setReadyHandler:(void (^)(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion))readyHandler
- (void)setReadyHandler:(nullable void (^)(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion))readyHandler
{
[self assertMutable];
_readyHandler = readyHandler;
}

- (void)setCancelHandler:(void (^)(void))cancelHandler
- (void)setCancelHandler:(nullable void (^)(void))cancelHandler
{
[self assertMutable];
_cancelHandler = cancelHandler;
Expand Down Expand Up @@ -103,7 +122,6 @@ - (NSInteger)retryCount

- (void)callReadyHandlerWithContext:(id)context completion:(MTRAsyncWorkCompletionBlock)completion
{
//
NSAssert(_state >= MTRAsyncWorkItemEnqueued, @"work item is not enqueued (%ld)", (long) _state);
NSInteger retryCount = 0;
if (_state == MTRAsyncWorkItemEnqueued) {
Expand All @@ -117,8 +135,14 @@ - (void)callReadyHandlerWithContext:(id)context completion:(MTRAsyncWorkCompleti

// Always dispatch even if there is no readyHandler as this avoids synchronously
// re-entering the MTRAsyncWorkQueueCode, simplifying the implementation.
auto uniqueID = _uniqueID;
auto readyHandler = _readyHandler;
dispatch_async(_queue, ^{
if (!retryCount) {
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> executing work item [%llu]", context, uniqueID);
} else {
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> executing work item [%llu] (retry %zd)", context, uniqueID, retryCount);
}
if (readyHandler) {
readyHandler(context, retryCount, completion);
} else {
Expand Down Expand Up @@ -160,6 +184,25 @@ - (void)markComplete
_duplicateCheckHandler = nil;
}

- (NSString *)description
{
NSString * state;
switch (_state) {
case MTRAsyncWorkItemMutable:
state = @"mutable";
break;
case MTRAsyncWorkItemComplete:
state = @"complete";
break;
case MTRAsyncWorkItemEnqueued:
state = @"enqueued";
break;
default:
return [NSString stringWithFormat:@"<%@ %llu running retry: %tu>", self.class, _uniqueID, self.retryCount];
}
return [NSString stringWithFormat:@"<%@ %llu %@>", self.class, _uniqueID, state];
}

@end

MTR_DIRECT_MEMBERS
Expand All @@ -182,37 +225,66 @@ - (instancetype)initWithContext:(id)context

- (NSString *)description
{
NSUInteger itemsCount;
os_unfair_lock_lock(&_lock);
auto * result = [NSString stringWithFormat:@"<%@ context: %@ items count: %tu>", self.class, _context, _items.count];
itemsCount = _items.count;
os_unfair_lock_unlock(&_lock);
return result;
return [NSString stringWithFormat:@"<%@ context: %@ items count: %tu>", self.class, _context, itemsCount];
}

- (void)enqueueWorkItem:(MTRAsyncWorkItem *)item
{
[self enqueueWorkItem:item description:nil];
}

- (void)enqueueWorkItem:(MTRAsyncWorkItem *)item descriptionWithFormat:(NSString *)format, ...
{
va_list args;
va_start(args, format);
NSString * description = [[NSString alloc] initWithFormat:format arguments:args];
va_end(args);
[self enqueueWorkItem:item description:description];
}

- (void)enqueueWorkItem:(MTRAsyncWorkItem *)item
description:(nullable NSString *)description
{
NSParameterAssert(item);
NSAssert(_context, @"context has been lost");
ContextSnapshot context(_context); // outside of lock
NSAssert(context.reference, @"context has been lost");

os_unfair_lock_lock(&_lock);
[item markEnqueued];
[_items addObject:item];
[self _callNextReadyWorkItem];

if (description) {
// Logging the description once is enough because other log messages
// related to the work item (execution, completion etc) can easily be
// correlated using the unique id.
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> enqueued work item [%llu]: %@", context.description, item.uniqueID, description);
} else {
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> enqueued work item [%llu]", context.description, item.uniqueID);
}

[self _callNextReadyWorkItemWithContext:context];
os_unfair_lock_unlock(&_lock);
}

- (void)invalidate
{
NSString * contextDescription = [_context description]; // outside of lock
ContextSnapshot context(_context); // outside of lock
os_unfair_lock_lock(&_lock);
MTR_LOG_INFO("MTRAsyncWorkQueue<%@> invalidate %tu items", contextDescription, _items.count);
MTR_LOG_INFO("MTRAsyncWorkQueue<%@> invalidate %tu items", context.description, _items.count);
for (MTRAsyncWorkItem * item in _items) {
[item cancel];
}
[_items removeAllObjects];
os_unfair_lock_unlock(&_lock);
}

- (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem retry:(BOOL)retry
- (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem
context:(ContextSnapshot const &)context
retry:(BOOL)retry
{
os_unfair_lock_assert_owner(&_lock);

Expand All @@ -222,18 +294,20 @@ - (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem retry:(BOOL)retry
return;
}

// if work item is done (no need to retry), remove from queue and call ready on the next item
if (!retry) {
if (retry) {
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> retry needed for work item [%llu]", context.description, workItem.uniqueID);
} else {
[workItem markComplete];
[_items removeObjectAtIndex:0];
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> completed work item [%llu]", context.description, workItem.uniqueID);
}

// when "concurrency width" is implemented this will be decremented instead
_runningWorkItemCount = 0;
[self _callNextReadyWorkItem];
[self _callNextReadyWorkItemWithContext:context];
}

- (void)_callNextReadyWorkItem
- (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
{
os_unfair_lock_assert_owner(&_lock);

Expand All @@ -246,9 +320,8 @@ - (void)_callNextReadyWorkItem
return; // nothing to run
}

id context = _context;
if (!context) {
MTR_LOG_ERROR("MTRAsyncWorkQueue context has been lost, dropping queued work items");
if (!context.reference) {
MTR_LOG_ERROR("MTRAsyncWorkQueue<%@> context has been lost, dropping queued work items", (id) nil);
[_items removeAllObjects];
return;
}
Expand All @@ -264,27 +337,35 @@ - (void)_callNextReadyWorkItem
while (_items.count >= 2) {
MTRAsyncWorkItem * nextWorkItem = _items[1];
if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID) {
break; // next item is not eligible to merge with this one
goto done; // next item is not eligible to merge with this one
}

BOOL fullyMerged = NO;
batchingHandler(workItem.batchableData, nextWorkItem.batchableData, &fullyMerged);
if (!fullyMerged) {
break; // not removing the next item, so we can't merge anything else
switch (batchingHandler(workItem.batchableData, nextWorkItem.batchableData)) {
case MTRNotBatched:
goto done; // can't merge anything else
case MTRBatchedPartially:
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> partially merged work item [%llu] into %llu",
context.description, nextWorkItem.uniqueID, workItem.uniqueID);
goto done; // can't merge anything else
case MTRBatchedFully:
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> fully merged work item [%llu] into %llu",
context.description, nextWorkItem.uniqueID, workItem.uniqueID);
[_items removeObjectAtIndex:1];
continue; // try to batch the next item (if any)
}

[_items removeObjectAtIndex:1];
}
done:;
}

mtr_weakify(self);
[workItem callReadyHandlerWithContext:context completion:^(MTRAsyncWorkOutcome outcome) {
[workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
mtr_strongify(self);
BOOL handled = NO;
if (self) {
ContextSnapshot context(self->_context); // re-acquire a new snapshot
os_unfair_lock_lock(&self->_lock);
if (!workItem.isComplete) {
[self _postProcessWorkItem:workItem retry:(outcome == MTRAsyncWorkNeedsRetry)];
[self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
handled = YES;
}
os_unfair_lock_unlock(&self->_lock);
Expand Down Expand Up @@ -315,3 +396,5 @@ - (BOOL)hasDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id
}

@end

NS_ASSUME_NONNULL_END
10 changes: 10 additions & 0 deletions src/darwin/Framework/CHIP/MTRDefines_Internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@
_Pragma("clang diagnostic ignored \"-Wshadow\"") \
__strong typeof(local) _Nullable local = _mtr_weak_##local \
_Pragma("clang diagnostic pop")

/// Declares an unused local of unspecified type, to prevent accidental
/// references to a shadowed variable of the same name. Note that hiding
/// `self` does not prevent implicit references to self due to ivar access.
#define mtr_hide(local) \
_Pragma("clang diagnostic push") \
_Pragma("clang diagnostic ignored \"-Wshadow\"") \
__attribute__((unused)) variable_hidden_by_mtr_hide local; \
_Pragma("clang diagnostic pop")
typedef struct {} variable_hidden_by_mtr_hide;
// clang-format on
Loading

0 comments on commit 5647e0a

Please sign in to comment.