/
cookies.py
199 lines (150 loc) · 5.87 KB
/
cookies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import re
import time
from http.cookiejar import Cookie
from http.cookiejar import CookieJar as _CookieJar
from http.cookiejar import DefaultCookiePolicy
from typing import Sequence
from scrapy import Request
from scrapy.http import Response
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.python import to_unicode
# Defined in the http.cookiejar module, but undocumented:
# https://github.com/python/cpython/blob/v3.9.0/Lib/http/cookiejar.py#L527
IPV4_RE = re.compile(r"\.\d+$", re.ASCII)
class CookieJar:
def __init__(self, policy=None, check_expired_frequency=10000):
self.policy = policy or DefaultCookiePolicy()
self.jar = _CookieJar(self.policy)
self.jar._cookies_lock = _DummyLock()
self.check_expired_frequency = check_expired_frequency
self.processed = 0
def extract_cookies(self, response, request):
wreq = WrappedRequest(request)
wrsp = WrappedResponse(response)
return self.jar.extract_cookies(wrsp, wreq)
def add_cookie_header(self, request: Request) -> None:
wreq = WrappedRequest(request)
self.policy._now = self.jar._now = int(time.time())
# the cookiejar implementation iterates through all domains
# instead we restrict to potential matches on the domain
req_host = urlparse_cached(request).hostname
if not req_host:
return
if not IPV4_RE.search(req_host):
hosts = potential_domain_matches(req_host)
if "." not in req_host:
hosts += [req_host + ".local"]
else:
hosts = [req_host]
cookies = []
for host in hosts:
if host in self.jar._cookies:
cookies += self.jar._cookies_for_domain(host, wreq)
attrs = self.jar._cookie_attrs(cookies)
if attrs:
if not wreq.has_header("Cookie"):
wreq.add_unredirected_header("Cookie", "; ".join(attrs))
self.processed += 1
if self.processed % self.check_expired_frequency == 0:
# This is still quite inefficient for large number of cookies
self.jar.clear_expired_cookies()
@property
def _cookies(self):
return self.jar._cookies
def clear_session_cookies(self, *args, **kwargs):
return self.jar.clear_session_cookies(*args, **kwargs)
def clear(self, domain=None, path=None, name=None):
return self.jar.clear(domain, path, name)
def __iter__(self):
return iter(self.jar)
def __len__(self):
return len(self.jar)
def set_policy(self, pol):
return self.jar.set_policy(pol)
def make_cookies(self, response: Response, request: Request) -> Sequence[Cookie]:
wreq = WrappedRequest(request)
wrsp = WrappedResponse(response)
return self.jar.make_cookies(wrsp, wreq)
def set_cookie(self, cookie):
self.jar.set_cookie(cookie)
def set_cookie_if_ok(self, cookie: Cookie, request: Request) -> None:
self.jar.set_cookie_if_ok(cookie, WrappedRequest(request))
def potential_domain_matches(domain):
"""Potential domain matches for a cookie
>>> potential_domain_matches('www.example.com')
['www.example.com', 'example.com', '.www.example.com', '.example.com']
"""
matches = [domain]
try:
start = domain.index(".") + 1
end = domain.rindex(".")
while start < end:
matches.append(domain[start:])
start = domain.index(".", start) + 1
except ValueError:
pass
return matches + ["." + d for d in matches]
class _DummyLock:
def acquire(self):
pass
def release(self):
pass
class WrappedRequest:
"""Wraps a scrapy Request class with methods defined by urllib2.Request class to interact with CookieJar class
see http://docs.python.org/library/urllib2.html#urllib2.Request
"""
def __init__(self, request):
self.request = request
def get_full_url(self):
return self.request.url
def get_host(self):
return urlparse_cached(self.request).netloc
def get_type(self):
return urlparse_cached(self.request).scheme
def is_unverifiable(self):
"""Unverifiable should indicate whether the request is unverifiable, as defined by RFC 2965.
It defaults to False. An unverifiable request is one whose URL the user did not have the
option to approve. For example, if the request is for an image in an
HTML document, and the user had no option to approve the automatic
fetching of the image, this should be true.
"""
return self.request.meta.get("is_unverifiable", False)
@property
def full_url(self):
return self.get_full_url()
@property
def host(self):
return self.get_host()
@property
def type(self):
return self.get_type()
@property
def unverifiable(self):
return self.is_unverifiable()
@property
def origin_req_host(self):
return urlparse_cached(self.request).hostname
def has_header(self, name):
return name in self.request.headers
def get_header(self, name, default=None):
value = self.request.headers.get(name, default)
return to_unicode(value, errors="replace") if value is not None else None
def header_items(self):
return [
(
to_unicode(k, errors="replace"),
[to_unicode(x, errors="replace") for x in v],
)
for k, v in self.request.headers.items()
]
def add_unredirected_header(self, name, value):
self.request.headers.appendlist(name, value)
class WrappedResponse:
def __init__(self, response):
self.response = response
def info(self):
return self
def get_all(self, name, default=None):
return [
to_unicode(v, errors="replace") for v in self.response.headers.getlist(name)
]