From b1fdd541d425ed22194630ccfd50eed0b50607eb Mon Sep 17 00:00:00 2001 From: Pavel Feldman Date: Tue, 19 Jan 2021 16:29:32 -0800 Subject: [PATCH] test: add some proxy tests --- tests/async/conftest.py | 28 ++++- tests/async/test_browsercontext_proxy.py | 123 ++++++++++++++++++++++ tests/async/test_interception.py | 7 -- tests/async/test_proxy.py | 124 +++++++++++++++++++++++ tests/conftest.py | 1 - tests/server.py | 27 ++--- 6 files changed, 286 insertions(+), 24 deletions(-) create mode 100644 tests/async/test_browsercontext_proxy.py create mode 100644 tests/async/test_proxy.py diff --git a/tests/async/conftest.py b/tests/async/conftest.py index f6dbec34e..1048be55c 100644 --- a/tests/async/conftest.py +++ b/tests/async/conftest.py @@ -48,10 +48,16 @@ def browser_type(playwright, browser_name: str): @pytest.fixture(scope="session") async def browser_factory(launch_arguments, browser_type): + browsers = [] + async def launch(**kwargs): - return await browser_type.launch(**launch_arguments, **kwargs) + browser = await browser_type.launch(**launch_arguments, **kwargs) + browsers.append(browser) + return browser - return launch + yield launch + for browser in browsers: + await browser.close() @pytest.fixture(scope="session") @@ -62,8 +68,22 @@ async def browser(browser_factory): @pytest.fixture -async def context(browser): - context = await browser.new_context() +async def context_factory(browser): + contexts = [] + + async def launch(**kwargs): + context = await browser.new_context(**kwargs) + contexts.append(context) + return context + + yield launch + for context in contexts: + await context.close() + + +@pytest.fixture +async def context(context_factory): + context = await context_factory() yield context await context.close() diff --git a/tests/async/test_browsercontext_proxy.py b/tests/async/test_browsercontext_proxy.py new file mode 100644 index 000000000..a0edb4521 --- /dev/null +++ b/tests/async/test_browsercontext_proxy.py @@ -0,0 +1,123 @@ +# Copyright (c) Microsoft Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 + +import pytest + + +@pytest.fixture(scope="session") +async def browser(browser_factory): + browser = await browser_factory(proxy={"server": "dummy"}) + yield browser + await browser.close() + + +async def test_should_use_proxy(context_factory, server): + server.set_route( + "/target.html", + lambda r: ( + r.write(b"Served by the proxy"), + r.finish(), + ), + ) + context = await context_factory(proxy={"server": f"localhost:{server.PORT}"}) + page = await context.new_page() + await page.goto("http://non-existent.com/target.html") + assert await page.title() == "Served by the proxy" + + +async def test_should_use_proxy_for_second_page(context_factory, server): + server.set_route( + "/target.html", + lambda r: ( + r.write(b"Served by the proxy"), + r.finish(), + ), + ) + context = await context_factory(proxy={"server": f"localhost:{server.PORT}"}) + + page1 = await context.new_page() + await page1.goto("http://non-existent.com/target.html") + assert await page1.title() == "Served by the proxy" + + page2 = await context.new_page() + await page2.goto("http://non-existent.com/target.html") + assert await page2.title() == "Served by the proxy" + + +async def test_should_work_with_ip_port_notion(context_factory, server): + server.set_route( + "/target.html", + lambda r: ( + r.write(b"Served by the proxy"), + r.finish(), + ), + ) + context = await context_factory(proxy={"server": f"127.0.0.1:{server.PORT}"}) + page = await context.new_page() + await page.goto("http://non-existent.com/target.html") + assert await page.title() == "Served by the proxy" + + +async def test_should_authenticate(context_factory, server): + def handler(req): + print(req) + auth = req.getHeader("proxy-authorization") + if not auth: + req.setHeader( + b"Proxy-Authenticate", b'Basic realm="Access to internal site"' + ) + req.setResponseCode(407) + else: + req.write(f"{auth}".encode("utf-8")) + req.finish() + + server.set_route("/target.html", handler) + + context = await context_factory( + proxy={ + "server": f"localhost:{server.PORT}", + "username": "user", + "password": "secret", + } + ) + page = await context.new_page() + await page.goto("http://non-existent.com/target.html") + assert await page.title() == "Basic " + base64.b64encode(b"user:secret").decode( + "utf-8" + ) + + +async def test_should_authenticate_with_empty_password(context_factory, server): + def handler(req): + print(req) + auth = req.getHeader("proxy-authorization") + if not auth: + req.setHeader( + b"Proxy-Authenticate", b'Basic realm="Access to internal site"' + ) + req.setResponseCode(407) + else: + req.write(f"{auth}".encode("utf-8")) + req.finish() + + server.set_route("/target.html", handler) + + context = await context_factory( + proxy={"server": f"localhost:{server.PORT}", "username": "user", "password": ""} + ) + page = await context.new_page() + await page.goto("http://non-existent.com/target.html") + assert await page.title() == "Basic " + base64.b64encode(b"user:").decode("utf-8") diff --git a/tests/async/test_interception.py b/tests/async/test_interception.py index 80d1145b5..9304bc544 100644 --- a/tests/async/test_interception.py +++ b/tests/async/test_interception.py @@ -460,13 +460,6 @@ async def test_page_route_should_work_with_encoded_server(page, server): assert response.status == 404 -async def test_page_route_should_work_with_badly_encoded_server(page, server): - server.set_route("/malformed?rnd=%911", lambda req: req.finish()) - await page.route("**/*", lambda route: route.continue_()) - response = await page.goto(server.PREFIX + "/malformed?rnd=%911") - assert response.status == 200 - - async def test_page_route_should_work_with_encoded_server___2(page, server): # The requestWillBeSent will report URL as-is, whereas interception will # report encoded URL for stylesheet. @see crbug.com/759388 diff --git a/tests/async/test_proxy.py b/tests/async/test_proxy.py new file mode 100644 index 000000000..4a9ead417 --- /dev/null +++ b/tests/async/test_proxy.py @@ -0,0 +1,124 @@ +# Copyright (c) Microsoft Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 + +import pytest + +from playwright.async_api import Error + + +async def test_should_throw_for_bad_server_value(browser_factory): + with pytest.raises(Error) as exc_info: + await browser_factory(proxy={"server": 123}) + assert "proxy.server: expected string, got number" in exc_info.value.message + + +async def test_should_use_proxy(browser_factory, server): + server.set_route( + "/target.html", + lambda r: ( + r.write(b"Served by the proxy"), + r.finish(), + ), + ) + browser = await browser_factory(proxy={"server": f"localhost:{server.PORT}"}) + page = await browser.new_page() + await page.goto("http://non-existent.com/target.html") + assert await page.title() == "Served by the proxy" + + +async def test_should_use_proxy_for_second_page(browser_factory, server): + server.set_route( + "/target.html", + lambda r: ( + r.write(b"Served by the proxy"), + r.finish(), + ), + ) + browser = await browser_factory(proxy={"server": f"localhost:{server.PORT}"}) + + page1 = await browser.new_page() + await page1.goto("http://non-existent.com/target.html") + assert await page1.title() == "Served by the proxy" + + page2 = await browser.new_page() + await page2.goto("http://non-existent.com/target.html") + assert await page2.title() == "Served by the proxy" + + +async def test_should_work_with_ip_port_notion(browser_factory, server): + server.set_route( + "/target.html", + lambda r: ( + r.write(b"Served by the proxy"), + r.finish(), + ), + ) + browser = await browser_factory(proxy={"server": f"127.0.0.1:{server.PORT}"}) + page = await browser.new_page() + await page.goto("http://non-existent.com/target.html") + assert await page.title() == "Served by the proxy" + + +async def test_should_authenticate(browser_factory, server): + def handler(req): + print(req) + auth = req.getHeader("proxy-authorization") + if not auth: + req.setHeader( + b"Proxy-Authenticate", b'Basic realm="Access to internal site"' + ) + req.setResponseCode(407) + else: + req.write(f"{auth}".encode("utf-8")) + req.finish() + + server.set_route("/target.html", handler) + + browser = await browser_factory( + proxy={ + "server": f"localhost:{server.PORT}", + "username": "user", + "password": "secret", + } + ) + page = await browser.new_page() + await page.goto("http://non-existent.com/target.html") + assert await page.title() == "Basic " + base64.b64encode(b"user:secret").decode( + "utf-8" + ) + + +async def test_should_authenticate_with_empty_password(browser_factory, server): + def handler(req): + print(req) + auth = req.getHeader("proxy-authorization") + if not auth: + req.setHeader( + b"Proxy-Authenticate", b'Basic realm="Access to internal site"' + ) + req.setResponseCode(407) + else: + req.write(f"{auth}".encode("utf-8")) + req.finish() + + server.set_route("/target.html", handler) + + browser = await browser_factory( + proxy={"server": f"localhost:{server.PORT}", "username": "user", "password": ""} + ) + page = await browser.new_page() + await page.goto("http://non-existent.com/target.html") + assert await page.title() == "Basic " + base64.b64encode(b"user:").decode("utf-8") diff --git a/tests/conftest.py b/tests/conftest.py index 2aa90a115..cccce0d63 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -50,7 +50,6 @@ def assetdir(): def launch_arguments(pytestconfig): return { "headless": not pytestconfig.getoption("--headful"), - "chromium_sandbox": False, } diff --git a/tests/server.py b/tests/server.py index f37a80383..6fc86122e 100644 --- a/tests/server.py +++ b/tests/server.py @@ -20,6 +20,7 @@ import threading from contextlib import closing from http import HTTPStatus +from urllib.parse import urlparse from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol from OpenSSL import crypto @@ -78,18 +79,20 @@ def process(self): request = self self.post_body = request.content.read() request.content.seek(0, 0) - uri = request.uri.decode() - if request_subscribers.get(uri): - request_subscribers[uri].set_result(request) - request_subscribers.pop(uri) + uri = urlparse(request.uri.decode()) + path = uri.path - if auth.get(uri): + if request_subscribers.get(path): + request_subscribers[path].set_result(request) + request_subscribers.pop(path) + + if auth.get(path): authorization_header = request.requestHeaders.getRawHeaders( "authorization" ) creds_correct = False if authorization_header: - creds_correct = auth.get(uri) == ( + creds_correct = auth.get(path) == ( request.getUser(), request.getPassword(), ) @@ -100,19 +103,19 @@ def process(self): request.setResponseCode(HTTPStatus.UNAUTHORIZED) request.finish() return - if csp.get(uri): - request.setHeader(b"Content-Security-Policy", csp[uri]) - if routes.get(uri): - routes[uri](request) + if csp.get(path): + request.setHeader(b"Content-Security-Policy", csp[path]) + if routes.get(path): + routes[path](request) return file_content = None try: file_content = ( static_path / request.path.decode()[1:] ).read_bytes() - request.setHeader(b"Content-Type", mimetypes.guess_type(uri)[0]) + request.setHeader(b"Content-Type", mimetypes.guess_type(path)[0]) request.setHeader(b"Cache-Control", "no-cache, no-store") - if uri in gzip_routes: + if path in gzip_routes: request.setHeader("Content-Encoding", "gzip") request.write(gzip.compress(file_content)) else: