-
-
Notifications
You must be signed in to change notification settings - Fork 29.9k
/
cookiejar.py
2113 lines (1777 loc) · 75 KB
/
cookiejar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
r"""HTTP cookie handling for web clients.
This module has (now fairly distant) origins in Gisle Aas' Perl module
HTTP::Cookies, from the libwww-perl library.
Docstrings, comments and debug strings in this code refer to the
attributes of the HTTP cookie system as cookie-attributes, to distinguish
them clearly from Python attributes.
Class diagram (note that BSDDBCookieJar and the MSIE* classes are not
distributed with the Python standard library, but are available from
http://wwwsearch.sf.net/):
CookieJar____
/ \ \
FileCookieJar \ \
/ | \ \ \
MozillaCookieJar | LWPCookieJar \ \
| | \
| ---MSIEBase | \
| / | | \
| / MSIEDBCookieJar BSDDBCookieJar
|/
MSIECookieJar
"""
__all__ = ['Cookie', 'CookieJar', 'CookiePolicy', 'DefaultCookiePolicy',
'FileCookieJar', 'LWPCookieJar', 'LoadError', 'MozillaCookieJar']
import os
import copy
import datetime
import re
import time
import urllib.parse, urllib.request
import threading as _threading
import http.client # only for the default HTTP port
from calendar import timegm
debug = False # set to True to enable debugging via the logging module
logger = None
def _debug(*args):
if not debug:
return
global logger
if not logger:
import logging
logger = logging.getLogger("http.cookiejar")
return logger.debug(*args)
DEFAULT_HTTP_PORT = str(http.client.HTTP_PORT)
MISSING_FILENAME_TEXT = ("a filename was not supplied (nor was the CookieJar "
"instance initialised with one)")
def _warn_unhandled_exception():
# There are a few catch-all except: statements in this module, for
# catching input that's bad in unexpected ways. Warn if any
# exceptions are caught there.
import io, warnings, traceback
f = io.StringIO()
traceback.print_exc(None, f)
msg = f.getvalue()
warnings.warn("http.cookiejar bug!\n%s" % msg, stacklevel=2)
# Date/time conversion
# -----------------------------------------------------------------------------
EPOCH_YEAR = 1970
def _timegm(tt):
year, month, mday, hour, min, sec = tt[:6]
if ((year >= EPOCH_YEAR) and (1 <= month <= 12) and (1 <= mday <= 31) and
(0 <= hour <= 24) and (0 <= min <= 59) and (0 <= sec <= 61)):
return timegm(tt)
else:
return None
DAYS = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
MONTHS = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
MONTHS_LOWER = []
for month in MONTHS: MONTHS_LOWER.append(month.lower())
def time2isoz(t=None):
"""Return a string representing time in seconds since epoch, t.
If the function is called without an argument, it will use the current
time.
The format of the returned string is like "YYYY-MM-DD hh:mm:ssZ",
representing Universal Time (UTC, aka GMT). An example of this format is:
1994-11-24 08:49:37Z
"""
if t is None:
dt = datetime.datetime.utcnow()
else:
dt = datetime.datetime.utcfromtimestamp(t)
return "%04d-%02d-%02d %02d:%02d:%02dZ" % (
dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
def time2netscape(t=None):
"""Return a string representing time in seconds since epoch, t.
If the function is called without an argument, it will use the current
time.
The format of the returned string is like this:
Wed, DD-Mon-YYYY HH:MM:SS GMT
"""
if t is None:
dt = datetime.datetime.utcnow()
else:
dt = datetime.datetime.utcfromtimestamp(t)
return "%s, %02d-%s-%04d %02d:%02d:%02d GMT" % (
DAYS[dt.weekday()], dt.day, MONTHS[dt.month-1],
dt.year, dt.hour, dt.minute, dt.second)
UTC_ZONES = {"GMT": None, "UTC": None, "UT": None, "Z": None}
TIMEZONE_RE = re.compile(r"^([-+])?(\d\d?):?(\d\d)?$", re.ASCII)
def offset_from_tz_string(tz):
offset = None
if tz in UTC_ZONES:
offset = 0
else:
m = TIMEZONE_RE.search(tz)
if m:
offset = 3600 * int(m.group(2))
if m.group(3):
offset = offset + 60 * int(m.group(3))
if m.group(1) == '-':
offset = -offset
return offset
def _str2time(day, mon, yr, hr, min, sec, tz):
yr = int(yr)
if yr > datetime.MAXYEAR:
return None
# translate month name to number
# month numbers start with 1 (January)
try:
mon = MONTHS_LOWER.index(mon.lower())+1
except ValueError:
# maybe it's already a number
try:
imon = int(mon)
except ValueError:
return None
if 1 <= imon <= 12:
mon = imon
else:
return None
# make sure clock elements are defined
if hr is None: hr = 0
if min is None: min = 0
if sec is None: sec = 0
day = int(day)
hr = int(hr)
min = int(min)
sec = int(sec)
if yr < 1000:
# find "obvious" year
cur_yr = time.localtime(time.time())[0]
m = cur_yr % 100
tmp = yr
yr = yr + cur_yr - m
m = m - tmp
if abs(m) > 50:
if m > 0: yr = yr + 100
else: yr = yr - 100
# convert UTC time tuple to seconds since epoch (not timezone-adjusted)
t = _timegm((yr, mon, day, hr, min, sec, tz))
if t is not None:
# adjust time using timezone string, to get absolute time since epoch
if tz is None:
tz = "UTC"
tz = tz.upper()
offset = offset_from_tz_string(tz)
if offset is None:
return None
t = t - offset
return t
STRICT_DATE_RE = re.compile(
r"^[SMTWF][a-z][a-z], (\d\d) ([JFMASOND][a-z][a-z]) "
r"(\d\d\d\d) (\d\d):(\d\d):(\d\d) GMT$", re.ASCII)
WEEKDAY_RE = re.compile(
r"^(?:Sun|Mon|Tue|Wed|Thu|Fri|Sat)[a-z]*,?\s*", re.I | re.ASCII)
LOOSE_HTTP_DATE_RE = re.compile(
r"""^
(\d\d?) # day
(?:\s+|[-\/])
(\w+) # month
(?:\s+|[-\/])
(\d+) # year
(?:
(?:\s+|:) # separator before clock
(\d\d?):(\d\d) # hour:min
(?::(\d\d))? # optional seconds
)? # optional clock
\s*
(?:
([-+]?\d{2,4}|(?![APap][Mm]\b)[A-Za-z]+) # timezone
\s*
)?
(?:
\(\w+\) # ASCII representation of timezone in parens.
\s*
)?$""", re.X | re.ASCII)
def http2time(text):
"""Returns time in seconds since epoch of time represented by a string.
Return value is an integer.
None is returned if the format of str is unrecognized, the time is outside
the representable range, or the timezone string is not recognized. If the
string contains no timezone, UTC is assumed.
The timezone in the string may be numerical (like "-0800" or "+0100") or a
string timezone (like "UTC", "GMT", "BST" or "EST"). Currently, only the
timezone strings equivalent to UTC (zero offset) are known to the function.
The function loosely parses the following formats:
Wed, 09 Feb 1994 22:23:32 GMT -- HTTP format
Tuesday, 08-Feb-94 14:15:29 GMT -- old rfc850 HTTP format
Tuesday, 08-Feb-1994 14:15:29 GMT -- broken rfc850 HTTP format
09 Feb 1994 22:23:32 GMT -- HTTP format (no weekday)
08-Feb-94 14:15:29 GMT -- rfc850 format (no weekday)
08-Feb-1994 14:15:29 GMT -- broken rfc850 format (no weekday)
The parser ignores leading and trailing whitespace. The time may be
absent.
If the year is given with only 2 digits, the function will select the
century that makes the year closest to the current date.
"""
# fast exit for strictly conforming string
m = STRICT_DATE_RE.search(text)
if m:
g = m.groups()
mon = MONTHS_LOWER.index(g[1].lower()) + 1
tt = (int(g[2]), mon, int(g[0]),
int(g[3]), int(g[4]), float(g[5]))
return _timegm(tt)
# No, we need some messy parsing...
# clean up
text = text.lstrip()
text = WEEKDAY_RE.sub("", text, 1) # Useless weekday
# tz is time zone specifier string
day, mon, yr, hr, min, sec, tz = [None]*7
# loose regexp parse
m = LOOSE_HTTP_DATE_RE.search(text)
if m is not None:
day, mon, yr, hr, min, sec, tz = m.groups()
else:
return None # bad format
return _str2time(day, mon, yr, hr, min, sec, tz)
ISO_DATE_RE = re.compile(
r"""^
(\d{4}) # year
[-\/]?
(\d\d?) # numerical month
[-\/]?
(\d\d?) # day
(?:
(?:\s+|[-:Tt]) # separator before clock
(\d\d?):?(\d\d) # hour:min
(?::?(\d\d(?:\.\d*)?))? # optional seconds (and fractional)
)? # optional clock
\s*
(?:
([-+]?\d\d?:?(:?\d\d)?
|Z|z) # timezone (Z is "zero meridian", i.e. GMT)
\s*
)?$""", re.X | re. ASCII)
def iso2time(text):
"""
As for http2time, but parses the ISO 8601 formats:
1994-02-03 14:15:29 -0100 -- ISO 8601 format
1994-02-03 14:15:29 -- zone is optional
1994-02-03 -- only date
1994-02-03T14:15:29 -- Use T as separator
19940203T141529Z -- ISO 8601 compact format
19940203 -- only date
"""
# clean up
text = text.lstrip()
# tz is time zone specifier string
day, mon, yr, hr, min, sec, tz = [None]*7
# loose regexp parse
m = ISO_DATE_RE.search(text)
if m is not None:
# XXX there's an extra bit of the timezone I'm ignoring here: is
# this the right thing to do?
yr, mon, day, hr, min, sec, tz, _ = m.groups()
else:
return None # bad format
return _str2time(day, mon, yr, hr, min, sec, tz)
# Header parsing
# -----------------------------------------------------------------------------
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
HEADER_TOKEN_RE = re.compile(r"^\s*([^=\s;,]+)")
HEADER_QUOTED_VALUE_RE = re.compile(r"^\s*=\s*\"([^\"\\]*(?:\\.[^\"\\]*)*)\"")
HEADER_VALUE_RE = re.compile(r"^\s*=\s*([^\s;,]*)")
HEADER_ESCAPE_RE = re.compile(r"\\(.)")
def split_header_words(header_values):
r"""Parse header values into a list of lists containing key,value pairs.
The function knows how to deal with ",", ";" and "=" as well as quoted
values after "=". A list of space separated tokens are parsed as if they
were separated by ";".
If the header_values passed as argument contains multiple values, then they
are treated as if they were a single value separated by comma ",".
This means that this function is useful for parsing header fields that
follow this syntax (BNF as from the HTTP/1.1 specification, but we relax
the requirement for tokens).
headers = #header
header = (token | parameter) *( [";"] (token | parameter))
token = 1*<any CHAR except CTLs or separators>
separators = "(" | ")" | "<" | ">" | "@"
| "," | ";" | ":" | "\" | <">
| "/" | "[" | "]" | "?" | "="
| "{" | "}" | SP | HT
quoted-string = ( <"> *(qdtext | quoted-pair ) <"> )
qdtext = <any TEXT except <">>
quoted-pair = "\" CHAR
parameter = attribute "=" value
attribute = token
value = token | quoted-string
Each header is represented by a list of key/value pairs. The value for a
simple token (not part of a parameter) is None. Syntactically incorrect
headers will not necessarily be parsed as you would want.
This is easier to describe with some examples:
>>> split_header_words(['foo="bar"; port="80,81"; discard, bar=baz'])
[[('foo', 'bar'), ('port', '80,81'), ('discard', None)], [('bar', 'baz')]]
>>> split_header_words(['text/html; charset="iso-8859-1"'])
[[('text/html', None), ('charset', 'iso-8859-1')]]
>>> split_header_words([r'Basic realm="\"foo\bar\""'])
[[('Basic', None), ('realm', '"foobar"')]]
"""
assert not isinstance(header_values, str)
result = []
for text in header_values:
orig_text = text
pairs = []
while text:
m = HEADER_TOKEN_RE.search(text)
if m:
text = unmatched(m)
name = m.group(1)
m = HEADER_QUOTED_VALUE_RE.search(text)
if m: # quoted value
text = unmatched(m)
value = m.group(1)
value = HEADER_ESCAPE_RE.sub(r"\1", value)
else:
m = HEADER_VALUE_RE.search(text)
if m: # unquoted value
text = unmatched(m)
value = m.group(1)
value = value.rstrip()
else:
# no value, a lone token
value = None
pairs.append((name, value))
elif text.lstrip().startswith(","):
# concatenated headers, as per RFC 2616 section 4.2
text = text.lstrip()[1:]
if pairs: result.append(pairs)
pairs = []
else:
# skip junk
non_junk, nr_junk_chars = re.subn(r"^[=\s;]*", "", text)
assert nr_junk_chars > 0, (
"split_header_words bug: '%s', '%s', %s" %
(orig_text, text, pairs))
text = non_junk
if pairs: result.append(pairs)
return result
HEADER_JOIN_ESCAPE_RE = re.compile(r"([\"\\])")
def join_header_words(lists):
"""Do the inverse (almost) of the conversion done by split_header_words.
Takes a list of lists of (key, value) pairs and produces a single header
value. Attribute values are quoted if needed.
>>> join_header_words([[("text/plain", None), ("charset", "iso-8859-1")]])
'text/plain; charset="iso-8859-1"'
>>> join_header_words([[("text/plain", None)], [("charset", "iso-8859-1")]])
'text/plain, charset="iso-8859-1"'
"""
headers = []
for pairs in lists:
attr = []
for k, v in pairs:
if v is not None:
if not re.search(r"^\w+$", v):
v = HEADER_JOIN_ESCAPE_RE.sub(r"\\\1", v) # escape " and \
v = '"%s"' % v
k = "%s=%s" % (k, v)
attr.append(k)
if attr: headers.append("; ".join(attr))
return ", ".join(headers)
def strip_quotes(text):
if text.startswith('"'):
text = text[1:]
if text.endswith('"'):
text = text[:-1]
return text
def parse_ns_headers(ns_headers):
"""Ad-hoc parser for Netscape protocol cookie-attributes.
The old Netscape cookie format for Set-Cookie can for instance contain
an unquoted "," in the expires field, so we have to use this ad-hoc
parser instead of split_header_words.
XXX This may not make the best possible effort to parse all the crap
that Netscape Cookie headers contain. Ronald Tschalar's HTTPClient
parser is probably better, so could do worse than following that if
this ever gives any trouble.
Currently, this is also used for parsing RFC 2109 cookies.
"""
known_attrs = ("expires", "domain", "path", "secure",
# RFC 2109 attrs (may turn up in Netscape cookies, too)
"version", "port", "max-age")
result = []
for ns_header in ns_headers:
pairs = []
version_set = False
# XXX: The following does not strictly adhere to RFCs in that empty
# names and values are legal (the former will only appear once and will
# be overwritten if multiple occurrences are present). This is
# mostly to deal with backwards compatibility.
for ii, param in enumerate(ns_header.split(';')):
param = param.strip()
key, sep, val = param.partition('=')
key = key.strip()
if not key:
if ii == 0:
break
else:
continue
# allow for a distinction between present and empty and missing
# altogether
val = val.strip() if sep else None
if ii != 0:
lc = key.lower()
if lc in known_attrs:
key = lc
if key == "version":
# This is an RFC 2109 cookie.
if val is not None:
val = strip_quotes(val)
version_set = True
elif key == "expires":
# convert expires date to seconds since epoch
if val is not None:
val = http2time(strip_quotes(val)) # None if invalid
pairs.append((key, val))
if pairs:
if not version_set:
pairs.append(("version", "0"))
result.append(pairs)
return result
IPV4_RE = re.compile(r"\.\d+$", re.ASCII)
def is_HDN(text):
"""Return True if text is a host domain name."""
# XXX
# This may well be wrong. Which RFC is HDN defined in, if any (for
# the purposes of RFC 2965)?
# For the current implementation, what about IPv6? Remember to look
# at other uses of IPV4_RE also, if change this.
if IPV4_RE.search(text):
return False
if text == "":
return False
if text[0] == "." or text[-1] == ".":
return False
return True
def domain_match(A, B):
"""Return True if domain A domain-matches domain B, according to RFC 2965.
A and B may be host domain names or IP addresses.
RFC 2965, section 1:
Host names can be specified either as an IP address or a HDN string.
Sometimes we compare one host name with another. (Such comparisons SHALL
be case-insensitive.) Host A's name domain-matches host B's if
* their host name strings string-compare equal; or
* A is a HDN string and has the form NB, where N is a non-empty
name string, B has the form .B', and B' is a HDN string. (So,
x.y.com domain-matches .Y.com but not Y.com.)
Note that domain-match is not a commutative operation: a.b.c.com
domain-matches .c.com, but not the reverse.
"""
# Note that, if A or B are IP addresses, the only relevant part of the
# definition of the domain-match algorithm is the direct string-compare.
A = A.lower()
B = B.lower()
if A == B:
return True
if not is_HDN(A):
return False
i = A.rfind(B)
if i == -1 or i == 0:
# A does not have form NB, or N is the empty string
return False
if not B.startswith("."):
return False
if not is_HDN(B[1:]):
return False
return True
def liberal_is_HDN(text):
"""Return True if text is a sort-of-like a host domain name.
For accepting/blocking domains.
"""
if IPV4_RE.search(text):
return False
return True
def user_domain_match(A, B):
"""For blocking/accepting domains.
A and B may be host domain names or IP addresses.
"""
A = A.lower()
B = B.lower()
if not (liberal_is_HDN(A) and liberal_is_HDN(B)):
if A == B:
# equal IP addresses
return True
return False
initial_dot = B.startswith(".")
if initial_dot and A.endswith(B):
return True
if not initial_dot and A == B:
return True
return False
cut_port_re = re.compile(r":\d+$", re.ASCII)
def request_host(request):
"""Return request-host, as defined by RFC 2965.
Variation from RFC: returned value is lowercased, for convenient
comparison.
"""
url = request.get_full_url()
host = urllib.parse.urlparse(url)[1]
if host == "":
host = request.get_header("Host", "")
# remove port, if present
host = cut_port_re.sub("", host, 1)
return host.lower()
def eff_request_host(request):
"""Return a tuple (request-host, effective request-host name).
As defined by RFC 2965, except both are lowercased.
"""
erhn = req_host = request_host(request)
if req_host.find(".") == -1 and not IPV4_RE.search(req_host):
erhn = req_host + ".local"
return req_host, erhn
def request_path(request):
"""Path component of request-URI, as defined by RFC 2965."""
url = request.get_full_url()
parts = urllib.parse.urlsplit(url)
path = escape_path(parts.path)
if not path.startswith("/"):
# fix bad RFC 2396 absoluteURI
path = "/" + path
return path
def request_port(request):
host = request.host
i = host.find(':')
if i >= 0:
port = host[i+1:]
try:
int(port)
except ValueError:
_debug("nonnumeric port: '%s'", port)
return None
else:
port = DEFAULT_HTTP_PORT
return port
# Characters in addition to A-Z, a-z, 0-9, '_', '.', and '-' that don't
# need to be escaped to form a valid HTTP URL (RFCs 2396 and 1738).
HTTP_PATH_SAFE = "%/;:@&=+$,!~*'()"
ESCAPED_CHAR_RE = re.compile(r"%([0-9a-fA-F][0-9a-fA-F])")
def uppercase_escaped_char(match):
return "%%%s" % match.group(1).upper()
def escape_path(path):
"""Escape any invalid characters in HTTP URL, and uppercase all escapes."""
# There's no knowing what character encoding was used to create URLs
# containing %-escapes, but since we have to pick one to escape invalid
# path characters, we pick UTF-8, as recommended in the HTML 4.0
# specification:
# http://www.w3.org/TR/REC-html40/appendix/notes.html#h-B.2.1
# And here, kind of: draft-fielding-uri-rfc2396bis-03
# (And in draft IRI specification: draft-duerst-iri-05)
# (And here, for new URI schemes: RFC 2718)
path = urllib.parse.quote(path, HTTP_PATH_SAFE)
path = ESCAPED_CHAR_RE.sub(uppercase_escaped_char, path)
return path
def reach(h):
"""Return reach of host h, as defined by RFC 2965, section 1.
The reach R of a host name H is defined as follows:
* If
- H is the host domain name of a host; and,
- H has the form A.B; and
- A has no embedded (that is, interior) dots; and
- B has at least one embedded dot, or B is the string "local".
then the reach of H is .B.
* Otherwise, the reach of H is H.
>>> reach("www.acme.com")
'.acme.com'
>>> reach("acme.com")
'acme.com'
>>> reach("acme.local")
'.local'
"""
i = h.find(".")
if i >= 0:
#a = h[:i] # this line is only here to show what a is
b = h[i+1:]
i = b.find(".")
if is_HDN(h) and (i >= 0 or b == "local"):
return "."+b
return h
def is_third_party(request):
"""
RFC 2965, section 3.3.6:
An unverifiable transaction is to a third-party host if its request-
host U does not domain-match the reach R of the request-host O in the
origin transaction.
"""
req_host = request_host(request)
if not domain_match(req_host, reach(request.origin_req_host)):
return True
else:
return False
class Cookie:
"""HTTP Cookie.
This class represents both Netscape and RFC 2965 cookies.
This is deliberately a very simple class. It just holds attributes. It's
possible to construct Cookie instances that don't comply with the cookie
standards. CookieJar.make_cookies is the factory function for Cookie
objects -- it deals with cookie parsing, supplying defaults, and
normalising to the representation used in this class. CookiePolicy is
responsible for checking them to see whether they should be accepted from
and returned to the server.
Note that the port may be present in the headers, but unspecified ("Port"
rather than"Port=80", for example); if this is the case, port is None.
"""
def __init__(self, version, name, value,
port, port_specified,
domain, domain_specified, domain_initial_dot,
path, path_specified,
secure,
expires,
discard,
comment,
comment_url,
rest,
rfc2109=False,
):
if version is not None: version = int(version)
if expires is not None: expires = int(float(expires))
if port is None and port_specified is True:
raise ValueError("if port is None, port_specified must be false")
self.version = version
self.name = name
self.value = value
self.port = port
self.port_specified = port_specified
# normalise case, as per RFC 2965 section 3.3.3
self.domain = domain.lower()
self.domain_specified = domain_specified
# Sigh. We need to know whether the domain given in the
# cookie-attribute had an initial dot, in order to follow RFC 2965
# (as clarified in draft errata). Needed for the returned $Domain
# value.
self.domain_initial_dot = domain_initial_dot
self.path = path
self.path_specified = path_specified
self.secure = secure
self.expires = expires
self.discard = discard
self.comment = comment
self.comment_url = comment_url
self.rfc2109 = rfc2109
self._rest = copy.copy(rest)
def has_nonstandard_attr(self, name):
return name in self._rest
def get_nonstandard_attr(self, name, default=None):
return self._rest.get(name, default)
def set_nonstandard_attr(self, name, value):
self._rest[name] = value
def is_expired(self, now=None):
if now is None: now = time.time()
if (self.expires is not None) and (self.expires <= now):
return True
return False
def __str__(self):
if self.port is None: p = ""
else: p = ":"+self.port
limit = self.domain + p + self.path
if self.value is not None:
namevalue = "%s=%s" % (self.name, self.value)
else:
namevalue = self.name
return "<Cookie %s for %s>" % (namevalue, limit)
def __repr__(self):
args = []
for name in ("version", "name", "value",
"port", "port_specified",
"domain", "domain_specified", "domain_initial_dot",
"path", "path_specified",
"secure", "expires", "discard", "comment", "comment_url",
):
attr = getattr(self, name)
args.append("%s=%s" % (name, repr(attr)))
args.append("rest=%s" % repr(self._rest))
args.append("rfc2109=%s" % repr(self.rfc2109))
return "%s(%s)" % (self.__class__.__name__, ", ".join(args))
class CookiePolicy:
"""Defines which cookies get accepted from and returned to server.
May also modify cookies, though this is probably a bad idea.
The subclass DefaultCookiePolicy defines the standard rules for Netscape
and RFC 2965 cookies -- override that if you want a customized policy.
"""
def set_ok(self, cookie, request):
"""Return true if (and only if) cookie should be accepted from server.
Currently, pre-expired cookies never get this far -- the CookieJar
class deletes such cookies itself.
"""
raise NotImplementedError()
def return_ok(self, cookie, request):
"""Return true if (and only if) cookie should be returned to server."""
raise NotImplementedError()
def domain_return_ok(self, domain, request):
"""Return false if cookies should not be returned, given cookie domain.
"""
return True
def path_return_ok(self, path, request):
"""Return false if cookies should not be returned, given cookie path.
"""
return True
class DefaultCookiePolicy(CookiePolicy):
"""Implements the standard rules for accepting and returning cookies."""
DomainStrictNoDots = 1
DomainStrictNonDomain = 2
DomainRFC2965Match = 4
DomainLiberal = 0
DomainStrict = DomainStrictNoDots|DomainStrictNonDomain
def __init__(self,
blocked_domains=None, allowed_domains=None,
netscape=True, rfc2965=False,
rfc2109_as_netscape=None,
hide_cookie2=False,
strict_domain=False,
strict_rfc2965_unverifiable=True,
strict_ns_unverifiable=False,
strict_ns_domain=DomainLiberal,
strict_ns_set_initial_dollar=False,
strict_ns_set_path=False,
secure_protocols=("https", "wss")
):
"""Constructor arguments should be passed as keyword arguments only."""
self.netscape = netscape
self.rfc2965 = rfc2965
self.rfc2109_as_netscape = rfc2109_as_netscape
self.hide_cookie2 = hide_cookie2
self.strict_domain = strict_domain
self.strict_rfc2965_unverifiable = strict_rfc2965_unverifiable
self.strict_ns_unverifiable = strict_ns_unverifiable
self.strict_ns_domain = strict_ns_domain
self.strict_ns_set_initial_dollar = strict_ns_set_initial_dollar
self.strict_ns_set_path = strict_ns_set_path
self.secure_protocols = secure_protocols
if blocked_domains is not None:
self._blocked_domains = tuple(blocked_domains)
else:
self._blocked_domains = ()
if allowed_domains is not None:
allowed_domains = tuple(allowed_domains)
self._allowed_domains = allowed_domains
def blocked_domains(self):
"""Return the sequence of blocked domains (as a tuple)."""
return self._blocked_domains
def set_blocked_domains(self, blocked_domains):
"""Set the sequence of blocked domains."""
self._blocked_domains = tuple(blocked_domains)
def is_blocked(self, domain):
for blocked_domain in self._blocked_domains:
if user_domain_match(domain, blocked_domain):
return True
return False
def allowed_domains(self):
"""Return None, or the sequence of allowed domains (as a tuple)."""
return self._allowed_domains
def set_allowed_domains(self, allowed_domains):
"""Set the sequence of allowed domains, or None."""
if allowed_domains is not None:
allowed_domains = tuple(allowed_domains)
self._allowed_domains = allowed_domains
def is_not_allowed(self, domain):
if self._allowed_domains is None:
return False
for allowed_domain in self._allowed_domains:
if user_domain_match(domain, allowed_domain):
return False
return True
def set_ok(self, cookie, request):
"""
If you override .set_ok(), be sure to call this method. If it returns
false, so should your subclass (assuming your subclass wants to be more
strict about which cookies to accept).
"""
_debug(" - checking cookie %s=%s", cookie.name, cookie.value)
assert cookie.name is not None
for n in "version", "verifiability", "name", "path", "domain", "port":
fn_name = "set_ok_"+n
fn = getattr(self, fn_name)
if not fn(cookie, request):
return False
return True
def set_ok_version(self, cookie, request):
if cookie.version is None:
# Version is always set to 0 by parse_ns_headers if it's a Netscape
# cookie, so this must be an invalid RFC 2965 cookie.
_debug(" Set-Cookie2 without version attribute (%s=%s)",
cookie.name, cookie.value)
return False
if cookie.version > 0 and not self.rfc2965:
_debug(" RFC 2965 cookies are switched off")
return False
elif cookie.version == 0 and not self.netscape:
_debug(" Netscape cookies are switched off")
return False
return True
def set_ok_verifiability(self, cookie, request):
if request.unverifiable and is_third_party(request):
if cookie.version > 0 and self.strict_rfc2965_unverifiable:
_debug(" third-party RFC 2965 cookie during "
"unverifiable transaction")
return False
elif cookie.version == 0 and self.strict_ns_unverifiable:
_debug(" third-party Netscape cookie during "
"unverifiable transaction")
return False
return True
def set_ok_name(self, cookie, request):
# Try and stop servers setting V0 cookies designed to hack other
# servers that know both V0 and V1 protocols.
if (cookie.version == 0 and self.strict_ns_set_initial_dollar and
cookie.name.startswith("$")):
_debug(" illegal name (starts with '$'): '%s'", cookie.name)
return False
return True
def set_ok_path(self, cookie, request):
if cookie.path_specified:
req_path = request_path(request)
if ((cookie.version > 0 or