-
Notifications
You must be signed in to change notification settings - Fork 71
/
disallow_streams_by_port.py
executable file
·70 lines (58 loc) · 1.99 KB
/
disallow_streams_by_port.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from __future__ import print_function
#
# This uses a very simple custom txtorcon.IStreamAttacher to disallow
# certain streams based solely on their port; by default it closes
# all streams on port 80 or 25 without ever attaching them to a
# circuit.
#
# For a more complex IStreamAttacher example, see
# attach_streams_by_country.py
#
from twisted.python import log
from twisted.internet.task import react
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.internet.endpoints import clientFromString
from zope.interface import implementer
import txtorcon
@implementer(txtorcon.IStreamAttacher)
class PortFilterAttacher:
def __init__(self, state):
self.state = state
self.disallow_ports = [80, 25]
print(
"Disallowing all streams to ports: {ports}".format(
ports=",".join(map(str, self.disallow_ports)),
)
)
def attach_stream(self, stream, circuits):
"""
IStreamAttacher API
"""
def stream_closed(x):
print("Stream closed:", x)
if stream.target_port in self.disallow_ports:
print(
"Disallowing {stream} to port {stream.target_port}".format(
stream=stream,
)
)
d = self.state.close_stream(stream)
d.addCallback(stream_closed)
d.addErrback(log.err)
return txtorcon.TorState.DO_NOT_ATTACH
# Ask Tor to assign stream to a circuit by itself
return None
@react
@inlineCallbacks
def main(reactor):
control_ep = clientFromString(reactor, "tcp:localhost:9051")
tor = yield txtorcon.connect(reactor, control_ep)
print("Connected to a Tor version={version}".format(
version=tor.protocol.version,
))
state = yield tor.create_state()
yield state.set_attacher(PortFilterAttacher(state), reactor)
print("Existing streams:")
for s in state.streams.values():
print(" ", s)
yield Deferred()