Skip to content

Commit

Permalink
[tsan] Add HB edges for GCD barrier blocks
Browse files Browse the repository at this point in the history
Adding support for GCD barrier blocks in concurrent queues.  This uses two sync object in the same way as read-write locks do.  This also simplifies the use of dispatch groups (the notifications act as barrier blocks).

Differential Revision: http://reviews.llvm.org/D21604

llvm-svn: 273893
  • Loading branch information
kubamracek committed Jun 27, 2016
1 parent a36aa41 commit 2621dea
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 48 deletions.
83 changes: 35 additions & 48 deletions compiler-rt/lib/tsan/rtl/tsan_libdispatch_mac.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ typedef struct {
dispatch_queue_t queue;
void *orig_context;
dispatch_function_t orig_work;
uptr sync_object;
dispatch_object_t object_to_release;
bool free_context_in_callback;
bool release_sync_object_in_callback;
bool submitted_synchronously;
bool is_barrier_block;
} tsan_block_context_t;

// The offsets of different fields of the dispatch_queue_t structure, exported
Expand Down Expand Up @@ -77,35 +76,32 @@ static tsan_block_context_t *AllocContext(ThreadState *thr, uptr pc,
new_context->queue = queue;
new_context->orig_context = orig_context;
new_context->orig_work = orig_work;
new_context->sync_object = (uptr)new_context;
new_context->object_to_release = nullptr;
new_context->free_context_in_callback = true;
new_context->release_sync_object_in_callback = false;
new_context->submitted_synchronously = false;
new_context->is_barrier_block = false;
return new_context;
}

static void dispatch_callback_wrap(void *param) {
SCOPED_INTERCEPTOR_RAW(dispatch_callback_wrap);
tsan_block_context_t *context = (tsan_block_context_t *)param;
dispatch_queue_t q = context->queue;

Acquire(thr, pc, context->sync_object);
uptr serial_sync = (uptr)q;
uptr concurrent_sync = ((uptr)q) + sizeof(uptr);
uptr submit_sync = (uptr)context;
bool serial_task = IsQueueSerial(q) || context->is_barrier_block;

// Extra retain/release is required for dispatch groups. We use the group
// itself to synchronize, but in a notification (dispatch_group_notify
// callback), it may be disposed already. To solve this, we retain the group
// and release it here.
if (context->object_to_release) dispatch_release(context->object_to_release);
Acquire(thr, pc, submit_sync);
Acquire(thr, pc, serial_sync);
if (serial_task) Acquire(thr, pc, concurrent_sync);

// In serial queues, work items can be executed on different threads, we need
// to explicitly synchronize on the queue itself.
if (IsQueueSerial(context->queue)) Acquire(thr, pc, (uptr)context->queue);
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_START();
context->orig_work(context->orig_context);
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_END();
if (IsQueueSerial(context->queue)) Release(thr, pc, (uptr)context->queue);

if (context->release_sync_object_in_callback)
Release(thr, pc, context->sync_object);
Release(thr, pc, serial_task ? serial_sync : concurrent_sync);
if (context->submitted_synchronously) Release(thr, pc, submit_sync);

if (context->free_context_in_callback) user_free(thr, pc, context);
}
Expand All @@ -116,54 +112,55 @@ static void invoke_and_release_block(void *param) {
Block_release(block);
}

#define DISPATCH_INTERCEPT_B(name) \
#define DISPATCH_INTERCEPT_B(name, barrier) \
TSAN_INTERCEPTOR(void, name, dispatch_queue_t q, dispatch_block_t block) { \
SCOPED_TSAN_INTERCEPTOR(name, q, block); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_START(); \
dispatch_block_t heap_block = Block_copy(block); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_END(); \
tsan_block_context_t *new_context = \
AllocContext(thr, pc, q, heap_block, &invoke_and_release_block); \
new_context->is_barrier_block = barrier; \
Release(thr, pc, (uptr)new_context); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_START(); \
REAL(name##_f)(q, new_context, dispatch_callback_wrap); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_END(); \
}

#define DISPATCH_INTERCEPT_SYNC_B(name) \
#define DISPATCH_INTERCEPT_SYNC_B(name, barrier) \
TSAN_INTERCEPTOR(void, name, dispatch_queue_t q, dispatch_block_t block) { \
SCOPED_TSAN_INTERCEPTOR(name, q, block); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_START(); \
dispatch_block_t heap_block = Block_copy(block); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_END(); \
tsan_block_context_t new_context = { \
q, heap_block, &invoke_and_release_block, 0, 0, false, true}; \
new_context.sync_object = (uptr)&new_context; \
q, heap_block, &invoke_and_release_block, false, true, barrier}; \
Release(thr, pc, (uptr)&new_context); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_START(); \
REAL(name##_f)(q, &new_context, dispatch_callback_wrap); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_END(); \
Acquire(thr, pc, (uptr)&new_context); \
}

#define DISPATCH_INTERCEPT_F(name) \
#define DISPATCH_INTERCEPT_F(name, barrier) \
TSAN_INTERCEPTOR(void, name, dispatch_queue_t q, void *context, \
dispatch_function_t work) { \
SCOPED_TSAN_INTERCEPTOR(name, q, context, work); \
tsan_block_context_t *new_context = \
AllocContext(thr, pc, q, context, work); \
new_context->is_barrier_block = barrier; \
Release(thr, pc, (uptr)new_context); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_START(); \
REAL(name)(q, new_context, dispatch_callback_wrap); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_END(); \
}

#define DISPATCH_INTERCEPT_SYNC_F(name) \
#define DISPATCH_INTERCEPT_SYNC_F(name, barrier) \
TSAN_INTERCEPTOR(void, name, dispatch_queue_t q, void *context, \
dispatch_function_t work) { \
SCOPED_TSAN_INTERCEPTOR(name, q, context, work); \
tsan_block_context_t new_context = {q, context, work, 0, 0, false, true}; \
new_context.sync_object = (uptr)&new_context; \
tsan_block_context_t new_context = { \
q, context, work, false, true, barrier}; \
Release(thr, pc, (uptr)&new_context); \
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_START(); \
REAL(name)(q, &new_context, dispatch_callback_wrap); \
Expand All @@ -175,14 +172,14 @@ static void invoke_and_release_block(void *param) {
// context, which is used to synchronize (we release the context before
// submitting, and the callback acquires it before executing the original
// callback).
DISPATCH_INTERCEPT_B(dispatch_async)
DISPATCH_INTERCEPT_B(dispatch_barrier_async)
DISPATCH_INTERCEPT_F(dispatch_async_f)
DISPATCH_INTERCEPT_F(dispatch_barrier_async_f)
DISPATCH_INTERCEPT_SYNC_B(dispatch_sync)
DISPATCH_INTERCEPT_SYNC_B(dispatch_barrier_sync)
DISPATCH_INTERCEPT_SYNC_F(dispatch_sync_f)
DISPATCH_INTERCEPT_SYNC_F(dispatch_barrier_sync_f)
DISPATCH_INTERCEPT_B(dispatch_async, false)
DISPATCH_INTERCEPT_B(dispatch_barrier_async, true)
DISPATCH_INTERCEPT_F(dispatch_async_f, false)
DISPATCH_INTERCEPT_F(dispatch_barrier_async_f, true)
DISPATCH_INTERCEPT_SYNC_B(dispatch_sync, false)
DISPATCH_INTERCEPT_SYNC_B(dispatch_barrier_sync, true)
DISPATCH_INTERCEPT_SYNC_F(dispatch_sync_f, false)
DISPATCH_INTERCEPT_SYNC_F(dispatch_barrier_sync_f, true)

TSAN_INTERCEPTOR(void, dispatch_after, dispatch_time_t when,
dispatch_queue_t queue, dispatch_block_t block) {
Expand Down Expand Up @@ -314,13 +311,8 @@ TSAN_INTERCEPTOR(void, dispatch_group_notify, dispatch_group_t group,
SCOPED_TSAN_INTERCEPTOR_USER_CALLBACK_END();
tsan_block_context_t *new_context =
AllocContext(thr, pc, q, heap_block, &invoke_and_release_block);
new_context->sync_object = (uptr)group;

// Will be released in dispatch_callback_wrap.
new_context->object_to_release = group;
dispatch_retain(group);

Release(thr, pc, (uptr)group);
new_context->is_barrier_block = true;
Release(thr, pc, (uptr)new_context);
REAL(dispatch_group_notify_f)(group, q, new_context,
dispatch_callback_wrap);
}
Expand All @@ -329,13 +321,8 @@ TSAN_INTERCEPTOR(void, dispatch_group_notify_f, dispatch_group_t group,
dispatch_queue_t q, void *context, dispatch_function_t work) {
SCOPED_TSAN_INTERCEPTOR(dispatch_group_notify_f, group, q, context, work);
tsan_block_context_t *new_context = AllocContext(thr, pc, q, context, work);
new_context->sync_object = (uptr)group;

// Will be released in dispatch_callback_wrap.
new_context->object_to_release = group;
dispatch_retain(group);

Release(thr, pc, (uptr)group);
new_context->is_barrier_block = true;
Release(thr, pc, (uptr)new_context);
REAL(dispatch_group_notify_f)(group, q, new_context,
dispatch_callback_wrap);
}
Expand Down
48 changes: 48 additions & 0 deletions compiler-rt/test/tsan/Darwin/gcd-barrier-race.mm
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// RUN: %clang_tsan %s -o %t -framework Foundation
// RUN: %env_tsan_opts=ignore_interceptors_accesses=1 %deflake %run %t 2>&1 | FileCheck %s

#import <Foundation/Foundation.h>

#import "../test.h"

long global;

int main() {
fprintf(stderr, "Hello world.\n");
print_address("addr=", 1, &global);
barrier_init(&barrier, 2);

dispatch_queue_t q = dispatch_queue_create("my.queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t bgq = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

dispatch_barrier_sync(q, ^{
global = 42;
});

dispatch_async(bgq, ^{
dispatch_sync(q, ^{
global = 43;
barrier_wait(&barrier);
});
});

dispatch_async(bgq, ^{
dispatch_sync(q, ^{
barrier_wait(&barrier);
global = 44;

dispatch_sync(dispatch_get_main_queue(), ^{
CFRunLoopStop(CFRunLoopGetCurrent());
});
});
});

CFRunLoopRun();
fprintf(stderr, "Done.\n");
}

// CHECK: Hello world.
// CHECK: addr=[[ADDR:0x[0-9,a-f]+]]
// CHECK: WARNING: ThreadSanitizer: data race
// CHECK: Location is global 'global' {{(of size 8 )?}}at [[ADDR]] (gcd-barrier-race.mm.tmp+0x{{[0-9,a-f]+}})
// CHECK: Done.
49 changes: 49 additions & 0 deletions compiler-rt/test/tsan/Darwin/gcd-barrier.mm
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// RUN: %clang_tsan %s -o %t -framework Foundation
// RUN: %env_tsan_opts=ignore_interceptors_accesses=1 %run %t 2>&1 | FileCheck %s

#import <Foundation/Foundation.h>

#import "../test.h"

long global;

int main() {
fprintf(stderr, "Hello world.\n");
print_address("addr=", 1, &global);
barrier_init(&barrier, 2);

dispatch_queue_t q = dispatch_queue_create("my.queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t bgq = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

dispatch_async(bgq, ^{
dispatch_sync(q, ^{
global = 42;
});
barrier_wait(&barrier);
});

dispatch_async(bgq, ^{
barrier_wait(&barrier);
dispatch_barrier_sync(q, ^{
global = 43;
});

dispatch_async(bgq, ^{
barrier_wait(&barrier);
global = 44;
});

barrier_wait(&barrier);

dispatch_sync(dispatch_get_main_queue(), ^{
CFRunLoopStop(CFRunLoopGetCurrent());
});
});

CFRunLoopRun();
fprintf(stderr, "Done.\n");
}

// CHECK: Hello world.
// CHECK: Done.
// CHECK-NOT: WARNING: ThreadSanitizer

0 comments on commit 2621dea

Please sign in to comment.