forked from equalitie/swabber
/
testing.py
126 lines (97 loc) · 3.33 KB
/
testing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import unittest
import datetime
import commands
import threading
import os
import tempfile
from swabber import BanEntry, createDB
from swabber import BanCleaner
from swabber import BanFetcher
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import zmq
from zmq.eventloop import ioloop, zmqstream
BAN_IP = "10.123.45.67"
BINDSTRING = "tcp://127.0.0.1:22620"
INTERFACE = "eth+"
#Defining context outside to avoid attacker using up all FDs
context = zmq.Context(1)
class Attacker(object): #(threading.Thread):
def __init__(self, testip):
self.testip = testip
#threading.Thread.__init__(self)
def start(self):
socket = context.socket(zmq.PUB)
publisher = zmqstream.ZMQStream(socket)
socket.connect(BINDSTRING)
publisher.send_multipart(("swabber_bans", self.testip))
publisher.close()
socket.close(linger=0)
#context.destroy(linger=0)
return True
class StressTest(object):
def __init__(self, testip, hit_times=500000):
self.testip = testip
self.hit_times = hit_times
def run(self):
bfetcher = BanFetcher(DB_CONN, BINDSTRING, False)
bfetcher.start()
print "Starting attacks"
for i in range(self.hit_times):
if i % 1000 == 0:
print "Attacked %d times" % i
a = Attacker(self.testip)
a.start()
del(a)
class Attacker(threading.Thread):
def __init__(self, testip):
self.testip = testip
threading.Thread.__init__(self)
def run(self):
context = zmq.Context(1)
socket = context.socket(zmq.PUB)
publisher = zmqstream.ZMQStream(socket)
socket.bind("tcp://127.0.0.1:22620")
publisher.send_multipart(("swabber_bans", testip))
return True
class StressTest(object):
def __init__(self, testip, hit_times=1000000):
self.testip = testip
self.hit_times = hit_times
def run(self):
for i in range(self.hit_times):
if i % 100 == 0:
print "Attacked %d times" % i
a = Attacker(self.testip)
a.start()
class BanTests(unittest.TestCase):
def testBan(self):
ban = BanEntry(BAN_IP, datetime.datetime.now())
ban.ban(INTERFACE)
status, output = commands.getstatusoutput("/sbin/iptables -L -n")
ban.unban()
self.assertIn(BAN_IP, output, msg="IP address not banned")
status, output = commands.getstatusoutput("/sbin/iptables -L -n")
self.assertNotIn(BAN_IP, output, msg="IP address was not unbanned")
class CleanTests(unittest.TestCase):
def testClean(self):
ban_len = 1
bantime = datetime.timedelta(minutes=(ban_len*2))
ban = BanEntry(BAN_IP, datetime.datetime.now() - bantime)
session.add(ban)
session.commit()
ban.ban(INTERFACE)
cleaner = BanCleaner(db_conn, ban_len)
cleaner.cleanBans()
status, output = commands.getstatusoutput("/sbin/iptables -L -n")
self.assertNotIn(BAN_IP, output, msg="Ban was not reset by cleaner")
def main():
if os.getuid() != 0:
print "Tests must be run as root"
raise SystemExit
else:
s = StressTest(BAN_IP)
s.run()
unittest.main()
if __name__ == '__main__':
main()