From d0dce62a8a35bd4c75f77d389781424d4f705c07 Mon Sep 17 00:00:00 2001 From: Josh Schneier Date: Sat, 21 Oct 2023 20:44:41 -0400 Subject: [PATCH] [ftp] support passwords with urlchars (#1329) --- storages/backends/ftp.py | 47 ++++++++++++++++++++-------------------- tests/test_ftp.py | 24 +++++++++++--------- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/storages/backends/ftp.py b/storages/backends/ftp.py index 05f639f1..4d41d375 100644 --- a/storages/backends/ftp.py +++ b/storages/backends/ftp.py @@ -18,8 +18,8 @@ import ftplib import io import os -from urllib.parse import urljoin -from urllib.parse import urlparse +import re +import urllib.parse from django.conf import settings from django.core.exceptions import ImproperlyConfigured @@ -55,29 +55,28 @@ def __init__(self, location=None, base_url=None, encoding=None): def _decode_location(self, location): """Return splitted configuration data from location.""" - splitted_url = urlparse(location) - config = {} - - if splitted_url.scheme not in ("ftp", "aftp", "ftps"): - raise ImproperlyConfigured("FTPStorage works only with FTP protocol!") - if splitted_url.hostname == "": - raise ImproperlyConfigured("You must at least provide hostname!") + splitted_url = re.search( + r"^(?P.+)://(?P.+):(?P.+)@" + r"(?P.+):(?P\d+)/(?P.*)$", + location, + ) + + if splitted_url is None: + raise ImproperlyConfigured("Improperly formatted location URL") + if splitted_url["scheme"] not in ("ftp", "aftp", "ftps"): + raise ImproperlyConfigured("Only ftp, aftp, ftps schemes supported") + if splitted_url["host"] == "": + raise ImproperlyConfigured("You must at least provide host!") - if splitted_url.scheme == "aftp": - config["active"] = True - else: - config["active"] = False - - if splitted_url.scheme == "ftps": - config["secure"] = True - else: - config["secure"] = False + config = {} + config["active"] = splitted_url["scheme"] == "aftp" + config["secure"] = splitted_url["scheme"] == "ftps" - config["path"] = splitted_url.path - config["host"] = splitted_url.hostname - config["user"] = splitted_url.username - config["passwd"] = splitted_url.password - config["port"] = int(splitted_url.port) + config["path"] = splitted_url["path"] or "/" + config["host"] = splitted_url["host"] + config["user"] = splitted_url["user"] + config["passwd"] = splitted_url["passwd"] + config["port"] = int(splitted_url["port"]) return config @@ -235,7 +234,7 @@ def size(self, name): def url(self, name): if self._base_url is None: raise ValueError("This file is not accessible via a URL.") - return urljoin(self._base_url, name).replace("\\", "/") + return urllib.parse.urljoin(self._base_url, name).replace("\\", "/") class FTPStorageFile(File): diff --git a/tests/test_ftp.py b/tests/test_ftp.py index d472f96e..ecfb5167 100644 --- a/tests/test_ftp.py +++ b/tests/test_ftp.py @@ -11,18 +11,22 @@ PASSWORD = "b@r" HOST = "localhost" PORT = 2121 -URL = "ftp://{user}:{passwd}@{host}:{port}/".format( - user=USER, passwd=PASSWORD, host=HOST, port=PORT -) -URL_TLS = "ftps://{user}:{passwd}@{host}:{port}/".format( - user=USER, passwd=PASSWORD, host=HOST, port=PORT -) LIST_FIXTURE = """drwxr-xr-x 2 ftp nogroup 4096 Jul 27 09:46 dir -rw-r--r-- 1 ftp nogroup 1024 Jul 27 09:45 fi -rw-r--r-- 1 ftp nogroup 2048 Jul 27 09:50 fi2""" +def geturl(scheme="ftp", pwd=PASSWORD): + return URL_TEMPLATE.format( + scheme=scheme, user=USER, passwd=pwd, host=HOST, port=PORT + ) + + +URL_TEMPLATE = "{scheme}://{user}:{passwd}@{host}:{port}/" +URL = geturl() + + def list_retrlines(cmd, func): for line in LIST_FIXTURE.splitlines(): func(line) @@ -72,9 +76,9 @@ def test_decode_location_error(self): self.storage._decode_location("foo") with self.assertRaises(ImproperlyConfigured): self.storage._decode_location("http://foo.pt") - # TODO: Cannot not provide a port - # with self.assertRaises(ImproperlyConfigured): - # self.storage._decode_location('ftp://') + + def test_decode_location_urlchars_password(self): + self.storage._decode_location(geturl(pwd="b#r")) @patch("ftplib.FTP") def test_start_connection(self, mock_ftp): @@ -248,7 +252,7 @@ def test_close(self, mock_ftp, mock_storage): class FTPTLSTest(TestCase): def setUp(self): - self.storage = ftp.FTPStorage(location=URL_TLS) + self.storage = ftp.FTPStorage(location=geturl(scheme="ftps")) def test_decode_location(self): wanted_config = {