This repository has been archived by the owner on Apr 22, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 46
/
atcp_server.py
194 lines (147 loc) · 6.95 KB
/
atcp_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""AsyncIO TCP Server for Kytos."""
import asyncio
import errno
import logging
from kytos.core.connection import Connection
from kytos.core.events import KytosEvent
LOG = logging.getLogger(__name__)
def exception_handler(loop, context):
"""Exception handler to avoid tracebacks because of network timeouts."""
exc = context.get('exception')
transport = context.get('transport')
if isinstance(exc, TimeoutError):
LOG.info('Socket timeout: %r', transport)
elif isinstance(exc, OSError) and exc.errno == errno.EBADF:
LOG.info('Socket closed: %r', transport)
else:
loop.default_exception_handler(context)
class KytosServer:
"""Abstraction of a TCP Server to listen to packages from the network.
The KytosServer will listen on the specified port
for any new TCP request from the network and then instantiate the
specified RequestHandler to handle the new request.
It creates a new thread for each Handler.
"""
def __init__(self, # pylint: disable=too-many-arguments
server_address, server_protocol, controller,
protocol_name, loop=None):
"""Create the object without starting the server.
Args:
server_address (tuple): Address where the server is listening.
example: ('127.0.0.1', 80)
server_protocol (asyncio.Protocol):
Class that will be instantiated to handle each request.
controller (:class:`~kytos.core.controller.Controller`):
An instance of Kytos Controller class.
protocol_name (str): Southbound protocol name that will be used
"""
self.server_address = server_address
self.server_protocol = server_protocol
self.controller = controller
self.protocol_name = protocol_name
# This will be an `asyncio.Server` instance after `serve_forever` is
# called
self._server = None
# Here we compose the received `server_protocol` class with a `server`
# object pointing to this instance
self.server_protocol.server = self
self.loop = loop or asyncio.get_event_loop()
self.loop.set_exception_handler(exception_handler)
def serve_forever(self):
"""Handle requests until an explicit shutdown() is called."""
addr, port = self.server_address[0], self.server_address[1]
self._server = self.loop.create_server(self.server_protocol,
addr, port)
try:
task = self.loop.create_task(self._server)
LOG.info("Kytos listening at %s:%s", addr, port)
except Exception:
LOG.error('Failed to start Kytos TCP Server at %s:%s', addr, port)
task.close()
raise
def shutdown(self):
"""Call .close() on underlying TCP server, closing client sockets."""
self._server.close()
# self.loop.run_until_complete(self._server.wait_closed())
class KytosServerProtocol(asyncio.Protocol):
"""Kytos' main request handler.
It is instantiated once per connection between each switch and the
controller.
The setup method will dispatch a KytosEvent (``kytos/core.connection.new``)
on the controller, that will be processed by a Core App.
The finish method will close the connection and dispatch a KytosEvent
(``kytos/core.connection.closed``) on the controller.
"""
known_ports = {
6633: 'openflow',
6653: 'openflow'
}
def __init__(self):
"""Initialize protocol and check if server attribute was set."""
self._loop = asyncio.get_event_loop()
self.connection = None
self.transport = None
self._rest = b''
# server attribute is set outside this class, in KytosServer.init()
# Here we initialize it to None to avoid pylint warnings
if not getattr(self, 'server'):
self.server = None
# Then we check if it was really set
if not self.server:
raise ValueError("server instance must be assigned before init")
def connection_made(self, transport):
"""Handle new client connection, passing it to the controller.
Build a new Kytos `Connection` and send a ``kytos/core.connection.new``
KytosEvent through the app buffer.
"""
self.transport = transport
addr, port = transport.get_extra_info('peername')
_, server_port = transport.get_extra_info('sockname')
socket = transport.get_extra_info('socket')
LOG.info("New connection from %s:%s", addr, port)
self.connection = Connection(addr, port, socket)
# This allows someone to inherit from KytosServer and start a server
# on another port to handle a different protocol.
if self.server.protocol_name:
self.known_ports[server_port] = self.server.protocol_name
if server_port in self.known_ports:
protocol_name = self.known_ports[server_port]
else:
protocol_name = f'{server_port:04d}'
self.connection.protocol.name = protocol_name
event_name = f'kytos/core.{protocol_name}.connection.new'
event = KytosEvent(name=event_name,
content={'source': self.connection})
self._loop.create_task(self.server.controller.buffers.raw.aput(event))
def data_received(self, data):
"""Handle each request and place its data in the raw event buffer.
Sends the received binary data in a ``kytos/core.{protocol}.raw.in``
event on the raw buffer.
"""
# max_size = 2**16
# new_data = self.request.recv(max_size)
data = self._rest + data
LOG.debug("New data from %s:%s (%s bytes)",
self.connection.address, self.connection.port, len(data))
# LOG.debug("New data from %s:%s (%s bytes): %s", self.addr, self.port,
# len(data), binascii.hexlify(data))
content = {'source': self.connection, 'new_data': data}
event_name = f'kytos/core.{self.connection.protocol.name}.raw.in'
event = KytosEvent(name=event_name, content=content)
self._loop.create_task(self.server.controller.buffers.raw.aput(event))
def connection_lost(self, exc):
"""Close the connection socket and generate connection lost event.
Emits a ``kytos/core.{protocol}.connection.lost`` event through the
App buffer.
"""
reason = exc or "Request closed by client"
LOG.info("Connection lost with client %s:%s. Reason: %s",
self.connection.address, self.connection.port, reason)
self.connection.close()
content = {'source': self.connection}
if exc:
content['exception'] = exc
event_name = \
f'kytos/core.{self.connection.protocol.name}.connection.lost'
event = KytosEvent(name=event_name, content=content)
self._loop.create_task(self.server.controller.buffers.app.aput(event))