/
lxmlhtml.py
112 lines (92 loc) · 4.24 KB
/
lxmlhtml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
Link extractor based on lxml.html
"""
import six
from six.moves.urllib.parse import urlparse, urljoin
import lxml.etree as etree
from scrapy.link import Link
from scrapy.utils.misc import arg_to_iter, rel_has_nofollow
from scrapy.utils.python import unique as unique_list, to_native_str
from scrapy.linkextractors import FilteringLinkExtractor
from scrapy.utils.response import get_base_url
# from lxml/src/lxml/html/__init__.py
XHTML_NAMESPACE = "http://www.w3.org/1999/xhtml"
_collect_string_content = etree.XPath("string()")
def _nons(tag):
if isinstance(tag, six.string_types):
if tag[0] == '{' and tag[1:len(XHTML_NAMESPACE)+1] == XHTML_NAMESPACE:
return tag.split('}')[-1]
return tag
class LxmlParserLinkExtractor(object):
def __init__(self, tag="a", attr="href", process=None, unique=False):
self.scan_tag = tag if callable(tag) else lambda t: t == tag
self.scan_attr = attr if callable(attr) else lambda a: a == attr
self.process_attr = process if callable(process) else lambda v: v
self.unique = unique
def _iter_links(self, document):
for el in document.iter(etree.Element):
if not self.scan_tag(_nons(el.tag)):
continue
attribs = el.attrib
for attrib in attribs:
if not self.scan_attr(attrib):
continue
yield (el, attrib, attribs[attrib])
def _extract_links(self, selector, response_url, response_encoding, base_url):
links = []
# hacky way to get the underlying lxml parsed document
for el, attr, attr_val in self._iter_links(selector.root):
# pseudo lxml.html.HtmlElement.make_links_absolute(base_url)
try:
attr_val = urljoin(base_url, attr_val)
except ValueError:
continue # skipping bogus links
else:
url = self.process_attr(attr_val)
if url is None:
continue
url = to_native_str(url, encoding=response_encoding)
# to fix relative links after process_value
url = urljoin(response_url, url)
link = Link(url, _collect_string_content(el) or u'',
nofollow=rel_has_nofollow(el.get('rel')))
links.append(link)
return self._deduplicate_if_needed(links)
def extract_links(self, response):
base_url = get_base_url(response)
return self._extract_links(response.selector, response.url, response.encoding, base_url)
def _process_links(self, links):
""" Normalize and filter extracted links
The subclass should override it if neccessary
"""
return self._deduplicate_if_needed(links)
def _deduplicate_if_needed(self, links):
if self.unique:
return unique_list(links, key=lambda link: link.url)
return links
class LxmlLinkExtractor(FilteringLinkExtractor):
def __init__(self, allow=(), deny=(), allow_domains=(), deny_domains=(), restrict_xpaths=(),
tags=('a', 'area'), attrs=('href',), canonicalize=True,
unique=True, process_value=None, deny_extensions=None, restrict_css=()):
tags, attrs = set(arg_to_iter(tags)), set(arg_to_iter(attrs))
tag_func = lambda x: x in tags
attr_func = lambda x: x in attrs
lx = LxmlParserLinkExtractor(tag=tag_func, attr=attr_func,
unique=unique, process=process_value)
super(LxmlLinkExtractor, self).__init__(lx, allow=allow, deny=deny,
allow_domains=allow_domains, deny_domains=deny_domains,
restrict_xpaths=restrict_xpaths, restrict_css=restrict_css,
canonicalize=canonicalize, deny_extensions=deny_extensions)
def extract_links(self, response):
base_url = get_base_url(response)
if self.restrict_xpaths:
docs = [subdoc
for x in self.restrict_xpaths
for subdoc in response.xpath(x)]
else:
docs = [response.selector]
all_links = []
for doc in docs:
links = self._extract_links(doc, response.url, response.encoding, base_url)
all_links.extend(self._process_links(links))
return unique_list(all_links)