-
Notifications
You must be signed in to change notification settings - Fork 840
/
blocking_connection.py
2616 lines (2111 loc) · 105 KB
/
blocking_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""The blocking connection adapter module implements blocking semantics on top
of Pika's core AMQP driver. While most of the asynchronous expectations are
removed when using the blocking connection adapter, it attempts to remain true
to the asynchronous RPC nature of the AMQP protocol, supporting server sent
RPC commands.
The user facing classes in the module consist of the
:py:class:`~pika.adapters.blocking_connection.BlockingConnection`
and the :class:`~pika.adapters.blocking_connection.BlockingChannel`
classes.
"""
# Suppress too-many-lines
# pylint: disable=C0302
# Disable "access to protected member warnings: this wrapper implementation is
# a friend of those instances
# pylint: disable=W0212
from collections import namedtuple, deque
import contextlib
import functools
import logging
import time
import pika.channel
from pika import compat
from pika import exceptions
import pika.spec
# NOTE: import SelectConnection after others to avoid circular depenency
from pika.adapters.select_connection import SelectConnection
LOGGER = logging.getLogger(__name__)
class _CallbackResult(object):
""" CallbackResult is a non-thread-safe implementation for receiving
callback results; INTERNAL USE ONLY!
"""
__slots__ = ('_value_class', '_ready', '_values')
def __init__(self, value_class=None):
"""
:param callable value_class: only needed if the CallbackResult
instance will be used with
`set_value_once` and `append_element`.
*args and **kwargs of the value setter
methods will be passed to this class.
"""
self._value_class = value_class
self._ready = None
self._values = None
self.reset()
def reset(self):
"""Reset value, but not _value_class"""
self._ready = False
self._values = None
def __bool__(self):
""" Called by python runtime to implement truth value testing and the
built-in operation bool(); NOTE: python 3.x
"""
return self.is_ready()
# python 2.x version of __bool__
__nonzero__ = __bool__
def __enter__(self):
""" Entry into context manager that automatically resets the object
on exit; this usage pattern helps garbage-collection by eliminating
potential circular references.
"""
return self
def __exit__(self, *args, **kwargs):
"""Reset value"""
self.reset()
def is_ready(self):
"""
:returns: True if the object is in a signaled state
"""
return self._ready
@property
def ready(self):
"""True if the object is in a signaled state"""
return self._ready
def signal_once(self, *_args, **_kwargs):
""" Set as ready
:raises AssertionError: if result was already signalled
"""
assert not self._ready, '_CallbackResult was already set'
self._ready = True
def set_value_once(self, *args, **kwargs):
""" Set as ready with value; the value may be retrieved via the `value`
property getter
:raises AssertionError: if result was already set
"""
self.signal_once()
try:
self._values = (self._value_class(*args, **kwargs),)
except Exception:
LOGGER.error(
"set_value_once failed: value_class=%r; args=%r; kwargs=%r",
self._value_class, args, kwargs)
raise
def append_element(self, *args, **kwargs):
"""Append an element to values"""
assert not self._ready or isinstance(self._values, list), (
'_CallbackResult state is incompatible with append_element: '
'ready=%r; values=%r' % (self._ready, self._values))
try:
value = self._value_class(*args, **kwargs)
except Exception:
LOGGER.error(
"append_element failed: value_class=%r; args=%r; kwargs=%r",
self._value_class, args, kwargs)
raise
if self._values is None:
self._values = [value]
else:
self._values.append(value)
self._ready = True
@property
def value(self):
"""
:returns: a reference to the value that was set via `set_value_once`
:raises AssertionError: if result was not set or value is incompatible
with `set_value_once`
"""
assert self._ready, '_CallbackResult was not set'
assert isinstance(self._values, tuple) and len(self._values) == 1, (
'_CallbackResult value is incompatible with set_value_once: %r'
% (self._values,))
return self._values[0]
@property
def elements(self):
"""
:returns: a reference to the list containing one or more elements that
were added via `append_element`
:raises AssertionError: if result was not set or value is incompatible
with `append_element`
"""
assert self._ready, '_CallbackResult was not set'
assert isinstance(self._values, list) and self._values, (
'_CallbackResult value is incompatible with append_element: %r'
% (self._values,))
return self._values
class _IoloopTimerContext(object):
"""Context manager for registering and safely unregistering a
SelectConnection ioloop-based timer
"""
def __init__(self, duration, connection):
"""
:param float duration: non-negative timer duration in seconds
:param SelectConnection connection:
"""
assert hasattr(connection, 'add_timeout'), connection
self._duration = duration
self._connection = connection
self._callback_result = _CallbackResult()
self._timer_id = None
def __enter__(self):
"""Register a timer"""
self._timer_id = self._connection.add_timeout(
self._duration,
self._callback_result.signal_once)
return self
def __exit__(self, *_args, **_kwargs):
"""Unregister timer if it hasn't fired yet"""
if not self._callback_result:
self._connection.remove_timeout(self._timer_id)
def is_ready(self):
"""
:returns: True if timer has fired, False otherwise
"""
return self._callback_result.is_ready()
class _TimerEvt(object):
"""Represents a timer created via `BlockingConnection.add_timeout`"""
__slots__ = ('timer_id', '_callback')
def __init__(self, callback):
"""
:param callback: see callback_method in `BlockingConnection.add_timeout`
"""
self._callback = callback
# Will be set to timer id returned from the underlying implementation's
# `add_timeout` method
self.timer_id = None
def __repr__(self):
return '<%s timer_id=%s callback=%s>' % (self.__class__.__name__,
self.timer_id, self._callback)
def dispatch(self):
"""Dispatch the user's callback method"""
self._callback()
class _ConnectionBlockedUnblockedEvtBase(object):
"""Base class for `_ConnectionBlockedEvt` and `_ConnectionUnblockedEvt`"""
__slots__ = ('_callback', '_method_frame')
def __init__(self, callback, method_frame):
"""
:param callback: see callback_method parameter in
`BlockingConnection.add_on_connection_blocked_callback` and
`BlockingConnection.add_on_connection_unblocked_callback`
:param pika.frame.Method method_frame: with method_frame.method of type
`pika.spec.Connection.Blocked` or `pika.spec.Connection.Unblocked`
"""
self._callback = callback
self._method_frame = method_frame
def __repr__(self):
return '<%s callback=%s, frame=%s>' % (self.__class__.__name__,
self._callback,
self._method_frame)
def dispatch(self):
"""Dispatch the user's callback method"""
self._callback(self._method_frame)
class _ConnectionBlockedEvt(_ConnectionBlockedUnblockedEvtBase):
"""Represents a Connection.Blocked notification from RabbitMQ broker`"""
pass
class _ConnectionUnblockedEvt(_ConnectionBlockedUnblockedEvtBase):
"""Represents a Connection.Unblocked notification from RabbitMQ broker`"""
pass
class BlockingConnection(object):
"""The BlockingConnection creates a layer on top of Pika's asynchronous core
providing methods that will block until their expected response has
returned. Due to the asynchronous nature of the `Basic.Deliver` and
`Basic.Return` calls from RabbitMQ to your application, you can still
implement continuation-passing style asynchronous methods if you'd like to
receive messages from RabbitMQ using
:meth:`basic_consume <BlockingChannel.basic_consume>` or if you want to be
notified of a delivery failure when using
:meth:`basic_publish <BlockingChannel.basic_publish>`.
For more information about communicating with the blocking_connection
adapter, be sure to check out the
:class:`BlockingChannel <BlockingChannel>` class which implements the
:class:`Channel <pika.channel.Channel>` based communication for the
blocking_connection adapter.
To prevent recursion/reentrancy, the blocking connection and channel
implementations queue asynchronously-delivered events received
in nested context (e.g., while waiting for `BlockingConnection.channel` or
`BlockingChannel.queue_declare` to complete), dispatching them synchronously
once nesting returns to the desired context. This concerns all callbacks,
such as those registered via `BlockingConnection.add_timeout`,
`BlockingConnection.add_on_connection_blocked_callback`,
`BlockingConnection.add_on_connection_unblocked_callback`,
`BlockingChannel.basic_consume`, etc.
Blocked Connection deadlock avoidance: when RabbitMQ becomes low on
resources, it emits Connection.Blocked (AMQP extension) to the client
connection when client makes a resource-consuming request on that connection
or its channel (e.g., `Basic.Publish`); subsequently, RabbitMQ suspsends
processing requests from that connection until the affected resources are
restored. See http://www.rabbitmq.com/connection-blocked.html. This
may impact `BlockingConnection` and `BlockingChannel` operations in a
way that users might not be expecting. For example, if the user dispatches
`BlockingChannel.basic_publish` in non-publisher-confirmation mode while
RabbitMQ is in this low-resource state followed by a synchronous request
(e.g., `BlockingConnection.channel`, `BlockingChannel.consume`,
`BlockingChannel.basic_consume`, etc.), the synchronous request will block
indefinitely (until Connection.Unblocked) waiting for RabbitMQ to reply. If
the blocked state persists for a long time, the blocking operation will
appear to hang. In this state, `BlockingConnection` instance and its
channels will not dispatch user callbacks. SOLUTION: To break this potential
deadlock, applications may configure the `blocked_connection_timeout`
connection parameter when instantiating `BlockingConnection`. Upon blocked
connection timeout, this adapter will raise ConnectionClosed exception with
first exception arg of
`pika.connection.InternalCloseReasons.BLOCKED_CONNECTION_TIMEOUT`. See
`pika.connection.ConnectionParameters` documentation to learn more about
`blocked_connection_timeout` configuration.
"""
# Connection-opened callback args
_OnOpenedArgs = namedtuple('BlockingConnection__OnOpenedArgs',
'connection')
# Connection-establishment error callback args
_OnOpenErrorArgs = namedtuple('BlockingConnection__OnOpenErrorArgs',
'connection error')
# Connection-closing callback args
_OnClosedArgs = namedtuple('BlockingConnection__OnClosedArgs',
'connection reason_code reason_text')
# Channel-opened callback args
_OnChannelOpenedArgs = namedtuple(
'BlockingConnection__OnChannelOpenedArgs',
'channel')
def __init__(self, parameters=None, _impl_class=None):
"""Create a new instance of the Connection object.
:param pika.connection.Parameters parameters: Connection parameters
:param _impl_class: for tests/debugging only; implementation class;
None=default
:raises RuntimeError:
"""
# Used by the _acquire_event_dispatch decorator; when already greater
# than 0, event dispatch is already acquired higher up the call stack
self._event_dispatch_suspend_depth = 0
# Connection-specific events that are ready for dispatch: _TimerEvt,
# _ConnectionBlockedEvt, _ConnectionUnblockedEvt
self._ready_events = deque()
# Channel numbers of channels that are requesting a call to their
# BlockingChannel._dispatch_events method; See
# `_request_channel_dispatch`
self._channels_pending_dispatch = set()
# Receives on_open_callback args from Connection
self._opened_result = _CallbackResult(self._OnOpenedArgs)
# Receives on_open_error_callback args from Connection
self._open_error_result = _CallbackResult(self._OnOpenErrorArgs)
# Receives on_close_callback args from Connection
self._closed_result = _CallbackResult(self._OnClosedArgs)
# Set to True when when user calls close() on the connection
# NOTE: this is a workaround to detect socket error because
# on_close_callback passes reason_code=0 when called due to socket error
self._user_initiated_close = False
impl_class = _impl_class or SelectConnection
self._impl = impl_class(
parameters=parameters,
on_open_callback=self._opened_result.set_value_once,
on_open_error_callback=self._open_error_result.set_value_once,
on_close_callback=self._closed_result.set_value_once,
stop_ioloop_on_close=False)
self._impl.ioloop.activate_poller()
self._process_io_for_connection_setup()
def __repr__(self):
return '<%s impl=%r>' % (self.__class__.__name__, self._impl)
def _cleanup(self):
"""Clean up members that might inhibit garbage collection"""
self._impl.ioloop.close()
self._ready_events.clear()
self._opened_result.reset()
self._open_error_result.reset()
self._closed_result.reset()
@contextlib.contextmanager
def _acquire_event_dispatch(self):
""" Context manager that controls access to event dispatcher for
preventing reentrancy.
The "as" value is True if the managed code block owns the event
dispatcher and False if caller higher up in the call stack already owns
it. Only managed code that gets ownership (got True) is permitted to
dispatch
"""
try:
# __enter__ part
self._event_dispatch_suspend_depth += 1
yield self._event_dispatch_suspend_depth == 1
finally:
# __exit__ part
self._event_dispatch_suspend_depth -= 1
def _process_io_for_connection_setup(self):
""" Perform follow-up processing for connection setup request: flush
connection output and process input while waiting for connection-open
or connection-error.
:raises AMQPConnectionError: on connection open error
"""
if not self._open_error_result.ready:
self._flush_output(self._opened_result.is_ready,
self._open_error_result.is_ready)
if self._open_error_result.ready:
try:
exception_or_message = self._open_error_result.value.error
if isinstance(exception_or_message, Exception):
raise exception_or_message
raise exceptions.AMQPConnectionError(exception_or_message)
finally:
self._cleanup()
assert self._opened_result.ready
assert self._opened_result.value.connection is self._impl
def _flush_output(self, *waiters):
""" Flush output and process input while waiting for any of the given
callbacks to return true. The wait is aborted upon connection-close.
Otherwise, processing continues until the output is flushed AND at least
one of the callbacks returns true. If there are no callbacks, then
processing ends when all output is flushed.
:param waiters: sequence of zero or more callables taking no args and
returning true when it's time to stop processing.
Their results are OR'ed together.
"""
if self.is_closed:
raise exceptions.ConnectionClosed()
# Conditions for terminating the processing loop:
# connection closed
# OR
# empty outbound buffer and no waiters
# OR
# empty outbound buffer and any waiter is ready
is_done = (lambda:
self._closed_result.ready or
(not self._impl.outbound_buffer and
(not waiters or any(ready() for ready in waiters))))
# Process I/O until our completion condition is satisified
while not is_done():
self._impl.ioloop.poll()
self._impl.ioloop.process_timeouts()
if self._open_error_result.ready or self._closed_result.ready:
try:
if not self._user_initiated_close:
if self._open_error_result.ready:
maybe_exception = self._open_error_result.value.error
LOGGER.error('Connection open failed - %r',
maybe_exception)
if isinstance(maybe_exception, Exception):
raise maybe_exception
else:
raise exceptions.ConnectionClosed(maybe_exception)
else:
result = self._closed_result.value
LOGGER.error('Connection close detected; result=%r',
result)
raise exceptions.ConnectionClosed(result.reason_code,
result.reason_text)
else:
LOGGER.info('Connection closed; result=%r',
self._closed_result.value)
finally:
self._cleanup()
def _request_channel_dispatch(self, channel_number):
"""Called by BlockingChannel instances to request a call to their
_dispatch_events method or to terminate `process_data_events`;
BlockingConnection will honor these requests from a safe context.
:param int channel_number: positive channel number to request a call
to the channel's `_dispatch_events`; a negative channel number to
request termination of `process_data_events`
"""
self._channels_pending_dispatch.add(channel_number)
def _dispatch_channel_events(self):
"""Invoke the `_dispatch_events` method on open channels that requested
it
"""
if not self._channels_pending_dispatch:
return
with self._acquire_event_dispatch() as dispatch_acquired:
if not dispatch_acquired:
# Nested dispatch or dispatch blocked higher in call stack
return
candidates = list(self._channels_pending_dispatch)
self._channels_pending_dispatch.clear()
for channel_number in candidates:
if channel_number < 0:
# This was meant to terminate process_data_events
continue
try:
impl_channel = self._impl._channels[channel_number]
except KeyError:
continue
if impl_channel.is_open:
impl_channel._get_cookie()._dispatch_events()
def _on_timer_ready(self, evt):
"""Handle expiry of a timer that was registered via `add_timeout`
:param _TimerEvt evt:
"""
self._ready_events.append(evt)
def _on_threadsafe_callback(self, user_callback):
"""Handle callback that was registered via `add_callback_threadsafe`.
:param user_callback: callback passed to `add_callback_threadsafe` by
the application.
"""
# Turn it into a 0-delay timeout to take advantage of our existing logic
# that deals with reentrancy
self.add_timeout(0, user_callback)
def _on_connection_blocked(self, user_callback, method_frame):
"""Handle Connection.Blocked notification from RabbitMQ broker
:param callable user_callback: callback_method passed to
`add_on_connection_blocked_callback`
:param pika.frame.Method method_frame: method frame having `method`
member of type `pika.spec.Connection.Blocked`
"""
self._ready_events.append(
_ConnectionBlockedEvt(user_callback, method_frame))
def _on_connection_unblocked(self, user_callback, method_frame):
"""Handle Connection.Unblocked notification from RabbitMQ broker
:param callable user_callback: callback_method passed to
`add_on_connection_unblocked_callback`
:param pika.frame.Method method_frame: method frame having `method`
member of type `pika.spec.Connection.Blocked`
"""
self._ready_events.append(
_ConnectionUnblockedEvt(user_callback, method_frame))
def _dispatch_connection_events(self):
"""Dispatch ready connection events"""
if not self._ready_events:
return
with self._acquire_event_dispatch() as dispatch_acquired:
if not dispatch_acquired:
# Nested dispatch or dispatch blocked higher in call stack
return
# Limit dispatch to the number of currently ready events to avoid
# getting stuck in this loop
for _ in compat.xrange(len(self._ready_events)):
try:
evt = self._ready_events.popleft()
except IndexError:
# Some events (e.g., timers) must have been cancelled
break
evt.dispatch()
def add_on_connection_blocked_callback(self, callback_method):
"""Add a callback to be notified when RabbitMQ has sent a
`Connection.Blocked` frame indicating that RabbitMQ is low on
resources. Publishers can use this to voluntarily suspend publishing,
instead of relying on back pressure throttling. The callback
will be passed the `Connection.Blocked` method frame.
See also `ConnectionParameters.blocked_connection_timeout`.
:param method callback_method: Callback to call on `Connection.Blocked`,
having the signature `callback_method(pika.frame.Method)`, where the
method frame's `method` member is of type
`pika.spec.Connection.Blocked`
"""
self._impl.add_on_connection_blocked_callback(
functools.partial(self._on_connection_blocked, callback_method))
def add_on_connection_unblocked_callback(self, callback_method):
"""Add a callback to be notified when RabbitMQ has sent a
`Connection.Unblocked` frame letting publishers know it's ok
to start publishing again. The callback will be passed the
`Connection.Unblocked` method frame.
:param method callback_method: Callback to call on
`Connection.Unblocked`, having the signature
`callback_method(pika.frame.Method)`, where the method frame's
`method` member is of type `pika.spec.Connection.Unblocked`
"""
self._impl.add_on_connection_unblocked_callback(
functools.partial(self._on_connection_unblocked, callback_method))
def add_timeout(self, deadline, callback_method):
"""Create a single-shot timer to fire after deadline seconds. Do not
confuse with Tornado's timeout where you pass in the time you want to
have your callback called. Only pass in the seconds until it's to be
called.
NOTE: the timer callbacks are dispatched only in the scope of
specially-designated methods: see
`BlockingConnection.process_data_events` and
`BlockingChannel.start_consuming`.
:param float deadline: The number of seconds to wait to call callback
:param callable callback_method: The callback method with the signature
callback_method()
:returns: opaque timer id
"""
if not callable(callback_method):
raise ValueError(
'callback_method parameter must be callable, but got %r'
% (callback_method,))
evt = _TimerEvt(callback=callback_method)
timer_id = self._impl.add_timeout(
deadline,
functools.partial(self._on_timer_ready, evt))
evt.timer_id = timer_id
return timer_id
def add_callback_threadsafe(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this connection's thread.
NOTE: This is the only thread-safe method in `BlockingConnection`. All
other manipulations of `BlockingConnection` must be performed from the
connection's thread.
For example, a thread may request a call to the
`BlockingChannel.basic_ack` method of a `BlockingConnection` that is
running in a different thread via
```
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
```
NOTE: if you know that the requester is running on the same thread as
the connection it is more efficient to use the
`BlockingConnection.add_timeout()` method with a deadline of 0.
:param method callback: The callback method; must be callable
"""
self._impl.add_callback_threadsafe(
functools.partial(self._on_threadsafe_callback, callback))
def remove_timeout(self, timeout_id):
"""Remove a timer if it's still in the timeout stack
:param timeout_id: The opaque timer id to remove
"""
# Remove from the impl's timeout stack
self._impl.remove_timeout(timeout_id)
# Remove from ready events, if the timer fired already
for i, evt in enumerate(self._ready_events):
if isinstance(evt, _TimerEvt) and evt.timer_id == timeout_id:
index_to_remove = i
break
else:
# Not found
return
del self._ready_events[index_to_remove]
def close(self, reply_code=200, reply_text='Normal shutdown'):
"""Disconnect from RabbitMQ. If there are any open channels, it will
attempt to close them prior to fully disconnecting. Channels which
have active consumers will attempt to send a Basic.Cancel to RabbitMQ
to cleanly stop the delivery of messages prior to closing the channel.
:param int reply_code: The code number for the close
:param str reply_text: The text reason for the close
"""
if self.is_closed:
LOGGER.debug('Close called on closed connection (%s): %s',
reply_code, reply_text)
return
LOGGER.info('Closing connection (%s): %s', reply_code, reply_text)
self._user_initiated_close = True
# Close channels that remain opened
for impl_channel in pika.compat.dictvalues(self._impl._channels):
channel = impl_channel._get_cookie()
if channel.is_open:
try:
channel.close(reply_code, reply_text)
except exceptions.ChannelClosed as exc:
# Log and suppress broker-closed channel
LOGGER.warning('Got ChannelClosed while closing channel '
'from connection.close: %r', exc)
# Close the connection
self._impl.close(reply_code, reply_text)
self._flush_output(self._closed_result.is_ready)
def process_data_events(self, time_limit=0):
"""Will make sure that data events are processed. Dispatches timer and
channel callbacks if not called from the scope of BlockingConnection or
BlockingChannel callback. Your app can block on this method.
:param float time_limit: suggested upper bound on processing time in
seconds. The actual blocking time depends on the granularity of the
underlying ioloop. Zero means return as soon as possible. None means
there is no limit on processing time and the function will block
until I/O produces actionable events. Defaults to 0 for backward
compatibility. This parameter is NEW in pika 0.10.0.
"""
with self._acquire_event_dispatch() as dispatch_acquired:
# Check if we can actually process pending events
common_terminator = lambda: bool(dispatch_acquired and
(self._channels_pending_dispatch or self._ready_events))
if time_limit is None:
self._flush_output(common_terminator)
else:
with _IoloopTimerContext(time_limit, self._impl) as timer:
self._flush_output(timer.is_ready, common_terminator)
if self._ready_events:
self._dispatch_connection_events()
if self._channels_pending_dispatch:
self._dispatch_channel_events()
def sleep(self, duration):
"""A safer way to sleep than calling time.sleep() directly that would
keep the adapter from ignoring frames sent from the broker. The
connection will "sleep" or block the number of seconds specified in
duration in small intervals.
:param float duration: The time to sleep in seconds
"""
assert duration >= 0, duration
deadline = time.time() + duration
time_limit = duration
# Process events at least once
while True:
self.process_data_events(time_limit)
time_limit = deadline - time.time()
if time_limit <= 0:
break
def channel(self, channel_number=None):
"""Create a new channel with the next available channel number or pass
in a channel number to use. Must be non-zero if you would like to
specify but it is recommended that you let Pika manage the channel
numbers.
:rtype: pika.adapters.blocking_connection.BlockingChannel
"""
with _CallbackResult(self._OnChannelOpenedArgs) as opened_args:
impl_channel = self._impl.channel(
on_open_callback=opened_args.set_value_once,
channel_number=channel_number)
# Create our proxy channel
channel = BlockingChannel(impl_channel, self)
# Link implementation channel with our proxy channel
impl_channel._set_cookie(channel)
# Drive I/O until Channel.Open-ok
channel._flush_output(opened_args.is_ready)
return channel
def __enter__(self):
# Prepare `with` context
return self
def __exit__(self, exc_type, value, traceback):
# Close connection after `with` context
self.close()
#
# Connections state properties
#
@property
def is_closed(self):
"""
Returns a boolean reporting the current connection state.
"""
return self._impl.is_closed
@property
def is_closing(self):
"""
Returns True if connection is in the process of closing due to
client-initiated `close` request, but closing is not yet complete.
"""
return self._impl.is_closing
@property
def is_open(self):
"""
Returns a boolean reporting the current connection state.
"""
return self._impl.is_open
#
# Properties that reflect server capabilities for the current connection
#
@property
def basic_nack_supported(self):
"""Specifies if the server supports basic.nack on the active connection.
:rtype: bool
"""
return self._impl.basic_nack
@property
def consumer_cancel_notify_supported(self):
"""Specifies if the server supports consumer cancel notification on the
active connection.
:rtype: bool
"""
return self._impl.consumer_cancel_notify
@property
def exchange_exchange_bindings_supported(self):
"""Specifies if the active connection supports exchange to exchange
bindings.
:rtype: bool
"""
return self._impl.exchange_exchange_bindings
@property
def publisher_confirms_supported(self):
"""Specifies if the active connection can use publisher confirmations.
:rtype: bool
"""
return self._impl.publisher_confirms
# Legacy property names for backward compatibility
basic_nack = basic_nack_supported
consumer_cancel_notify = consumer_cancel_notify_supported
exchange_exchange_bindings = exchange_exchange_bindings_supported
publisher_confirms = publisher_confirms_supported
class _ChannelPendingEvt(object):
"""Base class for BlockingChannel pending events"""
pass
class _ConsumerDeliveryEvt(_ChannelPendingEvt):
"""This event represents consumer message delivery `Basic.Deliver`; it
contains method, properties, and body of the delivered message.
"""
__slots__ = ('method', 'properties', 'body')
def __init__(self, method, properties, body):
"""
:param spec.Basic.Deliver method: NOTE: consumer_tag and delivery_tag
are valid only within source channel
:param spec.BasicProperties properties: message properties
:param body: message body; empty string if no body
:type body: str or unicode
"""
self.method = method
self.properties = properties
self.body = body
class _ConsumerCancellationEvt(_ChannelPendingEvt):
"""This event represents server-initiated consumer cancellation delivered to
client via Basic.Cancel. After receiving Basic.Cancel, there will be no
further deliveries for the consumer identified by `consumer_tag` in
`Basic.Cancel`
"""
__slots__ = ('method_frame',)
def __init__(self, method_frame):
"""
:param pika.frame.Method method_frame: method frame with method of type
`spec.Basic.Cancel`
"""
self.method_frame = method_frame
def __repr__(self):
return '<%s method_frame=%r>' % (self.__class__.__name__,
self.method_frame)
@property
def method(self):
"""method of type spec.Basic.Cancel"""
return self.method_frame.method
class _ReturnedMessageEvt(_ChannelPendingEvt):
"""This event represents a message returned by broker via `Basic.Return`"""
__slots__ = ('callback', 'channel', 'method', 'properties', 'body')
def __init__(self, callback, channel, method, properties, body):
"""
:param callable callback: user's callback, having the signature
callback(channel, method, properties, body), where
channel: pika.Channel
method: pika.spec.Basic.Return
properties: pika.spec.BasicProperties
body: str, unicode, or bytes (python 3.x)
:param pika.Channel channel:
:param pika.spec.Basic.Return method:
:param pika.spec.BasicProperties properties:
:param body: str, unicode, or bytes (python 3.x)
"""
self.callback = callback
self.channel = channel
self.method = method
self.properties = properties
self.body = body
def __repr__(self):
return ('<%s callback=%r channel=%r method=%r properties=%r '
'body=%.300r>') % (self.__class__.__name__, self.callback,
self.channel, self.method, self.properties,
self.body)
def dispatch(self):
"""Dispatch user's callback"""
self.callback(self.channel, self.method, self.properties, self.body)
class ReturnedMessage(object):
"""Represents a message returned via Basic.Return in publish-acknowledgments
mode
"""
__slots__ = ('method', 'properties', 'body')
def __init__(self, method, properties, body):
"""
:param spec.Basic.Return method:
:param spec.BasicProperties properties: message properties
:param body: message body; empty string if no body
:type body: str or unicode
"""
self.method = method
self.properties = properties
self.body = body
class _ConsumerInfo(object):
"""Information about an active consumer"""
__slots__ = ('consumer_tag', 'no_ack', 'consumer_cb',
'alternate_event_sink', 'state')
# Consumer states
SETTING_UP = 1