Skip to content

Commit

Permalink
Implement NSCopying, NSCoding, able to remove event handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 20, 2013
1 parent ff125d5 commit 3421732
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 56 deletions.
2 changes: 1 addition & 1 deletion TRVSEventSource.xcodeproj/project.pbxproj
Expand Up @@ -210,7 +210,7 @@
ORGANIZATIONNAME = "Travis Jeffery";
TargetAttributes = {
3275255018047A3B00CF5D6F = {
TestTargetID = 3275253518047A3A00CF5D6F;
TestTargetID = 3275253518047A3A00CF5D6F /* TRVSEventSource */;
};
};
};
Expand Down
49 changes: 47 additions & 2 deletions TRVSEventSource/TRVSEventSource.h
Expand Up @@ -14,9 +14,17 @@

typedef void (^TRVSEventSourceEventHandler)(TRVSServerSentEvent *event, NSError *error);

@interface TRVSEventSource : NSObject <NSURLSessionDelegate, NSURLSessionDataDelegate>

// The URL that the event source receives events from.
/**
`TRVSEventSource` is an Objective-C implementation of the EventSource DOM interface supported by modern browsers.
An event source opens an HTTP connection, and receives events as they are sent from the server. Each event is encoded as an `TRVSServerSentEvent` object, and dispatched to all listeners for that particular event type.
@see http://www.w3.org/TR/eventsource/
*/
@interface TRVSEventSource : NSObject <NSURLSessionDelegate, NSURLSessionDataDelegate, NSCopying, NSCoding>

// The URL the event source receives events from.
@property (nonatomic, strong, readonly) NSURL *URL;
// The managed session.
@property (nonatomic, strong, readonly) NSURLSession *URLSession;
Expand All @@ -29,23 +37,60 @@ typedef void (^TRVSEventSourceEventHandler)(TRVSServerSentEvent *event, NSError

// @name connection state

// The connection state can be in only one state at any given time.
- (BOOL)isConnecting;
- (BOOL)isOpen;
- (BOOL)isClosed;
- (BOOL)isClosing;

// @name initializing an event source

/**
* Initializes an `TRVSEventSource` object with the specified URL. The event source will open only by calling -[TRVSEventSource open].
*
* @param URL The url the event source will receive events from.
*
* @return The newly-initialized event source.
*/
- (instancetype)initWithURL:(NSURL *)URL;

// @name opening and closing an event source

/**
* Opens a connection to the `URL` to receive events. The request specifies an `Accept` HTTP header field value of `text/event-stream`.
*/
- (void)open;

/**
* Closes the connection.
*/
- (void)close;

// @name listening for events

/**
* Adds a listener to the event source thats runs the `eventHandler` block whenever an event is received with the given `event` name.
*
* @param event The name of the event to listen for.
* @param eventHandler The block to run when events with the given name are received.
*
* @return The identifier associated with the listener for the specified event. Pass this to `-[TRVSEventSource removeEventListenerWithIdentifier:]` to remove the listener.
*/
- (NSUInteger)addListenerForEvent:(NSString *)event
usingEventHandler:(TRVSEventSourceEventHandler)eventHandler;

/**
Removes the event listener with the given identifier
@param identifier The identifier associated with the event listener.
@discussion The event listener identifier is returned when added with `-[TRVSEventSource addListenerForEvent:usingBlock:]`.
*/
- (void)removeEventListenerWithIdentifier:(NSUInteger)identifier;

/**
* Removes all listeners for events of the given type.
*/
- (void)removeAllListenersForEvent:(NSString *)event;

@end
133 changes: 86 additions & 47 deletions TRVSEventSource/TRVSEventSource.m
Expand Up @@ -7,32 +7,28 @@
//

#import "TRVSEventSource.h"
#import "TRVSServerSentEvent.h"


NSString *const TRVSEventSourceErrorDomain = @"com.travisjeffery.TRVSEventSource";
const NSInteger TRVSEventSourceErrorSourceClosed = 666;
static NSUInteger const TRVSEventSourceListenersCapacity = 100;
static char *const TRVSEventSourceSyncQueueLabel = "com.travisjeffery.TRVSEventSource.syncQueue";
static NSString *const TRVSEventSourceOperationQueueName = @"com.travisjeffery.TRVSEventSource.operationQueue";

static NSDictionary *TRVSServerSentEventFieldsFromData(NSData *data, NSError * __autoreleasing *error) {
if (!data || [data length] == 0) return nil;

NSString *string = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding];
NSMutableDictionary *mutableFields = [NSMutableDictionary dictionary];

for (NSString *line in [string componentsSeparatedByCharactersInSet:[NSCharacterSet newlineCharacterSet]]) {
if (!line || [line length] == 0 || [line hasPrefix:@":"]) continue;

@autoreleasepool {
NSScanner *scanner = [[NSScanner alloc] initWithString:line];
scanner.charactersToBeSkipped = [NSCharacterSet whitespaceCharacterSet];
NSString *key, *value;
[scanner scanUpToString:@":" intoString:&key];
[scanner scanString:@":" intoString:nil];
[scanner scanUpToString:@"\n" intoString:&value];

if (key && value) {
if (mutableFields[key]) {
mutableFields[key] = [mutableFields[key] stringByAppendingFormat:@"\n%@", value];
Expand All @@ -42,7 +38,7 @@
}
}
}

return mutableFields;
}

Expand Down Expand Up @@ -70,41 +66,54 @@ @implementation TRVSEventSource

- (instancetype)initWithURL:(NSURL *)URL {
if (!(self = [super init])) return nil;

self.operationQueue = [[NSOperationQueue alloc] init];
self.operationQueue.name = TRVSEventSourceOperationQueueName;
self.URL = URL;
self.listenersKeyedByEvent = [[NSMapTable alloc] initWithKeyOptions:NSPointerFunctionsCopyIn valueOptions:NSPointerFunctionsStrongMemory capacity:TRVSEventSourceListenersCapacity];
self.URLSession = [NSURLSession sessionWithConfiguration:[NSURLSessionConfiguration defaultSessionConfiguration] delegate:self delegateQueue:self.operationQueue];
self.syncQueue = dispatch_queue_create(TRVSEventSourceSyncQueueLabel, NULL);

return self;
}

- (void)open {
__weak typeof(self) weakSelf = self;
dispatch_sync(self.syncQueue, ^{
[weakSelf transitionToConnecting];
});
[self transitionToConnecting];
}

- (void)close {
__weak typeof(self) weakSelf = self;
dispatch_sync(self.syncQueue, ^{
[weakSelf transitionToClosing];
});
[self transitionToClosing];
}

- (NSUInteger)addListenerForEvent:(NSString *)event usingEventHandler:(TRVSEventSourceEventHandler)eventHandler {
NSMutableDictionary *mutableListenersKeyedByIdentifier = [self.listenersKeyedByEvent objectForKey:event];
if (!mutableListenersKeyedByIdentifier) mutableListenersKeyedByIdentifier = [NSMutableDictionary dictionary];

NSUInteger identifier = [[NSUUID UUID] hash];
mutableListenersKeyedByIdentifier[@(identifier)] = [eventHandler copy];

[self.listenersKeyedByEvent setObject:mutableListenersKeyedByIdentifier forKey:event];

return identifier;
}

- (void)removeEventListenerWithIdentifier:(NSUInteger)identifier {
NSEnumerator *enumerator = [self.listenersKeyedByEvent keyEnumerator];
id event = nil;
while ((event = [enumerator nextObject])) {
NSMutableDictionary *mutableListenersKeyedByIdentifier = [self.listenersKeyedByEvent objectForKey:event];
if ([mutableListenersKeyedByIdentifier objectForKey:@(identifier)]) {
[mutableListenersKeyedByIdentifier removeObjectForKey:@(identifier)];
[self.listenersKeyedByEvent setObject:mutableListenersKeyedByIdentifier forKey:event];
return;
}
}
}

- (void)removeAllListenersForEvent:(NSString *)event {
[self.listenersKeyedByEvent removeObjectForKey:event];
}

#pragma mark - State

- (BOOL)isConnecting {
Expand All @@ -125,13 +134,13 @@ - (BOOL)isClosing {

#pragma mark - NSURLSessionDelegate

- (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask didReceiveData:(NSData *)data {
- (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask didReceiveData:(NSData *)data {
NSUInteger length = data.length;
while (YES) {
NSInteger totalNumberOfBytesWritten = 0;
if (self.outputStream.hasSpaceAvailable) {
const uint8_t *dataBuffer = (uint8_t *)[data bytes];

NSInteger numberOfBytesWritten = 0;
while (totalNumberOfBytesWritten < (NSInteger)length) {
numberOfBytesWritten = [self.outputStream write:&dataBuffer[0] maxLength:length];
Expand All @@ -141,7 +150,7 @@ - (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)data
totalNumberOfBytesWritten += numberOfBytesWritten;
}
}

break;
}
}
Expand Down Expand Up @@ -170,7 +179,7 @@ - (void)stream:(NSStream *)stream handleEvent:(NSStreamEvent)eventCode {
NSError *error = nil;
TRVSServerSentEvent *event = [TRVSServerSentEvent eventWithFields:TRVSServerSentEventFieldsFromData([data subdataWithRange:NSMakeRange(self.offset, [data length] - self.offset)], &error)];
self.offset = [data length];

if (error) {
[self transitionToFailedWithError:error];
}
Expand All @@ -179,7 +188,7 @@ - (void)stream:(NSStream *)stream handleEvent:(NSStreamEvent)eventCode {
[[self.listenersKeyedByEvent objectForKey:event.event] enumerateKeysAndObjectsUsingBlock:^(id _, TRVSEventSourceEventHandler eventHandler, BOOL *stop) {
eventHandler(event, nil);
}];

if ([self.delegate respondsToSelector:@selector(eventSource:didReceiveEvent:)]) {
[self.delegate eventSource:self didReceiveEvent:event];
}
Expand All @@ -195,27 +204,36 @@ - (void)stream:(NSStream *)stream handleEvent:(NSStreamEvent)eventCode {
}
}

#pragma mark - Private
#pragma NSCoding

- (void)setupOutputStream {
self.outputStream = [NSOutputStream outputStreamToMemory];
self.outputStream.delegate = self;
[self.outputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream open];
- (id)initWithCoder:(NSCoder *)aDecoder {
NSURL *URL = [aDecoder decodeObjectForKey:@"URL"];
if (!(self = [self initWithURL:URL])) return nil;
return self;
}

- (void)closeOutputStream {
self.outputStream.delegate = nil;
[self.outputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream close];
- (void)encodeWithCoder:(NSCoder *)aCoder {
[aCoder encodeObject:self.URL forKey:@"URL"];
}

#pragma NSCopying

- (id)copyWithZone:(NSZone *)zone {
return [[[self class] allocWithZone:zone] initWithURL:self.URL];
}

#pragma mark - Private

- (void)transitionToOpenIfNeeded {
if (self.state != TRVSEventSourceConnecting) return;
self.state = TRVSEventSourceOpen;
if ([self.delegate respondsToSelector:@selector(eventSourceDidOpen:)]) {
[self.delegate eventSourceDidOpen:self];
}
__weak typeof(self) weakSelf = self;
dispatch_sync(self.syncQueue, ^{
__strong typeof(weakSelf) strongSelf = weakSelf;
if (strongSelf.state != TRVSEventSourceConnecting) return;
strongSelf.state = TRVSEventSourceOpen;
if ([strongSelf.delegate respondsToSelector:@selector(eventSourceDidOpen:)]) {
[strongSelf.delegate eventSourceDidOpen:self];
}
});
}

- (void)transitionToFailedWithError:(NSError *)error {
Expand All @@ -233,16 +251,37 @@ - (void)transitionToClosed {
}

- (void)transitionToConnecting {
self.state = TRVSEventSourceConnecting;
[self setupOutputStream];
self.URLSessionTask = [self.URLSession dataTaskWithURL:self.URL];
[self.URLSessionTask resume];
__weak typeof(self) weakSelf = self;
dispatch_sync(self.syncQueue, ^{
__strong typeof(weakSelf) strongSelf = weakSelf;
strongSelf.state = TRVSEventSourceConnecting;
[strongSelf setupOutputStream];
strongSelf.URLSessionTask = [strongSelf.URLSession dataTaskWithURL:strongSelf.URL];
[strongSelf.URLSessionTask resume];
});
}

- (void)transitionToClosing {
self.state = TRVSEventSourceClosing;
[self closeOutputStream];
[self.URLSession invalidateAndCancel];
__weak typeof(self) weakSelf = self;
dispatch_sync(self.syncQueue, ^{
__strong typeof(weakSelf) strongSelf = weakSelf;
strongSelf.state = TRVSEventSourceClosing;
[strongSelf closeOutputStream];
[strongSelf.URLSession invalidateAndCancel];
});
}

- (void)setupOutputStream {
self.outputStream = [NSOutputStream outputStreamToMemory];
self.outputStream.delegate = self;
[self.outputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream open];
}

- (void)closeOutputStream {
self.outputStream.delegate = nil;
[self.outputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream close];
}

@end
13 changes: 7 additions & 6 deletions TRVSEventSourceTests/TRVSEventSourceTests.m
Expand Up @@ -21,12 +21,13 @@ @implementation TRVSEventSourceTests
- (void)testEventSourceStreaming {
// you must be running the local server. see README.md.
TRVSEventSource *eventSource = [[TRVSEventSource alloc] initWithURL:[NSURL URLWithString:@"http://127.0.0.1:8000"]];

__block TRVSMonitor *monitor = [[TRVSMonitor alloc] initWithExpectedSignalCount:3];

[eventSource addListenerForEvent:@"message" usingEventHandler:^(TRVSServerSentEvent *event, NSError *error) {
NSDictionary *dictionary = [NSJSONSerialization JSONObjectWithData:event.data options:0 error:NULL];

XCTAssert(event);
XCTAssertEqualObjects(@"message", event.event);
NSDictionary *dictionary = [NSJSONSerialization JSONObjectWithData:event.data options:0 error:NULL];
XCTAssertEqualObjects(@1, dictionary[@"author_id"]);
XCTAssertEqualObjects(@1, dictionary[@"conversation_id"]);
XCTAssert(dictionary[@"body"]);
Expand Down Expand Up @@ -56,8 +57,8 @@ - (void)testEventSourceOpening {
}] eventSourceDidOpen:eventSource];

[eventSource open];
XCTAssert(eventSource.isConnecting);

XCTAssert(eventSource.isConnecting);
XCTAssert([monitor wait]);
[delegate verify];
}
Expand All @@ -80,14 +81,14 @@ - (void)testEventSourceClosing {
[[[delegate stub] andDo:^(NSInvocation *invocation) {
[monitor signal];
}] eventSourceDidOpen:eventSource];

[eventSource open];


[[[delegate stub] andDo:^(NSInvocation *invocation) {
XCTAssert(eventSource.isClosed);
[monitor signal];
}] eventSourceDidClose:eventSource];

[eventSource open];

XCTAssert([monitor wait]);
[delegate verify];
}
Expand Down

0 comments on commit 3421732

Please sign in to comment.