Skip to content

Commit

Permalink
Previously, if you called one of the read or write methods, the read/…
Browse files Browse the repository at this point in the history
…write could possibly start AND finish before the method even returned. E.g.:

[asyncSocket readDataToLength:15 timeout:5 tag:10];
// Read has already finished, and invoked your delegate method!
state = READING; // Oops!

This isn't what people generally suspect, and it runs a bit counter to the asynchronous idea.
Thus this has been changed. Calling a read or write method will no longer finish prior to the method returning. The read/write is guaranteed to finish until at least the next invocation of the run loop.
  • Loading branch information
robbiehanson committed Jun 5, 2008
1 parent e5752cd commit be7716f
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 51 deletions.
44 changes: 25 additions & 19 deletions AsyncSocket.h
Expand Up @@ -30,8 +30,9 @@ typedef enum AsyncSocketError AsyncSocketError;
@interface NSObject (AsyncSocketDelegate)

/**
* In the event of an error, the socket is closed. You may call "readDataWithTimeout:tag:" during this call-back to
* get the last bit of data off the socket. When connecting, this delegate method may be called
* In the event of an error, the socket is closed.
* You may call "unreadData" during this call-back to get the last bit of data off the socket.
* When connecting, this delegate method may be called
* before"onSocket:didAcceptNewSocket:" or "onSocket:didConnectToHost:".
**/
- (void)onSocket:(AsyncSocket *)sock willDisconnectWithError:(NSError *)err;
Expand Down Expand Up @@ -91,20 +92,20 @@ typedef enum AsyncSocketError AsyncSocketError;

@interface AsyncSocket : NSObject
{
CFSocketRef theSocket; // IPv4/IPv6 accept or connect socket.
CFSocketRef theSocket6; // IPv6 accept socket.
CFSocketRef theSocket; // IPv4 accept or connect socket
CFSocketRef theSocket6; // IPv6 accept or connect socket
CFReadStreamRef theReadStream;
CFWriteStreamRef theWriteStream;

CFRunLoopSourceRef theSource; // For theSocket.
CFRunLoopSourceRef theSource6; // For theSocket6.
CFRunLoopSourceRef theSource; // For theSocket
CFRunLoopSourceRef theSource6; // For theSocket6
CFRunLoopRef theRunLoop;
CFSocketContext theContext;

NSMutableArray *theReadQueue;
AsyncReadPacket *theCurrentRead;
NSTimer *theReadTimer;
NSData *partialReadBuffer;
NSMutableData *partialReadBuffer;

NSMutableArray *theWriteQueue;
AsyncWritePacket *theCurrentWrite;
Expand All @@ -116,13 +117,12 @@ typedef enum AsyncSocketError AsyncSocketError;
long theUserData;
}

- (id) init;
- (id) initWithDelegate:(id)delegate;
- (id) initWithDelegate:(id)delegate userData:(long)userData;
- (void) dealloc;
- (id)init;
- (id)initWithDelegate:(id)delegate;
- (id)initWithDelegate:(id)delegate userData:(long)userData;

/* String representation is long but has no "\n". */
- (NSString *) description;
- (NSString *)description;

/**
* Use "canSafelySetDelegate" to see if there is any pending business (reads and writes) with the current delegate
Expand All @@ -137,9 +137,9 @@ typedef enum AsyncSocketError AsyncSocketError;
- (void)setUserData:(long)userData;

/* Don't use these to read or write. And don't close them, either! */
- (CFSocketRef) getCFSocket;
- (CFReadStreamRef) getCFReadStream;
- (CFWriteStreamRef) getCFWriteStream;
- (CFSocketRef)getCFSocket;
- (CFReadStreamRef)getCFReadStream;
- (CFWriteStreamRef)getCFWriteStream;

/**
* Once one of these methods is called, the AsyncSocket instance is locked in, and the rest can't be called without
Expand Down Expand Up @@ -219,10 +219,16 @@ typedef enum AsyncSocketError AsyncSocketError;
- (float)progressOfReadReturningTag:(long *)tag bytesDone:(CFIndex *)done total:(CFIndex *)total;
- (float)progressOfWriteReturningTag:(long *)tag bytesDone:(CFIndex *)done total:(CFIndex *)total;

/**
* In the event of an error, this method may be called during onSocket:willDisconnectWithError: to read
* any data that's left on the socket.
**/
- (NSData *)unreadData;

/* A few common line separators, for use with "readDataToData:withTimeout:tag:". */
+ (NSData *)CRLFData; // 0x0D0A
+ (NSData *)CRData; // 0x0D
+ (NSData *)LFData; // 0x0A
+ (NSData *)ZeroData; // 0x00
+ (NSData *)CRLFData; // 0x0D0A
+ (NSData *)CRData; // 0x0D
+ (NSData *)LFData; // 0x0A
+ (NSData *)ZeroData; // 0x00

@end
112 changes: 80 additions & 32 deletions AsyncSocket.m
Expand Up @@ -30,10 +30,10 @@

enum AsyncSocketFlags
{
kDidCallConnectDeleg = 0x01, // If set, connect delegate has been called.
kDidPassConnectMethod = 0x02, // If set, disconnection results in delegate call.
kForbidReadsWrites = 0x04, // If set, no new reads or writes are allowed.
kDisconnectSoon = 0x08 // If set, disconnect as soon as nothing is queued.
kDidCallConnectDeleg = 0x01, // If set, connect delegate has been called.
kDidPassConnectMethod = 0x02, // If set, disconnection results in delegate call.
kForbidReadsWrites = 0x04, // If set, no new reads or writes are allowed.
kDisconnectSoon = 0x08 // If set, disconnect as soon as nothing is queued.
};

@interface AsyncSocket (Private)
Expand Down Expand Up @@ -847,8 +847,8 @@ - (BOOL)setSocketFromStreamsAndReturnError:(NSError **)errPtr
#pragma mark Disconnect Implementation:
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// Sends error message and disconnects.
- (void) closeWithError:(NSError *)err
// Sends error message and disconnects
- (void)closeWithError:(NSError *)err
{
if (theFlags & kDidPassConnectMethod)
{
Expand All @@ -857,20 +857,30 @@ - (void) closeWithError:(NSError *)err

// Let the delegate know, so it can try to recover if it likes.
if ([theDelegate respondsToSelector:@selector(onSocket:willDisconnectWithError:)])
{
[theDelegate onSocket:self willDisconnectWithError:err];
}
}
[self close];
}

// Prepare partially read data for recovery.
- (void) recoverUnreadData
- (void)recoverUnreadData
{
if (theCurrentRead) [theCurrentRead->buffer setLength: theCurrentRead->bytesDone];
partialReadBuffer = (theCurrentRead ? [theCurrentRead->buffer copy] : nil);
if (theCurrentRead)
{
[theCurrentRead->buffer setLength: theCurrentRead->bytesDone];
partialReadBuffer = [theCurrentRead->buffer mutableCopy];
}
else
{
partialReadBuffer = [[NSMutableData alloc] initWithLength:0];
}

[self emptyQueues];
}

- (void) emptyQueues
- (void)emptyQueues
{
if (theCurrentRead != nil) [self endCurrentRead];
if (theCurrentWrite != nil) [self endCurrentWrite];
Expand All @@ -881,7 +891,7 @@ - (void) emptyQueues
}

// Disconnects. This is called for both error and clean disconnections.
- (void) close
- (void)close
{
// Empty queues.
[self emptyQueues];
Expand Down Expand Up @@ -940,7 +950,9 @@ - (void) close
{
// Delay notification to give him freedom to release without returning here and core-dumping.
if ([theDelegate respondsToSelector: @selector(onSocketDidDisconnect:)])
{
[theDelegate performSelector:@selector(onSocketDidDisconnect:) withObject:self afterDelay:0];
}
}

// Clear flags.
Expand All @@ -950,7 +962,7 @@ - (void) close
/**
* Disconnects immediately. Any pending reads or writes are dropped.
**/
- (void) disconnect
- (void)disconnect
{
[self close];
}
Expand All @@ -960,13 +972,56 @@ - (void) disconnect
* After calling this, the read and write methods (including "readDataWithTimeout:tag:") will do nothing.
* The socket will disconnect even if there are still pending reads.
**/
- (void) disconnectAfterWriting
- (void)disconnectAfterWriting
{
theFlags |= kForbidReadsWrites;
theFlags |= kDisconnectSoon;
[self maybeScheduleDisconnect];
}

/**
* In the event of an error, this method may be called during onSocket:willDisconnectWithError: to read
* any data that's left on the socket.
**/
- (NSData *)unreadData
{
// If the onSocket:willDisconnectWithError: method has been called, the partialReadBuffer will be initialized.
// Otherwise it remains nil.
// Thus checking this variable ensures this method will only return data in the event of an error.
if(partialReadBuffer == nil) return nil;

if(theReadStream == NULL) return nil;

CFIndex totalBytesRead = [partialReadBuffer length];
BOOL error = NO;
while(!error && CFReadStreamHasBytesAvailable(theReadStream))
{
[partialReadBuffer increaseLengthBy:READALL_CHUNKSIZE];

// Number of bytes to read is space left in packet buffer.
CFIndex bytesToRead = [partialReadBuffer length] - totalBytesRead;

// Read data into packet buffer
UInt8 *packetbuf = (UInt8 *)( [partialReadBuffer mutableBytes] + totalBytesRead );
CFIndex bytesRead = CFReadStreamRead(theReadStream, packetbuf, bytesToRead);

// Check results
if(bytesRead < 0)
{
bytesRead = 0;
error = YES;
}
else
{
totalBytesRead += bytesRead;
}
}

[partialReadBuffer setLength:totalBytesRead];

return partialReadBuffer;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#pragma mark Errors
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1245,7 +1300,7 @@ - (UInt16)addressPort:(CFDataRef)cfaddr
return ntohs (pAddr->sin_port);
}

- (NSString *) description
- (NSString *)description
{
static const char *statstr[] = { "not open", "opening", "open", "reading", "writing", "at end", "closed", "has error" };
CFStreamStatus rs = (theReadStream != NULL) ? CFReadStreamGetStatus (theReadStream) : 0;
Expand Down Expand Up @@ -1322,7 +1377,7 @@ - (NSString *) description
if (theFlags & kDisconnectSoon) [ms appendString: @", will disconnect soon"];
if (![self isConnected]) [ms appendString: @", not connected"];

[ms appendString: @">"];
[ms appendString: @">"];

return [ms autorelease];
}
Expand All @@ -1345,7 +1400,7 @@ - (void)readDataToLength:(CFIndex)length withTimeout:(NSTimeInterval)timeout tag
bufferOffset:0];

[theReadQueue addObject:packet];
[self maybeDequeueRead];
[self scheduleDequeueRead];

[packet release];
[buffer release];
Expand All @@ -1365,7 +1420,7 @@ - (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(l
bufferOffset:0];

[theReadQueue addObject:packet];
[self maybeDequeueRead];
[self scheduleDequeueRead];

[packet release];
[buffer release];
Expand All @@ -1375,24 +1430,16 @@ - (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag
{
if (theFlags & kForbidReadsWrites) return;

// The partialReadBuffer is used when recovering data from a broken connection.
NSMutableData *buffer;
if(partialReadBuffer) {
buffer = [partialReadBuffer mutableCopy];
}
else {
buffer = [[NSMutableData alloc] initWithLength:0];
}

NSMutableData *buffer = [[NSMutableData alloc] initWithLength:0];
AsyncReadPacket *packet = [[AsyncReadPacket alloc] initWithData:buffer
timeout:timeout
tag:tag
readAllAvailable:YES
terminator:nil
bufferOffset:[buffer length]];
bufferOffset:0];

[theReadQueue addObject:packet];
[self maybeDequeueRead];
[self scheduleDequeueRead];

[packet release];
[buffer release];
Expand Down Expand Up @@ -1578,12 +1625,12 @@ - (void)writeData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)t
AsyncWritePacket *packet = [[AsyncWritePacket alloc] initWithData:data timeout:timeout tag:tag];

[theWriteQueue addObject:packet];
[self maybeDequeueWrite];
[self scheduleDequeueWrite];

[packet release];
}

- (void) scheduleDequeueWrite
- (void)scheduleDequeueWrite
{
[self performSelector:@selector(maybeDequeueWrite) withObject:nil afterDelay:0];
}
Expand Down Expand Up @@ -1659,10 +1706,12 @@ - (void)doSendBytes
- (void)completeCurrentWrite
{
NSAssert (theCurrentWrite, @"Trying to complete current write when there is no current write.");

if ([theDelegate respondsToSelector:@selector(onSocket:didWriteDataWithTag:)])
{
[theDelegate onSocket:self didWriteDataWithTag:theCurrentWrite->tag];
}

if (theCurrentWrite != nil) [self endCurrentWrite]; // Caller may have disconnected.
}

Expand Down Expand Up @@ -1697,10 +1746,9 @@ - (void)doWriteTimeout:(NSTimer *)timer
if (timer != theWriteTimer) return; // Old timer. Ignore it.
if (theCurrentWrite != nil)
{
// Send what we got
[self endCurrentWrite];
}
[self closeWithError: [self getWriteTimeoutError]];
[self closeWithError:[self getWriteTimeoutError]];
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit be7716f

Please sign in to comment.