Skip to content

Commit

Permalink
[core] Fix HTTP headers and cookie handling
Browse files Browse the repository at this point in the history
- Remove `Cookie` header from `http_headers` immediately after loading into cookiejar
- Restore compat for `--load-info-json` cookies
- Add more tests
- Fix improper passing of Cookie header by `MailRu` extractor

Closes #7558
Authored by: bashonly, pukkandan
  • Loading branch information
bashonly committed Jul 15, 2023
1 parent 2b029ca commit 6c5211c
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 33 deletions.
85 changes: 77 additions & 8 deletions test/test_YoutubeDL.py
Expand Up @@ -11,7 +11,7 @@
import copy
import json

from test.helper import FakeYDL, assertRegexpMatches
from test.helper import FakeYDL, assertRegexpMatches, try_rm
from yt_dlp import YoutubeDL
from yt_dlp.compat import compat_os_name
from yt_dlp.extractor import YoutubeIE
Expand All @@ -24,6 +24,8 @@
int_or_none,
match_filter_func,
)
from yt_dlp.utils.traversal import traverse_obj


TEST_URL = 'http://localhost/sample.mp4'

Expand Down Expand Up @@ -1227,10 +1229,10 @@ def cookie(name, value, version=None, domain='', path='', secure=False, expires=

_test_url = 'https://yt.dlp/test'

def test(encoded_cookies, cookies, headers=False, round_trip=None, error=None):
def test(encoded_cookies, cookies, *, headers=False, round_trip=None, error_re=None):
def _test():
ydl.cookiejar.clear()
ydl._load_cookies(encoded_cookies, from_headers=headers)
ydl._load_cookies(encoded_cookies, autoscope=headers)
if headers:
ydl._apply_header_cookies(_test_url)
data = {'url': _test_url}
Expand All @@ -1245,14 +1247,14 @@ def _test():
ydl.__dict__['_YoutubeDL__header_cookies'] = []

with self.subTest(msg=encoded_cookies):
if not error:
if not error_re:
_test()
return
with self.assertRaisesRegex(Exception, error):
with self.assertRaisesRegex(Exception, error_re):
_test()

test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')])
test('test=value', [cookie('test', 'value')], error='Unscoped cookies are not allowed')
test('test=value', [cookie('test', 'value')], error_re=r'Unscoped cookies are not allowed')
test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [
cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'),
cookie('cookie2', 'value2', domain='.yt.dlp', path='/')])
Expand All @@ -1265,9 +1267,76 @@ def _test():
round_trip='name=""; Domain=.yt.dlp')

test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True)
test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error='Invalid syntax')
test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error_re=r'Invalid syntax')
ydl.deprecated_feature = ydl.report_error
test('test=value', [], headers=True, error='Passing cookies as a header is a potential security risk')
test('test=value', [], headers=True, error_re=r'Passing cookies as a header is a potential security risk')

def test_infojson_cookies(self):
TEST_FILE = 'test_infojson_cookies.info.json'
TEST_URL = 'https://example.com/example.mp4'
COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com'
COOKIE_HEADER = {'Cookie': 'a=b; c=d'}

ydl = FakeYDL()
ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE)

def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False):
fmt = {'url': TEST_URL}
if fmts_header_cookies:
fmt['http_headers'] = COOKIE_HEADER
if cookies_field:
fmt['cookies'] = COOKIES
return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None)

def test(initial_info, note):
result = {}
result['processed'] = ydl.process_ie_result(initial_info)
self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
msg=f'No cookies set in cookiejar after initial process when {note}')
ydl.cookiejar.clear()
with open(TEST_FILE) as infojson:
result['loaded'] = ydl.sanitize_info(json.load(infojson), True)
result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False)
self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
msg=f'No cookies set in cookiejar after final process when {note}')
ydl.cookiejar.clear()
for key in ('processed', 'loaded', 'final'):
info = result[key]
self.assertIsNone(
traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False),
msg=f'Cookie header not removed in {key} result when {note}')
self.assertEqual(
traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES,
msg=f'No cookies field found in {key} result when {note}')

test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field')
test(make_info(info_header_cookies=True), 'info_dict header cokies')
test(make_info(fmts_header_cookies=True), 'format header cookies')
test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies')
test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields')
test(make_info(cookies_field=True), 'cookies format field')
test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only')

try_rm(TEST_FILE)

def test_add_headers_cookie(self):
def check_for_cookie_header(result):
return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False)

ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}})
ydl._apply_header_cookies(_make_result([])['webpage_url']) # Scope to input webpage URL: .example.com

fmt = {'url': 'https://example.com/video.mp4'}
result = ydl.process_ie_result(_make_result([fmt]), download=False)
self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict')
self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field')
self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar')

fmt = {'url': 'https://wrong.com/video.mp4'}
result = ydl.process_ie_result(_make_result([fmt]), download=False)
self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain')
self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain')
self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain')


if __name__ == '__main__':
Expand Down
8 changes: 8 additions & 0 deletions test/test_YoutubeDLCookieJar.py
Expand Up @@ -53,6 +53,14 @@ def test_get_cookie_header(self):
header = cookiejar.get_cookie_header('https://www.foobar.foobar')
self.assertIn('HTTPONLY_COOKIE', header)

def test_get_cookies_for_url(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
cookiejar.load(ignore_discard=True, ignore_expires=True)
cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/')
self.assertEqual(len(cookies), 2)
cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/')
self.assertFalse(cookies)


if __name__ == '__main__':
unittest.main()
46 changes: 32 additions & 14 deletions yt_dlp/YoutubeDL.py
Expand Up @@ -680,14 +680,15 @@ def process_color_policy(stream):

self.params['compat_opts'] = set(self.params.get('compat_opts', ()))
self.params['http_headers'] = HTTPHeaderDict(std_headers, self.params.get('http_headers'))
self.__header_cookies = []
self._load_cookies(self.params['http_headers'].get('Cookie')) # compat
self.params['http_headers'].pop('Cookie', None)

self._request_director = self.build_request_director(
sorted(_REQUEST_HANDLERS.values(), key=lambda rh: rh.RH_NAME.lower()))
if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header()

self.__header_cookies = []
self._load_cookies(traverse_obj(self.params.get('http_headers'), 'cookie', casesense=False)) # compat

def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
Expand Down Expand Up @@ -1645,18 +1646,19 @@ def progress(msg):
self.to_screen('')
raise

def _load_cookies(self, data, *, from_headers=True):
def _load_cookies(self, data, *, autoscope=True):
"""Loads cookies from a `Cookie` header
This tries to work around the security vulnerability of passing cookies to every domain.
See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
The unscoped cookies are saved for later to be stored in the jar with a limited scope.
@param data The Cookie header as string to load the cookies from
@param from_headers If `False`, allows Set-Cookie syntax in the cookie string (at least a domain will be required)
@param autoscope If `False`, scope cookies using Set-Cookie syntax and error for cookie without domains
If `True`, save cookies for later to be stored in the jar with a limited scope
If a URL, save cookies in the jar with the domain of the URL
"""
for cookie in LenientSimpleCookie(data).values():
if from_headers and any(cookie.values()):
if autoscope and any(cookie.values()):
raise ValueError('Invalid syntax in Cookie Header')

domain = cookie.get('domain') or ''
Expand All @@ -1670,17 +1672,23 @@ def _load_cookies(self, data, *, from_headers=True):

if domain:
self.cookiejar.set_cookie(prepared_cookie)
elif from_headers:
elif autoscope is True:
self.deprecated_feature(
'Passing cookies as a header is a potential security risk; '
'they will be scoped to the domain of the downloaded urls. '
'Please consider loading cookies from a file or browser instead.')
self.__header_cookies.append(prepared_cookie)
elif autoscope:
self.report_warning(
'The extractor result contains an unscoped cookie as an HTTP header. '
f'If you are using yt-dlp with an input URL{bug_reports_message(before=",")}',
only_once=True)
self._apply_header_cookies(autoscope, [prepared_cookie])
else:
self.report_error('Unscoped cookies are not allowed; please specify some sort of scoping',
tb=False, is_error=False)

def _apply_header_cookies(self, url):
def _apply_header_cookies(self, url, cookies=None):
"""Applies stray header cookies to the provided url
This loads header cookies and scopes them to the domain provided in `url`.
Expand All @@ -1691,7 +1699,7 @@ def _apply_header_cookies(self, url):
if not parsed.hostname:
return

for cookie in map(copy.copy, self.__header_cookies):
for cookie in map(copy.copy, cookies or self.__header_cookies):
cookie.domain = f'.{parsed.hostname}'
self.cookiejar.set_cookie(cookie)

Expand Down Expand Up @@ -2481,9 +2489,16 @@ def restore_last_token(self):
parsed_selector = _parse_format_selection(iter(TokenIterator(tokens)))
return _build_selector_function(parsed_selector)

def _calc_headers(self, info_dict):
def _calc_headers(self, info_dict, load_cookies=False):
res = HTTPHeaderDict(self.params['http_headers'], info_dict.get('http_headers'))
clean_headers(res)

if load_cookies: # For --load-info-json
self._load_cookies(res.get('Cookie'), autoscope=info_dict['url']) # compat
self._load_cookies(info_dict.get('cookies'), autoscope=False)
# The `Cookie` header is removed to prevent leaks and unscoped cookies.
# See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
res.pop('Cookie', None)
cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])
if cookies:
encoder = LenientSimpleCookie()
Expand Down Expand Up @@ -2762,7 +2777,12 @@ def is_wellformed(f):
and info_dict.get('duration') and format.get('tbr')
and not format.get('filesize') and not format.get('filesize_approx')):
format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8))
format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict))
format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict), load_cookies=True)

# Safeguard against old/insecure infojson when using --load-info-json
if info_dict.get('http_headers'):
info_dict['http_headers'] = HTTPHeaderDict(info_dict['http_headers'])
info_dict['http_headers'].pop('Cookie', None)

# This is copied to http_headers by the above _calc_headers and can now be removed
if '__x_forwarded_for_ip' in info_dict:
Expand Down Expand Up @@ -3508,8 +3528,6 @@ def download_with_info_file(self, info_filename):
infos = [self.sanitize_info(info, self.params.get('clean_infojson', True))
for info in variadic(json.loads('\n'.join(f)))]
for info in infos:
self._load_cookies(info.get('cookies'), from_headers=False)
self._load_cookies(traverse_obj(info.get('http_headers'), 'Cookie', casesense=False)) # compat
try:
self.__download_wrapper(self.process_ie_result)(info, download=True)
except (DownloadError, EntryNotInPlaylist, ReExtractInfo) as e:
Expand Down
6 changes: 0 additions & 6 deletions yt_dlp/downloader/common.py
Expand Up @@ -32,7 +32,6 @@
timetuple_from_msec,
try_call,
)
from ..utils.traversal import traverse_obj


class FileDownloader:
Expand Down Expand Up @@ -453,11 +452,6 @@ def download(self, filename, info_dict, subtitle=False):
self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...')
time.sleep(sleep_interval)

# Filter the `Cookie` header from the info_dict to prevent leaks.
# See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
info_dict['http_headers'] = dict(traverse_obj(info_dict, (
'http_headers', {dict.items}, lambda _, pair: pair[0].lower() != 'cookie'))) or None

ret = self.real_download(filename, info_dict)
self._finish_multiline_status()
return ret, True
Expand Down
8 changes: 3 additions & 5 deletions yt_dlp/extractor/mailru.py
@@ -1,6 +1,7 @@
import itertools
import json
import re
import urllib.parse

from .common import InfoExtractor
from ..compat import compat_urllib_parse_unquote
Expand Down Expand Up @@ -140,25 +141,22 @@ def _real_extract(self, url):
'http://api.video.mail.ru/videos/%s.json?new=1' % video_id,
video_id, 'Downloading video JSON')

headers = {}

video_key = self._get_cookies('https://my.mail.ru').get('video_key')
if video_key:
headers['Cookie'] = 'video_key=%s' % video_key.value

formats = []
for f in video_data['videos']:
video_url = f.get('url')
if not video_url:
continue
if video_key:
self._set_cookie(urllib.parse.urlparse(video_url).hostname, 'video_key', video_key.value)
format_id = f.get('key')
height = int_or_none(self._search_regex(
r'^(\d+)[pP]$', format_id, 'height', default=None)) if format_id else None
formats.append({
'url': video_url,
'format_id': format_id,
'height': height,
'http_headers': headers,
})

meta_data = video_data['meta']
Expand Down

0 comments on commit 6c5211c

Please sign in to comment.