-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathamp.py
More file actions
2849 lines (2277 loc) · 95.2 KB
/
amp.py
File metadata and controls
2849 lines (2277 loc) · 95.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- test-case-name: twisted.test.test_amp -*-
# Copyright (c) 2005 Divmod, Inc.
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
This module implements AMP, the Asynchronous Messaging Protocol.
AMP is a protocol for sending multiple asynchronous request/response pairs over
the same connection. Requests and responses are both collections of key/value
pairs.
AMP is a very simple protocol which is not an application. This module is a
"protocol construction kit" of sorts; it attempts to be the simplest wire-level
implementation of Deferreds. AMP provides the following base-level features:
- Asynchronous request/response handling (hence the name)
- Requests and responses are both key/value pairs
- Binary transfer of all data: all data is length-prefixed. Your
application will never need to worry about quoting.
- Command dispatching (like HTTP Verbs): the protocol is extensible, and
multiple AMP sub-protocols can be grouped together easily.
You can also use AMP to tunnel other protocols: because AMP has well-defined
message boundaries and maintains all incoming and outgoing requests for you,
you can start a connection over AMP and then switch to another protocol. This
makes it ideal for firewall-traversal applications where you may have only one
forwarded port but multiple applications that want to use it.
Using AMP with Twisted is simple. Each message is a command, with a response.
You begin by defining a command type. Commands specify their input and output
in terms of the types that they expect to see in the request and response
key-value pairs. Here's an example of a command that adds two integers, 'a'
and 'b'::
class Sum(amp.Command):
arguments = [('a', amp.Integer()),
('b', amp.Integer())]
response = [('total', amp.Integer())]
Once you have specified a command, you need to make it part of a protocol, and
define a responder for it. Here's a 'JustSum' protocol that includes a
responder for our 'Sum' command::
class JustSum(amp.AMP):
def sum(self, a, b):
total = a + b
print 'Did a sum: %d + %d = %d' % (a, b, total)
return {'total': total}
Sum.responder(sum)
Later, when you want to actually do a sum, the following expression will return
a L{Deferred} which will fire with the result::
ClientCreator(reactor, amp.AMP).connectTCP(...).addCallback(
lambda p: p.callRemote(Sum, a=13, b=81)).addCallback(
lambda result: result['total'])
Command responders may also return Deferreds, causing the response to be sent
only once the Deferred fires::
class DelayedSum(amp.AMP):
def slowSum(self, a, b):
total = a + b
result = defer.Deferred()
reactor.callLater(3, result.callback, {'total': total})
return result
Sum.responder(slowSum)
This is transparent to the caller.
You can also define the propagation of specific errors in AMP. For example,
for the slightly more complicated case of division, we might have to deal with
division by zero::
class Divide(amp.Command):
arguments = [('numerator', amp.Integer()),
('denominator', amp.Integer())]
response = [('result', amp.Float())]
errors = {ZeroDivisionError: 'ZERO_DIVISION'}
The 'errors' mapping here tells AMP that if a responder to Divide emits a
L{ZeroDivisionError}, then the other side should be informed that an error of
the type 'ZERO_DIVISION' has occurred. Writing a responder which takes
advantage of this is very simple - just raise your exception normally::
class JustDivide(amp.AMP):
def divide(self, numerator, denominator):
result = numerator / denominator
print 'Divided: %d / %d = %d' % (numerator, denominator, total)
return {'result': result}
Divide.responder(divide)
On the client side, the errors mapping will be used to determine what the
'ZERO_DIVISION' error means, and translated into an asynchronous exception,
which can be handled normally as any L{Deferred} would be::
def trapZero(result):
result.trap(ZeroDivisionError)
print "Divided by zero: returning INF"
return 1e1000
ClientCreator(reactor, amp.AMP).connectTCP(...).addCallback(
lambda p: p.callRemote(Divide, numerator=1234,
denominator=0)
).addErrback(trapZero)
For a complete, runnable example of both of these commands, see the files in
the Twisted repository::
doc/core/examples/ampserver.py
doc/core/examples/ampclient.py
On the wire, AMP is a protocol which uses 2-byte lengths to prefix keys and
values, and empty keys to separate messages::
<2-byte length><key><2-byte length><value>
<2-byte length><key><2-byte length><value>
...
<2-byte length><key><2-byte length><value>
<NUL><NUL> # Empty Key == End of Message
And so on. Because it's tedious to refer to lengths and NULs constantly, the
documentation will refer to packets as if they were newline delimited, like
so::
C: _command: sum
C: _ask: ef639e5c892ccb54
C: a: 13
C: b: 81
S: _answer: ef639e5c892ccb54
S: total: 94
Notes:
In general, the order of keys is arbitrary. Specific uses of AMP may impose an
ordering requirement, but unless this is specified explicitly, any ordering may
be generated and any ordering must be accepted. This applies to the
command-related keys I{_command} and I{_ask} as well as any other keys.
Values are limited to the maximum encodable size in a 16-bit length, 65535
bytes.
Keys are limited to the maximum encodable size in a 8-bit length, 255 bytes.
Note that we still use 2-byte lengths to encode keys. This small redundancy
has several features:
- If an implementation becomes confused and starts emitting corrupt data,
or gets keys confused with values, many common errors will be signalled
immediately instead of delivering obviously corrupt packets.
- A single NUL will separate every key, and a double NUL separates
messages. This provides some redundancy when debugging traffic dumps.
- NULs will be present at regular intervals along the protocol, providing
some padding for otherwise braindead C implementations of the protocol,
so that <stdio.h> string functions will see the NUL and stop.
- This makes it possible to run an AMP server on a port also used by a
plain-text protocol, and easily distinguish between non-AMP clients (like
web browsers) which issue non-NUL as the first byte, and AMP clients,
which always issue NUL as the first byte.
@var MAX_VALUE_LENGTH: The maximum length of a message.
@type MAX_VALUE_LENGTH: L{int}
@var ASK: Marker for an Ask packet.
@type ASK: L{bytes}
@var ANSWER: Marker for an Answer packet.
@type ANSWER: L{bytes}
@var COMMAND: Marker for a Command packet.
@type COMMAND: L{bytes}
@var ERROR: Marker for an AMP box of error type.
@type ERROR: L{bytes}
@var ERROR_CODE: Marker for an AMP box containing the code of an error.
@type ERROR_CODE: L{bytes}
@var ERROR_DESCRIPTION: Marker for an AMP box containing the description of the
error.
@type ERROR_DESCRIPTION: L{bytes}
"""
from __future__ import annotations
import datetime
import decimal
import warnings
from functools import partial
from io import BytesIO
from itertools import count
from struct import pack
from types import MethodType
from typing import Any, Callable, ClassVar, TypeVar
from zope.interface import Interface, implementer
from twisted.internet.defer import Deferred, fail, maybeDeferred
from twisted.internet.error import ConnectionClosed, ConnectionLost, PeerVerifyError
from twisted.internet.interfaces import IFileDescriptorReceiver
from twisted.internet.main import CONNECTION_LOST
from twisted.internet.protocol import Protocol
from twisted.logger import Logger
from twisted.protocols.basic import Int16StringReceiver, StatefulStringProtocol
from twisted.python import filepath
from twisted.python._tzhelper import (
UTC as utc,
FixedOffsetTimeZone as _FixedOffsetTZInfo,
)
from twisted.python.compat import nativeString
from twisted.python.failure import Failure
from twisted.python.reflect import accumulateClassDict
try:
from twisted.internet import ssl as _ssl
if _ssl.supported:
from twisted.internet.ssl import DN, Certificate, CertificateOptions, KeyPair
else:
ssl = None
except ImportError:
ssl = None
else:
ssl = _ssl
__all__ = [
"AMP",
"ANSWER",
"ASK",
"AmpBox",
"AmpError",
"AmpList",
"Argument",
"BadLocalReturn",
"BinaryBoxProtocol",
"Boolean",
"Box",
"BoxDispatcher",
"COMMAND",
"Command",
"CommandLocator",
"Decimal",
"Descriptor",
"ERROR",
"ERROR_CODE",
"ERROR_DESCRIPTION",
"Float",
"IArgumentType",
"IBoxReceiver",
"IBoxSender",
"IResponderLocator",
"IncompatibleVersions",
"Integer",
"InvalidSignature",
"ListOf",
"MAX_KEY_LENGTH",
"MAX_VALUE_LENGTH",
"MalformedAmpBox",
"NoEmptyBoxes",
"OnlyOneTLS",
"PROTOCOL_ERRORS",
"PYTHON_KEYWORDS",
"Path",
"ProtocolSwitchCommand",
"ProtocolSwitched",
"QuitBox",
"RemoteAmpError",
"SimpleStringLocator",
"StartTLS",
"String",
"TooLong",
"UNHANDLED_ERROR_CODE",
"UNKNOWN_ERROR_CODE",
"UnhandledCommand",
"utc",
"Unicode",
"UnknownRemoteError",
"parse",
"parseString",
]
_log = Logger()
_T_Callable = TypeVar("_T_Callable", bound=Callable[..., object])
ASK = b"_ask"
ANSWER = b"_answer"
COMMAND = b"_command"
ERROR = b"_error"
ERROR_CODE = b"_error_code"
ERROR_DESCRIPTION = b"_error_description"
UNKNOWN_ERROR_CODE = b"UNKNOWN"
UNHANDLED_ERROR_CODE = b"UNHANDLED"
MAX_KEY_LENGTH = 0xFF
MAX_VALUE_LENGTH = 0xFFFF
class IArgumentType(Interface):
"""
An L{IArgumentType} can serialize a Python object into an AMP box and
deserialize information from an AMP box back into a Python object.
@since: 9.0
"""
def fromBox(name, strings, objects, proto):
"""
Given an argument name and an AMP box containing serialized values,
extract one or more Python objects and add them to the C{objects}
dictionary.
@param name: The name associated with this argument. Most commonly
this is the key which can be used to find a serialized value in
C{strings}.
@type name: C{bytes}
@param strings: The AMP box from which to extract one or more
values.
@type strings: C{dict}
@param objects: The output dictionary to populate with the value for
this argument. The key used will be derived from C{name}. It may
differ; in Python 3, for example, the key will be a Unicode/native
string. See L{_wireNameToPythonIdentifier}.
@type objects: C{dict}
@param proto: The protocol instance which received the AMP box being
interpreted. Most likely this is an instance of L{AMP}, but
this is not guaranteed.
@return: L{None}
"""
def toBox(name, strings, objects, proto):
"""
Given an argument name and a dictionary containing structured Python
objects, serialize values into one or more strings and add them to
the C{strings} dictionary.
@param name: The name associated with this argument. Most commonly
this is the key in C{strings} to associate with a C{bytes} giving
the serialized form of that object.
@type name: C{bytes}
@param strings: The AMP box into which to insert one or more strings.
@type strings: C{dict}
@param objects: The input dictionary from which to extract Python
objects to serialize. The key used will be derived from C{name}.
It may differ; in Python 3, for example, the key will be a
Unicode/native string. See L{_wireNameToPythonIdentifier}.
@type objects: C{dict}
@param proto: The protocol instance which will send the AMP box once
it is fully populated. Most likely this is an instance of
L{AMP}, but this is not guaranteed.
@return: L{None}
"""
class IBoxSender(Interface):
"""
A transport which can send L{AmpBox} objects.
"""
def sendBox(box):
"""
Send an L{AmpBox}.
@raise ProtocolSwitched: if the underlying protocol has been
switched.
@raise ConnectionLost: if the underlying connection has already been
lost.
"""
def unhandledError(failure):
"""
An unhandled error occurred in response to a box. Log it
appropriately.
@param failure: a L{Failure} describing the error that occurred.
"""
class IBoxReceiver(Interface):
"""
An application object which can receive L{AmpBox} objects and dispatch them
appropriately.
"""
def startReceivingBoxes(boxSender):
"""
The L{IBoxReceiver.ampBoxReceived} method will start being called;
boxes may be responded to by responding to the given L{IBoxSender}.
@param boxSender: an L{IBoxSender} provider.
"""
def ampBoxReceived(box):
"""
A box was received from the transport; dispatch it appropriately.
"""
def stopReceivingBoxes(reason):
"""
No further boxes will be received on this connection.
@type reason: L{Failure}
"""
class IResponderLocator(Interface):
"""
An application object which can look up appropriate responder methods for
AMP commands.
"""
def locateResponder(name):
"""
Locate a responder method appropriate for the named command.
@param name: the wire-level name (commandName) of the AMP command to be
responded to.
@type name: C{bytes}
@return: a 1-argument callable that takes an L{AmpBox} with argument
values for the given command, and returns an L{AmpBox} containing
argument values for the named command, or a L{Deferred} that fires the
same.
"""
class AmpError(Exception):
"""
Base class of all Amp-related exceptions.
"""
class ProtocolSwitched(Exception):
"""
Connections which have been switched to other protocols can no longer
accept traffic at the AMP level. This is raised when you try to send it.
"""
class OnlyOneTLS(AmpError):
"""
This is an implementation limitation; TLS may only be started once per
connection.
"""
class NoEmptyBoxes(AmpError):
"""
You can't have empty boxes on the connection. This is raised when you
receive or attempt to send one.
"""
class InvalidSignature(AmpError):
"""
You didn't pass all the required arguments.
"""
class TooLong(AmpError):
"""
One of the protocol's length limitations was violated.
@ivar isKey: true if the string being encoded in a key position, false if
it was in a value position.
@ivar isLocal: Was the string encoded locally, or received too long from
the network? (It's only physically possible to encode "too long" values on
the network for keys.)
@ivar value: The string that was too long.
@ivar keyName: If the string being encoded was in a value position, what
key was it being encoded for?
"""
def __init__(self, isKey, isLocal, value, keyName=None):
AmpError.__init__(self)
self.isKey = isKey
self.isLocal = isLocal
self.value = value
self.keyName = keyName
def __repr__(self) -> str:
hdr = self.isKey and "key" or "value"
if not self.isKey:
hdr += " " + repr(self.keyName)
lcl = self.isLocal and "local" or "remote"
return "%s %s too long: %d" % (lcl, hdr, len(self.value))
class BadLocalReturn(AmpError):
"""
A bad value was returned from a local command; we were unable to coerce it.
"""
def __init__(self, message: str, enclosed: Failure) -> None:
AmpError.__init__(self)
self.message = message
self.enclosed = enclosed
def __repr__(self) -> str:
return self.message + " " + self.enclosed.getBriefTraceback()
__str__ = __repr__
class RemoteAmpError(AmpError):
"""
This error indicates that something went wrong on the remote end of the
connection, and the error was serialized and transmitted to you.
"""
def __init__(self, errorCode, description, fatal=False, local=None):
"""Create a remote error with an error code and description.
@param errorCode: the AMP error code of this error.
@type errorCode: C{bytes}
@param description: some text to show to the user.
@type description: C{str}
@param fatal: a boolean, true if this error should terminate the
connection.
@param local: a local Failure, if one exists.
"""
if local:
localwhat = " (local)"
othertb = local.getBriefTraceback()
else:
localwhat = ""
othertb = ""
# Backslash-escape errorCode. Python 3.5 can do this natively
# ("backslashescape") but Python 2.7 and Python 3.4 can't.
errorCodeForMessage = "".join(
f"\\x{c:2x}" if c >= 0x80 else chr(c) for c in errorCode
)
if othertb:
message = "Code<{}>{}: {}\n{}".format(
errorCodeForMessage,
localwhat,
description,
othertb,
)
else:
message = "Code<{}>{}: {}".format(
errorCodeForMessage, localwhat, description
)
super().__init__(message)
self.local = local
self.errorCode = errorCode
self.description = description
self.fatal = fatal
class UnknownRemoteError(RemoteAmpError):
"""
This means that an error whose type we can't identify was raised from the
other side.
"""
def __init__(self, description):
errorCode = UNKNOWN_ERROR_CODE
RemoteAmpError.__init__(self, errorCode, description)
class MalformedAmpBox(AmpError):
"""
This error indicates that the wire-level protocol was malformed.
"""
class UnhandledCommand(AmpError):
"""
A command received via amp could not be dispatched.
"""
class IncompatibleVersions(AmpError):
"""
It was impossible to negotiate a compatible version of the protocol with
the other end of the connection.
"""
PROTOCOL_ERRORS = {UNHANDLED_ERROR_CODE: UnhandledCommand}
class AmpBox(dict[bytes, bytes]):
"""
I am a packet in the AMP protocol, much like a
regular bytes:bytes dictionary.
"""
# be like a regular dictionary don't magically
# acquire a __dict__...
__slots__: list[str] = []
def __init__(self, *args, **kw):
"""
Initialize a new L{AmpBox}.
In Python 3, keyword arguments MUST be Unicode/native strings whereas
in Python 2 they could be either byte strings or Unicode strings.
However, all keys of an L{AmpBox} MUST be byte strings, or possible to
transparently coerce into byte strings (i.e. Python 2).
In Python 3, therefore, native string keys are coerced to byte strings
by encoding as ASCII. This can result in C{UnicodeEncodeError} being
raised.
@param args: See C{dict}, but all keys and values should be C{bytes}.
On Python 3, native strings may be used as keys provided they
contain only ASCII characters.
@param kw: See C{dict}, but all keys and values should be C{bytes}.
On Python 3, native strings may be used as keys provided they
contain only ASCII characters.
@raise UnicodeEncodeError: When a native string key cannot be coerced
to an ASCII byte string (Python 3 only).
"""
super().__init__(*args, **kw)
nonByteNames = [n for n in self if not isinstance(n, bytes)]
for nonByteName in nonByteNames:
byteName = nonByteName.encode("ascii")
self[byteName] = self.pop(nonByteName)
def copy(self):
"""
Return another AmpBox just like me.
"""
newBox = self.__class__()
newBox.update(self)
return newBox
def serialize(self):
"""
Convert me into a wire-encoded string.
@return: a C{bytes} encoded according to the rules described in the
module docstring.
"""
i = sorted(self.items())
L = []
w = L.append
for k, v in i:
if type(k) == str:
raise TypeError("Unicode key not allowed: %r" % k)
if type(v) == str:
raise TypeError(f"Unicode value for key {k!r} not allowed: {v!r}")
if len(k) > MAX_KEY_LENGTH:
raise TooLong(True, True, k, None)
if len(v) > MAX_VALUE_LENGTH:
raise TooLong(False, True, v, k)
for kv in k, v:
w(pack("!H", len(kv)))
w(kv)
w(pack("!H", 0))
return b"".join(L)
def _sendTo(self, proto):
"""
Serialize and send this box to an Amp instance. By the time it is being
sent, several keys are required. I must have exactly ONE of::
_ask
_answer
_error
If the '_ask' key is set, then the '_command' key must also be
set.
@param proto: an AMP instance.
"""
proto.sendBox(self)
def __repr__(self) -> str:
return f"AmpBox({dict.__repr__(self)})"
# amp.Box => AmpBox
Box = AmpBox
class QuitBox(AmpBox):
"""
I am an AmpBox that, upon being sent, terminates the connection.
"""
__slots__: list[str] = []
def __repr__(self) -> str:
return f"QuitBox(**{super().__repr__()})"
def _sendTo(self, proto):
"""
Immediately call loseConnection after sending.
"""
super()._sendTo(proto)
proto.transport.loseConnection()
class _SwitchBox(AmpBox):
"""
Implementation detail of ProtocolSwitchCommand: I am an AmpBox which sets
up state for the protocol to switch.
"""
# DON'T set __slots__ here; we do have an attribute.
def __init__(self, innerProto, **kw):
"""
Create a _SwitchBox with the protocol to switch to after being sent.
@param innerProto: the protocol instance to switch to.
@type innerProto: an IProtocol provider.
"""
super().__init__(**kw)
self.innerProto = innerProto
def __repr__(self) -> str:
return "_SwitchBox({!r}, **{})".format(
self.innerProto,
dict.__repr__(self),
)
def _sendTo(self, proto):
"""
Send me; I am the last box on the connection. All further traffic will be
over the new protocol.
"""
super()._sendTo(proto)
proto._lockForSwitch()
proto._switchTo(self.innerProto)
@implementer(IBoxReceiver)
class BoxDispatcher:
"""
A L{BoxDispatcher} dispatches '_ask', '_answer', and '_error' L{AmpBox}es,
both incoming and outgoing, to their appropriate destinations.
Outgoing commands are converted into L{Deferred}s and outgoing boxes, and
associated tracking state to fire those L{Deferred} when '_answer' boxes
come back. Incoming '_answer' and '_error' boxes are converted into
callbacks and errbacks on those L{Deferred}s, respectively.
Incoming '_ask' boxes are converted into method calls on a supplied method
locator.
@ivar _outstandingRequests: a dictionary mapping request IDs to
L{Deferred}s which were returned for those requests.
@ivar locator: an object with a L{CommandLocator.locateResponder} method
that locates a responder function that takes a Box and returns a result
(either a Box or a Deferred which fires one).
@ivar boxSender: an object which can send boxes, via the L{_sendBoxCommand}
method, such as an L{AMP} instance.
@type boxSender: L{IBoxSender}
"""
_failAllReason = None
_outstandingRequests = None
_counter = 0
boxSender = None
def __init__(self, locator):
self._outstandingRequests = {}
self.locator = locator
def startReceivingBoxes(self, boxSender):
"""
The given boxSender is going to start calling boxReceived on this
L{BoxDispatcher}.
@param boxSender: The L{IBoxSender} to send command responses to.
"""
self.boxSender = boxSender
def stopReceivingBoxes(self, reason):
"""
No further boxes will be received here. Terminate all currently
outstanding command deferreds with the given reason.
"""
self.failAllOutgoing(reason)
def failAllOutgoing(self, reason):
"""
Call the errback on all outstanding requests awaiting responses.
@param reason: the Failure instance to pass to those errbacks.
"""
self._failAllReason = reason
OR = self._outstandingRequests.items()
self._outstandingRequests = None # we can never send another request
for key, value in OR:
value.errback(reason)
def _nextTag(self):
"""
Generate protocol-local serial numbers for _ask keys.
@return: a string that has not yet been used on this connection.
"""
self._counter += 1
return b"%x" % (self._counter,)
def _sendBoxCommand(self, command, box, requiresAnswer=True):
"""
Send a command across the wire with the given C{amp.Box}.
Mutate the given box to give it any additional keys (_command, _ask)
required for the command and request/response machinery, then send it.
If requiresAnswer is True, returns a C{Deferred} which fires when a
response is received. The C{Deferred} is fired with an C{amp.Box} on
success, or with an C{amp.RemoteAmpError} if an error is received.
If the Deferred fails and the error is not handled by the caller of
this method, the failure will be logged and the connection dropped.
@param command: a C{bytes}, the name of the command to issue.
@param box: an AmpBox with the arguments for the command.
@param requiresAnswer: a boolean. Defaults to True. If True, return a
Deferred which will fire when the other side responds to this command.
If False, return None and do not ask the other side for acknowledgement.
@return: a Deferred which fires the AmpBox that holds the response to
this command, or None, as specified by requiresAnswer.
@raise ProtocolSwitched: if the protocol has been switched.
"""
if self._failAllReason is not None:
if requiresAnswer:
return fail(self._failAllReason)
else:
return None
box[COMMAND] = command
tag = self._nextTag()
if requiresAnswer:
box[ASK] = tag
box._sendTo(self.boxSender)
if requiresAnswer:
result = self._outstandingRequests[tag] = Deferred()
else:
result = None
return result
def callRemoteString(self, command, requiresAnswer=True, **kw):
"""
This is a low-level API, designed only for optimizing simple messages
for which the overhead of parsing is too great.
@param command: a C{bytes} naming the command.
@param kw: arguments to the amp box.
@param requiresAnswer: a boolean. Defaults to True. If True, return a
Deferred which will fire when the other side responds to this command.
If False, return None and do not ask the other side for acknowledgement.
@return: a Deferred which fires the AmpBox that holds the response to
this command, or None, as specified by requiresAnswer.
"""
box = Box(kw)
return self._sendBoxCommand(command, box, requiresAnswer)
def callRemote(self, commandType, *a, **kw):
"""
This is the primary high-level API for sending messages via AMP. Invoke it
with a command and appropriate arguments to send a message to this
connection's peer.
@param commandType: a subclass of Command.
@type commandType: L{type}
@param a: Positional (special) parameters taken by the command.
Positional parameters will typically not be sent over the wire. The
only command included with AMP which uses positional parameters is
L{ProtocolSwitchCommand}, which takes the protocol that will be
switched to as its first argument.
@param kw: Keyword arguments taken by the command. These are the
arguments declared in the command's 'arguments' attribute. They will
be encoded and sent to the peer as arguments for the L{commandType}.
@return: If L{commandType} has a C{requiresAnswer} attribute set to
L{False}, then return L{None}. Otherwise, return a L{Deferred} which
fires with a dictionary of objects representing the result of this
call. Additionally, this L{Deferred} may fail with an exception
representing a connection failure, with L{UnknownRemoteError} if the
other end of the connection fails for an unknown reason, or with any
error specified as a key in L{commandType}'s C{errors} dictionary.
"""
# XXX this takes command subclasses and not command objects on purpose.
# There's really no reason to have all this back-and-forth between
# command objects and the protocol, and the extra object being created
# (the Command instance) is pointless. Command is kind of like
# Interface, and should be more like it.
# In other words, the fact that commandType is instantiated here is an
# implementation detail. Don't rely on it.
try:
co = commandType(*a, **kw)
except BaseException:
return fail()
return co._doCommand(self)
def unhandledError(self, failure):
"""
This is a terminal callback called after application code has had a
chance to quash any errors.
"""
return self.boxSender.unhandledError(failure)
def _answerReceived(self, box):
"""
An AMP box was received that answered a command previously sent with
L{callRemote}.
@param box: an AmpBox with a value for its L{ANSWER} key.
"""
question = self._outstandingRequests.pop(box[ANSWER])
question.addErrback(self.unhandledError)
question.callback(box)
def _errorReceived(self, box):
"""
An AMP box was received that answered a command previously sent with
L{callRemote}, with an error.
@param box: an L{AmpBox} with a value for its L{ERROR}, L{ERROR_CODE},
and L{ERROR_DESCRIPTION} keys.
"""
question = self._outstandingRequests.pop(box[ERROR])
question.addErrback(self.unhandledError)
errorCode = box[ERROR_CODE]
description = box[ERROR_DESCRIPTION]
if isinstance(description, bytes):
description = description.decode("utf-8", "replace")
if errorCode in PROTOCOL_ERRORS:
exc = PROTOCOL_ERRORS[errorCode](errorCode, description)
else:
exc = RemoteAmpError(errorCode, description)
question.errback(Failure(exc))
def _commandReceived(self, box):
"""
@param box: an L{AmpBox} with a value for its L{COMMAND} and L{ASK}
keys.
"""
def formatAnswer(answerBox):
answerBox[ANSWER] = box[ASK]
return answerBox
def formatError(error: Failure) -> AmpBox:
errorBox: AmpBox
if error.check(RemoteAmpError):
code = error.value.errorCode
desc = error.value.description
if isinstance(desc, str):
desc = desc.encode("utf-8", "replace")
if error.value.fatal:
errorBox = QuitBox()
else:
errorBox = AmpBox()
else:
errorBox = QuitBox()
_log.failure("while receiving response to command", error)
# if the error isn't handled
code = UNKNOWN_ERROR_CODE
desc = b"Unknown Error"