Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #4060 from mplattner/mapremote-addon
add mapremote addon to modify request URLs
- Loading branch information
Showing
6 changed files
with
203 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import os | ||
import re | ||
import typing | ||
|
||
from mitmproxy import exceptions | ||
from mitmproxy import ctx | ||
from mitmproxy.addons.modifyheaders import parse_modify_spec, ModifySpec | ||
|
||
|
||
class MapRemote: | ||
def __init__(self): | ||
self.replacements: typing.List[ModifySpec] = [] | ||
|
||
def load(self, loader): | ||
loader.add_option( | ||
"map_remote", typing.Sequence[str], [], | ||
""" | ||
Replacement pattern of the form "[/flow-filter]/regex/[@]replacement", where | ||
the separator can be any character. The @ allows to provide a file path that | ||
is used to read the replacement string. | ||
""" | ||
) | ||
|
||
def configure(self, updated): | ||
if "map_remote" in updated: | ||
self.replacements = [] | ||
for option in ctx.options.map_remote: | ||
try: | ||
spec = parse_modify_spec(option) | ||
try: | ||
re.compile(spec.subject) | ||
except re.error: | ||
raise ValueError(f"Invalid regular expression: {spec.subject}") | ||
except ValueError as e: | ||
raise exceptions.OptionsError( | ||
f"Cannot parse map_remote option {option}: {e}" | ||
) from e | ||
|
||
self.replacements.append(spec) | ||
|
||
def request(self, flow): | ||
if not flow.reply.has_message: | ||
for spec in self.replacements: | ||
if spec.matches(flow): | ||
self.replace(flow.request, spec.subject, spec.replacement) | ||
|
||
def replace(self, obj, search, repl): | ||
""" | ||
Replaces all matches of the regex search in the url of the request with repl. | ||
Returns: | ||
The number of replacements made. | ||
""" | ||
if repl.startswith(b"@"): | ||
path = os.path.expanduser(repl[1:]) | ||
try: | ||
with open(path, "rb") as f: | ||
repl = f.read() | ||
except IOError: | ||
ctx.log.warn("Could not read replacement file: %s" % repl) | ||
return | ||
|
||
replacements = 0 | ||
obj.url, replacements = re.subn(search, repl, obj.pretty_url.encode("utf8", "surrogateescape"), flags=re.DOTALL) | ||
return replacements |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import pytest | ||
|
||
from mitmproxy.addons import mapremote | ||
from mitmproxy.test import taddons | ||
from mitmproxy.test import tflow | ||
|
||
|
||
class TestMapRemote: | ||
|
||
def test_configure(self): | ||
mr = mapremote.MapRemote() | ||
with taddons.context(mr) as tctx: | ||
tctx.configure(mr, map_remote=["one/two/three"]) | ||
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid number"): | ||
tctx.configure(mr, map_remote = ["/"]) | ||
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid filter"): | ||
tctx.configure(mr, map_remote=["/~b/two/three"]) | ||
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid regular expression"): | ||
tctx.configure(mr, map_remote=["/foo/+/three"]) | ||
tctx.configure(mr, map_remote=["/a/b/c/"]) | ||
|
||
def test_simple(self): | ||
mr = mapremote.MapRemote() | ||
with taddons.context(mr) as tctx: | ||
tctx.configure( | ||
mr, | ||
map_remote=[ | ||
":example.org/images/:mitmproxy.org/img/", | ||
] | ||
) | ||
f = tflow.tflow() | ||
f.request.url = b"https://example.org/images/test.jpg" | ||
mr.request(f) | ||
assert f.request.url == "https://mitmproxy.org/img/test.jpg" | ||
|
||
|
||
class TestMapRemoteFile: | ||
def test_simple(self, tmpdir): | ||
mr = mapremote.MapRemote() | ||
with taddons.context(mr) as tctx: | ||
tmpfile = tmpdir.join("replacement") | ||
tmpfile.write("mitmproxy.org") | ||
tctx.configure( | ||
mr, | ||
map_remote=[":example.org:@" + str(tmpfile)] | ||
) | ||
f = tflow.tflow() | ||
f.request.url = b"https://example.org/test" | ||
mr.request(f) | ||
assert f.request.url == "https://mitmproxy.org/test" | ||
|
||
@pytest.mark.asyncio | ||
async def test_nonexistent(self, tmpdir): | ||
mr = mapremote.MapRemote() | ||
with taddons.context(mr) as tctx: | ||
with pytest.raises(Exception, match="Invalid file path"): | ||
tctx.configure( | ||
mr, | ||
map_remote=[":~q:example.org:@nonexistent"] | ||
) | ||
|
||
tmpfile = tmpdir.join("replacement") | ||
tmpfile.write("mitmproxy.org") | ||
tctx.configure( | ||
mr, | ||
map_remote=[":example.org:@" + str(tmpfile)] | ||
) | ||
tmpfile.remove() | ||
f = tflow.tflow() | ||
f.request.url = b"https://example.org/test" | ||
mr.request(f) | ||
assert await tctx.master.await_log("could not read") |