-
Notifications
You must be signed in to change notification settings - Fork 3
/
tldextract.py
341 lines (282 loc) · 12.8 KB
/
tldextract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# -*- coding: utf-8 -*-
"""`tldextract` accurately separates the gTLD or ccTLD (generic or country code
top-level domain) from the registered domain and subdomains of a URL.
import tldextract
tldextract.extract('http://forums.news.cnn.com/')
ExtractResult(subdomain='forums.news', domain='cnn', suffix='com')
tldextract.extract('http://forums.bbc.co.uk/') # United Kingdom
ExtractResult(subdomain='forums', domain='bbc', suffix='co.uk')
tldextract.extract('http://www.worldbank.org.kg/') # Kyrgyzstan
ExtractResult(subdomain='www', domain='worldbank', suffix='org.kg')
`ExtractResult` is a namedtuple, so it's simple to access the parts you want.
ext = tldextract.extract('http://forums.bbc.co.uk')
ext.domain
'bbc'
'.'.join(ext[:2]) # rejoin subdomain and domain
'forums.bbc'
"""
from __future__ import with_statement
try:
import cPickle as pickle
except ImportError:
import pickle
from contextlib import closing
import errno
from functools import wraps
import logging
from operator import itemgetter
import os
import sys
import warnings
try:
import pkg_resources
except ImportError:
class pkg_resources(object):
"""Fake pkg_resources interface which falls back to getting resources
inside `tldextract`'s directory.
"""
@classmethod
def resource_stream(cls, package, resource_name):
moddir = os.path.dirname(__file__)
f = os.path.join(moddir, resource_name)
return open(f)
import re
import socket
string_types = str
try: # pragma: no cover
# Python 2
from urllib2 import urlopen
from urlparse import scheme_chars
except ImportError: # pragma: no cover
# Python 3
from urllib.request import urlopen
from urllib.parse import scheme_chars
unicode = str
LOG = logging.getLogger("tldextract")
CACHE_FILE_DEFAULT = os.path.join(os.path.dirname(__file__), '.tld_set')
CACHE_FILE = os.path.expanduser(os.environ.get("TLDEXTRACT_CACHE", CACHE_FILE_DEFAULT))
PUBLIC_SUFFIX_LIST_URLS = (
'http://mxr.mozilla.org/mozilla-central/source/netwerk/dns/effective_tld_names.dat?raw=1',
'https://raw.github.com/mozilla/gecko-dev/master/netwerk/dns/effective_tld_names.dat',
)
SCHEME_RE = re.compile(r'^([' + scheme_chars + ']+:)?//')
IP_RE = re.compile(r'^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$')
class ExtractResult(tuple):
'ExtractResult(subdomain, domain, suffix)'
__slots__ = ()
_fields = ('subdomain', 'domain', 'suffix')
def __new__(_cls, subdomain, domain, suffix):
'Create new instance of ExtractResult(subdomain, domain, suffix)'
return tuple.__new__(_cls, (subdomain, domain, suffix))
@classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new ExtractResult object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != 3:
raise TypeError('Expected 3 arguments, got %d' % len(result))
return result
def __repr__(self):
'Return a nicely formatted representation string'
return 'ExtractResult(subdomain=%r, domain=%r, suffix=%r)' % self
def _asdict(self):
'Return a new dict which maps field names to their values'
base_zip = zip(self._fields, self)
zipped = base_zip + [('tld', self.tld)]
return dict(zipped)
def _replace(_self, **kwds):
'Return a new ExtractResult object replacing specified fields with new values'
result = _self._make(map(kwds.pop, ('subdomain', 'domain', 'suffix'), _self))
if kwds:
raise ValueError('Got unexpected field names: %r' % kwds.keys())
return result
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return tuple(self)
subdomain = property(itemgetter(0), doc='Alias for field number 0')
domain = property(itemgetter(1), doc='Alias for field number 1')
suffix = property(itemgetter(2), doc='Alias for field number 2')
@property
def tld(self):
warnings.warn('This use of tld is misleading. Use `suffix` instead.', DeprecationWarning)
return self.suffix
@property
def registered_domain(self):
"""
Joins the domain and suffix fields with a dot, if they're both set.
>>> extract('http://forums.bbc.co.uk').registered_domain
'bbc.co.uk'
>>> extract('http://localhost:8080').registered_domain
''
"""
if self.domain and self.suffix:
return self.domain + '.' + self.suffix
return ''
class TLDExtract(object):
def __init__(self, cache_file=CACHE_FILE, suffix_list_url=PUBLIC_SUFFIX_LIST_URLS, fetch=True,
fallback_to_snapshot=True):
"""
Constructs a callable for extracting subdomain, domain, and suffix
components from a URL.
Upon calling it, it first checks for a Python-pickled `cache_file`.
By default, the `cache_file` will live in the tldextract directory.
You can disable the caching functionality of this module by setting `cache_file` to False.
If the `cache_file` does not exist (such as on the first run), a live HTTP request
will be made to obtain the robly_data at the `suffix_list_url` -- unless `suffix_list_url`
evaluates to `False`. Therefore you can deactivate the HTTP request functionality
by setting this argument to `False` or `None`, like `suffix_list_url=None`.
The default URL points to the latest version of the Mozilla Public Suffix List, but any
similar document could be specified.
Local files can be specified by using the `file://` protocol. (See `urllib2` documentation.)
If there is no `cache_file` loaded and no data is found from the `suffix_list_url`,
the module will fall back to the included TLD set snapshot. If you do not want
this behavior, you may set `fallback_to_snapshot` to False, and an exception will be
raised instead.
"""
if not fetch:
LOG.warning("The 'fetch' argument is deprecated. Instead of specifying fetch, "
"you should specify suffix_list_url. The equivalent of fetch=False would "
"be suffix_list_url=None.")
self.suffix_list_urls = ()
if suffix_list_url and fetch:
if isinstance(suffix_list_url, string_types):
self.suffix_list_urls = (suffix_list_url,)
else:
# TODO: kwarg suffix_list_url can actually be a sequence of URL
# strings. Document this.
self.suffix_list_urls = suffix_list_url
self.suffix_list_urls = tuple(url.strip() for url in self.suffix_list_urls if url.strip())
self.cache_file = os.path.expanduser(cache_file or '')
self.fallback_to_snapshot = fallback_to_snapshot
if not (self.suffix_list_urls or self.cache_file or self.fallback_to_snapshot):
raise ValueError("The arguments you have provided disable all ways for tldextract "
"to obtain data. Please provide a suffix list data, a cache_file, "
"or set `fallback_to_snapshot` to `True`.")
self._extractor = None
def __call__(self, url):
"""
Takes a string URL and splits it into its subdomain, domain, and
suffix (effective TLD, gTLD, ccTLD, etc.) component.
>>> extract = TLDExtract()
>>> extract('http://forums.news.cnn.com/')
ExtractResult(subdomain='forums.news', domain='cnn', suffix='com')
>>> extract('http://forums.bbc.co.uk/')
ExtractResult(subdomain='forums', domain='bbc', suffix='co.uk')
"""
netloc = SCHEME_RE.sub("", url) \
.partition("/")[0] \
.partition("?")[0] \
.partition("#")[0] \
.split("@")[-1] \
.partition(":")[0] \
.rstrip(".")
registered_domain, tld = self._get_tld_extractor().extract(netloc)
if not tld and netloc and netloc[0].isdigit():
try:
is_ip = socket.inet_aton(netloc)
return ExtractResult('', netloc, '')
except AttributeError:
if IP_RE.match(netloc):
return ExtractResult('', netloc, '')
except socket.error:
pass
subdomain, _, domain = registered_domain.rpartition('.')
return ExtractResult(subdomain, domain, tld)
def update(self, fetch_now=False):
if os.path.exists(self.cache_file):
os.unlink(self.cache_file)
self._extractor = None
if fetch_now:
self._get_tld_extractor()
def _get_tld_extractor(self):
if self._extractor:
return self._extractor
if self.cache_file:
try:
with open(self.cache_file) as f:
self._extractor = _PublicSuffixListTLDExtractor(pickle.load(f))
return self._extractor
except IOError as ioe:
file_not_found = ioe.errno == errno.ENOENT
if not file_not_found:
LOG.error("error reading TLD cache file %s: %s", self.cache_file, ioe)
except Exception as ex:
LOG.error("error reading TLD cache file %s: %s", self.cache_file, ex)
tlds = frozenset()
if self.suffix_list_urls:
raw_suffix_list_data = fetch_file(self.suffix_list_urls)
tlds = get_tlds_from_raw_suffix_list_data(raw_suffix_list_data)
if not tlds:
if self.fallback_to_snapshot:
with closing(pkg_resources.resource_stream(__name__, '.tld_set_snapshot')) as snapshot_file:
self._extractor = _PublicSuffixListTLDExtractor(pickle.load(snapshot_file))
return self._extractor
else:
raise Exception("tlds is empty, but fallback_to_snapshot is set"
" to false. Cannot proceed without tlds.")
LOG.info("computed TLDs: [%s, ...]", ', '.join(list(tlds)[:10]))
if LOG.isEnabledFor(logging.DEBUG):
import difflib
with closing(pkg_resources.resource_stream(__name__, '.tld_set_snapshot')) as snapshot_file:
snapshot = sorted(pickle.load(snapshot_file))
new = sorted(tlds)
for line in difflib.unified_diff(snapshot, new, fromfile=".tld_set_snapshot", tofile=self.cache_file):
if sys.version_info < (3,):
sys.stderr.write(line.encode('utf-8') + "\n")
else:
sys.stderr.write(line + "\n")
if self.cache_file:
try:
with open(self.cache_file, 'wb') as f:
pickle.dump(tlds, f)
except IOError as e:
LOG.warn("unable to cache TLDs in file %s: %s", self.cache_file, e)
self._extractor = _PublicSuffixListTLDExtractor(tlds)
return self._extractor
TLD_EXTRACTOR = TLDExtract()
@wraps(TLD_EXTRACTOR.__call__)
def extract(url):
return TLD_EXTRACTOR(url)
@wraps(TLD_EXTRACTOR.update)
def update(*args, **kwargs):
return TLD_EXTRACTOR.update(*args, **kwargs)
def get_tlds_from_raw_suffix_list_data(suffix_list_source):
tld_finder = re.compile(r'^(?P<tld>[.*!]*\w[\S]*)', re.UNICODE | re.MULTILINE)
tld_iter = (m.group('tld') for m in tld_finder.finditer(suffix_list_source))
return frozenset(tld_iter)
def fetch_file(urls):
""" Decode the first successfully fetched URL, from UTF-8 encoding to
Python unicode.
"""
s = ''
for url in urls:
try:
conn = urlopen(url)
s = conn.read()
except Exception as e:
LOG.error('Exception reading Public Suffix List url ' + url + ' - ' + str(e) + '.')
else:
return _decode_utf8(s)
LOG.error('No Public Suffix List found. Consider using a mirror or constructing your TLDExtract with `fetch=False`.')
return u''
def _decode_utf8(s):
""" Decode from utf8 to Python unicode string.
The suffix list, wherever its origin, should be UTF-8 encoded.
"""
return unicode(s, 'utf-8')
class _PublicSuffixListTLDExtractor(object):
def __init__(self, tlds):
self.tlds = tlds
def extract(self, netloc):
spl = netloc.split('.')
lower_spl = tuple(el.lower() for el in spl)
for i in range(len(spl)):
maybe_tld = '.'.join(lower_spl[i:])
exception_tld = '!' + maybe_tld
if exception_tld in self.tlds:
return '.'.join(spl[:i+1]), '.'.join(spl[i+1:])
if maybe_tld in self.tlds:
return '.'.join(spl[:i]), '.'.join(spl[i:])
wildcard_tld = '*.' + '.'.join(lower_spl[i+1:])
if wildcard_tld in self.tlds:
return '.'.join(spl[:i]), '.'.join(spl[i:])
return netloc, ''