From 6c5211cebeacfc53ad5d5ddf4a659be76039656f Mon Sep 17 00:00:00 2001 From: bashonly Date: Sat, 15 Jul 2023 15:22:10 -0500 Subject: [PATCH] [core] Fix HTTP headers and cookie handling - Remove `Cookie` header from `http_headers` immediately after loading into cookiejar - Restore compat for `--load-info-json` cookies - Add more tests - Fix improper passing of Cookie header by `MailRu` extractor Closes #7558 Authored by: bashonly, pukkandan --- test/test_YoutubeDL.py | 85 +++++++++++++++++++++++++++++---- test/test_YoutubeDLCookieJar.py | 8 ++++ yt_dlp/YoutubeDL.py | 46 ++++++++++++------ yt_dlp/downloader/common.py | 6 --- yt_dlp/extractor/mailru.py | 8 ++-- 5 files changed, 120 insertions(+), 33 deletions(-) diff --git a/test/test_YoutubeDL.py b/test/test_YoutubeDL.py index c15c7704c54..b4f770ca585 100644 --- a/test/test_YoutubeDL.py +++ b/test/test_YoutubeDL.py @@ -11,7 +11,7 @@ import copy import json -from test.helper import FakeYDL, assertRegexpMatches +from test.helper import FakeYDL, assertRegexpMatches, try_rm from yt_dlp import YoutubeDL from yt_dlp.compat import compat_os_name from yt_dlp.extractor import YoutubeIE @@ -24,6 +24,8 @@ int_or_none, match_filter_func, ) +from yt_dlp.utils.traversal import traverse_obj + TEST_URL = 'http://localhost/sample.mp4' @@ -1227,10 +1229,10 @@ def cookie(name, value, version=None, domain='', path='', secure=False, expires= _test_url = 'https://yt.dlp/test' - def test(encoded_cookies, cookies, headers=False, round_trip=None, error=None): + def test(encoded_cookies, cookies, *, headers=False, round_trip=None, error_re=None): def _test(): ydl.cookiejar.clear() - ydl._load_cookies(encoded_cookies, from_headers=headers) + ydl._load_cookies(encoded_cookies, autoscope=headers) if headers: ydl._apply_header_cookies(_test_url) data = {'url': _test_url} @@ -1245,14 +1247,14 @@ def _test(): ydl.__dict__['_YoutubeDL__header_cookies'] = [] with self.subTest(msg=encoded_cookies): - if not error: + if not error_re: _test() return - with self.assertRaisesRegex(Exception, error): + with self.assertRaisesRegex(Exception, error_re): _test() test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')]) - test('test=value', [cookie('test', 'value')], error='Unscoped cookies are not allowed') + test('test=value', [cookie('test', 'value')], error_re=r'Unscoped cookies are not allowed') test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [ cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'), cookie('cookie2', 'value2', domain='.yt.dlp', path='/')]) @@ -1265,9 +1267,76 @@ def _test(): round_trip='name=""; Domain=.yt.dlp') test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True) - test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error='Invalid syntax') + test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error_re=r'Invalid syntax') ydl.deprecated_feature = ydl.report_error - test('test=value', [], headers=True, error='Passing cookies as a header is a potential security risk') + test('test=value', [], headers=True, error_re=r'Passing cookies as a header is a potential security risk') + + def test_infojson_cookies(self): + TEST_FILE = 'test_infojson_cookies.info.json' + TEST_URL = 'https://example.com/example.mp4' + COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com' + COOKIE_HEADER = {'Cookie': 'a=b; c=d'} + + ydl = FakeYDL() + ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE) + + def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False): + fmt = {'url': TEST_URL} + if fmts_header_cookies: + fmt['http_headers'] = COOKIE_HEADER + if cookies_field: + fmt['cookies'] = COOKIES + return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None) + + def test(initial_info, note): + result = {} + result['processed'] = ydl.process_ie_result(initial_info) + self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL), + msg=f'No cookies set in cookiejar after initial process when {note}') + ydl.cookiejar.clear() + with open(TEST_FILE) as infojson: + result['loaded'] = ydl.sanitize_info(json.load(infojson), True) + result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False) + self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL), + msg=f'No cookies set in cookiejar after final process when {note}') + ydl.cookiejar.clear() + for key in ('processed', 'loaded', 'final'): + info = result[key] + self.assertIsNone( + traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False), + msg=f'Cookie header not removed in {key} result when {note}') + self.assertEqual( + traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES, + msg=f'No cookies field found in {key} result when {note}') + + test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field') + test(make_info(info_header_cookies=True), 'info_dict header cokies') + test(make_info(fmts_header_cookies=True), 'format header cookies') + test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies') + test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields') + test(make_info(cookies_field=True), 'cookies format field') + test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only') + + try_rm(TEST_FILE) + + def test_add_headers_cookie(self): + def check_for_cookie_header(result): + return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False) + + ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}}) + ydl._apply_header_cookies(_make_result([])['webpage_url']) # Scope to input webpage URL: .example.com + + fmt = {'url': 'https://example.com/video.mp4'} + result = ydl.process_ie_result(_make_result([fmt]), download=False) + self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict') + self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field') + self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar') + + fmt = {'url': 'https://wrong.com/video.mp4'} + result = ydl.process_ie_result(_make_result([fmt]), download=False) + self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain') + self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain') + self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain') if __name__ == '__main__': diff --git a/test/test_YoutubeDLCookieJar.py b/test/test_YoutubeDLCookieJar.py index 2c73d7d8534..0b7a0acdb51 100644 --- a/test/test_YoutubeDLCookieJar.py +++ b/test/test_YoutubeDLCookieJar.py @@ -53,6 +53,14 @@ def test_get_cookie_header(self): header = cookiejar.get_cookie_header('https://www.foobar.foobar') self.assertIn('HTTPONLY_COOKIE', header) + def test_get_cookies_for_url(self): + cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt') + cookiejar.load(ignore_discard=True, ignore_expires=True) + cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/') + self.assertEqual(len(cookies), 2) + cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/') + self.assertFalse(cookies) + if __name__ == '__main__': unittest.main() diff --git a/yt_dlp/YoutubeDL.py b/yt_dlp/YoutubeDL.py index c49960782d9..1a2f42fe9a4 100644 --- a/yt_dlp/YoutubeDL.py +++ b/yt_dlp/YoutubeDL.py @@ -680,14 +680,15 @@ def process_color_policy(stream): self.params['compat_opts'] = set(self.params.get('compat_opts', ())) self.params['http_headers'] = HTTPHeaderDict(std_headers, self.params.get('http_headers')) + self.__header_cookies = [] + self._load_cookies(self.params['http_headers'].get('Cookie')) # compat + self.params['http_headers'].pop('Cookie', None) + self._request_director = self.build_request_director( sorted(_REQUEST_HANDLERS.values(), key=lambda rh: rh.RH_NAME.lower())) if auto_init and auto_init != 'no_verbose_header': self.print_debug_header() - self.__header_cookies = [] - self._load_cookies(traverse_obj(self.params.get('http_headers'), 'cookie', casesense=False)) # compat - def check_deprecated(param, option, suggestion): if self.params.get(param) is not None: self.report_warning(f'{option} is deprecated. Use {suggestion} instead') @@ -1645,18 +1646,19 @@ def progress(msg): self.to_screen('') raise - def _load_cookies(self, data, *, from_headers=True): + def _load_cookies(self, data, *, autoscope=True): """Loads cookies from a `Cookie` header This tries to work around the security vulnerability of passing cookies to every domain. See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj - The unscoped cookies are saved for later to be stored in the jar with a limited scope. @param data The Cookie header as string to load the cookies from - @param from_headers If `False`, allows Set-Cookie syntax in the cookie string (at least a domain will be required) + @param autoscope If `False`, scope cookies using Set-Cookie syntax and error for cookie without domains + If `True`, save cookies for later to be stored in the jar with a limited scope + If a URL, save cookies in the jar with the domain of the URL """ for cookie in LenientSimpleCookie(data).values(): - if from_headers and any(cookie.values()): + if autoscope and any(cookie.values()): raise ValueError('Invalid syntax in Cookie Header') domain = cookie.get('domain') or '' @@ -1670,17 +1672,23 @@ def _load_cookies(self, data, *, from_headers=True): if domain: self.cookiejar.set_cookie(prepared_cookie) - elif from_headers: + elif autoscope is True: self.deprecated_feature( 'Passing cookies as a header is a potential security risk; ' 'they will be scoped to the domain of the downloaded urls. ' 'Please consider loading cookies from a file or browser instead.') self.__header_cookies.append(prepared_cookie) + elif autoscope: + self.report_warning( + 'The extractor result contains an unscoped cookie as an HTTP header. ' + f'If you are using yt-dlp with an input URL{bug_reports_message(before=",")}', + only_once=True) + self._apply_header_cookies(autoscope, [prepared_cookie]) else: self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping', tb=False, is_error=False) - def _apply_header_cookies(self, url): + def _apply_header_cookies(self, url, cookies=None): """Applies stray header cookies to the provided url This loads header cookies and scopes them to the domain provided in `url`. @@ -1691,7 +1699,7 @@ def _apply_header_cookies(self, url): if not parsed.hostname: return - for cookie in map(copy.copy, self.__header_cookies): + for cookie in map(copy.copy, cookies or self.__header_cookies): cookie.domain = f'.{parsed.hostname}' self.cookiejar.set_cookie(cookie) @@ -2481,9 +2489,16 @@ def restore_last_token(self): parsed_selector = _parse_format_selection(iter(TokenIterator(tokens))) return _build_selector_function(parsed_selector) - def _calc_headers(self, info_dict): + def _calc_headers(self, info_dict, load_cookies=False): res = HTTPHeaderDict(self.params['http_headers'], info_dict.get('http_headers')) clean_headers(res) + + if load_cookies: # For --load-info-json + self._load_cookies(res.get('Cookie'), autoscope=info_dict['url']) # compat + self._load_cookies(info_dict.get('cookies'), autoscope=False) + # The `Cookie` header is removed to prevent leaks and unscoped cookies. + # See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj + res.pop('Cookie', None) cookies = self.cookiejar.get_cookies_for_url(info_dict['url']) if cookies: encoder = LenientSimpleCookie() @@ -2762,7 +2777,12 @@ def is_wellformed(f): and info_dict.get('duration') and format.get('tbr') and not format.get('filesize') and not format.get('filesize_approx')): format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8)) - format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict)) + format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict), load_cookies=True) + + # Safeguard against old/insecure infojson when using --load-info-json + if info_dict.get('http_headers'): + info_dict['http_headers'] = HTTPHeaderDict(info_dict['http_headers']) + info_dict['http_headers'].pop('Cookie', None) # This is copied to http_headers by the above _calc_headers and can now be removed if '__x_forwarded_for_ip' in info_dict: @@ -3508,8 +3528,6 @@ def download_with_info_file(self, info_filename): infos = [self.sanitize_info(info, self.params.get('clean_infojson', True)) for info in variadic(json.loads('\n'.join(f)))] for info in infos: - self._load_cookies(info.get('cookies'), from_headers=False) - self._load_cookies(traverse_obj(info.get('http_headers'), 'Cookie', casesense=False)) # compat try: self.__download_wrapper(self.process_ie_result)(info, download=True) except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e: diff --git a/yt_dlp/downloader/common.py b/yt_dlp/downloader/common.py index 2c404ee9022..b71d7ee8f2a 100644 --- a/yt_dlp/downloader/common.py +++ b/yt_dlp/downloader/common.py @@ -32,7 +32,6 @@ timetuple_from_msec, try_call, ) -from ..utils.traversal import traverse_obj class FileDownloader: @@ -453,11 +452,6 @@ def download(self, filename, info_dict, subtitle=False): self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...') time.sleep(sleep_interval) - # Filter the `Cookie` header from the info_dict to prevent leaks. - # See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj - info_dict['http_headers'] = dict(traverse_obj(info_dict, ( - 'http_headers', {dict.items}, lambda _, pair: pair[0].lower() != 'cookie'))) or None - ret = self.real_download(filename, info_dict) self._finish_multiline_status() return ret, True diff --git a/yt_dlp/extractor/mailru.py b/yt_dlp/extractor/mailru.py index 387d211fe11..0f0550c921b 100644 --- a/yt_dlp/extractor/mailru.py +++ b/yt_dlp/extractor/mailru.py @@ -1,6 +1,7 @@ import itertools import json import re +import urllib.parse from .common import InfoExtractor from ..compat import compat_urllib_parse_unquote @@ -140,17 +141,15 @@ def _real_extract(self, url): 'http://api.video.mail.ru/videos/%s.json?new=1' % video_id, video_id, 'Downloading video JSON') - headers = {} - video_key = self._get_cookies('https://my.mail.ru').get('video_key') - if video_key: - headers['Cookie'] = 'video_key=%s' % video_key.value formats = [] for f in video_data['videos']: video_url = f.get('url') if not video_url: continue + if video_key: + self._set_cookie(urllib.parse.urlparse(video_url).hostname, 'video_key', video_key.value) format_id = f.get('key') height = int_or_none(self._search_regex( r'^(\d+)[pP]$', format_id, 'height', default=None)) if format_id else None @@ -158,7 +157,6 @@ def _real_extract(self, url): 'url': video_url, 'format_id': format_id, 'height': height, - 'http_headers': headers, }) meta_data = video_data['meta']