Skip to content

Commit

Permalink
Adding ability to run receive filter synchronously.
Browse files Browse the repository at this point in the history
  • Loading branch information
robbiehanson committed Apr 13, 2012
1 parent fb41b56 commit 66b59a1
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 59 deletions.
20 changes: 20 additions & 0 deletions GCD/GCDAsyncUdpSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,29 @@ typedef BOOL (^GCDAsyncUdpSocketReceiveFilterBlock)(NSData *data, NSData *addres
*
* For more information about GCDAsyncUdpSocketReceiveFilterBlock, see the documentation for its typedef.
* To remove a previously set filter, invoke this method and pass a nil filterBlock and NULL filterQueue.
*
* Note: This method invokes setReceiveFilter:withQueue:isAsynchronous: (documented below),
* passing YES for the isAsynchronous parameter.
**/
- (void)setReceiveFilter:(GCDAsyncUdpSocketReceiveFilterBlock)filterBlock withQueue:(dispatch_queue_t)filterQueue;

/**
* The receive filter can be run via dispatch_async or dispatch_sync.
* Most typical situations call for asynchronous operation.
*
* However, there are a few situations in which synchronous operation is preferred.
* Such is the case when the filter is extremely minimal and fast.
* This is because dispatch_sync is faster than dispatch_async.
*
* If you choose synchronous operation, be aware of possible deadlock conditions.
* Since the socket queue is executing your block via dispatch_sync,
* then you cannot perform any tasks which may invoke dispatch_sync on the socket queue.
* For example, you can't query properties on the socket.
**/
- (void)setReceiveFilter:(GCDAsyncUdpSocketReceiveFilterBlock)filterBlock
withQueue:(dispatch_queue_t)filterQueue
isAsynchronous:(BOOL)isAsynchronous;

#pragma mark Closing

/**
Expand Down
142 changes: 84 additions & 58 deletions GCD/GCDAsyncUdpSocket.m
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ @interface GCDAsyncUdpSocket ()
id delegate;
dispatch_queue_t delegateQueue;

GCDAsyncUdpSocketReceiveFilterBlock filterBlock;
dispatch_queue_t filterQueue;
GCDAsyncUdpSocketReceiveFilterBlock receiveFilterBlock;
dispatch_queue_t receiveFilterQueue;
BOOL receiveFilterAsync;

uint32_t flags;
uint16_t config;
Expand Down Expand Up @@ -3921,33 +3922,35 @@ - (void)pauseReceiving
dispatch_async(socketQueue, block);
}

- (void)setReceiveFilter:(GCDAsyncUdpSocketReceiveFilterBlock)inFilterBlock withQueue:(dispatch_queue_t)inFilterQueue
- (void)setReceiveFilter:(GCDAsyncUdpSocketReceiveFilterBlock)filterBlock withQueue:(dispatch_queue_t)filterQueue
{
GCDAsyncUdpSocketReceiveFilterBlock newFilterBlock;
dispatch_queue_t newFilterQueue;
[self setReceiveFilter:filterBlock withQueue:filterQueue isAsynchronous:YES];
}

- (void)setReceiveFilter:(GCDAsyncUdpSocketReceiveFilterBlock)filterBlock
withQueue:(dispatch_queue_t)filterQueue
isAsynchronous:(BOOL)isAsynchronous
{
GCDAsyncUdpSocketReceiveFilterBlock newFilterBlock = NULL;
dispatch_queue_t newFilterQueue = NULL;

if (inFilterBlock)
if (filterBlock)
{
NSAssert(inFilterQueue, @"Must provide a dispatch_queue in which to run the filter block.");
NSAssert(filterQueue, @"Must provide a dispatch_queue in which to run the filter block.");

newFilterBlock = [inFilterBlock copy];
newFilterQueue = inFilterQueue;
newFilterBlock = [filterBlock copy];
newFilterQueue = filterQueue;
dispatch_retain(newFilterQueue);
}
else
{
newFilterBlock = NULL;
newFilterQueue = NULL;
}

dispatch_block_t block = ^{

if (receiveFilterQueue)
dispatch_release(receiveFilterQueue);

if (filterQueue)
dispatch_release(filterQueue);

filterBlock = newFilterBlock;
filterQueue = newFilterQueue;
receiveFilterBlock = newFilterBlock;
receiveFilterQueue = newFilterQueue;
receiveFilterAsync = isAsynchronous;
};

if (dispatch_get_current_queue() == socketQueue)
Expand Down Expand Up @@ -4116,7 +4119,8 @@ - (void)doReceive


BOOL waitingForSocket = NO;
BOOL ignoredDueToAddress = NO;
BOOL notifiedDelegate = NO;
BOOL ignored = NO;

NSError *error = nil;

Expand All @@ -4136,60 +4140,78 @@ - (void)doReceive
if (flags & kDidConnect)
{
if (addr4 && ![self isConnectedToAddress4:addr4])
ignoredDueToAddress = YES;
ignored = YES;
if (addr6 && ![self isConnectedToAddress6:addr6])
ignoredDueToAddress = YES;
ignored = YES;
}

NSData *addr = (addr4 != nil) ? addr4 : addr6;

if (!ignoredDueToAddress)
if (!ignored)
{
if (filterBlock && filterQueue)
if (receiveFilterBlock && receiveFilterQueue)
{
// Run data through filter, and if approved, notify delegate
pendingFilterOperations++;

dispatch_async(filterQueue, ^{ @autoreleasepool {

id filterContext = nil;
BOOL allowed = filterBlock(data, addr, &filterContext);

// Transition back to socketQueue to get the current delegate / delegateQueue

dispatch_async(socketQueue, ^{ @autoreleasepool {

pendingFilterOperations--;
__block id filterContext = nil;
__block BOOL allowed = NO;

if (receiveFilterAsync)
{
pendingFilterOperations++;
dispatch_async(receiveFilterQueue, ^{ @autoreleasepool {

if (allowed)
{
[self notifyDidReceiveData:data fromAddress:addr withFilterContext:filterContext];
}
allowed = receiveFilterBlock(data, addr, &filterContext);

if (flags & kReceiveOnce)
{
// Transition back to socketQueue to get the current delegate / delegateQueue
dispatch_async(socketQueue, ^{ @autoreleasepool {

pendingFilterOperations--;

if (allowed)
{
// The delegate has been notified,
// so our receive once operation has completed.
flags &= ~kReceiveOnce;
[self notifyDidReceiveData:data fromAddress:addr withFilterContext:filterContext];
}
else if (pendingFilterOperations == 0)

if (flags & kReceiveOnce)
{
// All pending filter operations have completed,
// and none were allowed through.
// Our receive once operation hasn't completed yet.
[self doReceive];
if (allowed)
{
// The delegate has been notified,
// so our receive once operation has completed.
flags &= ~kReceiveOnce;
}
else if (pendingFilterOperations == 0)
{
// All pending filter operations have completed,
// and none were allowed through.
// Our receive once operation hasn't completed yet.
[self doReceive];
}
}
}
}});
}});
}
else // if (!receiveFilterAsync)
{
dispatch_sync(receiveFilterQueue, ^{ @autoreleasepool {

allowed = receiveFilterBlock(data, addr, &filterContext);
}});

}});
if (allowed) {
[self notifyDidReceiveData:data fromAddress:addr withFilterContext:filterContext];
notifiedDelegate = YES;
}
else {
ignored = YES;
}
}
}
else
else // if (!receiveFilterBlock || !receiveFilterQueue)
{
// Notify delegate
[self notifyDidReceiveData:data fromAddress:addr withFilterContext:nil];
notifiedDelegate = YES;
}
}
}
Expand Down Expand Up @@ -4219,16 +4241,20 @@ - (void)doReceive
else
{
// One-at-a-time receive mode
if (ignoredDueToAddress)
{
[self doReceive];
}
else if (pendingFilterOperations == 0)
if (notifiedDelegate)
{
// The delegate has been notified (no set filter).
// So our receive once operation has completed.
flags &= ~kReceiveOnce;
}
else if (ignored)
{
[self doReceive];
}
else
{
// Waiting on asynchronous receive filter...
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion GCD/Xcode/EchoServer/EchoServer.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
29B97313FDCFA39411CA2CEA /* Project object */ = {
isa = PBXProject;
attributes = {
LastUpgradeCheck = 0420;
LastUpgradeCheck = 0430;
};
buildConfigurationList = C01FCF4E08A954540054247B /* Build configuration list for PBXProject "EchoServer" */;
compatibilityVersion = "Xcode 3.2";
Expand Down

0 comments on commit 66b59a1

Please sign in to comment.