diff --git a/README.mkd b/README.mkd index e2b6a7b..020e571 100644 --- a/README.mkd +++ b/README.mkd @@ -25,7 +25,7 @@ for its context. Thread Safety ------------- -ZeroMQ has some restrictive thread safety and coupling issues: +Early versions of ZeroMQ had some restrictive thread safety and coupling issues: * Sockets can only be used from the thread that created them. * All ZMQ sockets provided in a single call to `zmq_poll()` must have been created using the same context. @@ -37,38 +37,34 @@ you have to track each socket's context and make sure not to mix them. (The `ZMQSocket` class tracks this for you.) +This is not as restrictive as it sounds, +because most applications will only ever use +a single context. -Because each socket -is bound to the thread that created it, -you must be very careful when using +Prior to version 2.1.0, +each socket was permanently bound to the thread that created it. +This made it very difficult to use ZeroMQ sockets with Grand Central Dispatch -or `NSOperationQueue`. -The only persistent thread -that these two expose +or `NSOperationQueue`, because +the only persistent thread +that these two APIs expose is the thread you're least likely to want to perform socket operations on: the main thread. -Starting with -ZeroMQ 2.1.0, -you can use a socket -from any thread -provided you issue -a full memory barrier -between usage on one thread -and on the other. -The easiest way is to use -`OSMemoryBarrier()` -from ``. +Starting with version 2.1.0, +a socket can be used from different threads +provided a full memory barrier, +such as that introduced by +``'s +`OSMemoryBarrier` function, +separates the socket's use on one thread +from its use on another. To Do ----- * Add functional tests in the form of sample code. -* Provide a more (Core)Foundation-like API -(`CFSocket`, `CFFileDescriptor`) -that takes advantage of the runloop. -This is complicated by -the threading constraints -present in libzmq-2.0.7. +* Tie polling into the runloop, similar to +`NSStream`, `CFSocket`, or `CFFileDescriptor`. diff --git a/src/ZMQContext.h b/src/ZMQContext.h index b939cf9..f33c823 100644 --- a/src/ZMQContext.h +++ b/src/ZMQContext.h @@ -2,6 +2,10 @@ #import "ZMQSocket.h" // ZMQSocketType #import +/* Special polling timeout values. */ +#define ZMQPollTimeoutNever (-1) +#define ZMQPollTimeoutNow (0) + @interface ZMQContext : NSObject { void *context; NSMutableArray *sockets; @@ -10,6 +14,11 @@ } + (void)getZMQVersionMajor:(int *)major minor:(int *)minor patch:(int *)patch; +/* Polling */ +// Generic poll interface. ++ (int)pollWithItems:(zmq_pollitem_t *)ioItems count:(int)itemCount + timeoutAfterUsec:(long)usec; + // Creates a ZMQContext using |threadCount| threads for I/O. - (id)initWithIOThreads:(NSUInteger)threadCount; @@ -17,6 +26,9 @@ // Sockets associated with this context. @property(readonly, retain, NS_NONATOMIC_IPHONEONLY) NSArray *sockets; +// Closes all associated sockets. +- (void)closeSockets; + // Initiates termination. All associated sockets will be shut down. - (void)terminate; diff --git a/src/ZMQContext.m b/src/ZMQContext.m index 68239ef..dac3993 100644 --- a/src/ZMQContext.m +++ b/src/ZMQContext.m @@ -15,6 +15,13 @@ + (void)getZMQVersionMajor:(int *)major minor:(int *)minor patch:(int *)patch { (void)zmq_version(major, minor, patch); } +#pragma mark Polling ++ (int)pollWithItems:(zmq_pollitem_t *)ioItems count:(int)itemCount + timeoutAfterUsec:(long)usec { + int ret = zmq_poll(ioItems, itemCount, usec); + return ret; +} + - (id)initWithIOThreads:(NSUInteger)threadCount { self = [super init]; if (!self) return nil; @@ -52,6 +59,12 @@ - (ZMQSocket *)socketWithType:(ZMQSocketType)type { return socket; } +- (void)closeSockets { + for (ZMQSocket *socket in self.sockets) { + [socket close]; + } +} + @synthesize terminated; - (void)terminate { (void)zmq_term(self.context); diff --git a/src/ZMQObjC.h b/src/ZMQObjC.h new file mode 100644 index 0000000..3adc5db --- /dev/null +++ b/src/ZMQObjC.h @@ -0,0 +1,3 @@ +/* Convenience header - imports all ZMQObjc headers. */ +#import "ZMQContext.h" +#import "ZMQSocket.h" diff --git a/src/ZMQSocket.h b/src/ZMQSocket.h index ade3968..36efb23 100644 --- a/src/ZMQSocket.h +++ b/src/ZMQSocket.h @@ -10,9 +10,13 @@ typedef int ZMQMessageReceiveFlags; @interface ZMQSocket : NSObject { void *socket; ZMQContext *context; // not retained + NSString *endpoint; ZMQSocketType type; BOOL closed; } +// Returns @"ZMQ_PUB" for ZMQ_PUB, for example. ++ (NSString *)nameForSocketType:(ZMQSocketType)type; + // Create a socket using -[ZMQContext socketWithType:]. @property(readonly, assign, NS_NONATOMIC_IPHONEONLY) ZMQContext *context; @property(readonly, NS_NONATOMIC_IPHONEONLY) ZMQSocketType type; @@ -20,6 +24,7 @@ typedef int ZMQMessageReceiveFlags; - (void)close; // KVOable. @property(readonly, getter=isClosed, NS_NONATOMIC_IPHONEONLY) BOOL closed; +@property(readonly, copy, NS_NONATOMIC_IPHONEONLY) NSString *endpoint; #pragma mark Socket Options - (BOOL)setData:(NSData *)data forOption:(ZMQSocketOption)option; diff --git a/src/ZMQSocket.m b/src/ZMQSocket.m index a44d058..f7d0534 100644 --- a/src/ZMQSocket.m +++ b/src/ZMQSocket.m @@ -12,11 +12,27 @@ @interface ZMQContext (ZMQSocketIsFriend) @interface ZMQSocket () @property(readwrite, getter=isClosed, NS_NONATOMIC_IPHONEONLY) BOOL closed; @property(readonly) void *socket; +@property(readwrite, copy, NS_NONATOMIC_IPHONEONLY) NSString *endpoint; @end static inline void ZMQLogError(id object, NSString *msg); @implementation ZMQSocket ++ (NSString *)nameForSocketType:(ZMQSocketType)type { + static NSString *const typeNames[] = { + @"ZMQ_PAIR", + @"ZMQ_PUB", @"ZMQ_SUB", + @"ZMQ_REQ", @"ZMQ_REP", + @"ZMQ_XREQ", @"ZMQ_XREP", + @"ZMQ_PULL", @"ZMQ_PUSH" + }; + static const ZMQSocketType + typeNameCount = sizeof(typeNames)/sizeof(*typeNames); + + if (type < 0 || type >= typeNameCount) return @"UNKNOWN"; + return typeNames[type]; +} + - (id)init { self = [super init]; if (self) [self release]; @@ -48,7 +64,6 @@ - (id)initWithContext:(ZMQContext *)context_ type:(ZMQSocketType)type_ { @synthesize socket; @synthesize closed; - (void)close { - // FIXME: This is not thread-safe. if (!self.closed) { int err = zmq_close(self.socket); if (err) { @@ -61,15 +76,19 @@ - (void)close { - (void)dealloc { [self close]; + [endpoint release], endpoint = nil; [super dealloc]; } @synthesize context; @synthesize type; - (NSString *)description { + NSString *typeName = [[self class] nameForSocketType:self.type]; NSString * - desc = [NSString stringWithFormat:@"<%@: %p (ctx=%p, type=%d, closed=%d)>", - [self class], self, self.context, (int)self.type, (int)self.closed]; + desc = [NSString stringWithFormat: + @"<%@: %p (ctx=%p, type=%@, endpoint=%@, closed=%d)>", + [self class], self, self.context, typeName, self.endpoint, + (int)self.closed]; return desc; } @@ -101,8 +120,10 @@ - (NSData *)dataForOption:(ZMQSocketOption)option { } #pragma mark Endpoint Configuration -- (BOOL)bindToEndpoint:(NSString *)endpoint { - const char *addr = [endpoint UTF8String]; +@synthesize endpoint; +- (BOOL)bindToEndpoint:(NSString *)endpoint_ { + [self setEndpoint:endpoint_]; + const char *addr = [endpoint_ UTF8String]; int err = zmq_bind(self.socket, addr); if (err) { ZMQLogError(self, @"zmq_bind"); @@ -111,8 +132,9 @@ - (BOOL)bindToEndpoint:(NSString *)endpoint { return YES; } -- (BOOL)connectToEndpoint:(NSString *)endpoint { - const char *addr = [endpoint UTF8String]; +- (BOOL)connectToEndpoint:(NSString *)endpoint_ { + [self setEndpoint:endpoint_]; + const char *addr = [endpoint_ UTF8String]; int err = zmq_connect(self.socket, addr); if (err) { ZMQLogError(self, @"zmq_connect"); diff --git a/zmqobjc.xcodeproj/project.pbxproj b/zmqobjc.xcodeproj/project.pbxproj index 4f8bb4c..3c48768 100644 --- a/zmqobjc.xcodeproj/project.pbxproj +++ b/zmqobjc.xcodeproj/project.pbxproj @@ -17,6 +17,8 @@ 3F62DF7F11DEDAAA008EA78B /* ZMQSocket.m in Sources */ = {isa = PBXBuildFile; fileRef = 3F62DF7B11DEDAAA008EA78B /* ZMQSocket.m */; }; 3F62E09E11DEFB28008EA78B /* Foundation.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 3F62E09D11DEFB28008EA78B /* Foundation.framework */; }; 3F62E0A211DEFB4B008EA78B /* libzmq.dylib in Frameworks */ = {isa = PBXBuildFile; fileRef = 3F62E0A111DEFB4B008EA78B /* libzmq.dylib */; }; + 3FFFDB8012F121D200E2D54F /* ZMQObjC.h in Headers */ = {isa = PBXBuildFile; fileRef = 3FFFDB7F12F121D200E2D54F /* ZMQObjC.h */; }; + 3FFFDB8112F121D200E2D54F /* ZMQObjC.h in Headers */ = {isa = PBXBuildFile; fileRef = 3FFFDB7F12F121D200E2D54F /* ZMQObjC.h */; }; /* End PBXBuildFile section */ /* Begin PBXFileReference section */ @@ -30,6 +32,7 @@ 3F62E09011DEF8AC008EA78B /* dylib.xcconfig */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.xcconfig; path = dylib.xcconfig; sourceTree = ""; }; 3F62E09D11DEFB28008EA78B /* Foundation.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = Foundation.framework; path = System/Library/Frameworks/Foundation.framework; sourceTree = SDKROOT; }; 3F62E0A111DEFB4B008EA78B /* libzmq.dylib */ = {isa = PBXFileReference; lastKnownFileType = "compiled.mach-o.dylib"; name = libzmq.dylib; path = /usr/local/lib/libzmq.dylib; sourceTree = ""; }; + 3FFFDB7F12F121D200E2D54F /* ZMQObjC.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ZMQObjC.h; sourceTree = ""; }; D2AAC0630554660B00DB518D /* libzmqobjc.dylib */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.dylib"; includeInIndex = 0; path = libzmqobjc.dylib; sourceTree = BUILT_PRODUCTS_DIR; }; /* End PBXFileReference section */ @@ -69,6 +72,7 @@ 08FB7795FE84155DC02AAC07 /* Source */ = { isa = PBXGroup; children = ( + 3FFFDB7F12F121D200E2D54F /* ZMQObjC.h */, 3F62DF6411DED7A4008EA78B /* ZMQContext.h */, 3F62DF6511DED7A4008EA78B /* ZMQContext.m */, 3F62DF7A11DEDAAA008EA78B /* ZMQSocket.h */, @@ -105,6 +109,7 @@ files = ( 3F62DF6811DED7A4008EA78B /* ZMQContext.h in Headers */, 3F62DF7E11DEDAAA008EA78B /* ZMQSocket.h in Headers */, + 3FFFDB8012F121D200E2D54F /* ZMQObjC.h in Headers */, ); runOnlyForDeploymentPostprocessing = 0; }; @@ -114,6 +119,7 @@ files = ( 3F62DF6611DED7A4008EA78B /* ZMQContext.h in Headers */, 3F62DF7C11DEDAAA008EA78B /* ZMQSocket.h in Headers */, + 3FFFDB8112F121D200E2D54F /* ZMQObjC.h in Headers */, ); runOnlyForDeploymentPostprocessing = 0; };