-
Notifications
You must be signed in to change notification settings - Fork 0
/
multiplexing.py
227 lines (157 loc) · 5.21 KB
/
multiplexing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
Multiplexed stream connections over AMP.
"""
from twisted.internet import interfaces
from twisted.protocols import amp
from txampext import errors, exposed
from uuid import uuid4
from zope import interface
class NoSuchFactory(errors.Error):
"""
The referenced factory does not exist.
"""
class ConnectionRefused(errors.Error):
"""
The factory refused to create a protocol.
"""
class NoSuchConnection(errors.Error):
"""
The referenced connection does not exist.
"""
class Connect(amp.Command):
"""
Creates a connection to be multiplexed over this AMP connection.
"""
arguments = [
("factory", amp.String()),
("remote", exposed.ExposedProtocol())
]
response = [
("connection", amp.String())
]
errors = dict([
NoSuchFactory.asAMP(),
ConnectionRefused.asAMP()
])
class Transmit(amp.Command):
"""Sends some data over a multiplexed connection.
"""
arguments = [
("connection", amp.String()),
("data", amp.String())
]
response = []
errors = dict([NoSuchConnection.asAMP()])
class Disconnect(amp.Command):
"""Disconnect the multiplexed connection..
Further attempts to use the connection will fail. This is a
symmetric disconnect: the transport will also cease to work in the
other direction.
"""
arguments = [
("connection", amp.String()),
]
response = []
errors = dict([NoSuchConnection.asAMP()])
class MultiplexingCommandLocator(amp.CommandLocator):
"""An AMP locator for multiplexing stream connections over AMP.
This is for the "server" or "listening" end of the multiplexed
stream connections. Of course, since AMP is symmetrical, both
sides can have such an object.
"""
def __init__(self):
self._factories = {}
self._protocols = {}
def addFactory(self, identifier, factory):
"""Adds a factory.
After calling this method, remote clients will be able to
connect to it.
This will call ``factory.doStart``.
"""
factory.doStart()
self._factories[identifier] = factory
def removeFactory(self, identifier):
"""Removes a factory.
After calling this method, remote clients will no longer be
able to connect to it.
This will call the factory's ``doStop`` method.
"""
factory = self._factories.pop(identifier)
factory.doStop()
return factory
@Connect.responder
def connect(self, factory, remote):
"""Attempts to connect using a given factory.
This will find the requested factory and use it to build a
protocol as if the AMP protocol's peer was making the
connection. It will create a transport for the protocol and
connect it immediately. It will then store the protocol under
a unique identifier, and return that identifier.
"""
try:
factory = self._factories[factory]
except KeyError:
raise NoSuchFactory()
addr = remote.transport.getPeer()
proto = factory.buildProtocol(addr)
if proto is None:
raise ConnectionRefused()
identifier = uuid4().hex()
transport = MultiplexedTransport(identifier, remote)
proto.makeConnection(transport)
self._protocols[identifier] = proto
return {"connection": identifier}
@Transmit.responder
def receiveData(self, protocol, data):
"""
Receives some data for the given protocol.
"""
try:
protocol = self._protocols[protocol]
except KeyError:
raise NoSuchConnection()
protocol.dataReceived(data)
return {}
@Disconnect.responder
def disconnect(self, connection):
"""
Disconnects the given protocol.
"""
proto = self._protocols.pop(connection)
proto.transport = None
return {}
@interface.implementer(interfaces.ITransport)
class MultiplexedTransport(object):
"""
A local transport that makes calls over the AMP connection.
"""
def __init__(self, identifier, remote):
self.identifier = identifier
self.remote = remote
def _callRemote(self, command, **kwargs):
"""Calls the command remotely for this transport with ``kwargs``.
This passes the ``connection`` keyword argument to
``callRemote``, with the connection's identifier.
"""
self.remote.callRemote(command, transport=self.identifier, **kwargs)
def write(self, data):
"""Sends some data to the other side for writing.
"""
self._callRemote(Transmit, data=data)
def writeSequence(self, seq):
"""Write a bunch of pieces of data sequentially.
"""
for data in seq:
self.write(data)
def loseConnection(self):
"""Tells the other side to disconnect.
"""
self._callRemote(Disconnect)
def getPeer(self):
"""Gets the AMP connection's peer.
"""
return self.remote.transport.getPeer()
def getHost(self):
"""Gets the AMP connection's host.
"""
return self.remote.transport.getHost()