-
Notifications
You must be signed in to change notification settings - Fork 14
/
extractor.py
81 lines (57 loc) · 1.71 KB
/
extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import validators
from timing_utils import timing
MIN_LENGTH = 5
MAX_LENGTH = 240
def clean_comment(text):
index = text.find('#')
if index == -1:
return text
return text[:index].strip()
def clean_whitespace(text):
return text.replace('\r', '').replace('\t', ' ')
def is_correct_length(line):
length = len(line)
if length < MIN_LENGTH or length > MAX_LENGTH:
return False
return True
def is_ip_address(text):
if validators.ip_address.ipv4(text):
return True
return validators.ip_address.ipv6(text)
def decode(domain):
try:
return domain.encode('idna').decode('ascii')
except Exception as e:
print(f'Failed to decode idna domain: {domain}')
return None
def extract_domain(text):
text = clean_comment(text)
names = [x for x in text.split() if is_correct_length(x)]
for name in names:
if is_ip_address(name):
continue
name_idn = decode(name)
if not name_idn:
continue
if '_' in name_idn:
continue
if validators.domain(name_idn):
return name_idn
return None
@timing
def extract_domains(text):
content = clean_whitespace(text)
domains = [ extract_domain(line) for line in content.splitlines() ]
return filter(None, domains)
@timing
def dedup_domains(domains):
return list(set(domains))
@timing
def exclude_whitelist_domains(domains, whitelist):
exclusion_list = set([decode(d) for d in whitelist])
return [d for d in domains if d not in exclusion_list]
@timing
def sort_domains(domains):
def get_sort_key(domain):
return '.'.join(reversed(domain.split('.')))
return sorted(domains, key=get_sort_key)