-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.py
90 lines (68 loc) · 2.42 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import socket
import random
from time import sleep
# how many total connections?
HOW_MANY = 100
port = 9005
NUM_CONCUR = 10
# you can modify this list with pairs of request/response pairs
# (request, response)
pairs = [("GET /nothere", "404 Not Found"),
("GET /little.txt", "200 OK"),
("GET /big.txt", "403 Access Denied"),
("GT BENT BUEHLER", "400 Bad Request")
]
my_sockets = []
total_good = 0
total_bad = 0
red = "\033[91m"
gre = "\033[92m"
yel = "\033[93m"
c = "\033[0m"
def test_socket(sock, atuple):
global total_good, total_bad
sent = sock.send(bytes(atuple[0] + " HTTP/1.1", "utf-8"))
for i in range(100):
try:
chunk = sock.recv(15)
astr = chunk.decode("utf-8")
if astr[:16] == "HTTP/1.1 " + atuple[1][:6]:
print(gre + "good: " + c, end="")
total_good+=1
elif astr[:16] == "HTTP/1.0 " + atuple[1][:6]:
print(gre + "good: " + c, end="")
total_good+=1
else:
print(red + "bad -> " + c, end="")
total_bad+=1
print(yel + "requested:" + c, atuple[0], yel + "wanted:" + c, "'HTTP/1.* " + atuple[1][:6] + "'", yel + "got:" + c, "'" + astr + "'")
return
except socket.error as e:
if e.args[0] == 11:
sleep(0.05)
print(red + "bad -> " + c, end="")
print(yel + "requested:" + c, atuple[0], yel + "wanted:" + c, "'HTTP/1.* " + atuple[1][:6] + "'", yel + "got no reply. Concurrent much?" + c)
total_bad+=1
def make_new_conn():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("localhost", port))
s.setblocking(0)
return s
def setup(count):
for i in range(count):
my_sockets.append(make_new_conn())
# let's pick a random socket
setup(NUM_CONCUR) # start with 10 processes
for i in range(HOW_MANY):
rando = random.randint(0, len(my_sockets)-1)
test_socket(my_sockets[rando],pairs[random.randint(0,3)])
my_sockets[rando].close()
my_sockets.pop(rando)
my_sockets.append(make_new_conn())
while len(my_sockets) > 0:
rando = random.randint(0, len(my_sockets)-1)
test_socket(my_sockets[rando],pairs[random.randint(0,3)])
my_sockets[rando].close()
my_sockets.pop(rando)
print("\n" + gre + "good:", total_good, c, red + "bad:", total_bad,c);
print("Success %: ", total_good/(total_good + total_bad));