-
Notifications
You must be signed in to change notification settings - Fork 484
/
ZMQ.java
1683 lines (1437 loc) · 51.2 KB
/
ZMQ.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.jeromq;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.Selector;
import zmq.Ctx;
import zmq.DecoderBase;
import zmq.EncoderBase;
import zmq.SocketBase;
import zmq.ZError;
public class ZMQ {
/**
* Socket flag to indicate that more message parts are coming.
*/
public static final int SNDMORE = zmq.ZMQ.ZMQ_SNDMORE;
// Values for flags in Socket's send and recv functions.
/**
* Socket flag to indicate a nonblocking send or recv mode.
*/
public static final int DONTWAIT = zmq.ZMQ.ZMQ_DONTWAIT;
public static final int NOBLOCK = zmq.ZMQ.ZMQ_DONTWAIT;
// Socket types, used when creating a Socket.
/**
* Flag to specify a exclusive pair of sockets.
*/
public static final int PAIR = zmq.ZMQ.ZMQ_PAIR;
/**
* Flag to specify a PUB socket, receiving side must be a SUB or XSUB.
*/
public static final int PUB = zmq.ZMQ.ZMQ_PUB;
/**
* Flag to specify the receiving part of the PUB or XPUB socket.
*/
public static final int SUB = zmq.ZMQ.ZMQ_SUB;
/**
* Flag to specify a REQ socket, receiving side must be a REP.
*/
public static final int REQ = zmq.ZMQ.ZMQ_REQ;
/**
* Flag to specify the receiving part of a REQ socket.
*/
public static final int REP = zmq.ZMQ.ZMQ_REP;
/**
* Flag to specify a DEALER socket (aka XREQ).
* DEALER is really a combined ventilator / sink
* that does load-balancing on output and fair-queuing on input
* with no other semantics. It is the only socket type that lets
* you shuffle messages out to N nodes and shuffle the replies
* back, in a raw bidirectional asynch pattern.
*/
public static final int DEALER = zmq.ZMQ.ZMQ_DEALER;
/**
* Old alias for DEALER flag.
* Flag to specify a XREQ socket, receiving side must be a XREP.
*
* @deprecated As of release 3.0 of zeromq, replaced by {@link #DEALER}
*/
public static final int XREQ = DEALER;
/**
* Flag to specify ROUTER socket (aka XREP).
* ROUTER is the socket that creates and consumes request-reply
* routing envelopes. It is the only socket type that lets you route
* messages to specific connections if you know their identities.
*/
public static final int ROUTER = zmq.ZMQ.ZMQ_ROUTER;
/**
* Old alias for ROUTER flag.
* Flag to specify the receiving part of a XREQ socket.
*
* @deprecated As of release 3.0 of zeromq, replaced by {@link #ROUTER}
*/
public static final int XREP = ROUTER;
/**
* Flag to specify the receiving part of a PUSH socket.
*/
public static final int PULL = zmq.ZMQ.ZMQ_PULL;
/**
* Flag to specify a PUSH socket, receiving side must be a PULL.
*/
public static final int PUSH = zmq.ZMQ.ZMQ_PUSH;
/**
* Flag to specify a XPUB socket, receiving side must be a SUB or XSUB.
* Subscriptions can be received as a message. Subscriptions start with
* a '1' byte. Unsubscriptions start with a '0' byte.
*/
public static final int XPUB = zmq.ZMQ.ZMQ_XPUB;
/**
* Flag to specify the receiving part of the PUB or XPUB socket. Allows
*/
public static final int XSUB = zmq.ZMQ.ZMQ_XSUB;
/**
* Flag to specify a STREAMER device.
*/
public static final int STREAMER = zmq.ZMQ.ZMQ_STREAMER ;
/**
* Flag to specify a FORWARDER device.
*/
public static final int FORWARDER = zmq.ZMQ.ZMQ_FORWARDER ;
/**
* Flag to specify a QUEUE device.
*/
public static final int QUEUE = zmq.ZMQ.ZMQ_QUEUE ;
/**
* @see ZMQ#PULL
*/
@Deprecated
public static final int UPSTREAM = PULL;
/**
* @see ZMQ#PUSH
*/
@Deprecated
public static final int DOWNSTREAM = PUSH;
public static final int POLLIN = zmq.ZMQ.ZMQ_POLLIN;
public static final int POLLOUT = zmq.ZMQ.ZMQ_POLLOUT;
public static final int POLLERR = zmq.ZMQ.ZMQ_POLLERR;
/**
* Create a new Context.
*
* @param ioThreads
* Number of threads to use, usually 1 is sufficient for most use cases.
* @return the Context
*/
public static Context context(int ioThreads) {
return new Context(ioThreads);
}
public static Context context() {
return new Context(1);
}
public static class Context {
private final Ctx ctx;
/**
* Class constructor.
*
* @param ioThreads
* size of the threads pool to handle I/O operations.
*/
protected Context(int ioThreads) {
ctx = zmq.ZMQ.zmq_init(ioThreads);
}
/**
* This is an explicit "destructor". It can be called to ensure the corresponding 0MQ
* Context has been disposed of.
*/
public void term() {
ctx.terminate();
}
/**
* Create a new Socket within this context.
*
* @param type
* the socket type.
* @return the newly created Socket.
*/
public Socket socket(int type) {
return new Socket(this, type);
}
/**
* Create a new Poller within this context, with a default size.
*
* @return the newly created Poller.
*/
public Poller poller () {
return new Poller (this);
}
/**
* Create a new Poller within this context, with a specified initial size.
*
* @param size
* the poller initial size.
* @return the newly created Poller.
*/
public Poller poller (int size) {
return new Poller (this, size);
}
}
public static class Socket {
// This port range is defined by IANA for dynamic or private ports
// We use this when choosing a port for dynamic binding.
private static final int DYNFROM = 0xc000;
private static final int DYNTO = 0xffff;
private final Ctx ctx;
private final SocketBase base;
/**
* Class constructor.
*
* @param context
* a 0MQ context previously created.
* @param type
* the socket type.
*/
protected Socket(Context context_, int type) {
ctx = context_.ctx;
base = ctx.create_socket(type);
mayRaise();
}
protected Socket(SocketBase base_) {
ctx = null;
base = base_;
}
public SocketBase base() {
return base;
}
private final static void mayRaise() {
if (zmq.ZError.is(0) || zmq.ZError.is(zmq.ZError.EAGAIN) ) ;
else if (zmq.ZError.is(zmq.ZError.ETERM) )
throw new ZMQException.CtxTerminated();
else
throw new ZMQException(zmq.ZError.errno());
}
/**
* This is an explicit "destructor". It can be called to ensure the corresponding 0MQ Socket
* has been disposed of.
*/
public final void close() {
base.close();
}
/**
* The 'ZMQ_TYPE option shall retrieve the socket type for the specified
* 'socket'. The socket type is specified at socket creation time and
* cannot be modified afterwards.
*
* @return the socket type.
* @since 2.1.0
*/
public final int getType () {
return base.getsockopt(zmq.ZMQ.ZMQ_TYPE);
}
/**
* @see #setLinger(long)
*
* @return the linger period.
* @since 2.1.0
*/
public final long getLinger() {
return base.getsockopt(zmq.ZMQ.ZMQ_LINGER);
}
/**
* The 'ZMQ_LINGER' option shall retrieve the period for pending outbound
* messages to linger in memory after closing the socket. Value of -1 means
* infinite. Pending messages will be kept until they are fully transferred to
* the peer. Value of 0 means that all the pending messages are dropped immediately
* when socket is closed. Positive value means number of milliseconds to keep
* trying to send the pending messages before discarding them.
*
* @param linger
* the linger period.
* @since 2.1.0
*/
public final void setLinger (long value)
{
base.setsockopt (zmq.ZMQ.ZMQ_LINGER, (int) value);
}
/**
* @see #setReconnectIVL(long)
*
* @return the reconnectIVL.
* @since 3.0.0
*/
public final long getReconnectIVL() {
return base.getsockopt(zmq.ZMQ.ZMQ_RECONNECT_IVL_MAX);
}
/**
* @since 3.0.0
*/
public final void setReconnectIVL(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_RECONNECT_IVL_MAX, (int)value);
mayRaise();
}
/**
* @see #setBacklog(long)
*
* @return the backlog.
* @since 3.0.0
*/
public final long getBacklog() {
return base.getsockopt(zmq.ZMQ.ZMQ_BACKLOG);
}
/**
* @since 3.0.0
*/
public final void setBacklog(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_BACKLOG, (int)value);
mayRaise();
}
/**
* @see #setReconnectIVLMax(long)
*
* @return the reconnectIVLMax.
* @since 3.0.0
*/
public final long getReconnectIVLMax () {
return base.getsockopt(zmq.ZMQ.ZMQ_RECONNECT_IVL_MAX);
}
/**
* @since 3.0.0
*/
public final void setReconnectIVLMax (long value) {
base.setsockopt(zmq.ZMQ.ZMQ_RECONNECT_IVL_MAX, (int)value);
mayRaise();
}
/**
* @see #setMaxMsgSize(long)
*
* @return the maxMsgSize.
* @since 3.0.0
*/
public final long getMaxMsgSize() {
return (Long)base.getsockoptx(zmq.ZMQ.ZMQ_MAXMSGSIZE);
}
/**
* @since 3.0.0
*/
public final void setMaxMsgSize(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_MAXMSGSIZE, value);
mayRaise();
}
/**
* @see #setSndHWM(long)
*
* @return the SndHWM.
* @since 3.0.0
*/
public final long getSndHWM() {
return base.getsockopt(zmq.ZMQ.ZMQ_SNDHWM);
}
/**
* @since 3.0.0
*/
public final void setSndHWM(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_SNDHWM, (int)value);
mayRaise();
}
/**
* @see #setRcvHWM(long)
*
* @return the recvHWM period.
* @since 3.0.0
*/
public final long getRcvHWM() {
return base.getsockopt(zmq.ZMQ.ZMQ_RCVHWM);
}
/**
* @since 3.0.0
*/
public final void setRcvHWM(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_RCVHWM, (int)value);
mayRaise();
}
/**
* @see #setHWM(long)
*
* @return the High Water Mark.
*/
@Deprecated
public final long getHWM() {
return -1;
}
/**
* The 'ZMQ_HWM' option shall set the high water mark for the specified 'socket'. The high
* water mark is a hard limit on the maximum number of outstanding messages 0MQ shall queue
* in memory for any single peer that the specified 'socket' is communicating with.
*
* If this limit has been reached the socket shall enter an exceptional state and depending
* on the socket type, 0MQ shall take appropriate action such as blocking or dropping sent
* messages. Refer to the individual socket descriptions in the man page of zmq_socket[3] for
* details on the exact action taken for each socket type.
*
* @param hwm
* the number of messages to queue.
*/
public final void setHWM(long hwm) {
setSndHWM (hwm);
setRcvHWM (hwm);
}
/**
* @see #setSwap(long)
*
* @return the number of messages to swap at most.
*/
@Deprecated
public final long getSwap() {
// not support at zeromq 3
return -1L;
}
/**
* Get the Swap. The 'ZMQ_SWAP' option shall set the disk offload (swap) size for the
* specified 'socket'. A socket which has 'ZMQ_SWAP' set to a non-zero value may exceed its
* high water mark; in this case outstanding messages shall be offloaded to storage on disk
* rather than held in memory.
*
* @param swap
* The value of 'ZMQ_SWAP' defines the maximum size of the swap space in bytes.
*/
@Deprecated
public final void setSwap(long value) {
// not support at zeromq 3
}
/**
* @see #setAffinity(long)
*
* @return the affinity.
*/
public final long getAffinity() {
return (Long)base.getsockoptx(zmq.ZMQ.ZMQ_AFFINITY);
}
/**
* Get the Affinity. The 'ZMQ_AFFINITY' option shall set the I/O thread affinity for newly
* created connections on the specified 'socket'.
*
* Affinity determines which threads from the 0MQ I/O thread pool associated with the
* socket's _context_ shall handle newly created connections. A value of zero specifies no
* affinity, meaning that work shall be distributed fairly among all 0MQ I/O threads in the
* thread pool. For non-zero values, the lowest bit corresponds to thread 1, second lowest
* bit to thread 2 and so on. For example, a value of 3 specifies that subsequent
* connections on 'socket' shall be handled exclusively by I/O threads 1 and 2.
*
* See also in the man page of zmq_init[3] for details on allocating the number of I/O threads for a
* specific _context_.
*
* @param affinity
* the affinity.
*/
public final void setAffinity(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_AFFINITY, value);
mayRaise();
}
/**
* @see #setIdentity(byte[])
*
* @return the Identitiy.
*/
public final byte[] getIdentity() {
return (byte[]) base.getsockoptx(zmq.ZMQ.ZMQ_IDENTITY);
}
/**
* The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'. Socket
* identity determines if existing 0MQ infastructure (_message queues_, _forwarding
* devices_) shall be identified with a specific application and persist across multiple
* runs of the application.
*
* If the socket has no identity, each run of an application is completely separate from
* other runs. However, with identity set the socket shall re-use any existing 0MQ
* infrastructure configured by the previous run(s). Thus the application may receive
* messages that were sent in the meantime, _message queue_ limits shall be shared with
* previous run(s) and so on.
*
* Identity should be at least one byte and at most 255 bytes long. Identities starting with
* binary zero are reserved for use by 0MQ infrastructure.
*
* @param identity
*/
public final void setIdentity(byte[] identity) {
base.setsockopt(zmq.ZMQ.ZMQ_IDENTITY, identity);
mayRaise();
}
public final void setIdentity(String identity) {
setIdentity(identity.getBytes());
}
/**
* @see #setRate(long)
*
* @return the Rate.
*/
public final long getRate() {
return base.getsockopt(zmq.ZMQ.ZMQ_RATE);
}
/**
* The 'ZMQ_RATE' option shall set the maximum send or receive data rate for multicast
* transports such as in the man page of zmq_pgm[7] using the specified 'socket'.
*
* @param rate
*/
public final void setRate(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_RATE, (int)value);
mayRaise();
}
/**
* @see #setRecoveryInterval(long)
*
* @return the RecoveryIntervall.
*/
public final long getRecoveryInterval () {
return base.getsockopt(zmq.ZMQ.ZMQ_RECOVERY_IVL);
}
/**
* The 'ZMQ_RECOVERY_IVL' option shall set the recovery interval for multicast transports
* using the specified 'socket'. The recovery interval determines the maximum time in
* seconds that a receiver can be absent from a multicast group before unrecoverable data
* loss will occur.
*
* CAUTION: Excersize care when setting large recovery intervals as the data needed for
* recovery will be held in memory. For example, a 1 minute recovery interval at a data rate
* of 1Gbps requires a 7GB in-memory buffer. {Purpose of this Method}
*
* @param recovery_ivl
*/
public final void setRecoveryInterval (long value) {
base.setsockopt(zmq.ZMQ.ZMQ_RECOVERY_IVL, (int)value);
mayRaise();
}
/**
* @see #setMulticastLoop(boolean)
*
* @return the Multicast Loop.
*/
@Deprecated
public final boolean hasMulticastLoop () {
return false;
}
/**
* The 'ZMQ_MCAST_LOOP' option shall control whether data sent via multicast transports
* using the specified 'socket' can also be received by the sending host via loopback. A
* value of zero disables the loopback functionality, while the default value of 1 enables
* the loopback functionality. Leaving multicast loopback enabled when it is not required
* can have a negative impact on performance. Where possible, disable 'ZMQ_MCAST_LOOP' in
* production environments.
*
* @param mcast_loop
*/
@Deprecated
public final void setMulticastLoop (boolean mcast_loop) {
}
/**
* @see #setMulticastHops(long)
*
* @return the Multicast Hops.
*/
public final long getMulticastHops () {
return base.getsockopt(zmq.ZMQ.ZMQ_MULTICAST_HOPS);
}
/**
* Sets the time-to-live field in every multicast packet sent from this socket.
* The default is 1 which means that the multicast packets don't leave the local
* network.
*
* @param mcast_hops
*/
public final void setMulticastHops (long value) {
base.setsockopt(zmq.ZMQ.ZMQ_MULTICAST_HOPS, (int)value);
mayRaise();
}
/**
* @see #setReceiveTimeOut(long)
*
* @return the Receive Timeout
*/
public final int getReceiveTimeOut() {
return base.getsockopt(zmq.ZMQ.ZMQ_RCVTIMEO);
}
/**
* Sets the timeout for receive operation on the socket. If the value is 0, recv
* will return immediately, with null if there is no message to receive.
* If the value is -1, it will block until a message is available. For all other
* values, it will wait for a message for that amount of time before returning with
* an null.
*
* @param timeout
*/
public final void setReceiveTimeOut(int value) {
base.setsockopt(zmq.ZMQ.ZMQ_RCVTIMEO, value);
mayRaise();
}
/**
* @see #setSendTimeOut(long)
*
* @return the Send Timeout.
*/
public final int getSendTimeOut() {
return (int)base.getsockopt(zmq.ZMQ.ZMQ_SNDTIMEO);
}
/**
* Sets the timeout for send operation on the socket. If the value is 0, send
* will return immediately, with a false if the message cannot be sent.
* If the value is -1, it will block until the message is sent. For all other
* values, it will try to send the message for that amount of time before
* returning with a false.
*
* @param timeout
*/
public final void setSendTimeOut(int value) {
base.setsockopt(zmq.ZMQ.ZMQ_SNDTIMEO, value);
mayRaise();
}
/**
* @see #setSendBufferSize(long)
*
* @return the kernel send buffer size.
*/
public final long getSendBufferSize() {
return base.getsockopt(zmq.ZMQ.ZMQ_SNDBUF);
}
/**
* The 'ZMQ_SNDBUF' option shall set the underlying kernel transmit buffer size for the
* 'socket' to the specified size in bytes. A value of zero means leave the OS default
* unchanged. For details please refer to your operating system documentation for the
* 'SO_SNDBUF' socket option.
*
* @param sndbuf
*/
public final void setSendBufferSize(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_SNDBUF, (int)value);
mayRaise();
}
/**
* @see #setReceiveBufferSize(long)
*
* @return the kernel receive buffer size.
*/
public final long getReceiveBufferSize() {
return base.getsockopt(zmq.ZMQ.ZMQ_RCVBUF);
}
/**
* The 'ZMQ_RCVBUF' option shall set the underlying kernel receive buffer size for the
* 'socket' to the specified size in bytes. A value of zero means leave the OS default
* unchanged. For details refer to your operating system documentation for the 'SO_RCVBUF'
* socket option.
*
* @param rcvbuf
*/
public final void setReceiveBufferSize(long value) {
base.setsockopt(zmq.ZMQ.ZMQ_RCVBUF, (int)value);
mayRaise();
}
/**
* The 'ZMQ_RCVMORE' option shall return a boolean value indicating if the multi-part
* message currently being read from the specified 'socket' has more message parts to
* follow. If there are no message parts to follow or if the message currently being read is
* not a multi-part message a value of zero shall be returned. Otherwise, a value of 1 shall
* be returned.
*
* @return true if there are more messages to receive.
*/
public final boolean hasReceiveMore ()
{
return base.getsockopt (zmq.ZMQ.ZMQ_RCVMORE) == 1;
}
/**
* The 'ZMQ_FD' option shall retrieve file descriptor associated with the 0MQ
* socket. The descriptor can be used to integrate 0MQ socket into an existing
* event loop. It should never be used for anything else than polling -- such as
* reading or writing. The descriptor signals edge-triggered IN event when
* something has happened within the 0MQ socket. It does not necessarily mean that
* the messages can be read or written. Check ZMQ_EVENTS option to find out whether
* the 0MQ socket is readable or writeable.
*
* @return the underlying file descriptor.
* @since 2.1.0
*/
public final SelectableChannel getFD() {
return (SelectableChannel)base.getsockoptx(zmq.ZMQ.ZMQ_FD);
}
/**
* The 'ZMQ_EVENTS' option shall retrieve event flags for the specified socket.
* If a message can be read from the socket ZMQ_POLLIN flag is set. If message can
* be written to the socket ZMQ_POLLOUT flag is set.
*
* @return the mask of outstanding events.
* @since 2.1.0
*/
public final int getEvents() {
return base.getsockopt(zmq.ZMQ.ZMQ_EVENTS);
}
/**
* The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB' socket.
* Newly created 'ZMQ_SUB' sockets shall filter out all incoming messages, therefore you
* should call this option to establish an initial message filter.
*
* An empty 'option_value' of length zero shall subscribe to all incoming messages. A
* non-empty 'option_value' shall subscribe to all messages beginning with the specified
* prefix. Mutiple filters may be attached to a single 'ZMQ_SUB' socket, in which case a
* message shall be accepted if it matches at least one filter.
*
* @param topic
*/
public final void subscribe(byte[] topic) {
base.setsockopt(zmq.ZMQ.ZMQ_SUBSCRIBE, topic);
mayRaise();
}
public final void subscribe(String topic) {
subscribe(topic.getBytes());
}
/**
* The 'ZMQ_UNSUBSCRIBE' option shall remove an existing message filter on a 'ZMQ_SUB'
* socket. The filter specified must match an existing filter previously established with
* the 'ZMQ_SUBSCRIBE' option. If the socket has several instances of the same filter
* attached the 'ZMQ_UNSUBSCRIBE' option shall remove only one instance, leaving the rest in
* place and functional.
*
* @param topic
*/
public final void unsubscribe(byte[] topic) {
base.setsockopt(zmq.ZMQ.ZMQ_UNSUBSCRIBE, topic);
mayRaise();
}
public final void unsubscribe(String topic) {
unsubscribe(topic.getBytes());
}
/**
* Set custom Encoder
* @param cls
*/
public final void setEncoder(Class<? extends EncoderBase> cls) {
base.setsockopt(zmq.ZMQ.ZMQ_ENCODER, cls);
}
/**
* Set custom Decoder
* @param cls
*/
public final void setDecoder(Class<? extends DecoderBase> cls) {
base.setsockopt(zmq.ZMQ.ZMQ_DECODER, cls);
}
/**
* Bind to network interface. Start listening for new connections.
*
* @param addr
* the endpoint to bind to.
*/
public final int bind (String addr)
{
return bind (addr, DYNFROM, DYNTO);
}
/**
* Bind to network interface. Start listening for new connections.
*
* @param addr
* the endpoint to bind to.
* @param min
* The minimum port in the range of ports to try.
* @param max
* The maximum port in the range of ports to try.
*/
private final int bind (String addr, int min, int max)
{
if (addr.endsWith (":*")) {
int port = min;
String prefix = addr.substring (0, addr.lastIndexOf (':') + 1);
while (port <= max) {
addr = prefix + port;
// Try to bind on the next plausible port
if (base.bind (addr))
return port;
port++;
}
return -1;
} else {
if (base.bind(addr)) {
int port = 0;
try {
port = Integer.parseInt (
addr.substring (addr.lastIndexOf (':') + 1));
} catch (NumberFormatException e) {
}
return port;
} else {
return -1;
}
}
}
/**
* Bind to network interface to a random port. Start listening for new
* connections.
*
* @param addr
* the endpoint to bind to.
*/
public int bindToRandomPort (String addr)
{
return bind (addr + ":*", DYNFROM, DYNTO);
}
/**
* Bind to network interface to a random port. Start listening for new
* connections.
*
* @param addr
* the endpoint to bind to.
* @param min
* The minimum port in the range of ports to try.
* @param max
* The maximum port in the range of ports to try.
*/
public int bindToRandomPort (String addr, int min, int max)
{
return bind (addr + ":*", min, max);
}
/**
* Connect to remote application.
*
* @param addr
* the endpoint to connect to.
*/
public final boolean connect(String addr_) {
return base.connect(addr_);
}
public final boolean send (String data) {
zmq.Msg msg = new zmq.Msg(data);
return base.send(msg, 0);
}
public final boolean sendMore (String data) {
zmq.Msg msg = new zmq.Msg(data);
return base.send(msg, zmq.ZMQ.ZMQ_SNDMORE);
}
public final boolean send (String data, int flags) {
zmq.Msg msg = new zmq.Msg(data);
return base.send(msg, flags);
}
public final boolean send (byte[] data) {
zmq.Msg msg = new zmq.Msg(data);
return base.send(msg, 0);
}
public final boolean sendMore (byte[] data) {
zmq.Msg msg = new zmq.Msg(data);
return base.send(msg, zmq.ZMQ.ZMQ_SNDMORE);
}
public final boolean send (byte[] data, int flags) {
zmq.Msg msg = new zmq.Msg(data);
return base.send(msg, flags);
}
/**
* Send a message.
*
* @param msg
* the message to send, as an array of bytes.
* @return true if send was successful, false otherwise.
*/
public final boolean send (Msg msg) {
return base.send(msg.base, 0);
}
/**
* Send a message.
*
* @param msg
* the message to send, as an array of bytes.
* @return true if send was successful, false otherwise.
*/
public final boolean sendMore (Msg msg) {
return base.send(msg.base, zmq.ZMQ.ZMQ_SNDMORE);
}
/**
* Send a message.
*
* @param msg
* the message to send, as an array of bytes.
* @param flags
* the flags to apply to the send operation.
* @return true if send was successful, false otherwise.
*/
public final boolean send (Msg msg, int flags) {
return base.send(msg.base, flags);
}
/**
* Receive a message.
*
* @return the message received, as an array of bytes; null on error.
*/
public final byte[] recv() {
zmq.Msg msg = base.recv(0);
if (msg != null) {
return msg.data();
}
mayRaise();