-
-
Notifications
You must be signed in to change notification settings - Fork 601
/
Copy pathtcp_echo_server.py
51 lines (40 loc) · 1.34 KB
/
tcp_echo_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time
from typing import Any, Optional
from proxy import Proxy
from proxy.core.base import BaseTcpServerHandler
from proxy.core.connection import TcpClientConnection
class EchoServerHandler(BaseTcpServerHandler[TcpClientConnection]):
"""Sets client socket to non-blocking during initialization."""
@staticmethod
def create(*args: Any) -> TcpClientConnection: # pragma: no cover
return TcpClientConnection(*args)
def initialize(self) -> None:
self.work.connection.setblocking(False)
def handle_data(self, data: memoryview) -> Optional[bool]:
# echo back to client
self.work.queue(data)
return None
def main() -> None:
# This example requires `threadless=True`
with Proxy(
work_klass=EchoServerHandler,
threadless=True,
num_workers=1,
port=12345,
):
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()