forked from gongjianhui/AppleDNS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetch-timeout.py
executable file
·107 lines (83 loc) · 2.83 KB
/
fetch-timeout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python3
import argparse
import json
#import multiprocessing
from datetime import datetime
import itertools
timeout = 400 # unit ms
concurrent = 10
testing_times = 10
def check_requirements():
import sys
def check_python_version():
#if sys.hexversion <= 0x3040000:
# print('your "python" lower than 3.4.0 upgrade.')
# return False
return True
def check_is_use_proxy():
from os import environ
http_proxy = environ.get('http_proxy')
HTTP_PROXY = environ.get('HTTP_PROXY')
https_proxy = environ.get('https_proxy')
HTTPS_PROXY = environ.get('HTTPS_PROXY')
def output(name, value):
if value:
print('%s=%s' % (name, value))
if http_proxy or HTTP_PROXY or https_proxy or HTTPS_PROXY:
print('you are using a proxy test')
output('http_proxy', http_proxy)
output('HTTP_PROXY', HTTP_PROXY)
output('https_proxy', https_proxy)
output('HTTPS_PROXY', HTTPS_PROXY)
print()
return True
return all(
(
check_python_version(),
check_is_use_proxy()
)
)
def request(target):
host, port = target
from socket import socket
from socket import error
try:
begin_time = datetime.now()
conn = socket()
conn.settimeout(timeout / 1000.0)
conn.connect((host, port))
end_time = datetime.now()
delta = end_time - begin_time
rt = (delta.seconds * 1000.0) + (delta.microseconds / 1000.0)
return host, rt
except error as err:
return host, False
def handle_ip(target):
#from urllib.parse import urlparse
from six.moves.urllib.parse import urlparse
address = urlparse('http://%s' % target)
return address.hostname, address.port or 80
def fetch(payload):
#with multiprocessing.Pool(concurrent) as pool:
for service_item in payload:
print(service_item['title'])
print(', '.join(service_item['domains']))
for name, ips in service_item['ips'].items():
ips = itertools.imap(request, map(handle_ip, ips * testing_times))
ips = sorted(
({'ip': ip, 'delta': delta} for ip, delta in ips if delta),
key=lambda item: item['delta']
)
service_item['ips'][name] = ips
print('\t%s' % name)
for item in ips:
print('\t\t%(ip)-15s\t%(delta)sms' % item)
return payload
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--payload', dest='payload', default='payload.json')
args = parser.parse_args()
payload = fetch(json.load(open(args.payload)))
json.dump(payload, open('result.json', 'w'))
if __name__ == '__main__' and check_requirements():
main()