diff --git a/test/test_downloader_external.py b/test/test_downloader_external.py new file mode 100644 index 00000000000..e5b02ba5a40 --- /dev/null +++ b/test/test_downloader_external.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 + +# Allow direct execution +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import http.cookiejar + +from test.helper import FakeYDL +from yt_dlp.downloader.external import ( + Aria2cFD, + AxelFD, + CurlFD, + FFmpegFD, + HttpieFD, + WgetFD, +) + +TEST_COOKIE = { + 'version': 0, + 'name': 'test', + 'value': 'ytdlp', + 'port': None, + 'port_specified': False, + 'domain': '.example.com', + 'domain_specified': True, + 'domain_initial_dot': False, + 'path': '/', + 'path_specified': True, + 'secure': False, + 'expires': None, + 'discard': False, + 'comment': None, + 'comment_url': None, + 'rest': {}, +} + +TEST_INFO = {'url': 'http://www.example.com/'} + + +class TestHttpieFD(unittest.TestCase): + def test_make_cmd(self): + with FakeYDL() as ydl: + downloader = HttpieFD(ydl, {}) + self.assertEqual( + downloader._make_cmd('test', TEST_INFO), + ['http', '--download', '--output', 'test', 'http://www.example.com/']) + + # Test cookie header is added + ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE)) + self.assertEqual( + downloader._make_cmd('test', TEST_INFO), + ['http', '--download', '--output', 'test', 'http://www.example.com/', 'Cookie:test=ytdlp']) + + +class TestAxelFD(unittest.TestCase): + def test_make_cmd(self): + with FakeYDL() as ydl: + downloader = AxelFD(ydl, {}) + self.assertEqual( + downloader._make_cmd('test', TEST_INFO), + ['axel', '-o', 'test', '--', 'http://www.example.com/']) + + # Test cookie header is added + ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE)) + self.assertEqual( + downloader._make_cmd('test', TEST_INFO), + ['axel', '-o', 'test', 'Cookie: test=ytdlp', '--max-redirect=0', '--', 'http://www.example.com/']) + + +class TestWgetFD(unittest.TestCase): + def test_make_cmd(self): + with FakeYDL() as ydl: + downloader = WgetFD(ydl, {}) + self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO)) + # Test cookiejar tempfile arg is added + ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE)) + self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO)) + + +class TestCurlFD(unittest.TestCase): + def test_make_cmd(self): + with FakeYDL() as ydl: + downloader = CurlFD(ydl, {}) + self.assertNotIn('--cookie-jar', downloader._make_cmd('test', TEST_INFO)) + # Test cookiejar tempfile arg is added + ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE)) + self.assertIn('--cookie-jar', downloader._make_cmd('test', TEST_INFO)) + + +class TestAria2cFD(unittest.TestCase): + def test_make_cmd(self): + with FakeYDL() as ydl: + downloader = Aria2cFD(ydl, {}) + downloader._make_cmd('test', TEST_INFO) + self.assertFalse(hasattr(downloader, '_cookies_tempfile')) + + # Test cookiejar tempfile arg is added + ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE)) + cmd = downloader._make_cmd('test', TEST_INFO) + self.assertIn(f'--load-cookies={downloader._cookies_tempfile}', cmd) + + +@unittest.skipUnless(FFmpegFD.available(), 'ffmpeg not found') +class TestFFmpegFD(unittest.TestCase): + _args = [] + + def _test_cmd(self, args): + self._args = args + + def test_make_cmd(self): + with FakeYDL() as ydl: + downloader = FFmpegFD(ydl, {}) + downloader._debug_cmd = self._test_cmd + + downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'}) + self.assertEqual(self._args, [ + 'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/', + '-c', 'copy', '-f', 'mp4', 'file:test']) + + # Test cookies arg is added + ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE)) + downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'}) + self.assertEqual(self._args, [ + 'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n', + '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test']) + + +if __name__ == '__main__': + unittest.main() diff --git a/yt_dlp/cookies.py b/yt_dlp/cookies.py index f21e4f7e7b8..53fe0ec2d31 100644 --- a/yt_dlp/cookies.py +++ b/yt_dlp/cookies.py @@ -1327,6 +1327,13 @@ def get_cookie_header(self, url): self.add_cookie_header(cookie_req) return cookie_req.get_header('Cookie') + def get_cookies_for_url(self, url): + """Generate a list of Cookie objects for a given url""" + # Policy `_now` attribute must be set before calling `_cookies_for_request` + # Ref: https://github.com/python/cpython/blob/3.7/Lib/http/cookiejar.py#L1360 + self._policy._now = self._now = int(time.time()) + return self._cookies_for_request(urllib.request.Request(escape_url(sanitize_url(url)))) + def clear(self, *args, **kwargs): with contextlib.suppress(KeyError): return super().clear(*args, **kwargs) diff --git a/yt_dlp/downloader/external.py b/yt_dlp/downloader/external.py index f637a100bfa..d4045e58f91 100644 --- a/yt_dlp/downloader/external.py +++ b/yt_dlp/downloader/external.py @@ -1,9 +1,10 @@ import enum import json -import os.path +import os import re import subprocess import sys +import tempfile import time import uuid @@ -42,6 +43,7 @@ class ExternalFD(FragmentFD): def real_download(self, filename, info_dict): self.report_destination(filename) tmpfilename = self.temp_name(filename) + self._cookies_tempfile = None try: started = time.time() @@ -54,6 +56,9 @@ def real_download(self, filename, info_dict): # should take place retval = 0 self.to_screen('[%s] Interrupted by user' % self.get_basename()) + finally: + if self._cookies_tempfile: + self.try_remove(self._cookies_tempfile) if retval == 0: status = { @@ -125,6 +130,16 @@ def _configuration_args(self, keys=None, *args, **kwargs): self.get_basename(), self.params.get('external_downloader_args'), self.EXE_NAME, keys, *args, **kwargs) + def _write_cookies(self): + if not self.ydl.cookiejar.filename: + tmp_cookies = tempfile.NamedTemporaryFile(suffix='.cookies', delete=False) + tmp_cookies.close() + self._cookies_tempfile = tmp_cookies.name + self.to_screen(f'[download] Writing temporary cookies file to "{self._cookies_tempfile}"') + # real_download resets _cookies_tempfile; if it's None then save() will write to cookiejar.filename + self.ydl.cookiejar.save(self._cookies_tempfile) + return self.ydl.cookiejar.filename or self._cookies_tempfile + def _call_downloader(self, tmpfilename, info_dict): """ Either overwrite this or implement _make_cmd """ cmd = [encodeArgument(a) for a in self._make_cmd(tmpfilename, info_dict)] @@ -184,6 +199,8 @@ class CurlFD(ExternalFD): def _make_cmd(self, tmpfilename, info_dict): cmd = [self.exe, '--location', '-o', tmpfilename, '--compressed'] + if self.ydl.cookiejar.get_cookie_header(info_dict['url']): + cmd += ['--cookie-jar', self._write_cookies()] if info_dict.get('http_headers') is not None: for key, val in info_dict['http_headers'].items(): cmd += ['--header', f'{key}: {val}'] @@ -214,6 +231,9 @@ def _make_cmd(self, tmpfilename, info_dict): if info_dict.get('http_headers') is not None: for key, val in info_dict['http_headers'].items(): cmd += ['-H', f'{key}: {val}'] + cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url']) + if cookie_header: + cmd += [f'Cookie: {cookie_header}', '--max-redirect=0'] cmd += self._configuration_args() cmd += ['--', info_dict['url']] return cmd @@ -223,7 +243,9 @@ class WgetFD(ExternalFD): AVAILABLE_OPT = '--version' def _make_cmd(self, tmpfilename, info_dict): - cmd = [self.exe, '-O', tmpfilename, '-nv', '--no-cookies', '--compression=auto'] + cmd = [self.exe, '-O', tmpfilename, '-nv', '--compression=auto'] + if self.ydl.cookiejar.get_cookie_header(info_dict['url']): + cmd += ['--load-cookies', self._write_cookies()] if info_dict.get('http_headers') is not None: for key, val in info_dict['http_headers'].items(): cmd += ['--header', f'{key}: {val}'] @@ -279,6 +301,8 @@ def _make_cmd(self, tmpfilename, info_dict): else: cmd += ['--min-split-size', '1M'] + if self.ydl.cookiejar.get_cookie_header(info_dict['url']): + cmd += [f'--load-cookies={self._write_cookies()}'] if info_dict.get('http_headers') is not None: for key, val in info_dict['http_headers'].items(): cmd += ['--header', f'{key}: {val}'] @@ -417,6 +441,14 @@ def _make_cmd(self, tmpfilename, info_dict): if info_dict.get('http_headers') is not None: for key, val in info_dict['http_headers'].items(): cmd += [f'{key}:{val}'] + + # httpie 3.1.0+ removes the Cookie header on redirect, so this should be safe for now. [1] + # If we ever need cookie handling for redirects, we can export the cookiejar into a session. [2] + # 1: https://github.com/httpie/httpie/security/advisories/GHSA-9w4w-cpc8-h2fq + # 2: https://httpie.io/docs/cli/sessions + cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url']) + if cookie_header: + cmd += [f'Cookie:{cookie_header}'] return cmd @@ -527,6 +559,11 @@ def _call_downloader(self, tmpfilename, info_dict): selected_formats = info_dict.get('requested_formats') or [info_dict] for i, fmt in enumerate(selected_formats): + cookies = self.ydl.cookiejar.get_cookies_for_url(fmt['url']) + if cookies: + args.extend(['-cookies', ''.join( + f'{cookie.name}={cookie.value}; path={cookie.path}; domain={cookie.domain};\r\n' + for cookie in cookies)]) if fmt.get('http_headers') and re.match(r'^https?://', fmt['url']): # Trailing \r\n after each HTTP header is important to prevent warning from ffmpeg/avconv: # [http @ 00000000003d2fa0] No trailing CRLF found in HTTP header.