Skip to content

Commit

Permalink
Async parsing of continuous _changes feed
Browse files Browse the repository at this point in the history
TDSocketChangeTracker parses batches of lines on a background thread.
It doesn't read more data from the stream until parsing has finished.
This adds parallelism, and also makes the stream buffering work a lot better because data isn't read until it's ready to be parsed.
  • Loading branch information
snej committed Mar 28, 2012
1 parent 8a67933 commit 34ddf84
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 22 deletions.
1 change: 1 addition & 0 deletions Source/ChangeTracker/TDChangeTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef enum TDChangeTrackerMode {
@property (readonly) NSURL* changesFeedURL;
@property (readonly) NSString* changesFeedPath;
- (void) setUpstreamError: (NSString*)message;
- (BOOL) receivedChange: (NSDictionary*)change;
- (BOOL) receivedChunk: (NSData*)chunk;
- (BOOL) receivedPollResponse: (NSData*)body;
- (void) stopped; // override this
Expand Down
2 changes: 2 additions & 0 deletions Source/ChangeTracker/TDSocketChangeTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@
NSMutableData* _inputBuffer;
NSMutableData* _changeBuffer;
int _state;
bool _parsing;
bool _inputAvailable;
}
@end
113 changes: 91 additions & 22 deletions Source/ChangeTracker/TDSocketChangeTracker.m
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#import "TDSocketChangeTracker.h"
#import "TDBase64.h"
#import "MYBlockUtils.h"


enum {
Expand Down Expand Up @@ -132,7 +133,10 @@ - (BOOL) failUnparseable: (NSString*)line {
}


- (BOOL) appendToChangeLine: (const void*)bytes length: (NSUInteger)length {
- (void) readChangeLine: (const void*)bytes
length: (NSUInteger)length
intoArray: (NSMutableArray*)changes
{
if (_changeBuffer)
[_changeBuffer appendBytes: bytes length: length];
else
Expand All @@ -145,15 +149,14 @@ - (BOOL) appendToChangeLine: (const void*)bytes length: (NSUInteger)length {
NSData* line = [_changeBuffer subdataWithRange: NSMakeRange(0, eol-start)];
[_changeBuffer replaceBytesInRange: NSMakeRange(0, eol-start+1)
withBytes: NULL length: 0];
// Finally! Parse the line as JSON:
if (![self receivedChunk: line])
return NO;
if (line.length > 0)
[changes addObject: line];
}
return YES;
}


- (void) readLines {
NSMutableArray* changes = $marray();
const char* pos = _inputBuffer.bytes;
const char* end = pos + _inputBuffer.length;
BOOL keepGoing = YES;
Expand All @@ -167,7 +170,6 @@ - (void) readLines {
length: lineLength
encoding: NSUTF8StringEncoding] autorelease];
pos = crlf + 2;
LogTo(ChangeTracker, @"%@: LINE: \"%@\"", self, line);
if (!line) {
[self failUnparseable: @"invalid UTF-8"];
break;
Expand Down Expand Up @@ -202,7 +204,7 @@ - (void) readLines {
break; // Don't read the chunk till it's complete
}
// Append the chunk to the current change line:
[self appendToChangeLine: pos length: chunkLength];
[self readChangeLine: pos length: chunkLength intoArray: changes];
pos += chunkLength;
}
}
Expand All @@ -211,6 +213,84 @@ - (void) readLines {
// Remove the parsed lines:
[_inputBuffer replaceBytesInRange: NSMakeRange(0, pos - (const char*)_inputBuffer.bytes)
withBytes: NULL length: 0];

if (changes.count > 0)
[self asyncParseChangeLines: changes];
}


#pragma mark - ASYNC PARSING:


- (void) asyncParseChangeLines: (NSArray*)lines {
static NSOperationQueue* sParseQueue;
if (!sParseQueue)
sParseQueue = [[NSOperationQueue alloc] init];

LogTo(ChangeTracker, @"%@: Async parsing %u changes...", self, lines.count);
Assert(!_parsing);
_parsing = true;
NSThread* resultThread = [NSThread currentThread];
[sParseQueue addOperationWithBlock: ^{
// Parse on background thread:
bool allParsed = true;
NSMutableArray* parsedChanges = [NSMutableArray arrayWithCapacity: lines.count];
for (NSData* line in lines) {
id change = [TDJSON JSONObjectWithData: line options: 0 error: NULL];
if (!change) {
Warn(@"TDSocketChangeTracker received unparseable change line from server: %@", [line my_UTF8ToString]);
allParsed = false;
break;
}
[parsedChanges addObject: change];
}
MYOnThread(resultThread, ^{
// Process change lines on original thread:
Assert(_parsing);
_parsing = false;
if (!_trackingInput)
return;
LogTo(ChangeTracker, @"%@: Notifying %u changes...", self, parsedChanges.count);
for (id change in parsedChanges) {
if (![self receivedChange: change]) {
[self failUnparseable: change];
break;
}
}
if (!allParsed) {
[self setUpstreamError: @"Unparseable change line"];
[self stop];
}

if (_inputAvailable)
[self readFromInput];
});
}];
}


#pragma mark - STREAM HANDLING:


- (void) readFromInput {
Assert(!_parsing);
Assert(_inputAvailable);
_inputAvailable = false;

uint8_t* buffer;
NSUInteger bufferLength;
NSInteger bytesRead;
if ([_trackingInput getBuffer: &buffer length: &bufferLength]) {
[_inputBuffer appendBytes: buffer length: bufferLength];
bytesRead = bufferLength;
} else {
uint8_t buffer[8192];
bytesRead = [_trackingInput read: buffer maxLength: sizeof(buffer)];
if (bytesRead > 0)
[_inputBuffer appendBytes: buffer length: bytesRead];
}
LogTo(ChangeTracker, @"%@: read %ld bytes", self, (long)bytesRead);
[self readLines];
}


Expand Down Expand Up @@ -245,21 +325,10 @@ - (void) stream: (NSInputStream*)stream handleEvent: (NSStreamEvent)eventCode {
}
case NSStreamEventHasBytesAvailable: {
LogTo(ChangeTracker, @"%@: HasBytesAvailable %@", self, stream);
uint8_t* buffer;
NSUInteger bufferLength;
NSInteger bytesRead;
if ([stream getBuffer: &buffer length: &bufferLength]) {
[_inputBuffer appendBytes: buffer length: bufferLength];
bytesRead = bufferLength;
} else {
uint8_t buffer[8192];
bytesRead = [stream read: buffer maxLength: sizeof(buffer)];
if (bytesRead > 0) {
[_inputBuffer appendBytes: buffer length: bytesRead];
}
}
LogTo(ChangeTracker, @"%@: read %ld bytes", self, (long)bytesRead);
[self readLines];
_inputAvailable = true;
// If still chewing on last bytes, don't eat any more yet
if (!_parsing)
[self readFromInput];
break;
}
case NSStreamEventEndEncountered:
Expand Down

0 comments on commit 34ddf84

Please sign in to comment.