Skip to content

Commit

Permalink
Merge examples branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremy-w committed Feb 11, 2011
2 parents 840fdb2 + 230cb75 commit f5e254a
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 31 deletions.
44 changes: 20 additions & 24 deletions README.mkd
Expand Up @@ -25,7 +25,7 @@ for its context.

Thread Safety
-------------
ZeroMQ has some restrictive thread safety and coupling issues:
Early versions of ZeroMQ had some restrictive thread safety and coupling issues:

* Sockets can only be used from the thread that created them.
* All ZMQ sockets provided in a single call to `zmq_poll()` must have been created using the same context.
Expand All @@ -37,38 +37,34 @@ you have to track
each socket's context
and make sure not to mix them.
(The `ZMQSocket` class tracks this for you.)
This is not as restrictive as it sounds,
because most applications will only ever use
a single context.

Because each socket
is bound to the thread that created it,
you must be very careful when using
Prior to version 2.1.0,
each socket was permanently bound to the thread that created it.
This made it very difficult to use
ZeroMQ sockets
with Grand Central Dispatch
or `NSOperationQueue`.
The only persistent thread
that these two expose
or `NSOperationQueue`, because
the only persistent thread
that these two APIs expose
is the thread
you're least likely
to want to perform socket operations on:
the main thread.

Starting with
ZeroMQ 2.1.0,
you can use a socket
from any thread
provided you issue
a full memory barrier
between usage on one thread
and on the other.
The easiest way is to use
`OSMemoryBarrier()`
from `<libkern/OSAtomic.h>`.
Starting with version 2.1.0,
a socket can be used from different threads
provided a full memory barrier,
such as that introduced by
`<libkern/OSAtomic.h>`'s
`OSMemoryBarrier` function,
separates the socket's use on one thread
from its use on another.

To Do
-----
* Add functional tests in the form of sample code.
* Provide a more (Core)Foundation-like API
(`CFSocket`, `CFFileDescriptor`)
that takes advantage of the runloop.
This is complicated by
the threading constraints
present in libzmq-2.0.7.
* Tie polling into the runloop, similar to
`NSStream`, `CFSocket`, or `CFFileDescriptor`.
12 changes: 12 additions & 0 deletions src/ZMQContext.h
Expand Up @@ -2,6 +2,10 @@
#import "ZMQSocket.h" // ZMQSocketType
#import <libkern/OSAtomic.h>

/* Special polling timeout values. */
#define ZMQPollTimeoutNever (-1)
#define ZMQPollTimeoutNow (0)

@interface ZMQContext : NSObject {
void *context;
NSMutableArray *sockets;
Expand All @@ -10,13 +14,21 @@
}
+ (void)getZMQVersionMajor:(int *)major minor:(int *)minor patch:(int *)patch;

/* Polling */
// Generic poll interface.
+ (int)pollWithItems:(zmq_pollitem_t *)ioItems count:(int)itemCount
timeoutAfterUsec:(long)usec;

// Creates a ZMQContext using |threadCount| threads for I/O.
- (id)initWithIOThreads:(NSUInteger)threadCount;

- (ZMQSocket *)socketWithType:(ZMQSocketType)type;
// Sockets associated with this context.
@property(readonly, retain, NS_NONATOMIC_IPHONEONLY) NSArray *sockets;

// Closes all associated sockets.
- (void)closeSockets;

// Initiates termination. All associated sockets will be shut down.
- (void)terminate;

Expand Down
13 changes: 13 additions & 0 deletions src/ZMQContext.m
Expand Up @@ -15,6 +15,13 @@ + (void)getZMQVersionMajor:(int *)major minor:(int *)minor patch:(int *)patch {
(void)zmq_version(major, minor, patch);
}

#pragma mark Polling
+ (int)pollWithItems:(zmq_pollitem_t *)ioItems count:(int)itemCount
timeoutAfterUsec:(long)usec {
int ret = zmq_poll(ioItems, itemCount, usec);
return ret;
}

- (id)initWithIOThreads:(NSUInteger)threadCount {
self = [super init];
if (!self) return nil;
Expand Down Expand Up @@ -52,6 +59,12 @@ - (ZMQSocket *)socketWithType:(ZMQSocketType)type {
return socket;
}

- (void)closeSockets {
for (ZMQSocket *socket in self.sockets) {
[socket close];
}
}

@synthesize terminated;
- (void)terminate {
(void)zmq_term(self.context);
Expand Down
3 changes: 3 additions & 0 deletions src/ZMQObjC.h
@@ -0,0 +1,3 @@
/* Convenience header - imports all ZMQObjc headers. */
#import "ZMQContext.h"
#import "ZMQSocket.h"
5 changes: 5 additions & 0 deletions src/ZMQSocket.h
Expand Up @@ -10,16 +10,21 @@ typedef int ZMQMessageReceiveFlags;
@interface ZMQSocket : NSObject {
void *socket;
ZMQContext *context; // not retained
NSString *endpoint;
ZMQSocketType type;
BOOL closed;
}
// Returns @"ZMQ_PUB" for ZMQ_PUB, for example.
+ (NSString *)nameForSocketType:(ZMQSocketType)type;

// Create a socket using -[ZMQContext socketWithType:].
@property(readonly, assign, NS_NONATOMIC_IPHONEONLY) ZMQContext *context;
@property(readonly, NS_NONATOMIC_IPHONEONLY) ZMQSocketType type;

- (void)close;
// KVOable.
@property(readonly, getter=isClosed, NS_NONATOMIC_IPHONEONLY) BOOL closed;
@property(readonly, copy, NS_NONATOMIC_IPHONEONLY) NSString *endpoint;

#pragma mark Socket Options
- (BOOL)setData:(NSData *)data forOption:(ZMQSocketOption)option;
Expand Down
36 changes: 29 additions & 7 deletions src/ZMQSocket.m
Expand Up @@ -12,11 +12,27 @@ @interface ZMQContext (ZMQSocketIsFriend)
@interface ZMQSocket ()
@property(readwrite, getter=isClosed, NS_NONATOMIC_IPHONEONLY) BOOL closed;
@property(readonly) void *socket;
@property(readwrite, copy, NS_NONATOMIC_IPHONEONLY) NSString *endpoint;
@end

static inline void ZMQLogError(id object, NSString *msg);

@implementation ZMQSocket
+ (NSString *)nameForSocketType:(ZMQSocketType)type {
static NSString *const typeNames[] = {
@"ZMQ_PAIR",
@"ZMQ_PUB", @"ZMQ_SUB",
@"ZMQ_REQ", @"ZMQ_REP",
@"ZMQ_XREQ", @"ZMQ_XREP",
@"ZMQ_PULL", @"ZMQ_PUSH"
};
static const ZMQSocketType
typeNameCount = sizeof(typeNames)/sizeof(*typeNames);

if (type < 0 || type >= typeNameCount) return @"UNKNOWN";
return typeNames[type];
}

- (id)init {
self = [super init];
if (self) [self release];
Expand Down Expand Up @@ -48,7 +64,6 @@ - (id)initWithContext:(ZMQContext *)context_ type:(ZMQSocketType)type_ {
@synthesize socket;
@synthesize closed;
- (void)close {
// FIXME: This is not thread-safe.
if (!self.closed) {
int err = zmq_close(self.socket);
if (err) {
Expand All @@ -61,15 +76,19 @@ - (void)close {

- (void)dealloc {
[self close];
[endpoint release], endpoint = nil;
[super dealloc];
}

@synthesize context;
@synthesize type;
- (NSString *)description {
NSString *typeName = [[self class] nameForSocketType:self.type];
NSString *
desc = [NSString stringWithFormat:@"<%@: %p (ctx=%p, type=%d, closed=%d)>",
[self class], self, self.context, (int)self.type, (int)self.closed];
desc = [NSString stringWithFormat:
@"<%@: %p (ctx=%p, type=%@, endpoint=%@, closed=%d)>",
[self class], self, self.context, typeName, self.endpoint,
(int)self.closed];
return desc;
}

Expand Down Expand Up @@ -101,8 +120,10 @@ - (NSData *)dataForOption:(ZMQSocketOption)option {
}

#pragma mark Endpoint Configuration
- (BOOL)bindToEndpoint:(NSString *)endpoint {
const char *addr = [endpoint UTF8String];
@synthesize endpoint;
- (BOOL)bindToEndpoint:(NSString *)endpoint_ {
[self setEndpoint:endpoint_];
const char *addr = [endpoint_ UTF8String];
int err = zmq_bind(self.socket, addr);
if (err) {
ZMQLogError(self, @"zmq_bind");
Expand All @@ -111,8 +132,9 @@ - (BOOL)bindToEndpoint:(NSString *)endpoint {
return YES;
}

- (BOOL)connectToEndpoint:(NSString *)endpoint {
const char *addr = [endpoint UTF8String];
- (BOOL)connectToEndpoint:(NSString *)endpoint_ {
[self setEndpoint:endpoint_];
const char *addr = [endpoint_ UTF8String];
int err = zmq_connect(self.socket, addr);
if (err) {
ZMQLogError(self, @"zmq_connect");
Expand Down
6 changes: 6 additions & 0 deletions zmqobjc.xcodeproj/project.pbxproj
Expand Up @@ -17,6 +17,8 @@
3F62DF7F11DEDAAA008EA78B /* ZMQSocket.m in Sources */ = {isa = PBXBuildFile; fileRef = 3F62DF7B11DEDAAA008EA78B /* ZMQSocket.m */; };
3F62E09E11DEFB28008EA78B /* Foundation.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 3F62E09D11DEFB28008EA78B /* Foundation.framework */; };
3F62E0A211DEFB4B008EA78B /* libzmq.dylib in Frameworks */ = {isa = PBXBuildFile; fileRef = 3F62E0A111DEFB4B008EA78B /* libzmq.dylib */; };
3FFFDB8012F121D200E2D54F /* ZMQObjC.h in Headers */ = {isa = PBXBuildFile; fileRef = 3FFFDB7F12F121D200E2D54F /* ZMQObjC.h */; };
3FFFDB8112F121D200E2D54F /* ZMQObjC.h in Headers */ = {isa = PBXBuildFile; fileRef = 3FFFDB7F12F121D200E2D54F /* ZMQObjC.h */; };
/* End PBXBuildFile section */

/* Begin PBXFileReference section */
Expand All @@ -30,6 +32,7 @@
3F62E09011DEF8AC008EA78B /* dylib.xcconfig */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.xcconfig; path = dylib.xcconfig; sourceTree = "<group>"; };
3F62E09D11DEFB28008EA78B /* Foundation.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = Foundation.framework; path = System/Library/Frameworks/Foundation.framework; sourceTree = SDKROOT; };
3F62E0A111DEFB4B008EA78B /* libzmq.dylib */ = {isa = PBXFileReference; lastKnownFileType = "compiled.mach-o.dylib"; name = libzmq.dylib; path = /usr/local/lib/libzmq.dylib; sourceTree = "<absolute>"; };
3FFFDB7F12F121D200E2D54F /* ZMQObjC.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ZMQObjC.h; sourceTree = "<group>"; };
D2AAC0630554660B00DB518D /* libzmqobjc.dylib */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.dylib"; includeInIndex = 0; path = libzmqobjc.dylib; sourceTree = BUILT_PRODUCTS_DIR; };
/* End PBXFileReference section */

Expand Down Expand Up @@ -69,6 +72,7 @@
08FB7795FE84155DC02AAC07 /* Source */ = {
isa = PBXGroup;
children = (
3FFFDB7F12F121D200E2D54F /* ZMQObjC.h */,
3F62DF6411DED7A4008EA78B /* ZMQContext.h */,
3F62DF6511DED7A4008EA78B /* ZMQContext.m */,
3F62DF7A11DEDAAA008EA78B /* ZMQSocket.h */,
Expand Down Expand Up @@ -105,6 +109,7 @@
files = (
3F62DF6811DED7A4008EA78B /* ZMQContext.h in Headers */,
3F62DF7E11DEDAAA008EA78B /* ZMQSocket.h in Headers */,
3FFFDB8012F121D200E2D54F /* ZMQObjC.h in Headers */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand All @@ -114,6 +119,7 @@
files = (
3F62DF6611DED7A4008EA78B /* ZMQContext.h in Headers */,
3F62DF7C11DEDAAA008EA78B /* ZMQSocket.h in Headers */,
3FFFDB8112F121D200E2D54F /* ZMQObjC.h in Headers */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand Down

0 comments on commit f5e254a

Please sign in to comment.